当前位置: 首页 > 知识库问答 >
问题:

Kafka Connect中的多消息转换

戚永福
2023-03-14

我有Kafka-Connect,我需要将其与REST API集成,该API在使用单输入多数据模型调用时效果最好。

想象一下以下内容:

源主题-

Kafka源主题:

{"Foo":"Bar345354"}
{"Foo":"Bar454354"}

REST请求:

{
  "doWorkOn": {
    "message1Foo": "Bar345354",
    "message2Foo": "Bar454354",
    ...
  }
}

REST响应:

{
  "result": {
    "message1Foo": "Bar948474",
    "message2Foo": "Bar434343",
    ...
  }
}

Kafka Sink主题:

{"Foo":"Bar948474"}
{"Foo":"Bar434343"}

所以我想在给定的时间范围内转换多个消息。

Kafka-Connect转换(https://docs.confluent.io/current/connect/transforms/index.html)的留档表明它处理SMT或单输入单数据模型。所以我正在搜索与Kafka-Connect对应的MMT。

到目前为止,我没有找到它。有人能指出有哪些选择吗?

共有1个答案

劳嘉实
2023-03-14

听起来Kafka Connect并不适合这里。Kafka Connect连接器适用于:

  • 源连接器-将数据从系统拉入Kafka
  • 接收器连接器-从Kafka推送到另一个系统。

这些都不符合你在问题中描述的模式。我认为解决方案在于编写一个流处理应用程序;从主题中获取数据,执行某些操作(外部REST调用),并将其写回另一个Kafka主题。

为此,Kafka Connect的对应物将是Kafka Streams。

编辑:再想一想,这种模式在合流复制器和MirrorMaker 2中都有先例,它们都像Kafka Connect连接器一样工作,从一个主题读取并写入另一个主题。

 类似资料:
  • 我正在尝试将单个输入消息转换为多个消息。我有一个带有以下签名的方法: 类类似于: 对于中的每个,我想创建一个的实例。我如何做到这一点并处理

  • 多客服的消息转发绝对是超级的简单,转发的消息类型为 transfer: use EasyWeChat\Kernel\Messages\Transfer; // 转发收到的消息给客服 $app->server->push(function($message) { return new Transfer(); }); $response = $app->server->serve(); 当

  • 使用MVC Java编程配置方式时,如果你想替换Spring MVC提供的默认转换器,完全定制自己的HttpMessageConverter,这可以通过覆写configureMessageConverters()方法来实现。如果你只是想定制一下,或者想在默认转换器之外再添加其他的转换器,那么可以通过覆写extendMessageConverters()方法来实现。 下面是一段例子,它使用定制的Ob

  • Spring JMS中的类具有setter方法,它允许提供我们想要的任何转换器。 对于带注释的消息侦听器,这很有意义,因为我们可以直接定义 spring将负责将消息转换并传递给这个侦听器。 因此,这对于带注释的听者来说显然是有意义的。我的问题是,设置消息转换器对非注释消息侦听器有用吗?类似于 从我在docs/javadocs中的搜索和对源代码的有限理解来看,我认为为这种情况设置消息转换器是没有帮助

  • 当使用vscode的Git集成提交文件时,是否有任何方法可以编写多行提交消息,或者只限于一行? 到目前为止,VSCode在分段文件方面非常出色。但似乎我仍然必须从终端编写大部分提交消息。

  • 我在使用spring-jms模块转换来自RabbitMQ的消息时遇到了一些问题。以前,我使用Rest APIendpoint发送消息,该endpoint将消息发送到RabbitMQ队列,并使用@JMSListener方法处理它。 在内部,这种行为添加了一个字段来确定Java类型,由Spring库管理。但是,现在我想避免Rest API调用,因为它不是必需的,而且我可以直接将消息发送到RabbitM