当前位置: 首页 > 知识库问答 >
问题:

完全队列上的java blockingqueue使用者块

牟慎之
2023-03-14

我正在编写一个小程序,将推特公共流中的推特信息放入HBase数据库。该程序使用两个线程,一个用于收集推文,另一个用于处理推文。第一个线程使用twitter4j StatusListener获取推文,并将其放入容量为100的ArrayBlockingQueue中。第二个线程从队列中获取状态,过滤所需的数据并将其移动到数据库中。处理要比收集状态花费更多的时间。

制作人如下所示:

public void onStatus(Status status) {
    try {
        this.queue.put(status);
    } catch(Exception ex) {
        ex.printStackTrace();
    }
}

使用者使用take并调用函数来处理新状态:

public void run() {
    try {
        while(true) {
            // Get new status to process
            this.status = this.queue.take();
            this.analyse();
        }
    } catch(Exception ex) {
        ex.printStackTrace();
    }
 }

在main函数中,创建并启动了两个线程:

ArrayBlockingQueue<Status> queue_public = new ArrayBlockingQueue<Status>(100);

Thread ta_public = new Thread(new TweetAnalyser(cl.getOptionValue("config"), queue_public));
Thread st_public = new Thread(new RunPublicStream(cl.getOptionValue("config"), queue_public));

ta_public.start();
st_public.start();

程序运行了一段时间没有出现任何问题,但随后突然停止。此时,队列已满,消费者似乎无法从队列中获取新状态。我尝试了几种不同的生产者/消费者模式,但没有成功。未引发异常。

我不知道我们在寻找失败。我希望有人能给我一个提示或解决方案。

共有1个答案

孙琨
2023-03-14

如果使用阻塞队列,请仔细检查代码中的阻塞命令(对于ArrayBlockingQueue,请使用put和take),如果使用多个列表,请检查拼写错误。

 类似资料:
  • JMS队列有2个消费者,同步和异步Java应用程序进程等待响应。1)同步应用程序发送请求,并根据JMS相关ID等待响应60秒。2)异步线程将不断侦听同一队列。

  • 我是新手RabbitMQ java客户端。我的问题:我创建了10个consumer并将它们添加到队列中。每个消费者使用10秒来处理我的流程。我检查了Rabbit的页面,我看到我的队列有4000条消息没有发送到客户端。我检查了日志客户端,结果是为一个消费者获取一条消息,10秒后,我为一个消费者获取一条消息,依此类推…我想要得到10个消息为所有消费者在当时(10个消息-10消费者过程在当时)请帮助我,

  • 问题内容: 我有一个带有HornetQ的JBoss-6服务器和一个队列: 有一个不同的消费者(在不同的机器)连接到这个队列中,但只有一个 单一的 消费者是活动的时间。如果我关闭此使用者,则消息将立即由其他使用者之一处理。 由于我的消息需要一些耗时的处理,因此我希望多个使用者同时处理其唯一消息。 我记得在早期版本的JBoss中也有类似的情况,该设置可以正常工作。在Jboss-6中,消息传递系统运行良

  • 问题内容: 我想创建某种线程应用程序。但是我不确定在两者之间实现队列的最佳方法是什么。 因此,我提出了两个想法(这两个想法可能都是完全错误的)。我想知道哪种更好,如果它们都烂了,那么实现队列的最佳方法是什么。我关心的主要是这些示例中队列的实现。我正在扩展一个内部类的Queue类,它是线程安全的。下面是两个示例,每个示例有4个类。 主班 消费阶层 生产者类别 队列类 要么 主班 消费阶层 生产者类别

  • 问题内容: 想要使用高级消费者API实现延迟的消费者 大意: 按键生成消息(每个消息包含创建时间戳记),以确保每个分区按生成时间对消息进行排序。 auto.commit.enable = false(将在每个消息处理之后显式提交) 消费一条消息 检查消息时间戳,并检查是否经过了足够的时间 处理消息(此操作将永不失败) 提交1个偏移 有关此实现的一些担忧: 提交每个偏移量可能会使ZK变慢 Consu

  • 我试图理解当以azure工作者角色托管消息队列使用者时的最佳实践。我有许多不同类型消息使用者,它们订阅不同的azure服务总线订阅(或者队列,如果您愿意这样称呼的话)。我想知道是应该在一个Worker角色中为每个使用者实例化多个线程,还是应该为每个使用者部署多个Worker角色。