Apache Cassandra

Bei Cassandrahandelt es sich um eine NoSQL - Datenbank von Apache.

Die Daten werden nach dem folgenden Schema abgespeichert:

Schematische Darstellung

Diese Information ist auch notwendig, wenn man aus einer Anwendung heraus auf die Cassandra-DB zugreifen will. Wichtig ist auf jeden Fall, dass es kein festes Schema innerhalb der Column Family gibt - vielmehr handelt es sich um eine variable Menge von Key-Value Paaren.

Ich habe einen kleinen Testcase erstellt, womit aus Java heraus auf die Cassandra-DB zugegriffen wird. Dafür nutze ich die Low-Level Thrift-API.

Für den Test gehe ich davon aus, dass bereits ein Keyspace mit dem Namen PLAYGROUND ** und darin eine ColumnFamily mit dem Namen **User existiert.

Cassanda bietet eine Konsole, mit der man ganz einfach DDL und DML Kommandos ausführen kann.
Diese wird über den Befehl /bin/cassandra-cli gestartet. Ein kleines Tutorial dazu findet sich hier.

Die folgenden Dependencies sind für das Projekt erforderlich:

...
<dependencies>
      <!-- Thrift API -->
      <dependency>
              <groupId>org.apache.cassandra.deps</groupId>
              <artifactId>libthrift</artifactId>
              <version>0.5.0</version>
      </dependency>
      <!-- Cassandra Libs -->
      <dependency>
              <groupId>org.apache.cassandra</groupId>
              <artifactId>cassandra-all</artifactId>
              <version>0.7.1</version>
      </dependency>
      <!-- SLF4J Binding -->
      <dependency>
              <groupId>org.slf4j</groupId>
              <artifactId>slf4j-simple</artifactId>
              <version>1.6.1</version>
      </dependency>
      <!-- JUnit -->
      <dependency>
              <groupId>junit</groupId>
              <artifactId>junit</artifactId>
              <version>4.8.2</version>
              <scope>test</scope>
      </dependency>
</dependencies>
...

Hier der eigentliche JUnit TestCase, welcher einen Datensatz erstellt, ausliest und danach wieder entfernt.

import java.nio.ByteBuffer;
import java.util.List;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class CRUDTest {

    private TTransport transport;
    private Cassandra.Client client;

    private final String ENCODING = "UTF-8";
    private final String KEYSPACE = "PLAYGROUND";
    private final String CF = "User";

    @Before
    public void setUp() throws Exception {
        // Verbindung aufbauen
        transport = new TFramedTransport(new TSocket("localhost", 9160));
        client = new Cassandra.Client(new TBinaryProtocol(transport));
        transport.open();
        // Keyspace setzen, welcher für den Test genutzt wird.
        client.set_keyspace(KEYSPACE);
    }

    @After
    public void tearDown() throws Exception {
        // Verbindung schliessen
        transport.flush();
        transport.close();
    }

    @Test
    public void testCRUD() throws Exception {
        final String id = String.valueOf(System.currentTimeMillis());
        final ColumnParent parent = new ColumnParent(CF);
        long currentTimeInMillis = System.currentTimeMillis();

        // Daten einfügen

        Column username = new Column(ByteBuffer.wrap("username".getBytes(ENCODING)), ByteBuffer.wrap("peter_lustig"
                .getBytes()), currentTimeInMillis);
        Column password = new Column(ByteBuffer.wrap("password".getBytes(ENCODING)), ByteBuffer.wrap("loewenzahn"
                .getBytes()), currentTimeInMillis);

        ByteBuffer userid = ByteBuffer.wrap(id.getBytes(ENCODING));

        client.insert(userid, parent, username, ConsistencyLevel.ONE);
        client.insert(userid, parent, password, ConsistencyLevel.ONE);

        // Daten abfragen

        SlicePredicate slicePredicate = new SlicePredicate();
        SliceRange sliceRange = new SliceRange();
        sliceRange.setStart(new byte[] {});
        sliceRange.setFinish(new byte[] {});
        slicePredicate.setSlice_range(sliceRange);

        List result = client.get_slice(ByteBuffer.wrap(id.getBytes(ENCODING)), parent,
                slicePredicate, ConsistencyLevel.ONE);

        Assert.assertFalse(result.isEmpty());
        Assert.assertTrue(1 == result.size());

        for (ColumnOrSuperColumn col : result) {
            Assert.assertNotNull(col.getColumn());

            String name = new String(col.getColumn().getName(), ENCODING);
            String value = new String(col.getColumn().getValue(), ENCODING);
            long timestamp = col.getColumn().getTimestamp();

            System.out.println("------------------------------");
            System.out.println(String.format("name: %s", name));
            System.out.println(String.format("value: %s", value));
            System.out.println(String.format("timestamp: %d", timestamp));
            System.out.println("------------------------------");
        }

        // Daten löschen

        ColumnPath path = new ColumnPath();
        path.column_family = CF;
        client.remove(ByteBuffer.wrap(id.getBytes(ENCODING)), path, currentTimeInMillis, ConsistencyLevel.ONE);

        // Prüfung, ob keine Daten mehr vorhanden sind

        result = client.get_slice(ByteBuffer.wrap(id.getBytes(ENCODING)), parent, slicePredicate, ConsistencyLevel.ONE);

        Assert.assertTrue(result.isEmpty());
    }
}