当前位置: 首页 > 知识库问答 >
问题:

在 Kafka 连接源连接器中使用消息密钥

尉迟边浩
2023-03-14

我正在使用Kafka连接JDBC源连接器从数据库中的视图中读取并将其发布在kafka上,它工作正常。

我的用例是用户可以创建多个对象,并且对象的顺序在我的应用程序中很重要。我想使用用户 ID 作为我发布到主题中的所有消息的消息密钥,以保持它们的顺序。

我的问题是,如何在Kafka connect source连接器中定义消息键?

共有1个答案

常自强
2023-03-14

您可以通过添加以下代码来连接文件源配置文件,从而使用Kafka Connect的SMT(单消息转换)功能。

transforms=createKey
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=UserId <name of user id column>

有关 SMT 的更多信息,请单击此处

 类似资料:
  • Kafka 接收器连接器如何在从分区提取消息时确保消息排序。我有多个分区,并且在发布每个分区带有哈希键的消息时确保了消息排序。现在,当多个接收器任务(及其工作线程)从多个 JVM 扩展,负责从同一分区获取消息并通过 HTTP 通知目标系统时,我如何保证目标系统将按顺序接收消息。

  • 我已经使用Kafka的汇流本地集群为Kaffa和m安装了Aerospike所需的所有配置,并已安装https://www.confluent.io/hub/aerospike/kafka-connect-aerospike-source并已开始汇流群集,但连接器仍未启动 我还发现合流的共享文件夹中没有jar,它还在开发中吗?

  • 我有一个需求,即我们应用程序之外的源将在S3存储桶中放置一个文件,我们必须在kafka主题中加载该文件。我正在查看ConFluent的S3 Source连接器,目前正在努力定义在我们的环境中设置连接器的配置。但是有几篇文章指出,只有在您使用S3 Sink连接器将文件放在S3中时,才能使用S3 Source连接器。 以上是真的吗?在配置中,我在哪里/使用什么属性来定义输出主题?当阅读S3的文章并把它

  • 我在 AWS S3 中备份了以下文件,这些文件由 Kafka 连接接收器连接器备份: 当使用Kafka connect S3源恢复主题时,密钥文件被忽略,我在日志中看到以下调试消息: 我的源配置如下所示: 我应该做什么改变才能让密钥和消息一起存储在Kafka中?

  • 我使用kafka connect从mongo读取数据并将其写入kafka主题。 我正在使用 mongo kafka 源连接器。 我收到以下错误: 罐子里好像有一个小盒子。为了得到这个罐子,我使用了两种不同的方法,但是我得到了同样的错误。首先,我使用了下载的from:maven资源库,然后我从github repo中克隆了源代码,并自己构建了jar。我将jar推到plugins.path中,当我解压

  • 我们的基础设施中有融合平台。核心是,我们使用kafka代理来分发事件。许多设备会生成Kafka主题的事件(每种类型的事件都有一个Kafka主题),事件在谷歌的protobuf中序列化。我们有confluent的模式注册表来跟踪protobuf模式。 我们需要的是,对于一些事件,我们需要应用一些转换,然后将转换输出发布到其他一些Kafka主题。当然,Kafka流是实现这一点的一种方法,如本例中所示。