当前位置: 首页 > 知识库问答 >
问题:

无法使用Karaf 4.3.1和Camel 3.7.4从Kafka主题中消费

丁钟展
2023-03-14

你好我有一个非常简单的骆驼路线(尝试消费和记录Kafka主题消息):

@Override
public void configure() throws Exception {
    super.configure();

    from(kafka())
            .routeId(INPUT_BROKER_ROUTE_ID)

            .log("KAFKA BODY ::: ${body}");
}

private static String kafka() {
    return new StringBuilder("kafka:")
            .append("{{kafka.topic}}")
            .append("?brokers=")
            .append("{{kafka.brokers}}")
            .append("&groupId=")
            .append("{{kafka.group.id}}")
            .append("&clientId=")
            .append("myClientId")
            .append("&autoOffsetReset=")
            .append("earliest")
            .append("&saslMechanism=")
            .append("PLAIN")
            .append("&securityProtocol=")
            .append(SASL_PLAINTEXT)
            .append("&saslJaasConfig=")
            .append(saslJaasConfig())
            .toString();
}

private static String saslJaasConfig() {
    return new StringBuilder(format("%s", PlainLoginModule.class.getCanonicalName()))
            .append(' ').append("required").append(' ')
            .append("username=")
            .append('"').append("{{kafka.username}}").append('"')
            .append(' ')
            .append("password=")
            .append('"').append("{{kafka.password}}").append('"')
            .append(';')
            .toString();
}

如果我在此路由上运行单元测试,我会看到主题消息,但当我部署此捆绑包时,我没有从主题中收到任何消息,也没有错误,Karaf只是向我显示此日志:

11:37:27.064 INFO [Blueprint Event Dispatcher: 1] Successfully logged in.
11:37:27.067 WARN [Blueprint Event Dispatcher: 1] The configuration 'specific.avro.reader' was supplied but isn't a known config.
11:37:27.068 INFO [Blueprint Event Dispatcher: 1] Kafka version: 2.6.0
11:37:27.069 INFO [Blueprint Event Dispatcher: 1] Kafka commitId: 62abe01bee039651
11:37:27.069 INFO [Blueprint Event Dispatcher: 1] Kafka startTimeMs: 1624354647068
11:37:27.070 INFO [Blueprint Event Dispatcher: 1] Route: input-broker-route started and consuming from: kafka://test
11:37:27.070 INFO [Camel (integ-norauto-v1-input) thread #75 - KafkaConsumer[test]] Subscribing test-Thread 0 to topic test
11:37:27.071 INFO [Blueprint Event Dispatcher: 1] Total 1 routes, of which 1 are started
11:37:27.073 INFO [Camel (integ-norauto-v1-input) thread #75 - KafkaConsumer[test]] [Consumer clientId=myClientId, groupId=myGroupId] Subscribed to topic(s): test
11:37:27.073 INFO [Blueprint Event Dispatcher: 1] Apache Camel 3.7.4 (integ-norauto-v1-input) started in 38ms

我做了一些测试,我发现即使我提供了错误的用户名或密码,也不会发生任何事情(只有在Karaf上运行时),看起来它没有连接。

共有1个答案

华温书
2023-03-14

我终于发现LTS 3 . 7 . 4版本的camel bug是通过使用最新的camel版本(3.10.0)解决的...

 类似资料:
  • 因为我是新的Kafka,所以我能够从文件中读取记录,并通过生产者将消息发送到Kafka主题,但不能通过消费者消费相同的主题。 注意:您可以从任何文本文件中读取数据,我使用的是Kafka2.11-0.9。0.0版本 这是我的密码: 下面是输出:

  • 我们正在使用Kafka流将数据写入接收器主题。我正在运行一个avro消费者命令行来检查接收器主题中是否有数据: bin/kafka-avro控制台-消费者-主题sink.output.topic-从开始-新消费者-引导-服务器 当我在kafka streams应用程序运行时同时运行消费者时,我会看到数据,但如果我停止消费者并在几分钟后再次运行,我不会看到任何数据。几乎没有可能: 1) 这是因为Ka

  • Debezium连接器的Kafka connect事件是Avro编码的。 在传递给Kafka connect standalone服务的connect-standalone.properties中提到了以下内容。 使用这些属性配置Kafka使用者代码: 在消费者实现中,下面是读取键和值组件的代码。我使用REST从模式注册表中获取键和值的模式。 解析密钥工作正常。在解析消息的值部分时,我得到了Arr

  • 我创建了一个制作人和一个消费者,使用“Kafka节点”包发送和消费Kafka主题的消息。生产者和消费者通过API进行调用。POST方法用于向主题发送消息,而GET方法用于在消费者处从主题获取消息。 当我向KAFKA发送消息后调用consumer API时,之前的所有消息都会在。 我只需要最后一条消息,这是生产者发送的。 如何在不使用任何数组或任何东西的情况下获取最后一条消息。 有没有办法删除这个话

  • 我正在使用@StreamListener(Spring-Cloud-Stream)来使用来自主题(输入通道)的消息,进行一些处理并保存到一些缓存或数据库中。 我的要求是,如果DB在处理消费的消息时停止,我想暂停主消费者(输入通道),并从另一个主题(输入56通道)开始消费,一旦它消费了来自输入56通道的所有消息(没有很多),我想再次恢复主消费者(输入通道)。 这能做到吗??

  • 向源生成特殊的clear-message,这将导致聚合的消息变为空 将消息直接写入具有空数据的中间主题 另一种方式,也许kafka-streams已经有一个API调用了? 加分问题:如果我知道我不想让消息坐在中间话题中的时间超过6个月,我可以指示kafka-streams创建6M留存的中间话题,还是在我运行App之前我自己手动创建话题?