当kafka streams应用程序运行且kafka突然停机时,应用程序进入“等待”模式,发送警告日志的消费者和生产者线程无法连接,当kafka恢复时,一切(理论上)都应该恢复正常。我正在尝试获取有关此情况的警报,但我无法找到捕获该警报的位置并发送日志/度量。我尝试了以下方法:
streams.setUncaughtExceptionHandler
但这只发生在异常情况下,而不是这里productionExceptionHandler
并将default.production.exception.handler
属性更改为我的类,从而扩展了此接口。我知道Kafka有自己的衡量标准,我可以听一听,看看经纪人是否破产。但也可能存在这样的情况:Kafka代理很好,my Kafka streams应用程序无法连接(即身份验证配置错误或vpn/vpc问题)
我能做些什么来发现这些问题并记录/报告它们?
使现代化
如果Kafka不可用,请参阅消费者/生产者日志:
2020-08-24 21:41:32,055 [my-kafka-streams-app-23a462fe-28c6-415a-a08a-b11d3ffffc2e-StreamThread-1] WARN o.apache.kafka.clients.NetworkClient - [] [Consumer clientId=my-kafka-streams-app-23a462fe-28c6-415a-a08a-b11d3ffffc2e-StreamThread-1-consumer, groupId=my-kafka-streams-app] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
2020-08-24 21:41:32,186 [kafka-admin-client-thread | my-kafka-streams-app-23a462fe-28c6-415a-a08a-b11d3ffffc2e-admin] WARN o.apache.kafka.clients.NetworkClient - [] [AdminClient clientId=my-kafka-streams-app-23a462fe-28c6-415a-a08a-b11d3ffffc2e-admin] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2020-08-24 21:41:32,250 [kafka-producer-network-thread | my-kafka-streams-app-23a462fe-28c6-415a-a08a-b11d3ffffc2e-StreamThread-1-producer] WARN o.apache.kafka.clients.NetworkClient - [] [Producer clientId=my-kafka-streams-app-23a462fe-28c6-415a-a08a-b11d3ffffc2e-StreamThread-1-producer] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
这种情况不容易通过编程检测。问题是,客户端并没有真正向Kafka Streams公开其状态,因此Kafka Streams并不真正了解断开连接的情况。KIP建议添加一个断开连接
状态,但这并不容易实现(参见https://cwiki.apache.org/confluence/display/KAFKA/KIP-457:将断开连接状态添加到Kafka流)。
您提到的异常处理程序对这种情况没有帮助,因为没有抛出异常(至少在Kafka Streams代码库中没有)。
您可以尝试监视消费者延迟或一些Kafka流指标(如处理速率)。他们可能会提供足够好的代理。查阅https://docs.confluent.io/current/streams/monitoring.html
我有两个Kafka主题-和。第一个主题包含由唯一Id(称为)键入的recommendations对象。每个产品都有一个用户可以单击的URL。 主题获取通过单击推荐给用户的产品URL生成的消息。它是如此设置的,这些单击消息也由键控。 请注意 > 每个单击对象都会有一个相应的推荐对象。 click对象的时间戳将晚于Recommensions对象。 建议和相应的点击之间的间隔可能是几秒钟到几天(最多7天
我在使用合流Kafka连接图像中的AWS MSK TLSendpoint时遇到了麻烦,因为它超时了创建/阅读主题。当我传递明文endpoint时,工作完全正常。 INFO org.apache.kafka.clients.admin.AdminClientConfig-AdminClientConfig值:
我想知道是否可以在Kafka制作程序中配置2个不同的Kafka集群。 目前我正试图让我的制片人 我正在使用Apache Kafka 2.8和Python 3.7的confluent_kafka==1.8.2包。 生产商代码下方: 当我杀死clusterB时,我得到了以下错误消息。
我想加入一个 kstream:从主题创建,该主题具有JSON值。我使用值中的两个属性来重新键控流。示例值(json的片段)。我创建了自定义pojo类并使用自定义SERDES。 键映射为: 我查看了KStream并打印了键和我使用的属性。看起来都很好。 null 现在,当我执行内部连接并对主题进行窥视或通过/时,我看到键和值不匹配。Join似乎不起作用, 我有完全相同的东西通过ksql工作,但想做我
我想连接两个主题流(左连接),并在连接的流上进行基于窗口的聚合。然而,聚合将某些消息计数两倍,因为在连接期间,根据正确主题中的延迟,某些消息将发出两倍。以下是POC的代码。 它是否可以修复以避免因连接而重复?
我正在导入一个DB,其中包含一些表示多对多和一对多关系的链接表。 1-到目前为止,根据我对Kafka流的理解,我似乎需要为每个链接表提供一个流,以便执行聚合。KTable将不可用,因为记录是按键更新的。但是,聚合的结果可能是Ktable中的一个。 2-然后是外键上的连接问题。似乎唯一的方法是通过GlobalKtable。link-table-topic->link-table-stream->li