我们有一个Kafka制作人,它以非常高的频率为保留时间=10小时的主题生成键控消息。这些消息是实时更新,使用的键是值已更改的元素的ID。所以这个主题就像一个变更日志,会有很多重复的键。
现在,我们试图实现的是,当Kafka消费品启动时,不管最后的已知状态(新消费品、崩溃、重新启动等),它将以某种方式构造一个包含主题中所有键的最新值的表,然后继续正常侦听新的更新,保持Kafka服务器上的最小负载,并让使用者完成大部分工作。我们尝试了很多方法,但似乎没有一种是最好的。
我们尝试的是:
欺骗:
使用KSQL,我们要么将KTable重写为主题,以便消费者可以看到它(额外的主题),要么我们需要消费者执行KSQLSELECT
using to KSQL Rest Server并查询表(速度和性能不如Kafka api)。
消费者从一开始就开始和消费话题。这非常有效,但是使用者必须使用10小时的更改日志来构建最后一个值表。
通过以下方式使用KTable:
KTable<Integer, MarketData> tableFromTopic = streamsBuilder.table("topic_name", Consumed.with(Serdes.Integer(), customSerde));
KTable<Integer, MarketData> filteredTable = tableFromTopic.filter((key, value) -> keys.contains(value.getRiskFactorId()));
Kafka Streams将在Kafka服务器上为每个KTable创建一个主题(名为{consumer\u app\u id}-{topic\u name}-STATE-STORE-0000000000-changelog
),这将产生大量主题,因为我们拥有大量消费者。
从我们已经尝试过的情况来看,似乎我们需要增加服务器负载或消费者启动时间。难道没有一种“完美”的方式来实现我们的目标吗?
提前谢谢。
消费者从一开始就开始和消费话题。这非常有效,但是使用者必须使用10小时的更改日志来构建最后一个值表。
在您的应用html" target="_blank">程序第一次启动期间,您所说的是正确的。
为了避免每次重新启动时出现这种情况,请将键值数据存储在文件中。
例如,您可能希望使用持久映射(如MapDB)。
因为您给了消费者组。id
并且您定期提交偏移量,或者在每个记录存储在映射中之后提交偏移量,下次应用程序重新启动时,它将从该组的上次提交的偏移量中读取偏移量。id
。
因此,花费大量时间的问题只会在最初(第一次)出现。只要您拥有该文件,就不需要从一开始就使用它。
在这种情况下,如果文件不存在或被删除,只是在
并再次构建它。Kafka消费者
中查找
在某个地方,您需要存储这个键值以便检索,为什么它不能是一个持久存储?
如果您出于任何原因想要使用Kafka流,那么另一种选择(不像上面那样简单)是使用持久支持的存储。
例如,一个持久的全局存储。
streamsBuilder.addGlobalStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(topic), keySerde, valueSerde), topic, Consumed.with(keySerde, valueSerde), this::updateValue);
P. S:在存储偏移量的目录中会有一个名为的文件。检查点
。如果主题在中间被删除,你会得到OffsetOutOfRangeExc0019
。您可能想要避免这种情况,也许可以使用UncaughtExceptionHandler
有关更多信息,请参阅https://stackoverflow.com/a/57301986/2534090。
最后,
对于这一点,最好将消费者与持久文件一起使用,而不是使用流,因为它提供了简单性。
通过使用KTables,Kafka Streams将在Kafka服务器上为每个KTables创建一个主题,这将产生大量的主题,因为我们有大量的消费者。
如果您只是将现有的主题读取到KTable中(通过StreamsBuilder#table()),那么Kafka Streams不会创建额外的主题。KSQL也一样。
如果您能够明确您想要对KTable做什么,这将有所帮助。显然,您正在做的事情确实会导致创建其他主题?
1变更日志主题1紧凑主题:
你为什么想有两个不同的话题?通常情况下,变更日志主题应该始终被压缩。鉴于您的用例描述,我看不出为什么它不应该是:
现在,我们试图实现的是,当Kafka消费者启动时,不管最后已知的状态是什么(新消费者、崩溃、重启等),它都会以某种方式构建一个包含主题中所有键的最新值的表,然后继续监听对于新的更新正常[...]
因此,压缩对您的用例非常有用。它还可以防止您描述的此问题:
消费者从一开始就开始和消费话题。这非常有效,但是使用者必须使用10小时的更改日志来构建最后一个值表。
请注意,要重建最新的表值,Kafka Streams、KSQL和Kafka消费者必须完全读取表的基本主题(从头到尾)。如果该主题没有被压缩,这可能确实需要很长时间,具体取决于数据量、主题保留设置等。
从我们已经尝试过的情况来看,似乎我们需要增加服务器负载或消费者启动时间。难道没有一种“完美”的方式来实现我们的目标吗?
如果不了解更多关于你的用例,特别是KTable填充后你想做什么,我的回答是:
例如,如果Kafka使用者应该对“表”数据进行任何有状态处理,我就不会使用它,因为Kafka使用者缺少用于容错有状态处理的内置功能。
虽然auto.offset.reset的值是最新的,但使用者从属于2天前的消息开始,然后就会赶上最新的消息。 我错过了什么?
生产者发送消息到一个有四个分区的主题。我们有一个消费者在消费来自这个主题的消息。应用程序在工作日一直运行周末例外:它不会在周末期间调用poll方法。 使用者配置:自动提交,自动提交时间为5s(默认)。 应用程序一直运行良好,直到一个星期天,当它重新开始调用poll方法。我们看到有数百万条消息从这个话题中被轮询出来。消费者基本上是轮询来自主题的所有消息。将新的偏移量与它在周末停止之前的偏移量进行比较
我正在开发一个spring boot kafka消费者应用程序。它将有不同的消费者在不同的主题上工作。使用者的所有信息都来自application.yml文件。 我无法将应用程序属性中的主题列表设置到KafKalistener。 在这两种情况下,我都得到以下错误: java.lang.IllegalArgumentException:无法解析占位符 从应用程序属性获取主题并将其设置在KafkaLi
我在本地机器上安装了Kafka,并启动了zookeeper和一个代理服务器。 现在我有一个单独的主题,描述如下: 我有一个生产者在消费者启动之前产生了一些消息,如下所示: 当我使用--从头开始选项启动消费者时,它不会显示生产者生成的所有消息: 但是,它显示的是新添加的消息。 我在这里怎么了?有什么帮助吗?
我是 kafka 的新手,并试图了解是否有办法从上次使用的偏移量读取消息,但不是从头开始。 我正在写一个例子,这样我的意图就不会偏离。 有没有一种方法可以获取从上次使用的偏移量生成的消息。?
我以前是学习Kafka的传统ActiveMQ用户。我有一个问题。 使用Active MQ,您可以执行以下操作: 将100条消息提交到队列中 我试着在Kafka做同样的事情 如果不启动Consumer,等待它启动,然后运行producer,则此示例不起作用。 谁能告诉我如何修改我的示例程序,以在消息等待被消费的地方执行操作?