我正在学习如何将kafka与apache camel集成,我遇到了以下错误。我在c:/inbox文件夹中创建了一个文件,并希望使用kafka Consumer使用其中的文本。我使用的是apache Camel3.1.0版本。下面是我的代码
package com.javainuse;
import org.apache.camel.builder.RouteBuilder;
public class SimpleRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
String topicName = "test123";
String kafkaServer = "kafka:localhost:9092";
String zooKeeperHost = "zookeeperHost=localhost&zookeeperPort=2181";
String serializerClass = "serializerClass=kafka.serializer.StringEncoder";
String toKafka = "kafka:localhost:9092;kafka:test123?brokers=localhost:9092;zookeeperHost=localhost;zookeeperPort=2181;groupId=group1";
from("file:C:/inbox?noop=true").to(toKafka);
}
}
下面是我得到的错误
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
org.apache.camel.FailedToStartRouteException: Failed to start route route1 because of Route(route1)[From[file:C:/inbox?noop=true] -> [To[kafka:loc...
at org.apache.camel.impl.engine.BaseRouteService.warmUp(BaseRouteService.java:133)
at org.apache.camel.impl.engine.AbstractCamelContext.doWarmUpRoutes(AbstractCamelContext.java:3246)
at org.apache.camel.impl.engine.AbstractCamelContext.safelyStartRouteServices(AbstractCamelContext.java:3139)
at org.apache.camel.impl.engine.AbstractCamelContext.doStartOrResumeRoutes(AbstractCamelContext.java:2925)
at org.apache.camel.impl.engine.AbstractCamelContext.doStartCamel(AbstractCamelContext.java:2725)
at org.apache.camel.impl.engine.AbstractCamelContext.lambda$doStart$2(AbstractCamelContext.java:2527)
at org.apache.camel.impl.engine.AbstractCamelContext.doWithDefinedClassLoader(AbstractCamelContext.java:2544)
at org.apache.camel.impl.engine.AbstractCamelContext.doStart(AbstractCamelContext.java:2525)
at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
at org.apache.camel.impl.engine.AbstractCamelContext.start(AbstractCamelContext.java:2421)
at com.javainuse.MainApp.main(MainApp.java:12)
Caused by: org.apache.camel.RuntimeCamelException: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.camel.RuntimeCamelException.wrapRuntimeCamelException(RuntimeCamelException.java:52)
at org.apache.camel.support.ChildServiceSupport.start(ChildServiceSupport.java:67)
at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:87)
at org.apache.camel.processor.channel.DefaultChannel.doStart(DefaultChannel.java:144)
at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:73)
at org.apache.camel.processor.Pipeline.doStart(Pipeline.java:154)
at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
at org.apache.camel.support.processor.DelegateAsyncProcessor.doStart(DelegateAsyncProcessor.java:78)
at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
at org.apache.camel.impl.engine.BaseRouteService.startChildService(BaseRouteService.java:339)
at org.apache.camel.impl.engine.BaseRouteService.doWarmUp(BaseRouteService.java:189)
at org.apache.camel.impl.engine.BaseRouteService.warmUp(BaseRouteService.java:131)
... 10 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
at org.apache.camel.component.kafka.KafkaProducer.doStart(KafkaProducer.java:119)
at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
at org.apache.camel.impl.engine.AbstractCamelContext.internalAddService(AbstractCamelContext.java:1455)
at org.apache.camel.impl.engine.AbstractCamelContext.addService(AbstractCamelContext.java:1391)
at org.apache.camel.processor.SendProcessor.doStart(SendProcessor.java:240)
at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:87)
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler.doStart(RedeliveryErrorHandler.java:1454)
at org.apache.camel.support.ChildServiceSupport.start(ChildServiceSupport.java:60)
... 25 more
Caused by: org.apache.kafka.common.config.ConfigException: Invalid url in bootstrap.servers: localhost:9092;zookeeperHost=localhost;zookeeperPort=2181;groupId=group1
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:58)
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:407)
... 37 more
Process finished with exit code 0```
错误stacktrace表示您的Kafka使用者URI无效(在stacktrace的底部)。而且确实是。
正确的形式是kafka:[topicname]?[options]
(检查Camel-Kafka文档)
因此,当我查看您的URI时,可能应该是
kafka:test123?brokers=localhost:9092&groupId=group1
您的URI存在以下问题:
Kafka:[topicname]
what is invalidKafka:[topicname]
之一是Kafka:[brokers]
,删除它;
)代替&
来分隔选项<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
我正在尝试向异步路由发送消息,但它不起作用。我刚刚在github上创建了一个项目来模拟这个问题
我已经在kafka上工作了相当长的六个月,我对用户延迟和存储到主题分区中的数据有一些疑问。 问题1:最初,当我开始阅读Kafka并了解如何使用Kafka的功能时,我被教导说,一个只有一部分和一个复制因子的主题会创造奇迹。经过相当长的六个月的工作,将我的项目迁移到live之后,使用我的主题消息的消费者开始给我一个延迟。我阅读了许多关于消费者延迟的堆栈溢出答案,得出结论,如果我增加某个主题的分区和复制
我正在使用apache camel cxf开发一个Web服务(肥皂),我遇到了这个错误。 Java . lang . illegalargumentexception:Part { http://blue print . camel . ngt . TN/}返回的类型应为[ltn . ngt . camel . blue print . WB _ subscriptions;,而不是org . A
我有一个restendpoint示例。org,返回表单的json响应 我的路线是这样的 我读过关于轮询消费者的内容,但找不到如何继续轮询endpoint的示例,直到它返回“success”响应。 是否应该使用轮询消费者?如果是这样的话,可以举一个与我的案例相关的例子。用于轮询restendpoint的任何其他资源都非常有用。
为了实现Kafka消费者对消息的一次处理,我一次提交一条消息,如下所示 上面的代码将消息的处理异步委托给下面的另一个类。 但是,这仍然不能保证只发送一次,因为如果处理失败,它可能仍会提交其他消息,并且以前的消息将永远不会被处理和提交,我在这里的选项是什么?
我试图在聚合器完成后获得一个回复,但是我得到一个异常,我没有指定任何聚合器子项,但是当我指定一个。to()endpoint我没有收到聚合结果。。。这可能吗? 控制器: 聚合器:
我已经创建了要将它们连接在一起的kstream。两个流的输出如下所示: 流1: 流2: 我想创建这两个Stream的连接流(内连接),所以我创建了以下KStream: 在这个KStream中,我只使用了一个连接,我正在更改输出消息的格式,仅此而已。 通过一个例子,我将解释我想做什么: 在窗口内发布以下消息: 流1 流2 加入流 出版的是什么 我想出版什么 总之,我只想在窗口中发布最新消息,而不是所
我想测试以下骆驼路线。我在网上找到的所有例子都有以文件开头的路由,在我的例子中,我有一个Springbean方法,每隔几分钟就会被调用一次,最后消息被转换并移动到jms以及审计目录。 我对这条路线的写测试毫无头绪。目前我在测试用例中所拥有的是