cassandra java客户端_cassandra client in Java——cassandra总结(五)

景永望
2023-12-01

importjava.io.UnsupportedEncodingException;importjava.nio.ByteBuffer;importjava.util.ArrayList;importjava.util.HashMap;importjava.util.List;importjava.util.Map;importjava.util.Set;importorg.apache.cassandra.thrift.InvalidRequestException;importorg.apache.cassandra.thrift.TimedOutException;importorg.apache.cassandra.thrift.UnavailableException;importorg.apache.thrift.TException;importorg.apache.cassandra.thrift.ColumnOrSuperColumn;importorg.apache.cassandra.thrift.ColumnPath;importorg.apache.cassandra.thrift.ConsistencyLevel;importorg.apache.cassandra.thrift.Cassandra;importorg.apache.cassandra.thrift.Column;importorg.apache.cassandra.thrift.ColumnParent;importorg.apache.cassandra.thrift.Mutation;importorg.apache.cassandra.thrift.NotFoundException;importorg.apache.cassandra.thrift.SlicePredicate;importorg.apache.cassandra.thrift.SliceRange;importorg.apache.cassandra.thrift.SuperColumn;importorg.apache.thrift.protocol.TBinaryProtocol;importorg.apache.thrift.protocol.TProtocol;importorg.apache.thrift.transport.TFramedTransport;importorg.apache.thrift.transport.TSocket;importorg.apache.thrift.transport.TTransport;importorg.apache.thrift.transport.TTransportException;importorg.junit.Test;public classCassandraClient {/** 对cassandra数据库的基础操作

**/

private static String host = "127.0.0.1";private static int port = 9160;private static String keyspace = "cocoon";//暴露client供外界批量插入数据

public static Cassandra.Client client = null;private static ThreadLocal ttrans = new ThreadLocal();//打开数据库连接

public staticTTransport openConnection(){

TTransport tTransport=ttrans.get();if( tTransport == null){

tTransport= new TFramedTransport(newTSocket(host, port));

TProtocol tProtocol= newTBinaryProtocol(tTransport);

client= newCassandra.Client(tProtocol);try{

tTransport.open();

client.set_keyspace(keyspace);

ttrans.set(tTransport);

System.out.println(tTransport);

}catch(TTransportException e) {

e.printStackTrace();

}catch(InvalidRequestException e) {

e.printStackTrace();

}catch(TException e) {

e.printStackTrace();

}

}returntTransport;

}//关闭数据库连接

public static voidcloseConnection(){

TTransport tTransport=ttrans.get();

ttrans.set(null);if( tTransport != null &&tTransport.isOpen() )

tTransport.close();

}//测试线程局部变量

@Testpublic void testThreadLocal() throwsException{

TTransport t1=CassandraClient.openConnection();

System.out.println(t1);

TTransport t2=CassandraClient.openConnection();

System.out.println(t2);newThread(){public voidrun(){

TTransport t3=CassandraClient.openConnection();

System.out.println(t3);

CassandraClient.closeConnection();

}

}.start();

Thread.sleep(100);

CassandraClient.closeConnection();

System.out.println(t1.isOpen());

}/*插入一个supercolumn(for super column family)

* @param columns column的map集合

**/

public void insertSuperColumn(String superColumnFamily, String key, String superName, Mapcolumns)throwsUnsupportedEncodingException, InvalidRequestException,

UnavailableException, TimedOutException, TException{

openConnection();

Map>>map;

map= new HashMap>>();

List list = new ArrayList();

SuperColumn superColumn= newSuperColumn();

superColumn.setName(CassandraClient.toByteBuffer(superName));

Set columnNames =columns.keySet();for(String columnName: columnNames) {

Column c= newColumn();

c.setName(CassandraClient.toByteBuffer(columnName));

c.setValue(CassandraClient.toByteBuffer(columns.get(columnName)));

c.setTimestamp(System.currentTimeMillis());

superColumn.addToColumns(c);

}

ColumnOrSuperColumn cos= newColumnOrSuperColumn();

cos.super_column=superColumn;

Mutation mutation= newMutation();

mutation.column_or_supercolumn=cos;

list.add(mutation);

Map> supers = new HashMap>();

supers.put(superColumnFamily, list);

map.put(toByteBuffer(key), supers);

client.batch_mutate(map, ConsistencyLevel.ONE);

closeConnection();

}//插入一个column(for standard column family)

public void insertColumn(String columnFamily, String key, String columnName, String columnValue) throwsUnsupportedEncodingException{

openConnection();

ColumnParent parent= newColumnParent(columnFamily);if( client != null) {

Column column= newColumn( toByteBuffer(columnName) );

column.setValue(toByteBuffer(columnValue));long timestamp =System.currentTimeMillis();

column.setTimestamp(timestamp);try{

client.insert(toByteBuffer(key), parent, column, ConsistencyLevel.ONE);

}catch(InvalidRequestException e) {

e.printStackTrace();

}catch(UnavailableException e) {

e.printStackTrace();

}catch(TimedOutException e) {

e.printStackTrace();

}catch(TException e) {

e.printStackTrace();

}

closeConnection();

}

}/*获取key对应的column集合(for standard column family)

* @return column的map集合

**/

public HashMap getColumns(String columnFamily, String key) throwsInvalidRequestException, UnavailableException, TimedOutException, UnsupportedEncodingException, TException{

openConnection();

ColumnParent parent= newColumnParent(columnFamily);if( client != null) {

SlicePredicate predicate= newSlicePredicate();//定义查询的columnName范围(begin~end),正反方向,数目(columnName在数据库中是排好序的,所以有正方向查询,反方向查询)

SliceRange sliceRange = new SliceRange(toByteBuffer(""), toByteBuffer(""), false,100);

predicate.setSlice_range(sliceRange);

List results =client.get_slice(toByteBuffer(key), parent, predicate, ConsistencyLevel.ONE);if( results == null)return null;

HashMap map = new HashMap();for(ColumnOrSuperColumn result : results)

{

Column column=result.column;

map.put(byteBufferToString(column.name), byteBufferToString(column.value));//System.out.println(byteBufferToString(column.name) + " : " + byteBufferToString(column.value) );

}

closeConnection();returnmap;

}return null;

}/*获取key对应的superColumn集合(for super column family)

* @return superColumn的map集合

**/

public HashMap> getSuperColumns(String superColumnFamily, String key) throwsInvalidRequestException, UnavailableException, TimedOutException, UnsupportedEncodingException, TException{

openConnection();

ColumnParent parent= newColumnParent(superColumnFamily);if( client != null) {

SlicePredicate predicate= newSlicePredicate();//定义查询的superColumn.key范围,正反方向,数目(superColumn.key在数据库中是排好序的,所以有正方向查询,反方向查询)

SliceRange sliceRange = new SliceRange(toByteBuffer(""), toByteBuffer(""), false, 100);

predicate.setSlice_range(sliceRange);

List results =client.get_slice(toByteBuffer(key), parent, predicate, ConsistencyLevel.ONE);if( results == null)return null;

HashMap> supers = new HashMap>();for(ColumnOrSuperColumn result : results)

{

SuperColumn superColumn=result.super_column;

Map columns = new HashMap();for( Column column : superColumn.columns){

columns.put(byteBufferToString(column.name), byteBufferToString(column.value));

}

supers.put(byteBufferToString(superColumn.name), columns);//System.out.println(byteBufferToString(column.name) + " : " + byteBufferToString(column.value) );

}

closeConnection();returnsupers;

}return null;

}/*获取key,columnName对应的columnValue(for standard column family)

* @return String

**/

publicString getColumnValue(String columnFamily, String key, String columnName){try{

ColumnPath path= newColumnPath(columnFamily);

path.setColumn(toByteBuffer(columnName));try{

openConnection();

String result= newString( client.get(toByteBuffer(key), path, ConsistencyLevel.ONE).getColumn().getValue() );

closeConnection();returnresult;

}catch(InvalidRequestException e) {

e.printStackTrace();

}catch(NotFoundException e) {

e.printStackTrace();

}catch(UnavailableException e) {

e.printStackTrace();

}catch(TimedOutException e) {

e.printStackTrace();

}catch(TException e) {

e.printStackTrace();

}

}catch(UnsupportedEncodingException e) {

e.printStackTrace();

}return null;

}//String类型转化为ByteBuffer(UTF-8编码)

public static ByteBuffer toByteBuffer(String value) throwsUnsupportedEncodingException {return ByteBuffer.wrap(value.getBytes("UTF-8"));

}//ByteBuffer类型转化为String(UTF-8编码)

public static String byteBufferToString(ByteBuffer byteBuffer) throwsUnsupportedEncodingException{byte[] bytes = new byte[byteBuffer.remaining()];

byteBuffer.get(bytes);return new String(bytes,"UTF-8");

}

}

 类似资料: