当前位置: 首页 > 知识库问答 >
问题:

从消费者开始的主题中获取最新值,然后正常继续

满博
2023-03-14

我们有一个Kafka制作人,它以非常高的频率为保留时间=10小时的主题生成键控消息。这些消息是实时更新,使用的键是值已更改的元素的ID。所以这个主题就像一个变更日志,会有很多重复的键。

现在,我们试图实现的是,当Kafka消费品启动时,不管最后的已知状态(新消费品、崩溃、重新启动等),它将以某种方式构造一个包含主题中所有键的最新值的表,然后继续正常侦听新的更新,保持Kafka服务器上的最小负载,并让使用者完成大部分工作。我们尝试了很多方法,但似乎没有一种是最好的。

我们尝试的是:

  1. 生产者向封装在事务中的两个主题发送相同的消息,以确保成功发送
  2. 使用者启动并请求更改日志主题的最新偏移量
  3. 从一开始就使用压缩主题来构造表
  4. 自请求的偏移量之后,继续使用变更日志

欺骗:

  • 即使将日志压缩频率设置为可能的最高值,压缩主题中也很可能存在重复项

使用KSQL,我们要么将KTable重写为主题,以便消费者可以看到它(额外的主题),要么我们需要消费者执行KSQLSELECTusing to KSQL Rest Server并查询表(速度和性能不如Kafka api)。

消费者从一开始就开始和消费话题。这非常有效,但是使用者必须使用10小时的更改日志来构建最后一个值表。

通过以下方式使用KTable:

KTable<Integer, MarketData> tableFromTopic = streamsBuilder.table("topic_name", Consumed.with(Serdes.Integer(), customSerde));
KTable<Integer, MarketData> filteredTable = tableFromTopic.filter((key, value) -> keys.contains(value.getRiskFactorId()));

Kafka Streams将在Kafka服务器上为每个KTable创建一个主题(名为{consumer\u app\u id}-{topic\u name}-STATE-STORE-0000000000-changelog),这将产生大量主题,因为我们拥有大量消费者。

从我们已经尝试过的情况来看,似乎我们需要增加服务器负载或消费者启动时间。难道没有一种“完美”的方式来实现我们的目标吗?

提前谢谢。

共有2个答案

司空健
2023-03-14

消费者从一开始就开始和消费话题。这非常有效,但是使用者必须使用10小时的更改日志来构建最后一个值表。

在您的应用html" target="_blank">程序第一次启动期间,您所说的是正确的。

为了避免每次重新启动时出现这种情况,请将键值数据存储在文件中。

例如,您可能希望使用持久映射(如MapDB)。

因为您给了消费者组。id并且您定期提交偏移量,或者在每个记录存储在映射中之后提交偏移量,下次应用程序重新启动时,它将从该组的上次提交的偏移量中读取偏移量。id

因此,花费大量时间的问题只会在最初(第一次)出现。只要您拥有该文件,就不需要从一开始就使用它。

在这种情况下,如果文件不存在或被删除,只是Kafka消费者中查找并再次构建它。

在某个地方,您需要存储这个键值以便检索,为什么它不能是一个持久存储?

如果您出于任何原因想要使用Kafka流,那么另一种选择(不像上面那样简单)是使用持久支持的存储。

例如,一个持久的全局存储。

streamsBuilder.addGlobalStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(topic), keySerde, valueSerde), topic, Consumed.with(keySerde, valueSerde), this::updateValue);

P. S:在存储偏移量的目录中会有一个名为的文件。检查点。如果主题在中间被删除,你会得到OffsetOutOfRangeExc0019。您可能想要避免这种情况,也许可以使用UncaughtExceptionHandler

有关更多信息,请参阅https://stackoverflow.com/a/57301986/2534090。

最后,

对于这一点,最好将消费者与持久文件一起使用,而不是使用流,因为它提供了简单性。

养慈
2023-03-14

