我们希望侦听特定的Kafka主题,并构建它的“历史”--所以对于指定的键,提取一些数据,将其添加到该键的现有列表中(如果不存在,则创建一个新的列表),并将其放到另一个主题中,该主题只有一个分区,并且高度压缩。另一个应用程序可以只听这个主题并更新它的历史列表。
我在想它如何适合Kafka流库。我们当然可以使用聚合:
msgReceived.map((key, word) -> new KeyValue<>(key, word))
.groupBy((k,v) -> k, stringSerde, stringSerde)
.aggregate(String::new,
(k, v, stockTransactionCollector) -> stockTransactionCollector + "|" + v,
stringSerde, "summaries2")
.to(stringSerde, stringSerde, "transaction-summary50");
它创建一个由Kafka支持的本地存储,并将其用作历史表。
问题是,如果我只是为每个正在运行的实例设置相同的applicationId,那么它是否最终会重播来自同一个kafka主题的所有数据,即每个正在运行的实例具有相同的本地状态?
为什么要创建多个具有不同ID的应用程序来执行相同的工作?Kafka实现并行的方式是通过任务:
应用程序的处理器拓扑是通过将其分解为多个任务来扩展的。
更具体地说,Kafka Streams根据应用程序的输入流分区创建固定数量的任务,每个任务分配一个来自输入流的分区列表(即,Kafka主题)。分配给任务的分区永远不会改变,因此每个任务都是应用程序并行性的固定单元。
我有一个spring boot应用程序(比方说它叫app-1),它连接到一个kafka集群,并从一个特定的主题进行消费,比方说这个主题叫做“foo”。当另一个应用程序(比如称为app-2)将新的foo项导入数据库时,主题foo总是会收到一条消息。该主题主要用于第三个应用程序(比如app-3),它向可能对这个新foo项目感兴趣的人发送一些电子邮件通知。App-3是集群的,这意味着它有多个实例同时运行
我在本地机器中运行多个kafka流消费者实例(2个实例),每个实例都有自己的自定义本地存储,每个实例的名称不同。 根据文档,如果其中一个实例发生故障,则kafka必须将死实例的存储同步到活实例的存储(如果我错了,请更正我)。 我用相同的应用程序id配置了两个实例,让kafka知道这些实例属于同一个组。 当其中一个实例被杀死时,另一个(活动)实例的存储未与死实例的存储同步。我在两个商店都启用了更改日
我是KSQL的新手。我想使用KSQL查询将kafka集群中存在的所有主题名称存储到另一个主题。来自KSQL CLI的给我主题列表。我想通过创建流将所有这些主题信息存储在另一个主题中。我将轮询这个新主题(使用消费者),每当在集群中创建新主题时,我的消费者都会收到一条消息。我想要一个KSQL查询来完成这一点。 提前感谢。
我有2个Kafka的主题流完全相同的内容从不同的来源,所以我可以有高可用性的情况下,其中一个来源失败。我正在尝试使用Kafka Streams0.10.1.0将2个主题合并为1个输出主题,这样我就不会错过任何关于失败的消息,并且当所有源都启动时没有重复的消息。 当使用KStream的方法时,其中一个主题可以毫无问题地关闭(次要主题),但是当主主题关闭时,将不会向输出主题发送任何内容。这似乎是因为,
我有多个冗余的应用程序实例,希望消费一个主题的所有事件,并存储它们独立的磁盘查找(通过一个rocksdb)。 为了便于讨论,让我们假设这些冗余消费者正在服务无状态http请求;因此,不使用kafka共享负载,而是使用kafka将数据从生产者复制到每个实例LocalStore中。 在查看生成的主题时,每个消费应用程序创建了3个额外的主题: null null 下面是创建存储区的代码
我最近在一个streams应用程序中遇到了一个以前没有遇到过的问题,它很难跟踪与键控/连接相关的问题(以及更新后的分区问题)。 我有两个主题(raw_events和processed_users),这两个主题的密钥相同,但是当我试图对这两个主题执行连接时,尽管密钥相同,但只有一些连接是成功的。 为简洁起见,应用程序的基本工作流程如下: null 问题本身是在步骤5中产生的。由于主题和主题之间的连接