当前位置: 首页 > 知识库问答 >
问题:

Kafka Streams中的metadataForKey方法为连接到同一组的多个应用程序实例提供了错误信息

梁丘波鸿
2023-03-14

我正在实现一种机制,通过从存储区本地请求元数据信息或请求远程Kafka Streams实例来提供元数据信息。

我正在使用Scala和kafka-流-版本2.4.1的scala库

我将试着给你一个简单的例子来说明我在做什么

  1. 我正在运行Kafka集群,它创建了一个带有2个分区的测试主题
  2. 我还运行了1个Kafka Streams实例,正如我前面提到的,它实现了从存储中请求本地或远程元数据的机制,并保存所有分区信息,直到没有任何其他实例连接到同一个组
  3. 我把一些记录放入测试主题
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "1", "01"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "2", "02"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "3", "03"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 0, "4", "04"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "5", "15"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "6", "16"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "7", "17"));
kafkaProducer.send(new ProducerRecord<>("test-topic", 1, "8", "18"));

下一步,为了确保Kafka流以我在第4步中描述的方式工作,我将运行以下代码。

val it: KeyValueIterator[String, String] = streams.store(TEST_REQUEST_STORE, QueryableStoreTypes.keyValueStore[String, String]).all()

while (it.hasNext) {
  val keyValue: KeyValue[String, String] = it.next();
  println(keyValue)
}

很酷,我明白我的期望。我在localhost上运行的Kafka流在重新平衡和分区重新分配后保留分区1。

KeyValue(5, 15)
KeyValue(6, 16)
KeyValue(7, 17)
KeyValue(8, 18)

但当我运行这段代码时,我看到了完全出乎意料的结果。

println(streams.metadataForKey(TEST_REQUEST_STORE, "1", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "2", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "3", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "4", stringSerializer))
println()
println(streams.metadataForKey(TEST_REQUEST_STORE, "5", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "6", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "7", stringSerializer))
println(streams.metadataForKey(TEST_REQUEST_STORE, "8", stringSerializer))
println()
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}

StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}

据我所知,我应该期待这样的事情

StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}
StreamsMetadata{hostInfo=HostInfo{host='myhostname', port=18898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-0]}

StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}
StreamsMetadata{hostInfo=HostInfo{host='localhost', port=8898}, stateStoreNames=[test-request-store], topicPartitions=[test-topic-1]}

共有2个答案

楮阳
2023-03-14

我试过用java给你提建议。我将记录从java推送到主题中,并使用相同的序列化程序从java本身查询流,但我面临的问题与上面列出的类似。

你还有其他建议吗?请

厉令
2023-03-14

首先我想注意的是,metadataForKey提供了一些信息,即使您在存储中没有任何记录,而且似乎密钥所在的位置的信息是随机的。

我意识到问题完全与版本无关,而是与序列化程序有关。

我使用StringSerializer将记录从java推入主题,并从scala尝试使用Serdes. String.serializer()查询元数据,它给我的结果与现实不符。

我创建了另一种将数据推入主题的方式,使用scala和GenericPrimitivesDestring键序列化程序,以及metadataForKey的相同序列化程序,这一次出乎意料地工作正常。

因此,对于那些将使用metadataForKey的人,请注意键序列化器,以便该方法正常工作

 类似资料:
  • 问题内容: 嗨,在一个测试套件上,我看来我有2个同一个提供程序的活动实例,一个实例用于实现,另一个实例用于实际实现。 我的结论基于以下事实:在测试中,我尝试用jest.fn调用替换方法,但仍然在我正在测试的服务上,该方法仍指向原始实现。 更奇怪的是,我能够模拟执行完全相同过程的另一个服务,好像取决于这些服务的注入方式(它们来自容器图中的位置)是否起作用。 我将尝试分享一些片段,但是,当然,只有一个

  • 我试图从Websphere Application Server连接到外部JMS提供者。我们可以不使用Websphere MQ从Websphere Application Server连接到远程JMS提供者吗?

  • 问题内容: 如果我没有以编程方式设置任何内容,而只是调用并使用hibernate.properties(如下所示),那么一切都将很好。尝试以编程方式提供用户名,密码和连接URL时,我会收到奇怪的异常提示,提示是hbm文件。我想念什么? 按照@Kshitij的建议。进行混合模式。 *现在 *的hibernate.properties 是 编码 例外 我现在得到这个异常,我的hbm文件中的每个条目都有

  • 问题内容: 我写了一些代码来测试我的Hibernate配置,但是遇到了这样的错误消息: 我的文件结构如下: 这不是一个Web应用程序,它只是一个普通的Java项目。hibernate.cfg.xml如下所示: 主要功能中的代码如下: 根据错误消息,错误发生在 我是Hibernate的新用户,并且已经多次检查了我的配置文件。有谁可以帮助您找出问题所在?谢谢! 编辑:Hibernate版本是4.3.5

  • 我编写了一些代码来测试Hibernate的配置,但是我遇到了这样的错误消息: main函数中的代码如下所示: 根据错误消息,错误发生在 我是Hibernate的一个新用户,我已经检查了我的配置文件很多次了。有人能帮我找出问题所在吗?谢了!