importjava.io.UnsupportedEncodingException;importjava.nio.ByteBuffer;importjava.util.ArrayList;importjava.util.HashMap;importjava.util.List;importjava.util.Map;importjava.util.Set;importorg.apache.cassandra.thrift.InvalidRequestException;importorg.apache.cassandra.thrift.TimedOutException;importorg.apache.cassandra.thrift.UnavailableException;importorg.apache.thrift.TException;importorg.apache.cassandra.thrift.ColumnOrSuperColumn;importorg.apache.cassandra.thrift.ColumnPath;importorg.apache.cassandra.thrift.ConsistencyLevel;importorg.apache.cassandra.thrift.Cassandra;importorg.apache.cassandra.thrift.Column;importorg.apache.cassandra.thrift.ColumnParent;importorg.apache.cassandra.thrift.Mutation;importorg.apache.cassandra.thrift.NotFoundException;importorg.apache.cassandra.thrift.SlicePredicate;importorg.apache.cassandra.thrift.SliceRange;importorg.apache.cassandra.thrift.SuperColumn;importorg.apache.thrift.protocol.TBinaryProtocol;importorg.apache.thrift.protocol.TProtocol;importorg.apache.thrift.transport.TFramedTransport;importorg.apache.thrift.transport.TSocket;importorg.apache.thrift.transport.TTransport;importorg.apache.thrift.transport.TTransportException;importorg.junit.Test;public classCassandraClient {/** 对cassandra数据库的基础操作
**/
private static String host = "127.0.0.1";private static int port = 9160;private static String keyspace = "cocoon";//暴露client供外界批量插入数据
public static Cassandra.Client client = null;private static ThreadLocal ttrans = new ThreadLocal();//打开数据库连接
public staticTTransport openConnection(){
TTransport tTransport=ttrans.get();if( tTransport == null){
tTransport= new TFramedTransport(newTSocket(host, port));
TProtocol tProtocol= newTBinaryProtocol(tTransport);
client= newCassandra.Client(tProtocol);try{
tTransport.open();
client.set_keyspace(keyspace);
ttrans.set(tTransport);
System.out.println(tTransport);
}catch(TTransportException e) {
e.printStackTrace();
}catch(InvalidRequestException e) {
e.printStackTrace();
}catch(TException e) {
e.printStackTrace();
}
}returntTransport;
}//关闭数据库连接
public static voidcloseConnection(){
TTransport tTransport=ttrans.get();
ttrans.set(null);if( tTransport != null &&tTransport.isOpen() )
tTransport.close();
}//测试线程局部变量
@Testpublic void testThreadLocal() throwsException{
TTransport t1=CassandraClient.openConnection();
System.out.println(t1);
TTransport t2=CassandraClient.openConnection();
System.out.println(t2);newThread(){public voidrun(){
TTransport t3=CassandraClient.openConnection();
System.out.println(t3);
CassandraClient.closeConnection();
}
}.start();
Thread.sleep(100);
CassandraClient.closeConnection();
System.out.println(t1.isOpen());
}/*插入一个supercolumn(for super column family)
* @param columns column的map集合
**/
public void insertSuperColumn(String superColumnFamily, String key, String superName, Mapcolumns)throwsUnsupportedEncodingException, InvalidRequestException,
UnavailableException, TimedOutException, TException{
openConnection();
Map>>map;
map= new HashMap>>();
List list = new ArrayList();
SuperColumn superColumn= newSuperColumn();
superColumn.setName(CassandraClient.toByteBuffer(superName));
Set columnNames =columns.keySet();for(String columnName: columnNames) {
Column c= newColumn();
c.setName(CassandraClient.toByteBuffer(columnName));
c.setValue(CassandraClient.toByteBuffer(columns.get(columnName)));
c.setTimestamp(System.currentTimeMillis());
superColumn.addToColumns(c);
}
ColumnOrSuperColumn cos= newColumnOrSuperColumn();
cos.super_column=superColumn;
Mutation mutation= newMutation();
mutation.column_or_supercolumn=cos;
list.add(mutation);
Map> supers = new HashMap>();
supers.put(superColumnFamily, list);
map.put(toByteBuffer(key), supers);
client.batch_mutate(map, ConsistencyLevel.ONE);
closeConnection();
}//插入一个column(for standard column family)
public void insertColumn(String columnFamily, String key, String columnName, String columnValue) throwsUnsupportedEncodingException{
openConnection();
ColumnParent parent= newColumnParent(columnFamily);if( client != null) {
Column column= newColumn( toByteBuffer(columnName) );
column.setValue(toByteBuffer(columnValue));long timestamp =System.currentTimeMillis();
column.setTimestamp(timestamp);try{
client.insert(toByteBuffer(key), parent, column, ConsistencyLevel.ONE);
}catch(InvalidRequestException e) {
e.printStackTrace();
}catch(UnavailableException e) {
e.printStackTrace();
}catch(TimedOutException e) {
e.printStackTrace();
}catch(TException e) {
e.printStackTrace();
}
closeConnection();
}
}/*获取key对应的column集合(for standard column family)
* @return column的map集合
**/
public HashMap getColumns(String columnFamily, String key) throwsInvalidRequestException, UnavailableException, TimedOutException, UnsupportedEncodingException, TException{
openConnection();
ColumnParent parent= newColumnParent(columnFamily);if( client != null) {
SlicePredicate predicate= newSlicePredicate();//定义查询的columnName范围(begin~end),正反方向,数目(columnName在数据库中是排好序的,所以有正方向查询,反方向查询)
SliceRange sliceRange = new SliceRange(toByteBuffer(""), toByteBuffer(""), false,100);
predicate.setSlice_range(sliceRange);
List results =client.get_slice(toByteBuffer(key), parent, predicate, ConsistencyLevel.ONE);if( results == null)return null;
HashMap map = new HashMap();for(ColumnOrSuperColumn result : results)
{
Column column=result.column;
map.put(byteBufferToString(column.name), byteBufferToString(column.value));//System.out.println(byteBufferToString(column.name) + " : " + byteBufferToString(column.value) );
}
closeConnection();returnmap;
}return null;
}/*获取key对应的superColumn集合(for super column family)
* @return superColumn的map集合
**/
public HashMap> getSuperColumns(String superColumnFamily, String key) throwsInvalidRequestException, UnavailableException, TimedOutException, UnsupportedEncodingException, TException{
openConnection();
ColumnParent parent= newColumnParent(superColumnFamily);if( client != null) {
SlicePredicate predicate= newSlicePredicate();//定义查询的superColumn.key范围,正反方向,数目(superColumn.key在数据库中是排好序的,所以有正方向查询,反方向查询)
SliceRange sliceRange = new SliceRange(toByteBuffer(""), toByteBuffer(""), false, 100);
predicate.setSlice_range(sliceRange);
List results =client.get_slice(toByteBuffer(key), parent, predicate, ConsistencyLevel.ONE);if( results == null)return null;
HashMap> supers = new HashMap>();for(ColumnOrSuperColumn result : results)
{
SuperColumn superColumn=result.super_column;
Map columns = new HashMap();for( Column column : superColumn.columns){
columns.put(byteBufferToString(column.name), byteBufferToString(column.value));
}
supers.put(byteBufferToString(superColumn.name), columns);//System.out.println(byteBufferToString(column.name) + " : " + byteBufferToString(column.value) );
}
closeConnection();returnsupers;
}return null;
}/*获取key,columnName对应的columnValue(for standard column family)
* @return String
**/
publicString getColumnValue(String columnFamily, String key, String columnName){try{
ColumnPath path= newColumnPath(columnFamily);
path.setColumn(toByteBuffer(columnName));try{
openConnection();
String result= newString( client.get(toByteBuffer(key), path, ConsistencyLevel.ONE).getColumn().getValue() );
closeConnection();returnresult;
}catch(InvalidRequestException e) {
e.printStackTrace();
}catch(NotFoundException e) {
e.printStackTrace();
}catch(UnavailableException e) {
e.printStackTrace();
}catch(TimedOutException e) {
e.printStackTrace();
}catch(TException e) {
e.printStackTrace();
}
}catch(UnsupportedEncodingException e) {
e.printStackTrace();
}return null;
}//String类型转化为ByteBuffer(UTF-8编码)
public static ByteBuffer toByteBuffer(String value) throwsUnsupportedEncodingException {return ByteBuffer.wrap(value.getBytes("UTF-8"));
}//ByteBuffer类型转化为String(UTF-8编码)
public static String byteBufferToString(ByteBuffer byteBuffer) throwsUnsupportedEncodingException{byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);return new String(bytes,"UTF-8");
}
}