我正在编写一个小程序,将推特公共流中的推特信息放入HBase数据库。该程序使用两个线程,一个用于收集推文,另一个用于处理推文。第一个线程使用twitter4j StatusListener获取推文,并将其放入容量为100的ArrayBlockingQueue中。第二个线程从队列中获取状态,过滤所需的数据并将其移动到数据库中。处理要比收集状态花费更多的时间。
制作人如下所示:
public void onStatus(Status status) {
try {
this.queue.put(status);
} catch(Exception ex) {
ex.printStackTrace();
}
}
使用者使用take并调用函数来处理新状态:
public void run() {
try {
while(true) {
// Get new status to process
this.status = this.queue.take();
this.analyse();
}
} catch(Exception ex) {
ex.printStackTrace();
}
}
在main函数中,创建并启动了两个线程:
ArrayBlockingQueue<Status> queue_public = new ArrayBlockingQueue<Status>(100);
Thread ta_public = new Thread(new TweetAnalyser(cl.getOptionValue("config"), queue_public));
Thread st_public = new Thread(new RunPublicStream(cl.getOptionValue("config"), queue_public));
ta_public.start();
st_public.start();
程序运行了一段时间没有出现任何问题,但随后突然停止。此时,队列已满,消费者似乎无法从队列中获取新状态。我尝试了几种不同的生产者/消费者模式,但没有成功。未引发异常。
我不知道我们在寻找失败。我希望有人能给我一个提示或解决方案。
如果使用阻塞队列,请仔细检查代码中的阻塞命令(对于ArrayBlockingQueue,请使用put和take),如果使用多个列表,请检查拼写错误。
JMS队列有2个消费者,同步和异步Java应用程序进程等待响应。1)同步应用程序发送请求,并根据JMS相关ID等待响应60秒。2)异步线程将不断侦听同一队列。
我是新手RabbitMQ java客户端。我的问题:我创建了10个consumer并将它们添加到队列中。每个消费者使用10秒来处理我的流程。我检查了Rabbit的页面,我看到我的队列有4000条消息没有发送到客户端。我检查了日志客户端,结果是为一个消费者获取一条消息,10秒后,我为一个消费者获取一条消息,依此类推…我想要得到10个消息为所有消费者在当时(10个消息-10消费者过程在当时)请帮助我,
问题内容: 我有一个带有HornetQ的JBoss-6服务器和一个队列: 有一个不同的消费者(在不同的机器)连接到这个队列中,但只有一个 单一的 消费者是活动的时间。如果我关闭此使用者,则消息将立即由其他使用者之一处理。 由于我的消息需要一些耗时的处理,因此我希望多个使用者同时处理其唯一消息。 我记得在早期版本的JBoss中也有类似的情况,该设置可以正常工作。在Jboss-6中,消息传递系统运行良
问题内容: 我想创建某种线程应用程序。但是我不确定在两者之间实现队列的最佳方法是什么。 因此,我提出了两个想法(这两个想法可能都是完全错误的)。我想知道哪种更好,如果它们都烂了,那么实现队列的最佳方法是什么。我关心的主要是这些示例中队列的实现。我正在扩展一个内部类的Queue类,它是线程安全的。下面是两个示例,每个示例有4个类。 主班 消费阶层 生产者类别 队列类 要么 主班 消费阶层 生产者类别
问题内容: 想要使用高级消费者API实现延迟的消费者 大意: 按键生成消息(每个消息包含创建时间戳记),以确保每个分区按生成时间对消息进行排序。 auto.commit.enable = false(将在每个消息处理之后显式提交) 消费一条消息 检查消息时间戳,并检查是否经过了足够的时间 处理消息(此操作将永不失败) 提交1个偏移 有关此实现的一些担忧: 提交每个偏移量可能会使ZK变慢 Consu
我试图理解当以azure工作者角色托管消息队列使用者时的最佳实践。我有许多不同类型消息使用者,它们订阅不同的azure服务总线订阅(或者队列,如果您愿意这样称呼的话)。我想知道是应该在一个Worker角色中为每个使用者实例化多个线程,还是应该为每个使用者部署多个Worker角色。