http://ifeve.com/zookeeper-sharedcount/
import org.apache.curator.framework.recipes.queue.QueueSerializer;
import java.nio.charset.Charset;
public class StringQueueSerializer implements QueueSerializer<String> {
private static final Charset charset = Charset.forName("utf-8");
//as producer
@Override
public byte[] serialize(String item) {
return item.getBytes(charset);
}
//as consumer
@Override
public String deserialize(byte[] bytes) {
return new String(bytes, charset);
}
}
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.framework.state.ConnectionState;
import static org.apache.curator.framework.state.ConnectionState.RECONNECTED;
public class DistributedQueueConsumer implements QueueConsumer<String> {
@Override
public void consumeMessage(String message) throws Exception {
System.out.println("Consumer:" + message);
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
switch (newState) {
case RECONNECTED: //当链接重建之后
try {
System.out.println(RECONNECTED);
} catch (Exception e) {
}
break;
default:
System.out.println(newState.toString());
}
}
}
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.framework.recipes.queue.QueueBuilder;
import org.apache.curator.framework.recipes.queue.QueueConsumer;
import org.apache.curator.utils.CloseableUtils;
public class DistributedQueueProducer {
private static CuratorFramework client;
private static String queuePath;
public static void main(String[] args) {
QueueConsumer<String> consumer = new DistributedQueueConsumer();
DistributedQueue<String> queue = QueueBuilder.builder(client,
consumer,
new StringQueueSerializer(),
queuePath)
.lockPath("queue-lock")//消费担保
//.maxItems(1024);// 有界队列,最大队列深度,如果深度达到此值,将阻塞"生产者"创建新的节点.
.buildQueue();
try {
queue.start();
queue.put("test");
Thread.sleep((long) (3 * Math.random()));
} catch (Exception e) {
} finally {
CloseableUtils.closeQuietly(queue);
CloseableUtils.closeQuietly(client);
}
}
}
Recipse还提供了其他API(
http://ifeve.com/zookeeper%EF%BC%8Dcurator/):
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.framework.recipes.shared.SharedCountListener;
import org.apache.curator.framework.recipes.shared.SharedCountReader;
import org.apache.curator.framework.state.ConnectionState;
public class SharedCountTest {
private static CuratorFramework client;
private static String countPath;
public static void main(String[] args) throws Exception {
final SharedCount baseCount = new SharedCount(client, countPath, 0);
baseCount.addListener(new SharedCountListener() {
@Override
public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
System.out.println("count changed:" + newCount);
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
switch (newState) {
case RECONNECTED: //当链接重建之后,需要手动fresh
try {
Integer current = baseCount.getCount();
//reflush,无论更新成败,都会获取最新的值
baseCount.trySetCount(baseCount.getVersionedValue(), current);
} catch (Exception e) {
}
break;
default:
System.out.println(newState.toString());
}
}
});
//test,任意的SharedCount,只要使用相同的path,都可以得到计数值
final SharedCount count = new SharedCount(client, countPath, 0);
count.start();
count.trySetCount(count.getVersionedValue(), count.getCount() + 5);
//trySetCount尝试设置计数器,setCount是强制更新计数器的值
count.close();
baseCount.start();
//counter.close();//取消watcher
}
}
在"计数器"中,还提供了DistributedAtomicInteger,DistributedAtomicLong两个分布式自增计数器。