通过使用KTables,Kafka Streams将在Kafka服务器上为每个KTables创建一个主题,这将产生大量的主题,因为我们有大量的消费者。

如果您只是将现有的主题读取到KTable中(通过StreamsBuilder#table()),那么Kafka Streams不会创建额外的主题。KSQL也一样。

如果您能够明确您想要对KTable做什么,这将有所帮助。显然,您正在做的事情确实会导致创建其他主题?

1变更日志主题1紧凑主题:

你为什么想有两个不同的话题?通常情况下,变更日志主题应该始终被压缩。鉴于您的用例描述,我看不出为什么它不应该是:

现在,我们试图实现的是,当Kafka消费者启动时,不管最后已知的状态是什么(新消费者、崩溃、重启等),它都会以某种方式构建一个包含主题中所有键的最新值的表,然后继续监听对于新的更新正常[...]

因此,压缩对您的用例非常有用。它还可以防止您描述的此问题:

消费者从一开始就开始和消费话题。这非常有效,但是使用者必须使用10小时的更改日志来构建最后一个值表。

请注意,要重建最新的表值,Kafka Streams、KSQL和Kafka消费者必须完全读取表的基本主题(从头到尾)。如果该主题没有被压缩,这可能确实需要很长时间,具体取决于数据量、主题保留设置等。

从我们已经尝试过的情况来看,似乎我们需要增加服务器负载或消费者启动时间。难道没有一种“完美”的方式来实现我们的目标吗?

如果不了解更多关于你的用例,特别是KTable填充后你想做什么,我的回答是:

  • 确保“变更日志主题”也已压缩
  • 首先尝试KSQL。如果这不能满足您的需求,请尝试Kafka Streams。如果这不能满足您的需求,请尝试Kafka消费者

例如,如果Kafka使用者应该对“表”数据进行任何有状态处理,我就不会使用它,因为Kafka使用者缺少用于容错有状态处理的内置功能。

 类似资料:
  • 虽然auto.offset.reset的值是最新的,但使用者从属于2天前的消息开始,然后就会赶上最新的消息。 我错过了什么?

  • 生产者发送消息到一个有四个分区的主题。我们有一个消费者在消费来自这个主题的消息。应用程序在工作日一直运行周末例外:它不会在周末期间调用poll方法。 使用者配置:自动提交,自动提交时间为5s(默认)。 应用程序一直运行良好,直到一个星期天,当它重新开始调用poll方法。我们看到有数百万条消息从这个话题中被轮询出来。消费者基本上是轮询来自主题的所有消息。将新的偏移量与它在周末停止之前的偏移量进行比较

  • 我正在开发一个spring boot kafka消费者应用程序。它将有不同的消费者在不同的主题上工作。使用者的所有信息都来自application.yml文件。 我无法将应用程序属性中的主题列表设置到KafKalistener。 在这两种情况下,我都得到以下错误: java.lang.IllegalArgumentException:无法解析占位符 从应用程序属性获取主题并将其设置在KafkaLi

  • 我以前是学习Kafka的传统ActiveMQ用户。我有一个问题。 使用Active MQ,您可以执行以下操作: 将100条消息提交到队列中 我试着在Kafka做同样的事情 如果不启动Consumer,等待它启动,然后运行producer,则此示例不起作用。 谁能告诉我如何修改我的示例程序,以在消息等待被消费的地方执行操作?

  • 我在本地机器上安装了Kafka,并启动了zookeeper和一个代理服务器。 现在我有一个单独的主题,描述如下: 我有一个生产者在消费者启动之前产生了一些消息,如下所示: 当我使用--从头开始选项启动消费者时,它不会显示生产者生成的所有消息: 但是,它显示的是新添加的消息。 我在这里怎么了?有什么帮助吗?

  • 我是 kafka 的新手,并试图了解是否有办法从上次使用的偏移量读取消息,但不是从头开始。 我正在写一个例子,这样我的意图就不会偏离。 有没有一种方法可以获取从上次使用的偏移量生成的消息。?