当前位置: 首页 > 知识库问答 >
问题:

使用Kafka连接转换Kafka消息有意义吗?

商宏爽
2023-03-14

我们的基础设施中有融合平台。核心是,我们使用kafka代理来分发事件。许多设备会生成Kafka主题的事件(每种类型的事件都有一个Kafka主题),事件在谷歌的protobuf中序列化。我们有confluent的模式注册表来跟踪protobuf模式。

我们需要的是,对于一些事件,我们需要应用一些转换,然后将转换输出发布到其他一些Kafka主题。当然,Kafka流是实现这一点的一种方法,如本例中所示。然而,我们不希望每次转换都有一个java应用程序(这会增加项目和开发/部署工作的复杂性),将所有流放在一个应用程序中感觉不太合适(修改一个应用程序需要停止所有流并重新启动)。

在这一点上,我们认为ConFluent的Kafka Connect可能是更好的方法。我们可以有几个工作人员,我们可以将他们部署到一个kafka连接实例/或集群中。问题是;

使用kafka连接从一个kafka主题获取消息并将其发送到另一个kafka主题是否有意义?因为所有用例和示例都旨在将数据从外部(数据库、文件等)获取到kafka,并从kafka获取到外部。

共有1个答案

涂飞航
2023-03-14

为了澄清,Kafka连接不是“融合的”,它是阿帕奇Kafka的一部分。

虽然可以将MirrorMaker2/Confluent Replicator与转换一起使用,但老实说,这与将转换逻辑提取到共享库中,然后捆绑一个可部署的Kafka Streams应用程序没有太大区别,该应用程序接受输入和输出主题的配置参数,并在两者之间进行转换。

你对单点管理的观点很好,但这也是单点失败。。。如果使用Connect,更改转换插件还需要停止并重新启动连接服务器,如果所有主题都是同一连接器的一部分,则任何任务失败都会停止部分主题转换

无论如何,Kafka流(或KSQL)是集群间翻译的首选

您还可以查看Apache Nifi等解决方案,以实现更复杂的事件管理和路由

 类似资料:
  • Kafka 接收器连接器如何在从分区提取消息时确保消息排序。我有多个分区,并且在发布每个分区带有哈希键的消息时确保了消息排序。现在,当多个接收器任务(及其工作线程)从多个 JVM 扩展,负责从同一分区获取消息并通过 HTTP 通知目标系统时,我如何保证目标系统将按顺序接收消息。

  • 我正在使用Kafka连接JDBC源连接器从数据库中的视图中读取并将其发布在kafka上,它工作正常。 我的用例是用户可以创建多个对象,并且对象的顺序在我的应用程序中很重要。我想使用用户 ID 作为我发布到主题中的所有消息的消息密钥,以保持它们的顺序。 我的问题是,如何在Kafka connect source连接器中定义消息键?

  • 我有Kafka-Connect,我需要将其与REST API集成,该API在使用单输入多数据模型调用时效果最好。 想象一下以下内容: 源主题- Kafka源主题: REST请求: REST响应: Kafka Sink主题: 所以我想在给定的时间范围内转换多个消息。 Kafka-Connect转换(https://docs.confluent.io/current/connect/transform

  • 我有一个批处理作业,它将一天触发一次。要求是 使用该时间点上关于Kafka主题的所有可用消息 处理消息 如果进程已成功完成,则提交偏移量。 当前,我poll()while循环中的消息,直到ConsumerRecords.isEmpty()为true。当ConsumerRecords.isEmpty()为true时,我假设Topic在该时间点的所有可用记录都已被使用。应用程序维护偏移量并关闭kafk

  • 我试图用Spring Cloud Stream创建一个kafka使用者,以便监听在任何Spring上下文之外构建的kafka消息,并使用自定义头(operationType)。 我使用的是Spring Boot 1.5.x/Spring Cloud egdware.sr5和1.1.1版本的kafka-client和Kafka2.11。 我的侦听器类包含此方法 而operationType标头是存在

  • 我已经更新了我的Kafka从版本0.10.2.0到版本2.1.0,现在Kafka不能消费消息。我使用Spring引导,这是我的配置: 我已经更改了组id,以避免旧组id出现问题。我当前的spring版本是2.1。2.释放。在我的应用程序中,我可以看到我的客户是如何不断地重新连接的 你知道这个问题吗?