我有以下场景: 生产者通过Confluent的REST代理(在Confluent的模式注册表上注册模式)向Kafka主题发送Avro编码的消息,如http://docs.confluent.io/3.0.0/kafka-rest/docs/intro.html#produce-and-consument-avro-messages所述 Spring Cloud Stream enabled mes
我在读取KTABLE时得到了LongDeserializer异常。 我正在使用 该项目的github链接可在https://github.com/jaysara/kstreamanalytics获得
我有一个基于Spring cloud Streams的Kafka Streams应用程序,我将全局KTable绑定到一个紧凑的主题。当我将Tombstone记录推到主题(非空键,值为null)时-我的Kafka streams应用程序失败,出现反序列化异常。失败是因为我的反序列化程序不处理空记录。 从文档中,我认为GlobalKTable甚至不会“看到”空值记录。难道不是这样吗?我需要在反序列化程
在下,我找到了RocksDB使用的文件。我的意图是通过检查文件大小。 当看到文件名时,我很好奇文件名的含义。文件名是什么意思?我想这和Kafka Streams任务和分区有关。 前缀0和1是否意味着使用的主题数,后一个是使用的分区? 这个KafkaStreams应用程序只是使用KStream-KTable连接两个主题,其中一个主题被重新分区并还原到ktable中。
在一个使用Spring Cloud Stream连接到Kafka的Spring Boot应用程序中,我试图设置两个单独的Stream侦听器方法: null 我认为重要的部分是:“为非订阅的主题正则表达式模式分配分区t1-0;订阅模式是T3”。这是两个不相关的主题,所以据我所知,任何与t3相关的东西都不应该订阅任何与T1相关的东西。引起问题的确切主题也会断断续续地发生变化:有时被提及的是自动生成的主
在下面的示例中:https://github.com/confluentinc/kafka-streams-examples/blob/5.1.0-post/src/main/java/io/confluent/examples/streams/pageViewRegionexample.java有一个KStream和KTable连接。 在驱动程序https://github.com/conflu
此问题现在已在Message Hub上解决 我在Kafka创建一个KTable时遇到了一些麻烦。我对Kafka是初来乍到的,这大概是我问题的根源,但我原以为无论如何都可以问到这里。我有一个项目,在那里我想通过统计它们的总数来跟踪不同的ID。我正在使用IBM Cloud上的Message Hub来管理我的主题,到目前为止它工作得非常出色。 我有一个关于Message Hub的主题,它会生成诸如之类的
我是spark streaming的新手,我有一个关于其用法的一般性问题。我目前正在实现一个应用程序,它从一个Kafka主题流式传输数据。 使用应用程序只运行一次批处理是一种常见的场景吗,例如,一天结束,收集主题中的所有数据,做一些聚合和转换等等? 这意味着在用spark-submit启动应用程序后,所有这些东西将在一批中执行,然后应用程序将被关闭。或者spark stream build是为了在
我想知道是否指定了流拓扑处理消息的顺序。 示例: 使用: 在测试中,该流工作。但我认为这不是保证。它只起作用,因为消息在加入之前首先由计数节点处理。 但是那个订单有保证吗? 就我在所有的文档中所看到的,对这个处理订单没有任何保证。因此,如果收到新消息,也可能发生以下情况: null 谢谢你
null 在没有任何测试的情况下,我会说第二种选择更好/更干净、更可靠?
我想我需要为每个StreamListener方法单独的应用程序id,但是如果我正在监听相同的主题,我如何在application.yml文件中配置它呢?
我不想使用@KafKalistener或@StreamListener,但我想手动轮询Kafka。我正在使用spring-cloud-starter-stream-kafka库,我有以下Kafka制作人
我正在使用带有Avro和汇流模式注册表的Spring云流。我正在为所有服务使用一个单独的DLQ主题,因此具有不同模式的消息可能会落在这个主题中。我已禁用动态架构注册,以确保不传递错误消息()。 然而,问题是由于dlq上缺少模式,我可能会在进入这个主题时丢失一条消息。因此,我希望能够以JSON格式向dlq生成消息,并在管道的其余部分使用Avro。如果有人能帮助我如何做到这一点,或者能为我指出这件事的
我们正在使用带有汇流模式注册表、Avro和Kafka绑定器的Spring云流。我们已经将数据处理管道中的所有服务配置为使用一个共享的DLQ Kafka主题,以简化异常处理的过程,并能够重播失败的消息。然而,由于某种原因,我们似乎无法正确提取有效负载消息,因为具有不同模式的消息被发布到单个DLQ。因此,我们失去了原始消息的模式轨迹。 我想知道是否有任何方法可以在dlq中维护失败消息的原始以便将其用于
我试图使用ConsumerSeeKaware,阅读kafka主题中可用的最后一条消息。消息类型是Avro对象列表。我能成功地做到这一点。但在反序列化过程中会失败。该消息使用spring-cloud-stream-kafka框架生成。消息具有contentType。 我知道avro消息可以像下面这样反序列化。 但不管用。可能是因为两件事。 > 消息是avro对象的列表。但我正在尝试使用Avro模式创