你好我有一个非常简单的骆驼路线(尝试消费和记录Kafka主题消息):
@Override
public void configure() throws Exception {
super.configure();
from(kafka())
.routeId(INPUT_BROKER_ROUTE_ID)
.log("KAFKA BODY ::: ${body}");
}
private static String kafka() {
return new StringBuilder("kafka:")
.append("{{kafka.topic}}")
.append("?brokers=")
.append("{{kafka.brokers}}")
.append("&groupId=")
.append("{{kafka.group.id}}")
.append("&clientId=")
.append("myClientId")
.append("&autoOffsetReset=")
.append("earliest")
.append("&saslMechanism=")
.append("PLAIN")
.append("&securityProtocol=")
.append(SASL_PLAINTEXT)
.append("&saslJaasConfig=")
.append(saslJaasConfig())
.toString();
}
private static String saslJaasConfig() {
return new StringBuilder(format("%s", PlainLoginModule.class.getCanonicalName()))
.append(' ').append("required").append(' ')
.append("username=")
.append('"').append("{{kafka.username}}").append('"')
.append(' ')
.append("password=")
.append('"').append("{{kafka.password}}").append('"')
.append(';')
.toString();
}
如果我在此路由上运行单元测试,我会看到主题消息,但当我部署此捆绑包时,我没有从主题中收到任何消息,也没有错误,Karaf只是向我显示此日志:
11:37:27.064 INFO [Blueprint Event Dispatcher: 1] Successfully logged in.
11:37:27.067 WARN [Blueprint Event Dispatcher: 1] The configuration 'specific.avro.reader' was supplied but isn't a known config.
11:37:27.068 INFO [Blueprint Event Dispatcher: 1] Kafka version: 2.6.0
11:37:27.069 INFO [Blueprint Event Dispatcher: 1] Kafka commitId: 62abe01bee039651
11:37:27.069 INFO [Blueprint Event Dispatcher: 1] Kafka startTimeMs: 1624354647068
11:37:27.070 INFO [Blueprint Event Dispatcher: 1] Route: input-broker-route started and consuming from: kafka://test
11:37:27.070 INFO [Camel (integ-norauto-v1-input) thread #75 - KafkaConsumer[test]] Subscribing test-Thread 0 to topic test
11:37:27.071 INFO [Blueprint Event Dispatcher: 1] Total 1 routes, of which 1 are started
11:37:27.073 INFO [Camel (integ-norauto-v1-input) thread #75 - KafkaConsumer[test]] [Consumer clientId=myClientId, groupId=myGroupId] Subscribed to topic(s): test
11:37:27.073 INFO [Blueprint Event Dispatcher: 1] Apache Camel 3.7.4 (integ-norauto-v1-input) started in 38ms
我做了一些测试,我发现即使我提供了错误的用户名或密码,也不会发生任何事情(只有在Karaf上运行时),看起来它没有连接。
我终于发现LTS 3 . 7 . 4版本的camel bug是通过使用最新的camel版本(3.10.0)解决的...
因为我是新的Kafka,所以我能够从文件中读取记录,并通过生产者将消息发送到Kafka主题,但不能通过消费者消费相同的主题。 注意:您可以从任何文本文件中读取数据,我使用的是Kafka2.11-0.9。0.0版本 这是我的密码: 下面是输出:
我们正在使用Kafka流将数据写入接收器主题。我正在运行一个avro消费者命令行来检查接收器主题中是否有数据: bin/kafka-avro控制台-消费者-主题sink.output.topic-从开始-新消费者-引导-服务器 当我在kafka streams应用程序运行时同时运行消费者时,我会看到数据,但如果我停止消费者并在几分钟后再次运行,我不会看到任何数据。几乎没有可能: 1) 这是因为Ka
Debezium连接器的Kafka connect事件是Avro编码的。 在传递给Kafka connect standalone服务的connect-standalone.properties中提到了以下内容。 使用这些属性配置Kafka使用者代码: 在消费者实现中,下面是读取键和值组件的代码。我使用REST从模式注册表中获取键和值的模式。 解析密钥工作正常。在解析消息的值部分时,我得到了Arr
我创建了一个制作人和一个消费者,使用“Kafka节点”包发送和消费Kafka主题的消息。生产者和消费者通过API进行调用。POST方法用于向主题发送消息,而GET方法用于在消费者处从主题获取消息。 当我向KAFKA发送消息后调用consumer API时,之前的所有消息都会在。 我只需要最后一条消息,这是生产者发送的。 如何在不使用任何数组或任何东西的情况下获取最后一条消息。 有没有办法删除这个话
我正在使用@StreamListener(Spring-Cloud-Stream)来使用来自主题(输入通道)的消息,进行一些处理并保存到一些缓存或数据库中。 我的要求是,如果DB在处理消费的消息时停止,我想暂停主消费者(输入通道),并从另一个主题(输入56通道)开始消费,一旦它消费了来自输入56通道的所有消息(没有很多),我想再次恢复主消费者(输入通道)。 这能做到吗??
向源生成特殊的clear-message,这将导致聚合的消息变为空 将消息直接写入具有空数据的中间主题 另一种方式,也许kafka-streams已经有一个API调用了? 加分问题:如果我知道我不想让消息坐在中间话题中的时间超过6个月,我可以指示kafka-streams创建6M留存的中间话题,还是在我运行App之前我自己手动创建话题?