Cassandra を Java から使う

プロジェクト作成

Eclipse からの利用。新規Javaプロジェクト作成し、cassandra の lib 配下の jar をコピーしてクラスパスに追加します。conf ディレクトリを作成し、log4j.properties と storage-conf.xml を作成します。

log4j.properties は以下の内容

log4j.rootLogger=INFO,stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n


storage-conf.xml は以下の内容

<Storage>
  <ClusterName>Test Cluster</ClusterName>
  <HintedHandoffEnabled>true</HintedHandoffEnabled>
  <Keyspaces>
    <Keyspace Name="Keyspace1">
      <ColumnFamily Name="Standard1" CompareWith="BytesType"/>
      <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
      <ReplicationFactor>1</ReplicationFactor>
      <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
    </Keyspace>
  </Keyspaces>

  <Authenticator>org.apache.cassandra.auth.AllowAllAuthenticator</Authenticator>
  <Partitioner>org.apache.cassandra.dht.RandomPartitioner</Partitioner>

  <CommitLogDirectory>db/cassandra/commitlog</CommitLogDirectory>
  <DataFileDirectories>
      <DataFileDirectory>db/cassandra/data</DataFileDirectory>
  </DataFileDirectories>

  <Seeds>
      <Seed>127.0.0.1</Seed>
  </Seeds>
  <ListenAddress>localhost</ListenAddress>
  <StoragePort>7000</StoragePort>

  <ThriftAddress>localhost</ThriftAddress>
  <ThriftPort>9160</ThriftPort>
  <ThriftFramedTransport>false</ThriftFramedTransport>

  <DiskAccessMode>auto</DiskAccessMode>
  <CommitLogSync>periodic</CommitLogSync>
  <CommitLogSyncPeriodInMS>10000</CommitLogSyncPeriodInMS>

</Storage>

こんな感じの構成になります。

Java から cassandra を使う

CassandraService を初期化してサーバ起動

    EmbeddedCassandraService cassandra = new EmbeddedCassandraService();
    cassandra.init();
    Thread t = new Thread(cassandra);
    t.start();

Cassandra へは Thrift(スリフト) による RPC 経由で接続します。

    TSocket transport = new TSocket("localhost", 9160);
    TProtocol protocol = new TBinaryProtocol(transport);
    transport.open();
    Cassandra.Client client = new Cassandra.Client(protocol);

Cassandra.Client 経由にて insert。

    final ColumnPath columnPath = new ColumnPath("Standard1");
    columnPath.setColumn("column".getBytes());
    client.insert("Keyspace1", "sample1", columnPath, 
            "sample_value".getBytes(), System.currentTimeMillis(),
            ConsistencyLevel.ONE);


全ソースはこんな感じ。

package etc9;

import org.apache.cassandra.service.EmbeddedCassandraService;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;

public class Sample01 {
    
    public static void main(String[] args) throws Exception {
        System.setProperty("storage-config", "conf");
        
        EmbeddedCassandraService cassandra = new EmbeddedCassandraService();
        cassandra.init();
        Thread t = new Thread(cassandra);
        t.start();
        
        TSocket transport = new TSocket("localhost", 9160);
        TProtocol protocol = new TBinaryProtocol(transport);
        transport.open();
        Cassandra.Client client = new Cassandra.Client(protocol);

        final ColumnPath columnPath = new ColumnPath("Standard1");
        columnPath.setColumn("column".getBytes());
        client.insert("Keyspace1", "sample1", columnPath, 
                "sample_value".getBytes(), System.currentTimeMillis(),
                ConsistencyLevel.ONE);

        transport.flush();
        transport.close();
    }
}

実行

普通に実行します。CassandraService は終了させてないので cassandra-cli から登録内容確認。

cassandra> connect localhost/9160
Connected to: "Test Cluster" on localhost/9160
cassandra> get Keyspace1.Standard1['sample1']
=> (column=636f6c756d6e, value=sample_value, timestamp=1277398809812)
Returned 1 results.

、Insertされてます。