我正在使用Apache Camel使用来自kafka主题的消息,然后处理该消息,同时处理如果发生异常,我将该消息重定向到另一个kafka主题,并以单独的路由处理该消息。所以我有一个如下所示的路线。
from ("kafka1").process("someProcessor").end();
onException(Throwable.class).process(exchange->{exchange.getIn().setBody("Message with error details")}).to("kafka2");
上面的代码实际上是以相同的kafka(kafka1)发送错误消息。
我通过在onexception
进程中设置Exchange.getin().setheader(kafkaconstants.topic,“kafka2”)
解决了这一问题。这是预期的行为吗?它为什么忽略kafka2而使用kafka1?
> 使用的
版本的camel-2.14.0
消费者:
from("kafka:" + ("kafka.broker") + "?topic="
+ ("offer.kafka.topic")
+ "&zookeeperHost=" + ("kafka.zookeeper.host")
+ "&zookeeperPort=" + ("kafka.zookeeper.port")
+ "&groupId=" + ("offer.kafka.group.id")
+ "&consumerStreams=" + ("kafka.streams")
+ "&autoCommitIntervalMs=" + ("product.kafka.consumer.auto.commit.intervals")
+ "&zookeeperConnectionTimeoutMs=" + ("zookeeper.connection.timeout")
+ "&rebalanceMaxRetries=" + ("kafka.rebalance.max.retries")
+ "&rebalanceBackoffMs=" + ("kafka.rebalance.backoffs.ms")
+ "&zookeeperSessionTimeoutMs=" + ("zookeeper.session.timeout")
+ "&autoOffsetReset=" + ("kafka.auto.offset.reset")
+ "&fetchMessageMaxBytes=" + ("kafka.fetch.message.max.bytes")
+ "&socketReceiveBufferBytes=" + ("receive.buffer.bytes"))
.routeId("offerEventRoute").to("direct:offerEventRoute");
制作人:
to("kafka:" + ("error.kafka.broker") + "?topic="
+ ("error.kafka.topic")
+ "&zookeeperHost=" + ("error.kafka.zookeeper.host")
+ "&zookeeperPort=" + ("error.kafka.zookeeper.port")
+ "&groupId=" + ("error.kafka.group.id")
+ "&zookeeperConnectionTimeoutMs=" + ("error.zookeeper.connection.timeout")
+ "&rebalanceMaxRetries=" + ("rebalance.max.retries")
+ "&rebalanceBackoffMs=" + ("rebalance.backoffs.ms")
+ "&zookeeperSessionTimeoutMs=" + ("zookeeper.session.timeout")
+ "&autoOffsetReset=" + ("auto.offset.reset")
+ "&messageSendMaxRetries=" + ("error.max.retries")
+ "&serializerClass=kafka.serializer.StringEncoder"
);
您需要在您的producer kafkaendpoint中将bridgeEndPoint设置为true。否则,它将在exchange标头中查找主题名称,并将其用作生产者的主题名称。
默认情况下为false。
我们需要的是直接的API来设置和使用集群消息队列。我们最初的计划是使用Camel在集群JMS或ActiveMQ队列上进行消费/生产。Kafka如何使这项任务变得更容易?在任何一种情况下,应用程序本身都将在WebLogic服务器上运行。 消息传递将是点对点类型,其中有多个相同服务的实例在运行,但根据负载平衡策略,只有一个实例应该处理消息并发出结果。消息队列也是群集的,因此服务实例或队列实例的失败都不
我试图使用Apache Camel Quartz2实现一个调度器,它每分钟执行一次路由,并按预期执行一些任务。我使用spring DSL实现与apache camel相关联的路由,如下所示: 根据日志,它不会记录为路由记录的消息,例如Direct:DomainsWithFTPUsers等等。请指导如何实现同样的目标。
遵循官方文件(https://camel.apache.org/manual/component-dsl.html#_using_component_dsl)我创建了以下代码: 但是中的告诉我: 并且中的特性不建议导入相应的库。 有人能给我指出正确的方向吗? 我必须理解的概念才能做到这一点吗?
考虑到apache Camel,我有一个问题:是否可以通过代码来创建全局拦截器,例如AOP?拦截器应该跳过endpoint还是模仿endpoint? 提前致谢
下面是我试图实现的场景: 谢谢你的帮助。此外,这里是已经工作的FTP部分。
我对骆驼生产商有很好的了解,但我不能对各种骆驼消费者保持清醒的头脑。特别是事件驱动消费者和轮询消费者,camel如何知道为这些消费者调用回调? 消费者的一般流量是多少?