当您运行Kafka streams应用程序重置工具将应用程序重置为特定时间戳(例如T-n)时,状态存储会发生什么变化? 文档内容为:“内部主题:删除内部主题(这会自动删除任何提交的偏移)”(内部主题由Kafka Streams应用程序在执行状态存储的更改日志主题时在内部使用) 这是否意味着我在T-n时失去了状态存储/RocksDB的状态? 例如,假设我在该时间戳处理状态存储上的“会话窗口”。在应用
我看到并实现了KafkaTemplate方法。使用默认分区器类发送(主题、消息)。 但在这里,我不是在传递钥匙。我有一个简单的自定义分区器类,我还想发送到kafka服务器,比如KafkaTemplate(主题、键、消息),在producerConfig中,我为分区设置了customPartitioner类。 如果我提供自定义分区器,我看到KafkaTemboard的这个Will send(Topi
我正在使用Spring Kafka 2.3.9编写一个Kafka制作人,该制作人假设向一个主题发布大约200000条消息。例如,我有一个从数据库中提取的200000个对象的列表,我想将这些对象的json消息发布到一个主题。 我写的制作人在发布1000条消息方面做得很好。然后它创建了一些空指针错误(我已经包括了下面的屏幕截图)。 在调试过程中,我发现Kafka Producer网络线程的数量非常高。
我需要向Kafka的特定主题发送消息。我使用以下KafkaTemplate来实现这一点:KafkaTemplate 以下参数放在Kafka生产者中: 创建生产者: 当我执行“发送”方法时,我有一条消息发送到Kafka主题,但同时我发送了标头,其中包含请求的DTO文件的路径。 偏移资源管理器中的示例标头 由于这个标题,应用程序中出现了一些问题,这是一个消费者,我对此无能为力。有没有办法从查询中删除此
我刚开始接触Kafka。我已经经历了这一切。它只表示kafka流DSL的数据/主题管理。任何人都可以共享Kafka流处理器API的相同数据管理的任何链接吗?我对处理器API的用户和内部主题管理特别感兴趣。 在流处理器开始使用输入数据之前,从哪里用输入数据填充此源主题? 简而言之,我们可以像制片人写主题一样,使用流来写Kafka的“源”主题吗?或者流仅用于主题的并行消费?我相信我们应该像“Kafka
我正在开发kafka-stream api。基本上Kafka-stream从源主题获取数据,并在应用一些过滤器后将其写回目标kafka主题。 使用的依存关系: 下面是相同的代码。: { ... 这是我的应用程序架构: 生产者API(源主题中的生产者)= 我想要的是,当流将数据写入目标主题时,我想要捕获事件,无论它是否成功。 有没有办法捕捉到回调?谢谢
我有一个非常简单的Java/Spring应用程序来演示KStream的功能,但不幸的是,我无法使KStream加载数据。想法是创建一个KStream对象,并使用controller GET方法简单地检索其内容。示例代码: 问题-主题中有消息,但foreach(...)中的KStream枚举没有从中检索任何结果。KStream对象状态为“RUNning”,日志中没有错误。 生成随机应用程序ID并将A
“输入主题”已创建。我没有创建“输出主题”,似乎“Kstream”为我和其他内部主题创建了一个。此外,在“to”函数的javadoc中看到了这一点,指定的主题应该在使用之前手动创建(即,在Kafka Streams应用程序启动之前) 所以我的问题是,我们总是必须手动创建“输出主题”吗?
假设我对同一个消息键有不同的值。 例如: 在上述情况下,我只需要用户更新的最新值,即“user789@xyz.com”。 我的kafka流应该只给我第三个值,而不是前两个值。
在为Apache Kafka创建主题时,什么是最佳实践<每个人都允许自动创建主题吗?或者你是如何做到的?您是否将主题创建步骤与kafka实例的开始捆绑在一起 我有一个基于docker的Kafka安装,它已经被多个应用程序使用了。如何将每个应用程序的主题创建与Kafka容器的启动分开?。在Confluents音乐演示中,他们通过旋转一个新的Kafka图像来创建主题,调用“创建主题脚本”,然后让容器消
我试图使用kafka-node从kafka主题读取压缩消息。 问题是,最近插入的消息留在EOL上方,在插入其他消息之前无法访问。实际上,EOL和高水位偏移之间存在间隙,这会阻止读取最新消息。原因尚不清楚。 已使用创建主题 主题中产生了许多关键值。有些钥匙是一样的。 这是插入的键和值 然后请求主题键集。 有一个高水位偏移量,表示最新的值10。然而,消费者看到的偏移值只有7。不知何故,压缩阻止了消费者
出身背景 我们公司有由动物园管理员管理的阿帕奇·Kafka。我们的一个Spring Boot应用程序需要检查所有可用主题的列表,并列出哪些主题启用了日志压缩(cleanup.policy=compact)。 当前代码 问题 使用上述代码,应用程序可以获得主题列表。有没有办法也知道单个主题是否被日志压缩?我所寻找的是某种“Java”方式,以获得与从终端运行以下Apache Kafka CLI命令时相
我在《掌握Kafka Streams and ksqlDB》一书中遇到了以下两个短语,作者使用了两个术语,它们的真正含义是“压缩主题”和“未压缩主题” 他们对“日志压缩”有什么看法吗? 表可以被认为是对数据库的更新。在日志的这种视图中,只保留每个键的当前状态(给定键的最新记录或某种聚合)。表通常是从压缩的主题构建的。 用数据库的说法,流可以被视为插入。每个不同的记录都保留在此日志视图中。流通常是从
只是关于Kafka的后续问题-未压缩主题与压缩主题 正如那里所说, 用数据库的说法,流可以被视为插入。每个不同的记录都保留在此日志视图中。流通常是从未压缩的主题构建的。 作为最佳实践,关于未压缩主题的语义,是否应禁用要在日志启用程序中取消压缩的主题,以便不会发生压缩(清理),其属性如下: 日志清洁工enable=false或log。清洁工启用=true(默认),清除策略为“delete”(默认)
我想问你们一些关于阿帕奇·Kafka和压缩主题的问题。我们想提供一些Kafka压缩主题的PII数据。我们想通过墓碑删除这个主题的数据。目前有多个问题需要验证我们的假设: 有没有其他公司像KIP-354那样通过压缩主题和墓碑生成来满足Kafka的gdpr要求(忘记的权利)https://cwiki.apache.org/confluence/display/KAFKA/KIP-354:添加最大日志压