package com.xx.cassandra; import java.util.List; import java.util.Map; import org.apache.cassandra.thrift.Cassandra; import org.apache.cassandra.thrift.ColumnOrSuperColumn; import org.apache.cassandra.thrift.ColumnParent; import org.apache.cassandra.thrift.ConsistencyLevel; import org.apache.cassandra.thrift.InvalidRequestException; import org.apache.cassandra.thrift.Mutation; import org.apache.cassandra.thrift.SlicePredicate; import org.apache.cassandra.thrift.TimedOutException; import org.apache.cassandra.thrift.UnavailableException; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; /** * 0) client is sticky to a host, load-balance is implement by * CassandraClientPool <br> * 1) if the node is broken, request redirect to other nodes <br> * 2) if the node is back, request flow back to this node <br> * 3) a retry is added to make sure request not fail because of socket <br> */ public class CassandraClient { private Cassandra.Client client = null; private TTransport tr = null; private String host = null; private int rotateCount = 0; public final static String[] hosts = { "172.16.100.17", "172.16.100.131" }; // public final static String[] hosts = // { "172.16.100.17" }; private int round = 0; private boolean redirect = false; public CassandraClient(String host) throws TTransportException { this.host = host; this.connect(host); } private synchronized void connect(String host) throws TTransportException { tr = new TSocket(host, 9160); TProtocol proto = new TBinaryProtocol(tr); client = new Cassandra.Client(proto); tr.open(); } public synchronized void close() { if (tr != null && tr.isOpen()) tr.close(); } /** * the node is broken , redirect request to other nodes. * * @throws TTransportException */ private synchronized void redirect() throws TTransportException { close(); String ahost = host; for (int i = 0; i < hosts.length; i++) { ahost = hosts[round++ % hosts.length]; if (!ahost.equalsIgnoreCase(host)) break; } connect(ahost); redirect = true; } /** * try route back to the node. * * @throws TTransportException */ private synchronized void reflux() throws TTransportException { connect(host); redirect = false; } public void batch_mutate(String keyspace, Map<String, Map<String, List<Mutation>>> mutation_map, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TException { int retry = 3; for (int i = 0; i < retry; i++) { try { if (redirect && rotateCount++ == 10000) { reflux(); rotateCount = 0; } client.batch_mutate(keyspace, mutation_map, consistency_level); return; } catch (TimedOutException e) { redirect(); } catch (TTransportException ee) { redirect(); } } } public List<ColumnOrSuperColumn> get_slice(String keyspace, String key, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TException { int retry = 3; for (int i = 0; i < retry; i++) { try { if (redirect && rotateCount++ == 10000) { reflux(); rotateCount = 0; } return client.get_slice(keyspace, key, column_parent, predicate, ConsistencyLevel.ONE); } catch (TimedOutException e) { redirect(); } catch (TTransportException ee) { redirect(); } } throw new RuntimeException("timeout for 3 times"); } public Cassandra.Client getClient() { return client; } public String getHost() { return host; } }
package com.xx.cassandra; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.apache.thrift.transport.TTransportException; /** * DNS Round Robin (is simple) or a Cassandra.Client object pool. <br> * request send to each load equally <br> * */ public class CassandraClientPool { Semaphore access = null; CassandraClient[] pool = null; boolean[] used = null; int round = 0; int conn_num = 0; public CassandraClientPool(int conn_num) { this.conn_num = conn_num; init(); } private void init() { String[] host = CassandraClient.hosts; access = new Semaphore(conn_num); pool = new CassandraClient[conn_num]; used = new boolean[conn_num]; for (int i = 0; i < pool.length; i++) { used[i] = true; try { pool[i] = new CassandraClient(host[round++ % host.length]); used[i] = false; } catch (TTransportException e) { e.printStackTrace(); } } } /** * try to get an idle client in 3 seconds, otherwise throw a client busy * exception * * @return * @throws InterruptedException */ public CassandraClient nextClient() throws InterruptedException { int k = round++ % pool.length, j = 0; if (access.tryAcquire(3, TimeUnit.SECONDS)) { // do not synchronized on this method // tryAcquire would not release lock for synchronized synchronized (this) { for (int i = 0; i < pool.length; i++) { j = (k + i) % pool.length; if (!used[j]) { used[j] = true; return pool[j]; } } } } // no idle client in 3 seconds throw new RuntimeException("all client is too busy"); } public void releaseClient(CassandraClient client) { boolean released = false; synchronized (this) { for (int i = 0; i < pool.length; i++) { if (client == pool[i] && used[i]) { used[i] = false; released = true; break; } } } if (released) access.release(); } public void shutdownPool() { if (pool != null) { for (int i = 0; i < pool.length; i++) pool[i].close(); } } }