我正在尝试使用SSL实现Kafka消费者,在应用程序中提供所有必需的配置。
当我启动Spring启动Kafka消费者应用程序;消费者试图连接localhost:9092而不是提到Kafka经纪人。
KafkaConfig.java公司
@Bean
public ConsumerFactory<String, AvroRecord> consumerFactory() throws IOException {
return new DefaultKafkaConsumerFactory<>(kafkaProps());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, AvroRecord>>
kafkaListenerContainerFactory() throws IOException {
ConcurrentKafkaListenerContainerFactory<String, AvroRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
kafkaProps()
正在加载所有与SSL和引导服务器相关的属性。值,我可以在调试模式中看到它。
应用程序.yml
kafka:
properties:
basic:
auth:
credentials:
source: USER_INFO
user: username
pass: password
enableAutoRegister: true
max_count: 100
max_delay: 5000
schema:
registry:
url: https://schema-registry:8081
ssl:
truststore:
location: <<location>>
password: pwd
keystore:
location: <<location>>
password: pwd
key:
password: pwd
ssl:
enabled: true
protocols: TLSv1.2,TLSv1.1,TLSv1
truststore:
type: JKS
location: <<location>>
password: pwd
keystore:
type: JKS
location: <<location>>
password: pwd
key:
password: pwd
security:
protocol: SSL
consumer:
bootstrap-servers: broker1:9092,broker2:9092
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
max-message-size: 10241024
在应用程序日志中,我得到了下面的日志
18:46:33.964 [main] INFO o.a.k.c.a.AdminClientConfig requestId=
transactionKey= | AdminClientConfig values:
bootstrap.servers = [localhost:9092]
client.dns.lookup = use_all_dns_ips
client.id =
connections.max.idle.ms = 300000
15:53:54.608 [kafka-admin-client-thread | adminclient-1] WARN o.a.k.c.NetworkClient requestId=
transactionKey= | [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
我找不到它,为什么它连接到本地主机,而不是提到的代理
>
正确的属性是spring.kafka.bootstrap-服务器
。您似乎完全缺少Spring
前缀。此外,schema.registry.url
、ssl.truststore
等都被认为是Kafka客户端的单数属性键(字符串),因此(据我所知)不应“嵌套”在YAML对象中
您只尝试在使用者上设置bootstrap属性,而不是AdminClient
在与引导服务器字符串建立初始连接后,您的客户端将始终连接到代理的播发侦听器
,因此,如果这是 localhost:9092
,将解释 AdminClient 日志输出
我在mac上运行Kafka和Flink作为docker容器。 我已经实现了Flink作业,它应该消耗来自Kafka主题的消息。我运行一个向主题发送消息的python生产者。 工作开始时没有问题,但没有收到任何消息。我相信这些消息被发送到了正确的主题,因为我有一个能够使用消息的python消费者。 flink作业(java): Flink作业日志: 生产者作业(python):(在主机上运行-不是d
消费者使用Spring的JavaConfig类如下: Kafka主题侦听器使用@KafkaListener注释,如下所示: 我的pom包括依赖项: 现在当我打包到war并部署到tomcat时,它不会显示任何错误,即使在调试模式下也不会显示任何错误,只是部署war什么都没有。 请帮助我了解是否缺少触发kafkalistner的某些配置。 谢谢Gary我添加了上下文。xml和web。xml,但我得到了
我正在阅读Kafka常见问题解答,他们如下所示。 •每个分区不会被每个使用者组中的多个使用者线程/进程使用。这允许每个进程以单线程方式使用,以保证分区内的使用者的顺序(如果我们将有序消息分割成一个分区并将它们传递给多个使用者,即使这些消息是按顺序存储的,它们有时也会被无序地处理)。 有没有可能,
我想从Kafka的主题消费事件后,他们到达的时间。我希望使用事件的时间在消息的有效负载中。在Kafka那里有可能实现那样的事情吗?它的缺点是什么? 实际示例:一条消息M在12:10产生,在12:11到达我的Kafka主题,我希望消费者在12:41(到达后30分钟)轮询它
是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?