当前位置: 首页 > 知识库问答 >
问题:

在Kafka中,我可以创建一个单独的Kafka主题并让多个制作人写入吗

冯驰
2023-03-14

我有以下用例:将来自单个数据源的日志文件推送到Kafka主题(例如主题1)。有一个消费者将从中读取并转换为json格式并写回另一个主题(主题2)。另一个期望json中的数据将从主题2读取的消费者将进行一些其他修改并写回另一个主题(主题3)。

我的问题是,除了创建3个不同的主题之外,我能否创建一个主题,并让这些多个制作者写同一个主题?既然不能为生产者设置组id,我的消费者如何知道从哪个分区读取?我从SO中学到的一个解决方案是创建分区,并让每个生产者单独写入特定的分区。这种方法的问题是生产者和消费者的数量可能会改变,因此不需要修改主题。请指教。

共有1个答案

曹浩
2023-03-14

正如有人已经评论过的,你不应该把不同类型的模式推到单个主题上。Kafka中的主题数量不是问题。你可以使用一些术语来管理它们。比如“topic1”、“topic1_json”、“topic1_modificathtml" target="_blank">ion”。

如果您的用例有不可管理的主题列表,同一个消费者可以阅读所有json主题

用一般模式创建一个对象或设置一些模式注册表(检查汇合模式注册表)。所有模式适合作为子记录或记录的地方将携带模式信息。然后为所有json响应创建一个主题(例如:topic_json_generic)。从“topic1”中读取数据后,将其推送到“topic_json_generic”。后续主题类似。在消费者层次上,你可以处理哪种类型的对象需要做什么。

 类似资料:
  • 假设我有一个主题T1,它有三个分区,即P1、P2和P3。其中p1是领导者,rest是追随者。

  • 所以我有一个设计,其中我有多个生产者P1、P2、P3、P4... PN写入单个主题T1,它有32个分区。 另一方面,我在一个消费者组中最多有32个消费者。 我想负载平衡我的消息消耗 阅读文档时,我可以看到3个选项: 1。自己定义分区(缺点是我必须知道最后一条消息发送到哪里,或者为每个生产者定义分区范围P) 2。定义一个密钥并将分区决定权交给Kafka哈希算法(缺点-负载平衡将在运气好的情况下定义)

  • 然后,我对一个方法使用了注释,该方法执行以下操作: 这不起作用。是事务性的,但是当调用方法时,没有正在进行的事务,并且我得到一个。 我打算尝试方法,但Javadoc声明这只用于本地事务,因此它似乎不符合我的需要。 我的下一步是尝试直接使用Kafka的Producer API,看看这种模式是否有效,但如果有人能告诉我知道我在浪费时间,Kafka不支持事务性地写多个主题,我会很感激。 我确实在Conf

  • 我想为几个主题创建一个kafka消费者。consumer的方法构造函数允许我在订阅中传输主题列表的参数,如下所示: 之后,我想轮询记录从Kafka流每3秒并处理它们,但我想知道什么是这个消费者-如何将不同主题的记录轮询-首先一个主题,然后另一个,或并行。会不会一个消息量大的主题会一直处理,另一个消息量小的主题会等待?

  • 我正在考虑创建一个独立的Kafka生产者,它作为守护进程运行,通过套接字接收消息,并将其可靠地发送给Kafka。 但是,我决不能是第一个想到这个想法的人。这样做的目的是避免使用PHP或Node编写Kafka生成器,而只是通过套接字将消息从这些语言传递到独立的守护进程,这些语言负责传递,而主应用程序则一直在做自己的事情。 此守护进程应负责在发生中断时进行重试传递,并充当服务器上运行的所有程序的传递点

  • 我想使用spring cloud stream framework创建一个kafkaendpoint,它将有一个http post api到。如何动态更改属性 我可以使用实现来实现上述功能,但不知道是否有可能在Spring中开发此功能。