当前位置: 首页 > 知识库问答 >
问题:

kafka连接重启失败的任务

叶声
2023-03-14

我们有一个从rdbms读取并放入kafka的源连接器。它使用带有avro模式的模式注册表。我分别在kafka连接日志和模式注册表日志中发现以下异常

1.

 Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:426) 
WorkerSourceTask{id=A-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:443) 
Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:186) 
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
.
.
Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic A :
    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:91)
    at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
.
.
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema:
.
.
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Register operation timed out; error code: 50002
.
.
Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:187)    
Stopping JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask:314) 
Closing the Kafka producer with timeoutMillis = 30000 ms. 
(org.apache.kafka.clients.producer.KafkaProducer:1182) 

     2.

 Wait to catch up until the offset at 1 (io.confluent.kafka.schemaregistry.storage.KafkaStore:304)  
    Request Failed with exception  (io.confluent.rest.exceptions.DebuggableExceptionMapper:62) 
        io.confluent.kafka.schemaregistry.rest.exceptions.RestSchemaRegistryTimeoutException: Register operation timed out
            at io.confluent.kafka.schemaregistry.rest.exceptions.Errors.operationTimeoutException(Errors.java:132)
    .
    .
    Caused by: io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryTimeoutException: Write to the Kafka store timed out while 
    at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.register(KafkaSchemaRegistry.java:508)
        at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.registerOrForward(KafkaSchemaRegistry.java:553)
    .
    .
    Caused by: io.confluent.kafka.schemaregistry.storage.exceptions.StoreTimeoutException: KafkaStoreReaderThread failed to reach target offset within the timeout interval. targetOffset: 3, offsetReached: 1, timeout(ms): 50

0

因此,基本上模式注册表在注册模式之前将偏移量移动到最新,然后超时500毫秒。

我的问题是这个。

>

  • 我怎样才能找到为什么它不能从kafka读取?

    源连接器任务是否重新启动或轮询一个连接器的失败任务的数据?因为在日志的后面部分,我看到了这一点。

    提交偏移量 (org.apache.kafka.connect.runtime.WorkerSourceTask:426) WorkerSourceTask{id=A-0} 刷新 0 个用于偏移提交的未完成消息 (org.apache.kafka.connect.runtime.WorkerSourceTask:443)

    所以在这之后它失败了,但现在它没有打印它,这意味着它通过了。需要注意的关键是,当它无法读取时,它只失败了一个连接器 A 的任务,而其他连接器则通过了。后来我没有找到连接器 A 的异常。如果任务未启动或连接器未再次轮询,我需要使用 rest API 重新启动任务。

    任何帮助都将得到极大的回报。提前感谢。

  • 共有1个答案

    边翔宇
    2023-03-14

    关于您的问题标题,请阅读错误。

    任务在手动重新启动之前无法恢复

    如果您有多个任务,您仍然希望看到来自其他任务的日志。

    至于偏移提交,直到任务成功,源任务偏移才会被提交,并且没有给定的日志显示“移动到最新”

    这个错误与阅读Kafka无关。错误是AvroConverter中的模式注册表客户端超时,这不是Kafka Connect所必需的。

     类似资料:
    • 错误:无法启动连接:错误:WebSocket无法连接。在服务器上找不到连接,endpoint可能不是信号器endpoint,服务器上不存在连接ID,或者存在阻止WebSocket的代理。如果有多台服务器,请检查是否启用了粘性会话。 WebSocketTransport.js:49WebSocket连接到“ws://xxxxxx/生产/网络服务/集线器/spreadhub”失败: Angular.t

    • 我正在尝试使用Kafka连接接收器将文件从Kafka写入HDFS。 我的属性看起来像: 有什么建议吗?

    • 我正在使用kafka 2.0,kafka connect在分布式模式下运行,并尝试配置debezium mysql连接器,但得到错误 电话是这样的:

    • 问题内容: 我正在开发一个应用程序,我想做的是从android中的服务器数据库接收数据。因此,我开始运行一些教程。我发现一个正在做我想做的事。但我得到: 本教程的整个代码在这里链接 发生错误: 我检查了一下: 防火墙已禁用 ping正在工作 连接字符串与从服务器管理界面运行servlet时完全相同 --servlet正在运行 我的配置: 服务器:Glass Fish 4.0 Android应用程序

    • 当我运行spring boot应用程序时,我得到以下错误: 应用程序启动失败 描述: