我无法将来自topic3的响应与topic1上的请求关联起来,因为correlationid在中间主题中丢失了。如果我不使用中间主题(比如topic2),那么topic1将发送一个带有相关ID的消息,相应的响应将从Topic3接收。 任何意见/建议都大有帮助。 下面是示例代码:从我的API中,我发布了一个事务
我对Kafka和Spring-Cloud-Stream还不熟悉。现在,我在启动Kafka项目以发送消息时遇到了一个问题。第一次运行应用程序时显示空指针异常。 日志 application.properties 人口应用 RsvpKafkaProducer公司 RsvpWebSocketHandler pom。xml
Spring-Kafka:当按照文档使用Pause/Resume方法暂停/恢复消费者时,当使用自动分配时不应该发生再平衡,但它不工作,再平衡发生了。如何暂停/恢复消费者并在一段时间后保持轮询而不重新平衡? 用例:消费者应该暂停一段时间,并保持轮询给心跳,并在时间结束后恢复,但Kafka不应该在消费者暂停时重新平衡。 日志:2019-02-19 15:19:49.173信息82272---[rTas
我用经纪人ID的20、21、22创建了3个Kafka经纪人设置。然后我创建了这个主题: 其结果是: 当生产者向主题zeta发送消息“Hello World”时,Kafka首先将消息写入哪个分区? 如果我有8个用户在他们自己的进程或线程中并行地运行,并订阅了zeta主题,那么Kafka如何分配分区或代理来并行地为这些用户服务?
如有任何帮助,不胜感激。
我想知道Kafka流是如何分配到主题的分区进行阅读的。据我所知,每个Kafka流线程都是一个消费者(该流有一个消费者组)。所以我猜消费者是随机分配到分区的。 话题P包含人称。它有两个分区。消息的关键是person ID,因此每个属于person的消息最终都位于同一个分区中。 主题O包含订单。它有两个分区。假设密钥也是(订购某样东西的人的)person-id。因此,在这里,属于一个人的每个订单消息总
例如,假设我有一个包含4个分区的主题。我给这个主题发4K消息。每个分区获得1K条消息。由于外部因素,3个消费者分别处理了他们所有1K的消息。但是,第四个分区只能通过200条消息,剩下800条消息需要处理。是否有一种机制允许我“重新平衡”主题中的数据,也就是说,给分区1-3 200个分区4s的数据,让所有带有200条消息的分区成为一个进程? 当前分区副本分配 建议的分区重新分配配置
我对使用kafka streams和spring cloud stream相对较新,在使用窗口聚合功能时遇到了困难。 我想做的是 获取我的初始UserInteractionEvents流,并按userProjectId(字符串)对它们进行分组 创建这些事件的窗口会话,15分钟不活动 将这些窗口会话聚合到自定义会话对象中 然后将这些会话对象转换为另一个自定义UserSession对象 我的代码是:
我收到Kafka主题中的二进制Avro文件,我必须对它们进行反序列化。在Kafka收到的消息中,我可以在每条消息的开头看到一个模式。我知道不嵌入模式并将其与实际的Avro文件分离是一种更好的做法,但我无法控制生产者,也无法更改。 我的代码在Apache Storm上运行。首先,我创建一个读卡器: 然后,我尝试反序列化消息,但不声明架构: 但当消息到达时,我会收到一个错误: 我看到的所有答案都是关于
我试图解析一条Kafka消息,它是以某种加密的AVRO格式。我有以下AvroSchema。avsc avro架构文件: 现在,我编写了以下代码来获取JSON格式的数据: 请帮我解密这封信。 加密字节消息属于以下类型:<代码>080-21-0001:�哦�@@��A.�ǐ�U:�哦�@@��A 我按照建议进行了更改,现在我有以下代码: 但我仍然得到错误为“不是数据文件”。
我尝试向kafka发布/使用我的java对象。我使用Avro模式。 我的基本程序运行良好。在我的程序中,我在生产者(用于编码)和消费者(用于解码)中使用我的模式。 如果我在接收者处将不同的对象发布到不同的主题(例如:100个主题),我不知道我收到了什么类型的消息?...我想从接收到的字节中获取avro模式,并想将其用于解码...我的理解正确吗?如果是这样,我如何从接收到的对象中检索?
我是新的Flink流处理,并需要一些帮助与FlinkKafka生产者,因为不能找到很多相关的搜索后一段时间。我目前正在阅读一个Kafka主题的流,然后在执行一些计算后,我想把这个写到新的Kafka中的一个分离主题。但我面临的问题是,我无法发送Kafka主题的关键。我使用的是Flink Kafka连接器,它给了我FlinkKafkaConsumer和flinkkafkaProducer。更详细的查看
我正在用Springboot做一个简单的Kafka示例项目,我遇到了一个错误,制作人没有创建,但其余的工作正常。 我遇到的错误似乎引发了异常,因为制作人没有创建,但没有解释原因,我也不知道: 这是我的kafka配置: 这里是控制器,endpoint“/api/kafka”:
我正在手动启动Zoomaster,然后是Kafka服务器,最后是Kafka-Rest服务器及其各自的属性文件。接下来,我正在tomcat上部署我的Spring Boot应用程序 在Tomcat日志跟踪中,我得到了错误org。springframework。上下文ApplicationContextException:无法启动bean的组织。springframework。Kafka。配置。inte
需要以下步骤的帮助,了解如何使用Kafka主题中的消息并将其存储在/tmp/kakfa messages目录中 问题陈述: 创建Kafka使用者以使用主题“Multibrokerapplication”中的消息,并将其存储在“/tmp/kafka messages”中 步骤1:我能够使用发布到主题“Multibrokerapplication”的消息,如下所示。 bin/Kafka控制台消费者。s