当前位置: 首页 > 知识库问答 >
问题:

Apache camel-apache kafka集成

燕光熙
2023-03-14

我正在学习如何将kafka与apache camel集成,我遇到了以下错误。我在c:/inbox文件夹中创建了一个文件,并希望使用kafka Consumer使用其中的文本。我使用的是apache Camel3.1.0版本。下面是我的代码

package com.javainuse;

import org.apache.camel.builder.RouteBuilder;

public class SimpleRouteBuilder extends RouteBuilder {

@Override
public void configure() throws Exception {

    String topicName = "test123";
    String kafkaServer = "kafka:localhost:9092";
    String zooKeeperHost = "zookeeperHost=localhost&zookeeperPort=2181";
    String serializerClass = "serializerClass=kafka.serializer.StringEncoder";
    String toKafka = "kafka:localhost:9092;kafka:test123?brokers=localhost:9092;zookeeperHost=localhost;zookeeperPort=2181;groupId=group1";

    from("file:C:/inbox?noop=true").to(toKafka);
}
}

下面是我得到的错误

SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
org.apache.camel.FailedToStartRouteException: Failed to start route route1 because of Route(route1)[From[file:C:/inbox?noop=true] -> [To[kafka:loc...
    at org.apache.camel.impl.engine.BaseRouteService.warmUp(BaseRouteService.java:133)
    at org.apache.camel.impl.engine.AbstractCamelContext.doWarmUpRoutes(AbstractCamelContext.java:3246)
    at org.apache.camel.impl.engine.AbstractCamelContext.safelyStartRouteServices(AbstractCamelContext.java:3139)
    at org.apache.camel.impl.engine.AbstractCamelContext.doStartOrResumeRoutes(AbstractCamelContext.java:2925)
    at org.apache.camel.impl.engine.AbstractCamelContext.doStartCamel(AbstractCamelContext.java:2725)
    at org.apache.camel.impl.engine.AbstractCamelContext.lambda$doStart$2(AbstractCamelContext.java:2527)
    at org.apache.camel.impl.engine.AbstractCamelContext.doWithDefinedClassLoader(AbstractCamelContext.java:2544)
    at org.apache.camel.impl.engine.AbstractCamelContext.doStart(AbstractCamelContext.java:2525)
    at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
    at org.apache.camel.impl.engine.AbstractCamelContext.start(AbstractCamelContext.java:2421)
    at com.javainuse.MainApp.main(MainApp.java:12)
Caused by: org.apache.camel.RuntimeCamelException: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.camel.RuntimeCamelException.wrapRuntimeCamelException(RuntimeCamelException.java:52)
    at org.apache.camel.support.ChildServiceSupport.start(ChildServiceSupport.java:67)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:87)
    at org.apache.camel.processor.channel.DefaultChannel.doStart(DefaultChannel.java:144)
    at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:73)
    at org.apache.camel.processor.Pipeline.doStart(Pipeline.java:154)
    at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
    at org.apache.camel.support.processor.DelegateAsyncProcessor.doStart(DelegateAsyncProcessor.java:78)
    at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
    at org.apache.camel.impl.engine.BaseRouteService.startChildService(BaseRouteService.java:339)
    at org.apache.camel.impl.engine.BaseRouteService.doWarmUp(BaseRouteService.java:189)
    at org.apache.camel.impl.engine.BaseRouteService.warmUp(BaseRouteService.java:131)
    ... 10 more
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
    at org.apache.camel.component.kafka.KafkaProducer.doStart(KafkaProducer.java:119)
    at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
    at org.apache.camel.impl.engine.AbstractCamelContext.internalAddService(AbstractCamelContext.java:1455)
    at org.apache.camel.impl.engine.AbstractCamelContext.addService(AbstractCamelContext.java:1391)
    at org.apache.camel.processor.SendProcessor.doStart(SendProcessor.java:240)
    at org.apache.camel.support.service.ServiceSupport.start(ServiceSupport.java:121)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:70)
    at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:87)
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler.doStart(RedeliveryErrorHandler.java:1454)
    at org.apache.camel.support.ChildServiceSupport.start(ChildServiceSupport.java:60)
    ... 25 more
