当前位置: 首页 > 知识库问答 >
问题:

使用基于计时的PollingConsumer到直接endpoint

南门洋
2023-03-14

从功能上讲,我希望在从 JMS (WMQ) endpoint使用之前检查 URL 是否处于活动状态。
如果无法访问 URL 或服务器错误,则我不想从队列中选取。所以我想继续尝试(无限次重试)通过轮询消费者的 URL。因此,一旦可用,我就可以从JMS中获取。

我有一个使用直接endpoint设置的RouteBuilder,它被配置为运行将ping服务的处理器。

所以:

public class PingRoute extends RouteBuilder {
        @Override
        public void configureCamel() {
            from("direct:pingRoute").routeId(PingRoute.class.getSimpleName())
                .process(new PingProcessor(url))
                .to("log://PingRoute?showAll=true");
        }
    }

在另一条路线上,我设置了我的计时器:

    @Override
public void configureCamel() {
       from(timerEndpoint).beanRef(PollingConsumerBean.class.getSimpleName(), "checkPingRoute");
   ...
    }

使用PollingConsumerBean,我试图通过消费者接收身体:

public void checkPingRoute(){
    // loop to check the consumer.  Check we can carry on with the pick up from the JMS queue.
    while(true){
        Boolean pingAvailable = consumer.receiveBody("direct:pingRoute", Boolean.class);
   ...
    }

我将路由添加到上下文,并使用生产者发送:

context.addRoutes(new PingRoute());
context.start();
producer.sendBody(TimerPollingRoute.TIMER_POLLING_ROUTE_ENDPOINT, "a body");

我得到以下非法参数异常

Cannot add a 2nd consumer to the same endpoint. Endpoint Endpoint[direct://pingRoute] only allows one consumer.

有没有办法将直接路由设置为轮询消费者?

共有3个答案

岳昊空
2023-03-14

我很难弄清楚你到底想做什么,但是在我看来,你想在一段时间内使用来自endpoint的数据。对此,最好的模式是轮询消费者:http://camel.apache.org/polling-consumer.html

您当前收到的错误是因为您有两个使用者都试图从“direct://pingRoute”中读取数据。如果这是有意的,您可以将direct更改为seda://pingRoute,这样它就是您的数据所在的内存队列。

翟俊
2023-03-14

根据OP对其用例的澄清,他们有几个问题需要解决:

    < li >当且仅当对URL的ping是肯定的时,使用JMS队列中的消息。 < li >如果URL没有响应,JMS消息应该不会从队列中消失,必须进行重试,直到URL再次有响应,在这种情况下,消息将最终被使用。 < Li > OP没有指定重试次数是有限的还是无限的。

基于这个问题场景,我建议重新设计他们的解决方案,利用骆驼中的ActiveMQ重试、代理端重新交付和JMS事务:

  1. 如果 URL ping 失败(通过事务回滚),则将消息返回到队列。
  2. 确保消息不会丢失(通过使用 JMS 持久性和代理端重新传递,AMQ 将持久安排重试周期)。
  3. 能够为每个消息指定复杂的重试周期,例如指数退避、最大重试次数等。
  4. 如果重试周期已用尽而没有正结果,则可以选择将消息发送到死信队列,以便可以计划其他一些(可能是手动)操作。

现在,在实现方面:

from("activemq:queue:abc?transacted=true")          // (1)
    .to("http4://host.endpoint.com/foo?method=GET") // (2) (3)
    .process(new HandleSuccess());                  // (4)

评论:

  1. 请注意事务标志
  2. 如果 HTTP 调用失败,HTTP4 终结点将引发异常。
  3. 由于没有配置异常处理程序,Camel会将异常传播到消费方endpoint(activemq),这将回滚事务。
  4. 如果调用成功,流将继续,交换正文现在将包含 HTTP 服务器返回的有效负载,您可以按照任何您希望的方式处理它。这里我使用的是处理器。

接下来,重要的是在ActiveMQ中配置重新交付策略,并启用代理端重新交付。您可以在activemq中执行此操作。xml配置文件:

<plugins>
  <redeliveryPlugin fallbackToDeadLetter="true" sendToDlqIfMaxRetriesExceeded="true">
    <redeliveryPolicyMap>
      <redeliveryPolicyMap>
        <redeliveryPolicyEntries>
          <redeliveryPolicy queue="my.queue" 
                            initialRedeliveryDelay="30000" 
                            maximumRedeliveries="17" 
                            maximumRedeliveryDelay="259200000" 
                            redeliveryDelay="30000" 
                            useExponentialBackOff="true"
                            backOffMultiplier="2" />
        </redeliveryPolicyEntries>
      </redeliveryPolicyMap>
    </redeliveryPolicyMap>
  </redeliveryPlugin>
</plugins>

并确保在顶层启用了调度程序支持

<broker xmlns="http://activemq.apache.org/schema/core" 
        brokerName="mybroker" 
        schedulerSupport="true">
    ...
</broker>

我希望那有帮助。

编辑1: OP使用IBM WebSphere MQ作为代理,我错过了这一点。您可以使用JMS QueueBrowser来查看消息,并在实际使用消息之前尝试它们对应的URL,但是不可能有选择地使用单个消息

因此,我坚持认为您应该探索JMS事务,但不是让代理重新传递消息,您可以在TX主体本身中启动对URL的ping循环。关于Camel,您可以实现如下:

from("jms:queue:myqueue?transacted=true")
    .bean(new UrlPinger());

UrlPinger.java:

public class UrlPinger {

    @EndpointInject
    private ProducerTemplate template;

    private Pattern pattern = Pattern.compile("^(http(?:s)?)\\:");

    @Handler
    public void pingUrl(@Body String url, CamelContext context) throws InterruptedException {
        // Replace http(s): with http(s)4: to use the Camel HTTP4 endpoint.
        Matcher m = pattern.matcher(url);
        if (m.matches()) {
            url = m.replaceFirst(m.group(1) + "4:");
        }

        // Try forever until the status code is 200.
        while (getStatusCode(url, context) != 200) {
            Thread.sleep(5000);
        }
    }

    private int getStatusCode(String url, CamelContext context) {
        Exchange response = template.request(url + "?method=GET&throwExceptionOnFailure=false", new Processor() {
            @Override public void process(Exchange exchange) throws Exception {
                // No body since this is a GET request.
                exchange.getIn().getBody(null);
            }
        });

        return response.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE, Integer.class);
    }

}

注意事项:

    < li >请注意< code > throwExceptionOnFailure = false 选项。不会引发异常,因此循环将一直执行,直到条件为真。 < li >在bean内部,我一直循环,直到HTTP状态为200。当然,你的逻辑会不一样。 < li >在尝试和尝试之间,我睡了5000毫秒。 < li >我假设ping的URL在传入JMS消息的正文中。我将前导的< code>http(s):替换为< code>http(s)4:,以便使用Camel HTTP4endpoint。 < li >在TX内部执行ping操作可以保证只有当ping条件为真时(在本例中为HTTP status == 200)才会使用该消息。 < li >您可能希望引入一个终止条件(您不想永远尝试下去)。也许引入一些后退,以不压倒对方。 < li >如果Camel或代理在重试周期内关闭,消息将自动回滚。 < li >考虑到JMS事务是与< code >会话绑定的,因此如果要启动许多并发使用者(< code > concurrent consumers JMSendpoint选项),您需要为每个线程设置< code > CACHE level name = CACHE _ NONE 以使用不同的JMS 会话
景建业
2023-03-14

不幸的是,商业逻辑并不十分清晰。据我所知,您需要等待服务的响应。IMHO您必须使用Content Enricher EIPhttp://camel.apache.org/content-enricher.html.<code>pollEnrich</code>是计时器路由所需的。

<代码>。pollEnrich(“direct:waitForResponce”,-1)或。pollEnrich(“seda:waitForResponce”,-1)

public class PingRoute extends RouteBuilder {
        @Override
        public void configureCamel() {
             from("direct:pingRoute").routeId(PingRoute.class.getSimpleName())
                .process(new PingProcessor(url))
             .choice().when(body())                       
                  .to("log://PingRoute?showAll=true")
                  .to("direct:waitForResponce") 
                .otherwise()
                  .to("direct:pingRoute")
                .end(); 
        }
};

定时器:

    @Override
    public void configureCamel() {                           
      from(timerEndpoint)
      .inOnly("direct:pingRoute")
      .pollEnrich("direct:waitForResponce", -1)
       ...
    }
 类似资料:
  • 我是Flink Streaming API的新手,我想完成以下简单(IMO)任务。我有两个流,我想使用基于计数的窗口加入它们。到目前为止,我拥有的代码如下: 我的代码可以正常工作,但不会产生任何结果。实际上,从未调用方法(通过在调试模式下添加断点进行验证)。我认为,前面的主要原因是我的数据没有时间属性。因此,窗口化(通过实现)没有正确完成。因此,我的问题是如何表示我希望根据计数窗口进行加入。例如,

  • 基类既可能是派生类的直接基类,也可能是派生类的间接基类。在声明派生类时,派生类的首部要显式地列出直接基类。间接基类不是显式地列在派生类的首部,而是沿着类的多个层次向上继承。

  • 问题内容: 关于此问题,有什么方法可以将[文件从ASP.NET应用程序直接上传到Amazon S3并具有进度条? -—编辑---- 两天后,仍然没有直接的运气。发现了一件看起来很有前途但又不是免费的东西:http : //www.flajaxian.com/ 使用Flash通过进度条直接上传到S3。 问题答案: 我也在寻找解决方案。也许这会有所帮助, 来自AWS Dev Commnity, 但在许

  • 问题内容: 有没有一种方法可以使用客户端(而不是Node.js)JavaScript直接连接到Redis? 我已经为一些项目成功使用了Node.js + PHP + Redis + Socket.io(用于客户端)。但是,我确实认为这可以进一步简化为类似PHP + Redis + Browser javascript的东西- 取出Node.js服务器,这是我不愿意使用的另一台服务器。对于简单的事情

  • 我想用基于历史事件的流计算Flink中基于窗口的平均值(或我定义的任何其他函数),因此流必须是事件时间(而不是基于处理时间): 我已经了解了如何在摄入时添加时间戳: 但是当我进行计算(应用函数)时,当我只是以与没有EventTime时相同的方式进行计算时,它就不起作用了。我读过一些关于我必须设置的水印的东西: 有没有人举一个简单的Scala例子? 尊敬的安德烈亚斯

  • 我正在使用Spring配置文件(xml配置)进行Spring自动布线。我想根据一个条件注射豆子。让我详述一下。 有两个类“”和“”,它们实现了接口。在配置文件中为这两个类配置了bean。 > 我有另一个类,其中有一个类型为的实例变量。 我想根据SenderUser.getType()的值将注入到SenderUser的bean中。这意味着首先应该设置sernderuser.type,然后基于它的值(