当前位置: 首页 > 知识库问答 >
问题:

Kafka Connect:多个DB2 JDBC源连接器失败

葛意远
2023-03-14

我正在尝试在本地Docker容器中使用Kafka Connect(使用官方的ConFluent映像),以便将DB2数据推送到OpenShift(在AWS上)上的Kafka集群。我在使用DB2 JDBC-Jar时使用了ConFluent JDBC连接器。我有不同的连接器配置,因为我使用带有“transforms.create键”的SMT(创建我的密钥),并且我表中的键列有不同的名称。

以下是我的步骤:

  • 为Kafka Connect创建配置、偏移量和状态的主题
  • 启动/创建Kafka Connect容器(使用env vars见下文)
  • 通过对我的Connect容器的后调用创建第一个JDBC连接器(配置见下文)

到目前为止,一切都运行良好,我可以看到我的数据被推送到集群。但是,当我通过post call添加第二个JDBC连接器时,第一个连接器停止向集群推送数据,而第二个连接器开始并继续加载和推送数据。有一小段时间,似乎两个连接器都将数据推送到群集,但我假设这可能是来自连接器1的仍在刷新的数据。问题是a)即使跟踪日志也不会显示有意义的错误(至少对我来说),b)显示的错误在不同的尝试之间是不同的(我总是删除所有的主题和容器)。

我假设这不是bug,而是需要适当设置的配置组合和/或我缺乏对一些基本Kafka Connect核心功能的理解。我已经尝试添加和更改各种配置,但不幸的是,到目前为止一切都没有成功。我已经尝试了很多次,但运气不好。我附上了我最近两次尝试的日志以及配置。

有人知道我可以调整哪个配置或查看什么来修复此问题吗?感谢任何帮助-谢谢!

Kafka: 2.0.0
Docker image: confluentinc/cp-kafka-connect:5.0.0
DB2: 10.5
JDBC Jar: db2jcc4.jar with version 4.19.76

日志第一次尝试:

[2018-12-17 13:09:15,683] ERROR Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this (org.apache.kafka.connect.storage.OffsetStorageWriter)
[2018-12-17 13:09:15,684] ERROR WorkerSourceTask{id=db2-jdbc-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
    at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:110)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:409)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:238)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
[2018-12-17 13:09:15,686] ERROR WorkerSourceTask{id=db2-jdbc-source-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
[2018-12-17 13:09:15,686] INFO [Producer clientId=producer-4] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer)
[2018-12-17 13:09:20,682] ERROR Graceful stop of task db2-jdbc-source-0 failed. (org.apache.kafka.connect.runtime.Worker)
[2018-12-17 13:09:20,682] INFO Finished stopping tasks in preparation for rebalance (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

日志第二次尝试:

[2018-12-17 14:01:31,658] INFO Stopping task db2-jdbc-source-0 (org.apache.kafka.connect.runtime.Worker)
[2018-12-17 14:01:31,689] INFO Stopped connector db2-jdbc-source (org.apache.kafka.connect.runtime.Worker)
[2018-12-17 14:01:31,784] INFO WorkerSourceTask{id=db2-jdbc-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2018-12-17 14:01:31,784] INFO WorkerSourceTask{id=db2-jdbc-source-0} flushing 20450 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2018-12-17 14:01:36,733] ERROR Graceful stop of task db2-jdbc-source-0 failed. (org.apache.kafka.connect.runtime.Worker)
[2018-12-17 14:01:36,733] INFO Finished stopping tasks in preparation for rebalance (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

Kafka集群中每秒传入消息的截图

Kafka Connect Docker env变量:

-e CONNECT_BOOTSTRAP_SERVERS=my_kafka_cluster:443 \
  -e CONNECT_PRODUCER_BOOTSTRAP_SERVERS="my_kafka_cluster:443" \
  -e CONNECT_REST_ADVERTISED_HOST_NAME="kafka-connect" \
  -e CONNECT_REST_PORT=8083 \
  -e CONNECT_GROUP_ID="kafka-connect-group" \
  -e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=3 \
  -e CONNECT_CONFIG_STORAGE_TOPIC="kafka-connect-config" \
  -e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=3 \
  -e CONNECT_OFFSET_STORAGE_TOPIC="kafka-connect-offset" \
  -e CONNECT_OFFSET_FLUSH_INTERVAL_MS=15000 \
  -e CONNECT_OFFSET_FLUSH_TIMEOUT_MS=60000 \
  -e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=3 \
  -e CONNECT_STATUS_STORAGE_TOPIC="kafka-connect-status" \
  -e CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter" \
  -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://url_to_schemaregistry \
  -e CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" \
  -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://url_to_schemaregistry \
  -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
  -e CONNECT_INTERNAL_KEY_CONVERTER_SCHEMAS_ENABLE="false" \
  -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.html" target="_blank">json.JsonConverter" \
  -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE="false" \
  -e CONNECT_PLUGIN_PATH=/usr/share/java \
  -e CONNECT_PRODUCER_BUFFER_MEMORY="8388608" \
  -e CONNECT_SECURITY_PROTOCOL="SSL" \
  -e CONNECT_PRODUCER_SECURITY_PROTOCOL="SSL" \
  -e CONNECT_SSL_TRUSTSTORE_LOCATION="/usr/share/kafka.client.truststore.jks" \
  -e CONNECT_PRODUCER_SSL_TRUSTSTORE_LOCATION="/usr/share/kafka.client.truststore.jks" \
  -e CONNECT_SSL_TRUSTSTORE_PASSWORD="my_ts_pw" \
  -e CONNECT_PRODUCER_SSL_TRUSTSTORE_PASSWORD="my_ts_pw" \
  -e CONNECT_LOG4J_LOGGERS=org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR \
  -e CONNECT_LOG4J_ROOT_LOGLEVEL=INFO \
  -e HOSTNAME=kafka-connect \

JDBC连接器(只有表和键列不同):

{
    "name": "db2-jdbc-source",
    "config": 
    {
        "mode":"timestamp",
        "debug":"true",
        "batch.max.rows":"50",
        "poll.interval.ms":"10000",
        "timestamp.delay.interval.ms":"60000",
        "timestamp.column.name":"IBMSNAP_LOGMARKER",
        "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector" ,
        "connection.url":"jdbc:db2://myip:myport/mydb:currentSchema=myschema;",
        "connection.password":"mypw",
        "connection.user":"myuser",
        "connection.backoff.ms":"60000",
        "dialect.name": "Db2DatabaseDialect",
        "table.types": "TABLE",
        "table.poll.interval.ms":"60000",
        "table.whitelist":"MYTABLE1",
        "tasks.max":"1",
        "topic.prefix":"db2_",
        "key.converter":"io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url":"http://url_to_schemaregistry",
        "value.converter":"io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url":"http://url_to_schemaregistry",
        "transforms":"createKey",
        "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.createKey.fields":"MYKEY1"
    }
}

共有1个答案

栾英资
2023-03-14

我最终解决了问题:我在时间戳模式下使用JDBC连接器,而不是时间戳递增,因为我不能(总是)指定递增列。我意识到这可能会导致一个问题,即当存在多个具有相同时间戳的条目时,Connect无法知道哪些条目已经被读取。

我的数据行的很大一部分具有相同的时间戳。当我添加第二个连接器时,第一个连接器的当前时间戳被存储,Connect 开始重新平衡,因此丢失了该 stimestamp 的哪些行已被读取的信息。当连接器再次启动并运行时,第一个连接器继续使用“下一个时间戳”,因此仅加载最新的行(这只是一小部分)。

我的错误是假设,在这种情况下,第一个连接器将重新开始使用前一个时间戳,而不是继续使用“下一个时间”。对我来说,宁可冒着重复的风险,也不要冒着可能丢失数据的风险。

 类似资料:
  • 我参考了以下链接来了解Kafka的HDFS连接https://docs.confluent.io/2.0.0/Connect/connect-hdfs/docs/index.html 我能够通过配置单元集成将数据从Kafka导出到HDFS。 现在我正尝试在Java程序的帮助下将avro记录写入Kafka 当我把Avro记录写到Kafka主题时,我在Connect中出现以下错误

  • 我已经使用Kafka的汇流本地集群为Kaffa和m安装了Aerospike所需的所有配置,并已安装https://www.confluent.io/hub/aerospike/kafka-connect-aerospike-source并已开始汇流群集,但连接器仍未启动 我还发现合流的共享文件夹中没有jar,它还在开发中吗?

  • 我有一个需求,即我们应用程序之外的源将在S3存储桶中放置一个文件,我们必须在kafka主题中加载该文件。我正在查看ConFluent的S3 Source连接器,目前正在努力定义在我们的环境中设置连接器的配置。但是有几篇文章指出,只有在您使用S3 Sink连接器将文件放在S3中时,才能使用S3 Source连接器。 以上是真的吗?在配置中,我在哪里/使用什么属性来定义输出主题?当阅读S3的文章并把它

  • 我正在尝试使用Kafka连接接收器将文件从Kafka写入HDFS。 我的属性看起来像: 有什么建议吗?

  • 分布式模式下Kafka Connect集群的偏移管理行为是什么,即运行多个连接器并监听同一组主题(或一个主题)? 因此,在分布式模式下,Kafka Connect 会将偏移量信息存储在 Kafka 中,此偏移量将由集群中的工作线程读取和提交。如果我在该 Kafka Connect 集群中运行多个连接器侦听同一主题,会发生什么情况?分区的偏移量是否与所有连接器相同,或者每个连接器在分区上的偏移量是否

  • 问题内容: 目前,我的连接 mongoose.js 具有以下代码: 需要连接的文件是 test.js : 如何更新mongoose.js以使用mongoose.createConnection(…)函数使用多个连接? 当我进行如下更改时,我仅从一个连接的更改开始: 我得到“未定义不是函数”。如果我使用此代码: 我收到“错误:尝试打开未关闭的连接” 有什么建议吗? 问题答案: mongoose通过