当前位置: 首页 > 知识库问答 >
问题:

如何指定连接器是源还是接收器?

上官凯泽
2023-03-14

我目前正在配置kafka连接(与debezium/连接docker映像),我成功地使用环境变量将其连接到Kafka:

docker run -it --rm --name AAAAAA-kafka-connect -p 8083:8083 \
    -v aaaaa.jks:aaaaa.jks \
    -v bbbbbb.jks:bbbbbb.jks \
    -e LOG_LEVEL=INFO \
    -e HOST_NAME="AAAAAA-kafka-connect" \
    -e HEAP_OPTS="-Xms256m -Xmx2g" \
    -e BOOTSTRAP_SERVERS="BBBBB:9092" \
    -e CONNECT_CLIENT_ID="xxx-kafka-connect" \
    -e CONNECT_SASL_JAAS_CONFIG="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"...\" password=\"...\";" \
    -e CONNECT_SECURITY_PROTOCOL="SASL_SSL" \
    -e CONNECT_SASL_MECHANISM="PLAIN" \
    -e CONNECT_SSL_TRUSTSTORE_LOCATION="bbbbbb.jks" \
    -e CONNECT_SSL_TRUSTSTORE_PASSWORD="..." \
    -e CONNECT_SSL_KEYSTORE_LOCATION="aaaaa.jks" \
    -e CONNECT_SSL_KEYSTORE_PASSWORD="..." \
    -e GROUP_ID="XXX.grp.kafka.connect" \
    -e CONFIG_STORAGE_TOPIC="XXX.connect.configs.v1" \
    -e OFFSET_STORAGE_TOPIC="XXX.connect.offsets.v1" \
    -e STATUS_STORAGE_TOPIC="XXX.connect.statuses.v1" \
    quay.io/debezium/connect:1.9

现在我必须创建一个源连接器(posgresql-db),我希望kafka connect从源获取的数据在kafka主题中接收。

由于数据库连接器的 json 配置中没有这样的配置,我必须在哪里设置接收器的 kafka 配置?

我必须创造一个连接Kafka主题的接收器吗?如果是,我们在哪里指定这是一个接收器连接器还是一个源连接器??

PS:我已经创建了kafka主题,我想把数据放在那里

请随意提问

共有2个答案

边明煦
2023-03-14

好的,你必须添加CONNECT_PRODUCER_*CONNECT_CONSUMER_*环境变量来指定源或接收器的配置!!!!!!

像这样:

docker run -it --rm --name AAAAAA-kafka-connect -p 8083:8083 \
    -v aaaaa.jks:aaaaa.jks \
    -v bbbbbb.jks:bbbbbb.jks \
    -e LOG_LEVEL=INFO \
    -e HOST_NAME="AAAAAA-kafka-connect" \
    -e HEAP_OPTS="-Xms256m -Xmx2g" \
    -e BOOTSTRAP_SERVERS="BBBBB:9092" \
    -e CONNECT_CLIENT_ID="xxx-kafka-connect" \
    -e CONNECT_SASL_JAAS_CONFIG="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"...\" password=\"...\";" \
    -e CONNECT_SECURITY_PROTOCOL="SASL_SSL" \
    -e CONNECT_SASL_MECHANISM="PLAIN" \
    -e CONNECT_SSL_TRUSTSTORE_LOCATION="bbbbbb.jks" \
    -e CONNECT_SSL_TRUSTSTORE_PASSWORD="..." \
    -e CONNECT_SSL_KEYSTORE_LOCATION="aaaaa.jks" \
    -e CONNECT_SSL_KEYSTORE_PASSWORD="..." \
    -e GROUP_ID="XXX.grp.kafka.connect" \
    -e CONFIG_STORAGE_TOPIC="XXX.connect.configs.v1" \
    -e OFFSET_STORAGE_TOPIC="XXX.connect.offsets.v1" \
    -e STATUS_STORAGE_TOPIC="XXX.connect.statuses.v1" \
    -e CONNECT_PRODUCER_TOPIC_CREATION_ENABLE=false \
    -e CONNECT_PRODUCER_SASL_JAAS_CONFIG="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"...\" password=\"...\";" \
    -e CONNECT_PRODUCER_SECURITY_PROTOCOL="SASL_SSL" \
    -e CONNECT_PRODUCER_SASL_MECHANISM="PLAIN" \
    -e CONNECT_PRODUCER_SSL_TRUSTSTORE_LOCATION="bbbbbb.jks" \
    -e CONNECT_PRODUCER_SSL_TRUSTSTORE_PASSWORD="..." \
    -e CONNECT_PRODUCER_SSL_KEYSTORE_LOCATION="aaaaa.jks" \
    -e CONNECT_PRODUCER_SSL_KEYSTORE_PASSWORD="..." \
    -e CONNECT_PRODUCER_CLIENT_ID="xxx-kafka-connect" \
    -e CONNECT_PRODUCER_TOPIC_CREATION_ENABLE=false \
    quay.io/debezium/connect:1.9

sinksource属性来自连接器。在连接器的json定义中使用的类。然而,Debeziums CDC连接器只能用作从外部数据库系统捕获实时事件更改记录的源连接器(https://hevodata.com/learn/debezium-vs-kafka-connect/#:~:text=Debezium平台有大量来自外部数据库系统的记录。)

殳阳飙
2023-03-14

环境变量仅修改客户端参数。

源和汇是在实际创建连接器时确定的。您需要一个JSON配置,它将有一个< code>connector.class。

在Kafka API中有< code>SinkTask和< code>SourceTask。

Debezium始终是一个来源。来源写信给Kafka;这并不意味着Kafka就是一个水槽。你需要安装一个新的连接器插件来为你的数据库获取一个接收器,比如来自Confluent的JDBC连接器,它有源和接收器的类。

 类似资料:
  • 问题内容: 对于我来说,目前尚不清楚,在这种情况下,我想使用值接收器而不是始终使用指针接收器。 回顾一下文档: 该 文档 还说:“对于基本类型,切片和小型结构之类的类型,值接收器非常便宜,因此,除非该方法的语义要求使用指针,否则值接收器是高效且清晰的。” 首先, 它说“非常便宜”,但问题是它比指针接收器便宜。因此,我做了一个小的基准测试(基于要点的代码),向我展示了,即使对于只有一个字符串字段的结

  • 问题内容: 我一直在从关于指针接收器的话题中出错,我决定用谷歌搜索术语的含义,并且阅读了有关指针接收器的不同资源和文档。例如:http : //golang.org/doc/faq和http://jordanorelli.com/post/32665860244/how- to-use-interfaces-in-go。 虽然,尽管他们谈论这些术语,但仍未能准确定义它们。不过,从上下文来看,我认为

  • 假设您使用一个(阻塞)生物连接器,如下所示: 你做了201个并发连接,最后一个连接会发生什么? 如果您创建了1001个并发连接,那么最后一个连接会发生什么? 接下来,让我们使用一个(非阻塞)NIO连接器 如果我们进行201和1001并发连接,会有什么不同吗? 据我所知,NIO和NIO2之间没有行为差异,只有实现不同,这是真的吗? 我制作了一个小servlet如下所示: 设置这些设置并发出50个并发

  • 我有Kafka主题,有多种类型的消息流入并使用Kafka Connect写入弹性搜索。流看起来不错,直到我不得不将唯一的消息集分离到唯一的索引中。也就是说,我必须根据字段(JSON消息)为新的数据集获取新的索引。 我如何配置/定制Kafka connect以实现同样的功能?每个消息都包含一个表示消息类型和时间戳的字段。 示例 Json 如下所示: Sample1: {“log”:{“data”:“

  • 我正在尝试将来自主题的数据(json数据)写入MySql数据库。我想我需要一个JDBC接收器连接器。 我如何配置连接器以将主题中的json数据映射到如何将数据插入数据库。 我能找到的文件只有这个。 “接收器连接器需要了解架构,因此您应该使用合适的转换器,例如架构注册表附带的Avro转换器,或启用了架构的JSON转换器。如果存在Kafka记录键,则可以是基元类型或连接结构,记录值必须是连接结构。从连

  • 我有一个Kafka连接接收器记录从Kafka主题到S3。它在工作,但太慢了。Kafka主题每秒接收约30000条消息。连接接收器无法跟上。我已经尝试增加Kafka连接器的任务。最大值从1到3,这会创建更多任务,但这似乎无助于提高消息/秒的速度。我试着增加Kafka连接工人的CPU分配,这似乎也没有帮助。 我还能试什么?哪些指标有助于监控以进一步识别瓶颈? 更新:Kafka主题有5个分区。Kafka