当前位置: 首页 > 知识库问答 >
问题:

Apache Camel:轮询消费者

宋丰
2023-03-14

我是Apache Camel的新手,我试图在一个简单的项目中理解和使用轮询消费者EIP,但我感到有点迷茫…谁能帮我解释一下,甚至用一个小的工作例子。

如有任何帮助,我们将不胜感激

共有1个答案

张嘉熙
2023-03-14

对于大多数用例,只需在route...中的from()子句中定义使用者,即可创建使用者。

from("activemq:inbox").to(new MyProcessor());

但是,您也可以编写自己的POJO轮询使用者逻辑,以便对使用者逻辑进行更多的控制...只需使用计时器定期启动它,并调用receive()方法,如下所示:

from("timer://foo?period=5000").bean(MyBean, "processQueue");

public void processQueue() {
    while (true) {
        // receive the message from the queue, wait at most 3 sec
        String msg = consumer.receiveBody("activemq:inbox", 3000, String.class);
        if (msg == null) {
            // no more messages in queue
            break;
        }

        // do something with body
    }
}

更多详情请参阅文档:http://camel.apache.org/polling-consumer

 类似资料:
  • 我有一个restendpoint示例。org,返回表单的json响应 我的路线是这样的 我读过关于轮询消费者的内容,但找不到如何继续轮询endpoint的示例,直到它返回“success”响应。 是否应该使用轮询消费者?如果是这样的话,可以举一个与我的案例相关的例子。用于轮询restendpoint的任何其他资源都非常有用。

  • 我正在努力寻找一个成熟的例子,说明如何在Spring Boot框架中使用ApacheCamel进行轮询。 我已经看过了:https://camel.apache.org/manual/latest/polling-consumer.html除此之外:https://camel.apache.org/components/latest/timer-component.html但是代码示例不够广泛,我

  • 我有一个Kafka主题,并为其附加了1个消费者(主题只有1个分区)。现在对于超时,我使用默认值(心跳:3秒,会话超时:10秒,轮询超时:5分钟)。 根据留档,轮询超时定义消费者必须在其他代理将该消费者从消费者组中删除之前处理消息。现在假设,消费者只需1分钟即可完成处理消息。 现在我有两个问题

  • 我的Kafka消费者的代码是这样的 我已经意识到,这种消费者设置无法读取所有信息。我无法再现这一点,因为这是一个间歇性的问题。 当我使用 将最后 100 条消息与此消费者进行比较时,我发现我的消费者间歇性地随机错过了几条消息。我的消费者有什么问题? 在python中使用消息的方法太多了。应该有一种最好只有一种明显的方法来做到这一点。

  • 我有一个Kafka消费者,其中消息通过HTTP POST调用传递给另一个应用程序。我还使用手动提交偏移量 确认。确认(); 有一些HTTP返回错误代码,我们忽略错误并提交偏移量,还有一些错误代码我们不提交偏移量。问题是,kafka使用者仅在我重新启动使用者时才轮询未提交的消息。如果分区中有未提交的消息,是否还有轮询消息的地方?

  • 场景: session.timeout.ms:10秒 最大poll.interval.ms:5分钟 处理“Poll()”中使用的消息需要6分钟 C(6秒):发送另一个心跳 D(5分钟):发送了另一个心跳(5*60%3=0),但达到了“max.poll.interval.ms”(5分钟 在点“D”,消费者: 继续每3秒发送一次心跳? 如果是“1”点,则 a.在完成6分钟的处理后,考虑到由于在点“d”