当前位置: 首页 > 知识库问答 >
问题:

如何在Kafka接收器连接器中手动提交偏移量

朱炜
2023-03-14

我有一个Kafka接收器任务,通过put()方法收听Kafka主题
但我不想自动提交偏移量,因为一旦从Kafka取出记录,我就有一些处理逻辑
从Kafka获取记录后,如果处理成功,则只有我想提交偏移量,否则它应该再次从同一偏移量读取。

我可以在Kafka consumer中看到方法commitSync(),但在Sink Connector中找不到替代方法。

共有2个答案

柳英资
2023-03-14

添加此属性:("enable.auto.commit","false")

使可能汽车commit的默认值为true,第二个属性为auto。犯罪间隔ms的默认值为5000

皇甫浩壤
2023-03-14

接收Kafka连接器提交

如果选项(enable.auto.commit)为False,则根据下面的选项(offset.flush.interval.ms)每60秒自动提交一次。如果put()方法中没有错误,它将正常提交。

offset.flush.interval.ms
Interval at which to try committing offsets for tasks.

Type: long
Default: 60000
Importance: low

在Sink Kafka中管理偏移量

Kafka Connect应提交通过预提交传递给连接器的所有偏移量。但是,如果预提交返回一组空的偏移量,那么Kafka Connect将不会记录任何偏移量。在此处输入链接描述

SinkTask.java

/**
 * Pre-commit hook invoked prior to an offset commit.
 *
 * The default implementation simply invokes {@link #flush(Map)} and is thus able to assume all {@code currentOffsets} are committable.
 *
 * @param currentOffsets the current offset state as of the last call to {@link #put(Collection)}},
 *                       provided for convenience but could also be determined by tracking all offsets included in the {@link SinkRecord}s
 *                       passed to {@link #put}.
 *
 * @return an empty map if Connect-managed offset commits are not desired, otherwise a map of committable offsets by topic-partition.
 */
public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
    flush(currentOffsets);
    return currentOffsets;
}

SinkTaskContext.java

/**
 * Request an offset commit. Sink tasks can use this to minimize the potential for redelivery
 * by requesting an offset commit as soon as they flush data to the destination system.
 *
 * This is a hint to the runtime and no timing guarantee should be assumed.
 */
void requestCommit();
 类似资料:
  • 我想重置AerospikeSink Kafka Connector偏移量,我首先删除连接器消费组()偏移量,然后重新创建它。当我使用策略重新创建时,它以正确的偏移量重新创建,但是然后,当任务状态从更改为任务时,它会从连接器的前一个实例到达的点继续处理,这会阻止从一开始就读取来自kafka的所有消息(我正在尝试再次读取来自kafka的所有消息)。 注意:使用新名称创建新连接器并不能解决问题。 使用任

  • 我们正在使用kafka拓扑转发向kafka主题发送记录。 我们之前使用了一个单独的生产者来发布消息,我们能够获取消息的偏移量和分区。现在我们想用上下文替换它。向前地 如何使用上下文获取Kafka接收器处理器发送的记录的偏移量和分区。向前地

  • 我使用MANUAL_IMMEDIATEack模式,Spring-kafka 1.3.9(不能更改为Java8),并在监听器代码中完成处理时提交偏移量。我使用自定义反序列化器及其工作正常,除非我遇到反序列化异常。然后Kafka卡住了。我已经处理了这个由Deserializer,喜欢而不是抛出异常(当反序列化异常发生)我得到一个反序列化对象的新实例,并设置原始消息(导致反序列化异常)在一个字段(异常数

  • 我使用的是camel kafka组件,我不清楚在提交补偿时引擎盖下发生了什么。如下所示,我正在聚合记录,我认为对于我的用例来说,只有在记录保存到SFTP后提交偏移量才有意义。 是否可以手动控制何时可以执行提交?

  • 我正在尝试将来自主题的数据(json数据)写入MySql数据库。我想我需要一个JDBC接收器连接器。 我如何配置连接器以将主题中的json数据映射到如何将数据插入数据库。 我能找到的文件只有这个。 “接收器连接器需要了解架构,因此您应该使用合适的转换器,例如架构注册表附带的Avro转换器,或启用了架构的JSON转换器。如果存在Kafka记录键,则可以是基元类型或连接结构,记录值必须是连接结构。从连