问题:在设置一个 Kafka 管道后,该管道使用 Kafka Connect JDBC 源和 Avro 序列化程序和反序列化程序将数据拉入,一旦我尝试使用 Kafka Streams Java 应用程序将该数据读入 KStream,就会收到以下错误。 org.apache.kafka命令错误。SerializationException:LongDeserializer接收的数据大小不是8 我试图
我正在使用Kafka Streams API (KTable,GlobalKTable..).我在用KStreams消费Kafka主题。我需要根据一些配置过滤出一些传入的Kafka事件,并在配置发生变化时处理它们。主题的持续时间限制至少为7天。以下是要求: 键值状态 K1 V1加工 K2 V2 未处理(基于某些业务逻辑) K3 V3 已处理 K4 V4加工 K1 V5加工 ------ 现在我想再
我有数据流 但我错了
我对流媒体有一个普遍的问题,但对于问题的范围,让我们限制自己使用Kafka Streams。让我们进一步缩小范围,将我们的问题局限于单词计数,或者可能是一般的计数。假设我有一个由某个键和一个值组成的流,键可以是一个字符串(假设我们可以有很多字符串,除了空字符串,由世界上的任何字符组成),值是一个整数,现在我们正在构建一个单词计数应用程序,如果词汇表中的单词总数是一万亿,我们不能将它们存储在本地缓存
所以基本上我有会计课。我有数据。我想将这些对象发送到我与生产者的主题中。现在没关系。稍后,我想使用 Kafka 流进行聚合,但我不能,因为某些 Serde 属性在我的配置中是错误的,我认为 :/。我不知道错误在哪里。我的制作人工作正常,但我无法聚合。有人帮我查看我的 kafka 流代码吗?我的帐户类: 我的Account类有两个类Serializer和Deserializer。序列化程序: 反序列
我让消费者在我的机器上运行。当我停止Kafka broker时,我在应用程序中得到警告 但是在2-4分钟后被触发。根据此文档 https://github.com/spring-projects/spring-kafka/blob/master/src/reference/asciidoc/kafka.adoc#idle-containers 它说“如果轮询未在 pollInterval 属性的
这是一个场景:我知道,使用与Spring kafka相关的最新API(如Spring集成kafka 2.10),我们可以执行以下操作: 以及来自与相同kafka主题相关的不同分区的读取。 我想知道我们是否可以使用同样的方法,例如spsping-集成-Kafka1.3.1 我没有找到任何关于如何做到这一点的提示(我对xml版本很感兴趣)。
我有4个分区和4个消费者(例如A、B、C、D)。如何使用使用者组配置哪个使用者将从哪个分区读取数据。我用的是Kafka的春靴。
我在使用不同版本的 Spring-kafka 的应用程序的 kafka 使用者方面遇到了问题,特别是在 2.3.13.RELEASE 和 2.8.3 之间。 当使用蓝色/绿色策略部署到PRO时,我遇到的问题是,当在使用者上使用并发时,所有分区都被分配给使用版本2.3.13.RELEASE的应用程序,而在完成分区重新平衡时,使用Spring kafka版本2.8.3(同一主题和组名)部署使用者的新应
我们有一个Kafka集群,由3个节点组成,每个节点有32GB内存和6个内核2.5 CPU。 我们写了一个 kafka 制作人,它接收来自 Twitter 的推文,然后分批发送给 Kafka,每批 5000 条推文。 在生产者中,我们使用
我正在使用 kafka模板向 kafka 主题发送消息。我遇到了一个要求,如果将消息发送到 kafka 主题时出现故障,那么我应该重试在具有相同偏移量的同一分区上发送消息。请帮助如何使用Kafka模板实现这一点?
我正在尝试将KAFKA与Spring集成,我的JAVA应用程序正在与KAFKA服务器通信,当我使用HTTP运行应用程序时,我也会收到消息。 现在我想使用 Spring 在 KAFKA 上添加 SSL,我已经完成了在 SSL KAFKA 和 SPRING KAFKA 上指定的更改 当我使用命令行(使用 SSL)运行生产者和消费者时,通信会正常发生,但是当我更改 Java 应用程序的配置并尝试生成和使
我正在使用Kafka制作器将价格发送到主题。当我发送第一条消息时,它会打印生产者配置,然后发送消息,因此发送第一条消息需要更多时间。 第一条消息后,发送消息几乎不需要 1/2 毫秒。 我的问题是我们可以做些什么,以便配置部分将跳过,或者我们可以在发送第一条消息之前开始? 我正在我的项目中使用SpringKafka。我也读了其他问题,但不是很有帮助。 应用程序.yml 生产者价值: 我提到了以下问题
我用的是SpringKafka2.2.9的Spring靴2.1.9 如果消息多次失败(在afterRollbackProcessor中定义),消费者将停止轮询记录。但如果消费者重新启动,它会再次轮询相同的消息和进程。 但是我不希望消息再次被重新轮询,最好的阻止方法是什么? 这是我的配置 我怎样才能做到这一点?
在我们的系统中有一个场景,kafka主题XYZ用户详细信息由某个其他生产应用程序A(不同的系统)发布,而我的应用程序B正在使用该主题。 要求是应用程序B需要消耗该事件45分钟后(或任何可配置的时间),它被放在kafka主题XYZ由A(这种延迟的原因是,一些系统C的另一个REST api需要根据特定用户的此用户详细信息事件触发,以确认它是否为该用户设置了一些标志,并且该标志可以在45分钟持续时间内的