Ich habe mir mir einen Zugriffs-Wrapper für die Thrift-API in Groovy geschrieben. Alle Zugriffe erfolgen über eine diese zentrale Basisklasse, welche die CRUD Standardfunktionalitäten mitbringt.
package de.ronnyfriedland.cassandra.wrapper import de.ronnyfriedland.cassandra.entity.BaseEntity /** * @author Ronny Friedland */ class EntityWrapper { def private static EXLUCDE_PROPERTIES = ["metaClass"] def private static READONLY_PROPERTIES = ["class"] static Map fromEntity(BaseEntity entity) { Expando ex = new Expando() entity?.properties?.each { if(!EXLUCDE_PROPERTIES.grep(it.key)) { ex.setProperty(it.key, it.value) } } ex.properties } static BaseEntity toEntity(Map properties) { def clazzProperty = properties?.get("class") def clazz if(clazzProperty instanceof Class) { clazz = clazzProperty.name } else if(clazzProperty instanceof String) { clazz = clazzProperty.replaceAll("class", "").trim() } else { throw new RuntimeException("Unexpected type for property 'class' !") } READONLY_PROPERTIES?.each { properties.remove(it) } BaseEntity entity = Class.forName(clazz).newInstance(properties) } }
Der Zugriff auf diese Wrapper-Klasse muss aber nicht direkt erfolgen. Ich habe zusätzlich einen CassandraClient geschrieben, welcher auch den Auf- und Abbau der Connections verwaltet.
package de.ronnyfriedland.cassandra.api import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer import org.apache.cassandra.thrift.Cassandra import org.apache.cassandra.thrift.Column import org.apache.cassandra.thrift.ColumnOrSuperColumn import org.apache.cassandra.thrift.ColumnParent import org.apache.cassandra.thrift.ColumnPath import org.apache.cassandra.thrift.ConsistencyLevel import org.apache.cassandra.thrift.InvalidRequestException; import org.apache.cassandra.thrift.SlicePredicate import org.apache.cassandra.thrift.SliceRange import org.apache.cassandra.thrift.TBinaryProtocol import org.apache.cassandra.thrift.TimedOutException; import org.apache.cassandra.thrift.UnavailableException; import org.apache.thrift.TException; import org.apache.thrift.transport.TFramedTransport import org.apache.thrift.transport.TSocket import org.apache.thrift.transport.TTransport import de.ronnyfriedland.cassandra.entity.BaseEntity import de.ronnyfriedland.cassandra.wrapper.EntityWrapper /** * Einfacher Cassandra-Client, welcher Zugriffsmethoden für * * SELECT * INSERT * DELETE * * zur Verfügung stellt. * * @author Ronny Friedland */ class CassandraClient { protected final String ENCODING = "UTF-8"; protected final TTransport transport protected final Cassandra.Client client /** * Erzeugt eine neue CassandraClient-Instanz. * @param host Cassandra-Host * @param port Cassandra-Port * @param keySpace zu nutzender Keyspace */ public CassandraClient(String host, Integer port, String keySpace) { transport = new TFramedTransport(new TSocket(host, port)) client = new Cassandra.Client(new TBinaryProtocol(transport)) transport.open() client.set_keyspace(keySpace) } /** * Schliesst die Verbindung zur Cassandra-DB. */ public void close() { transport.flush() transport.close() } /** * Einfügen eines neuen Datensatzes * @param entity zu speicherndes Entity */ public void insert(BaseEntity entity) throws UnsupportedEncodingException, InvalidRequestException, UnavailableException, TimedOutException, TException { doInsert(EntityWrapper.fromEntity(entity)) } protected void doInsert(def properties) throws UnsupportedEncodingException, InvalidRequestException, UnavailableException, TimedOutException, TException { if(!transport.open) { throw new RuntimeException("Connection already closed !") } long timeStamp = System.currentTimeMillis() ByteBuffer uuid = ByteBuffer.wrap(properties?.uuid?.getBytes(ENCODING)) ColumnParent parent = new ColumnParent(properties?.columnFamily) properties?.each { Column col = new Column(ByteBuffer.wrap("${it.key}".getBytes(ENCODING)), ByteBuffer.wrap("${it.value}" .getBytes(ENCODING)), timeStamp) client.insert(uuid, parent, col, ConsistencyLevel.ONE) } } /** * Selektieren eines Datensatzes anhand des eindeutigen Schlüssels. * @param uuid eindeutige ID * @param cf zu nutzende KeyFamily * @return BaseEntity */ public BaseEntity select(String uuid, String cf) throws UnsupportedEncodingException, InvalidRequestException, UnavailableException, TimedOutException, TException { def resultProperties = doSelect(uuid, cf) BaseEntity entity = EntityWrapper.toEntity(resultProperties) return entity } protected Map doSelect(String uuid, String cf) throws UnsupportedEncodingException, InvalidRequestException, UnavailableException, TimedOutException, TException { if(!transport.open) { throw new RuntimeException("Connection already closed !") } ColumnParent parent = new ColumnParent(cf) SlicePredicate slicePredicate = new SlicePredicate(); SliceRange sliceRange = new SliceRange(); sliceRange.setStart(new byte[0]); sliceRange.setFinish(new byte[0]); slicePredicate.setSlice_range(sliceRange); List resultColumns = client.get_slice(ByteBuffer.wrap(uuid?.getBytes(ENCODING)), parent, slicePredicate, ConsistencyLevel.ONE); def timeStamp = -1; def resultProperties = [:] resultColumns?.each { resultProperties[new String(it?.column?.name, ENCODING)] = new String(it?.column?.value, ENCODING) resultProperties["creationTime"] = it?.column?.timestamp } return resultProperties } /** * Löschen eines Datensatzes. * @param entity das zu löschende Entity */ public void remove(BaseEntity entity) throws UnsupportedEncodingException, InvalidRequestException, UnavailableException, TimedOutException, TException { remove(entity?.uuid, entity?.creationTime, entity?.columnFamily) } /** * Löschen eines Datensatzes. * @param uuid eindeutige ID des Datensatzes * @param creationTime Erstellungszeitpunkt * @param cf zu nutzende KeyFamily */ public void remove(String uuid, Long creationTime, String cf) throws UnsupportedEncodingException, InvalidRequestException, UnavailableException, TimedOutException, TException { if(!transport.open) { throw new RuntimeException("Connection already closed !") } ColumnPath path = new ColumnPath(); path.column_family = cf; client.remove(ByteBuffer.wrap(uuid.getBytes(ENCODING)), path, creationTime, ConsistencyLevel.ALL); } }
Die Daten können als Java-Klasse - welche von einer Basisklasse
BaseEntity erben muss - übergeben werden und werden durch den
Wrapper als Key-Value Paare in der Datenbank abgelegt. Beim Auslesen
werden die Key-Value Paare wieder auf die Java-Klasse gemappt und zurück
gegeben.
Es können auch eigene Java-Klassen als Entitäten genutzt werden,
solange sie von der Basisklasse ableiten.
package de.ronnyfriedland.cassandra.entity; import java.util.UUID; /** * Basis Entity für Cassandra * * @author Ronny Friedland */ public class BaseEntity { private String uuid; private String columnFamily; private long creationTime; /** * Erzeugt eine neue BaseEntity-Instanz. */ protected BaseEntity() { super(); } /** * Erzeugt eine neue BaseEntity-Instanz. * * @param aColumnFamily */ public BaseEntity(final String aColumnFamily) { uuid = UUID.randomUUID().toString(); columnFamily = aColumnFamily; } /** * Erzeugt eine neue BaseEntity-Instanz. * * @param aUuid * @param aColumnFamily */ public BaseEntity(final String aUuid, final String aColumnFamily) { uuid = aUuid; columnFamily = aColumnFamily; } public String getUuid() { return uuid; } public void setUuid(final String uuid) { this.uuid = uuid; } public void setColumnFamily(final String columnFamily) { this.columnFamily = columnFamily; } public String getColumnFamily() { return columnFamily; } public void setCreationTime(final long creationTime) { this.creationTime = creationTime; } public long getCreationTime() { return creationTime; } /** * (non-Javadoc) * * @see java.lang.Object#toString() */ @Override public String toString() { StringBuffer sbuf = new StringBuffer(super.toString()); sbuf.append(String.format("[uuid: %s]", getUuid())); sbuf.append(String.format("[columnFamily: %s]", getColumnFamily())); sbuf.append(String.format("[creationTime: %d]", getCreationTime())); return sbuf.toString(); } }
Die Datenbank muss wie folgt konfiguriert werden:
Keyspace: PLAYGROUND (create keyspace PLAYGROUND;)
ColumnFamily: User (create column family User with comparator = UTF8Type;)
update column family User with column_metadata = [ {column_name: uuid, validation_class: UTF8Type}, {column_name: creationTime, validation_class: UTF8Type}, {column_name: column_Family, validation_class: UTF8Type}, {column_name: class, validation_class: UTF8Type} ];