Caused by: org.apache.kafka.common.config.ConfigException: Invalid url in bootstrap.servers: localhost:9092;zookeeperHost=localhost;zookeeperPort=2181;groupId=group1
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:58)
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:407)
    ... 37 more

Process finished with exit code 0```


共有1个答案

薛兴言
2023-03-14

错误stacktrace表示您的Kafka使用者URI无效(在stacktrace的底部)。而且确实是。

正确的形式是kafka:[topicname]?[options](检查Camel-Kafka文档)

因此,当我查看您的URI时,可能应该是

kafka:test123?brokers=localhost:9092&groupId=group1

您的URI存在以下问题:

  • 它包含两次Kafka:[topicname]what is invalid
  • Kafka:[topicname]之一是Kafka:[brokers],删除它
  • 分号()代替&来分隔选项
  • 旧版本骆驼-Kafka的动物园管理员选项,删除它们
<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
</dependency>
 类似资料:
  • 我正在尝试向异步路由发送消息,但它不起作用。我刚刚在github上创建了一个项目来模拟这个问题

  • 我已经在kafka上工作了相当长的六个月,我对用户延迟和存储到主题分区中的数据有一些疑问。 问题1:最初,当我开始阅读Kafka并了解如何使用Kafka的功能时,我被教导说,一个只有一部分和一个复制因子的主题会创造奇迹。经过相当长的六个月的工作,将我的项目迁移到live之后,使用我的主题消息的消费者开始给我一个延迟。我阅读了许多关于消费者延迟的堆栈溢出答案,得出结论,如果我增加某个主题的分区和复制

  • 我正在使用apache camel cxf开发一个Web服务(肥皂),我遇到了这个错误。 Java . lang . illegalargumentexception:Part { http://blue print . camel . ngt . TN/}返回的类型应为[ltn . ngt . camel . blue print . WB _ subscriptions;,而不是org . A

  • 我有一个restendpoint示例。org,返回表单的json响应 我的路线是这样的 我读过关于轮询消费者的内容,但找不到如何继续轮询endpoint的示例,直到它返回“success”响应。 是否应该使用轮询消费者?如果是这样的话,可以举一个与我的案例相关的例子。用于轮询restendpoint的任何其他资源都非常有用。

  • 为了实现Kafka消费者对消息的一次处理,我一次提交一条消息,如下所示 上面的代码将消息的处理异步委托给下面的另一个类。 但是,这仍然不能保证只发送一次,因为如果处理失败,它可能仍会提交其他消息,并且以前的消息将永远不会被处理和提交,我在这里的选项是什么?

  • 我试图在聚合器完成后获得一个回复,但是我得到一个异常,我没有指定任何聚合器子项,但是当我指定一个。to()endpoint我没有收到聚合结果。。。这可能吗? 控制器: 聚合器:

  • 我已经创建了要将它们连接在一起的kstream。两个流的输出如下所示: 流1: 流2: 我想创建这两个Stream的连接流(内连接),所以我创建了以下KStream: 在这个KStream中,我只使用了一个连接,我正在更改输出消息的格式,仅此而已。 通过一个例子,我将解释我想做什么: 在窗口内发布以下消息: 流1 流2 加入流 出版的是什么 我想出版什么 总之,我只想在窗口中发布最新消息,而不是所

  • 我想测试以下骆驼路线。我在网上找到的所有例子都有以文件开头的路由,在我的例子中,我有一个Springbean方法,每隔几分钟就会被调用一次,最后消息被转换并移动到jms以及审计目录。 我对这条路线的写测试毫无头绪。目前我在测试用例中所拥有的是