我最近被介绍给LMAX Disruptor,并决定试一试。多亏了开发人员,安装速度很快,没有任何麻烦。但如果有人能帮我,我想我遇到了一个问题。
问题:有人告诉我,当制作人发布活动时,它应该阻止,直到消费者有机会在包装之前取回它。我在消费者方面有一个序列障碍,我可以确认,如果生产者没有发布数据,消费者的waitFor呼叫将被阻止。但是,生产者似乎并没有受到任何监管,只会在环形缓冲区中封装并覆盖未处理的数据。
我有一个producer作为一个可运行的对象,在单独的线程上运行。
public class Producer implements Runnable {
private final RingBuffer<Event> ringbuffer;
public Producer(RingBuffer<Event> rb) {
ringbuffer = rb;
}
public void run() {
long next = 0L;
while(true) {
try {
next = ringbuffer.next();
Event e = ringbuffer.get(next);
... do stuff...
e.set(... stuff...);
}
finally {
ringbuffer.publish(next);
}
}
}
}
我有一个消费者在主线程上运行。
public class Consumer {
private final ExecutorService exec;
private final Disruptor<Event> disruptor;
private final RingBuffer<Event> ringbuffer;
private final SequenceBarrier seqbar;
private long seq = 0L;
public Consumer() {
exec = Executors.newCachedThreadPool();
disruptor = new Disruptor<>(Event.EVENT_FACTORY, 1024, Executors.defaultThreadFactory());
ringbuffer = disruptor.start();
seqbar = ringbuffer.newBarrier();
Producer producer = new Producer(ringbuffer);
exec.submit(producer);
}
public Data getData() {
seqbar.waitFor(seq);
Event e = ringbuffer.get(seq);
seq++;
return e.get();
}
}
最后,我像这样运行代码:
public class DisruptorTest {
public static void main(String[] args){
Consumer c = new Consumer();
while (true) {
c.getData();
... Do stuff ...
}
}
您需要向ringBuffer添加一个门控序列(com.lmax.disruptor.Sequence
),该序列必须根据您的消费者的位置进行更新。
您可以使用EventHandler接口和提供的内置序列的BatchEventProcessor(com.lmax.disruptor.BatchEventProcessor.BatchEventProcessor)来实现事件处理
这是一个完全可行的例子
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.dsl.Disruptor;
public class Main {
static class Event {
int id;
}
static class Producer implements Runnable {
private final RingBuffer<Event> ringbuffer;
public Producer(RingBuffer<Event> rb) {
ringbuffer = rb;
}
@Override
public void run() {
long next = 0L;
int id = 0;
while (true) {
try {
next = ringbuffer.next();
Event e = ringbuffer.get(next);
e.id = id++;
} finally {
ringbuffer.publish(next);
}
}
}
}
static class Consumer {
private final ExecutorService exec;
private final Disruptor<Event> disruptor;
private final RingBuffer<Event> ringbuffer;
private final SequenceBarrier seqbar;
private BatchEventProcessor<Event> processor;
public Consumer() {
exec = Executors.newCachedThreadPool();
disruptor = new Disruptor<>(() -> new Event(), 1024, Executors.defaultThreadFactory());
ringbuffer = disruptor.start();
seqbar = ringbuffer.newBarrier();
processor = new BatchEventProcessor<Main.Event>(
ringbuffer, seqbar, new Handler());
ringbuffer.addGatingSequences(processor.getSequence());
Producer producer = new Producer(ringbuffer);
exec.submit(producer);
}
}
static class Handler implements EventHandler<Event> {
@Override
public void onEvent(Event event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("Handling event " + event.id);
}
}
public static void main(String[] args) throws Exception {
Consumer c = new Consumer();
while (true) {
c.processor.run();
}
}
}
问题内容: 我正在使用导航和。当我要运行这个程序比我得到这个错误。 我也在使用最低版本的SDK和构建工具… 问题答案: Android以前可以在上运行。 有一个错误,允许类覆盖父级的package-private方法。 因此,当他们切换到时,他们修复了此问题,因此它不再覆盖父级的方法。现在,当检测到这种情况时,它将记录警告,以确保您知道行为的变化。 这似乎有些类(,),有这样的情况,所以通知你。
我正在使用一个Kafka产品和一个SpringKafka消费者。我正在使用Json序列化器和反序列化器。每当我试图从主题中读取消费者中的消息时,我会得到以下错误: 我没有在生产者和消费者中配置任何关于头的内容。我错过了什么?
我通读了RxJS文档,并希望确保我理解了< code > subscriber . unsubscribe()和< code > subscriber . complete()之间的区别。 假设我有一个有两个订阅者的可观察对象,订阅者1和订阅者2。如果订阅者1对其订阅调用取消订阅,它将不再接收来自可观察对象的通知,但订阅者2将继续接收它们。 <代码>的文档。complete(): 观察者回调,用于
拥有发布者和N个消费者,如果消费者使用,那么他们将错过订阅主题之前发布到主题的所有消息...众所周知,使用的消费者不会重播订阅主题之前存在的消息... 所以我需要: null 我想使用者必须检查现有消息的主题,如果有消息就使用它们,然后启动使用。对我来说这是最好的方法...
我对kafka和kafka-python相当陌生。安装kafka-python后,我从这里尝试了一个简单的消费者代码实现-http://kafka-python.readthedocs.io/en/master/usage.html 我一直在kafka的bin目录中编写消费者代码,并尝试从那里运行python代码。但是,我遇到以下错误: 回溯(最近一次调用):文件 “KafkaConsumer.p