当前位置: 首页 > 知识库问答 >
问题:

从Kafka到另一个Kafka的阿帕奇骆驼之路

宿嘉
2023-03-14

我正在使用Apache Camel使用来自kafka主题的消息,然后处理该消息,同时处理如果发生异常,我将该消息重定向到另一个kafka主题,并以单独的路由处理该消息。所以我有一个如下所示的路线。

from ("kafka1").process("someProcessor").end();
onException(Throwable.class).process(exchange->{exchange.getIn().setBody("Message with error details")}).to("kafka2");

上面的代码实际上是以相同的kafka(kafka1)发送错误消息。

我通过在onexception进程中设置Exchange.getin().setheader(kafkaconstants.topic,“kafka2”)解决了这一问题。这是预期的行为吗?它为什么忽略kafka2而使用kafka1?

> 使用的

  • 版本的camel-2.14.0

    消费者:

    from("kafka:" + ("kafka.broker") + "?topic="
                + ("offer.kafka.topic")
                + "&zookeeperHost=" + ("kafka.zookeeper.host")
                + "&zookeeperPort=" + ("kafka.zookeeper.port")
                + "&groupId=" + ("offer.kafka.group.id")
                + "&consumerStreams=" + ("kafka.streams")
                + "&autoCommitIntervalMs=" + ("product.kafka.consumer.auto.commit.intervals")
                + "&zookeeperConnectionTimeoutMs=" + ("zookeeper.connection.timeout")
                + "&rebalanceMaxRetries=" + ("kafka.rebalance.max.retries")
                + "&rebalanceBackoffMs=" + ("kafka.rebalance.backoffs.ms")
                + "&zookeeperSessionTimeoutMs=" + ("zookeeper.session.timeout")
                + "&autoOffsetReset=" + ("kafka.auto.offset.reset")
                + "&fetchMessageMaxBytes=" + ("kafka.fetch.message.max.bytes")
                + "&socketReceiveBufferBytes=" + ("receive.buffer.bytes"))
                .routeId("offerEventRoute").to("direct:offerEventRoute");
    

    制作人:

    to("kafka:" + ("error.kafka.broker") + "?topic="
                            + ("error.kafka.topic")
                            + "&zookeeperHost=" + ("error.kafka.zookeeper.host")
                            + "&zookeeperPort=" + ("error.kafka.zookeeper.port")
                            + "&groupId=" + ("error.kafka.group.id")
                            + "&zookeeperConnectionTimeoutMs=" + ("error.zookeeper.connection.timeout")
                            + "&rebalanceMaxRetries=" + ("rebalance.max.retries")
                            + "&rebalanceBackoffMs=" + ("rebalance.backoffs.ms")
                            + "&zookeeperSessionTimeoutMs=" + ("zookeeper.session.timeout")
                            + "&autoOffsetReset=" + ("auto.offset.reset")
                            + "&messageSendMaxRetries=" + ("error.max.retries")
                            + "&serializerClass=kafka.serializer.StringEncoder"
            );
    
  • 共有1个答案

    艾望
    2023-03-14

    您需要在您的producer kafkaendpoint中将bridgeEndPoint设置为true。否则,它将在exchange标头中查找主题名称,并将其用作生产者的主题名称。

    默认情况下为false。

     类似资料:
    • 我们需要的是直接的API来设置和使用集群消息队列。我们最初的计划是使用Camel在集群JMS或ActiveMQ队列上进行消费/生产。Kafka如何使这项任务变得更容易?在任何一种情况下,应用程序本身都将在WebLogic服务器上运行。 消息传递将是点对点类型,其中有多个相同服务的实例在运行,但根据负载平衡策略,只有一个实例应该处理消息并发出结果。消息队列也是群集的,因此服务实例或队列实例的失败都不

    • 我试图使用Apache Camel Quartz2实现一个调度器,它每分钟执行一次路由,并按预期执行一些任务。我使用spring DSL实现与apache camel相关联的路由,如下所示: 根据日志,它不会记录为路由记录的消息,例如Direct:DomainsWithFTPUsers等等。请指导如何实现同样的目标。

    • 遵循官方文件(https://camel.apache.org/manual/component-dsl.html#_using_component_dsl)我创建了以下代码: 但是中的告诉我: 并且中的特性不建议导入相应的库。 有人能给我指出正确的方向吗? 我必须理解的概念才能做到这一点吗?

    • 考虑到apache Camel,我有一个问题:是否可以通过代码来创建全局拦截器,例如AOP?拦截器应该跳过endpoint还是模仿endpoint? 提前致谢

    • 下面是我试图实现的场景: 谢谢你的帮助。此外,这里是已经工作的FTP部分。

    • 我对骆驼生产商有很好的了解,但我不能对各种骆驼消费者保持清醒的头脑。特别是事件驱动消费者和轮询消费者,camel如何知道为这些消费者调用回调? 消费者的一般流量是多少?