当前位置: 首页 > 知识库问答 >
问题:

Kafka Streams对未订阅主题的警告

任飞鸣
2023-03-14

在我的应用程序中(它正在使用Topic1、Topic2和Topic3),我还不断看到与其他主题相关的警告。

14:02:33.910 478517 [Application-468d913c-bca0-4ad9-baec-5b51ffa597d2-StreamThread-4] WARN  o.a.kafka.clients.NetworkClient - [Consumer clientId=Application-468d913c-bca0-4ad9-baec-5b51ffa597d2-StreamThread-4-consumer, groupId=Application] 1389 partitions have leader brokers without a matching listener, including [Topic4, Topic5, Topic6, Topic7, ..., Topic8-Link-Store-changelog-5 ]

该消息通过以下Kafka代码打印:https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/networkclient.java

@Override
        public void handleSuccessfulResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
            // If any partition has leader with missing listeners, log up to ten of these partitions
            // for diagnosing broker configuration issues.
            // This could be a transient issue if listeners were added dynamically to brokers.
            List<TopicPartition> missingListenerPartitions = response.topicMetadata().stream().flatMap(topicMetadata ->
                topicMetadata.partitionMetadata().stream()
                    .filter(partitionMetadata -> partitionMetadata.error() == Errors.LISTENER_NOT_FOUND)
                    .map(partitionMetadata -> new TopicPartition(topicMetadata.topic(), partitionMetadata.partition())))
                .collect(Collectors.toList());
            if (!missingListenerPartitions.isEmpty()) {
                int count = missingListenerPartitions.size();
                log.warn("{} partitions have leader brokers without a matching listener, including {}",
                        count, missingListenerPartitions.subList(0, Math.min(10, count)));
            }

这是意料之中的吗?为什么这些警告消息会发布在我的应用程序中?

共有1个答案

东门新立
2023-03-14

如果将监听器动态添加到代理中,这可能是一个暂时的问题

在任何情况下,这都表明在代理上定义的Listeners属性存在问题。

 类似资料:
  • 客户对主题的订阅(即订阅者)的寿命是多少? 我之所以这么担心,是因为我认为服务总线的一个特点--持久的信息。因此,我认为在连接不稳定的情况下,可以保证提供持久的消息。 那么,当一个应用程序(多个应用程序中的一个)失去与服务总线的连接一天,然后第二天重新启动应用程序并实例化一个新的订阅客户端实例时,会发生什么呢?当其他应用程序由于自己的订阅已经处理了这些相同的消息时,应用程序是否会继续接收等待传递的

  • https://github.com/azure/azure-service-bus/tree/master/samples/dotnet/gettingstart/microsoft.azure.servicebus/topicsubscriptionwithruleoperationssample 现在我想添加一个筛选器/规则,这样只有通过筛选器中定义的特定条件的消息才应该给订阅。 例如,下面

  • 我正在试验消息驱动Beans,以便从外部ActiveMQ实例接收主题订阅消息。 我的测试首先从队列订阅开始,它工作得很好。 然后我想尝试主题订阅,但我无法让它工作。 这就是我所拥有的: 会议记录。xml 这是MDB: 我不知道为什么,但从日志中我可以看到,TomEE创建了一个队列,而不是一个主题: 另一个证明是,当我添加持续时间配置时,服务器不会启动: 然后服务器抱怨这不适合配置类型javax.j

  • 目前,我已经开始使用ActiveMQ处理JMS主题。我已经通过JAVA代码(如下所述)创建了发布者和持久订阅者,并且在订阅者端也收到了消息。 Publisher.Java 订阅者.java 我对以下主题有一些疑问, 如何检查有多少订阅者使用 Java JMS 在主题中主动查找消息? 如何从主题中获取活动和持久订阅者列表? 我们是否可以删除主题中发布的消息? 在这些情况下帮助我。 提前致谢。

  • 我试图创建基于@JmsListener注释的发布-订阅示例:https://github.com/lkrnac/book-eiws-code-samples/tree/master/05-jms/0515-publish-subscribe 相关代码片段: 问题是,要获得这种行为: 但每个消息都应该由两个侦听器根据主题的定义来使用。我错过了什么?

  • 我有一个将消息发布到AWS SNS主题字符串的NodeJS应用程序和一个AWS SQS订阅。在SQS控制台上,我可以看到发布的消息。但是,我不清楚SQS队列的访问策略。 问题 当将消息传递到SQS队列时,作为订阅的结果,哪个用户是有效的?与发表该主题的人相同? 只有在使用时,才能使消息流入队列。那么,我应该如何定义一个限制性策略,使消息只能作为订阅的结果写入队列? 创建具有权限的队列的AWS SQ