我有一个Kafka接收器任务,通过put()
方法收听Kafka主题
但我不想自动提交偏移量,因为一旦从Kafka取出记录,我就有一些处理逻辑
从Kafka获取记录后,如果处理成功,则只有我想提交偏移量,否则它应该再次从同一偏移量读取。
我可以在Kafka consumer中看到方法commitSync()
,但在Sink Connector
中找不到替代方法。
添加此属性:("enable.auto.commit","false")
使可能汽车commit的默认值为true,第二个属性为auto。犯罪间隔ms的默认值为5000
接收Kafka连接器提交
如果选项(enable.auto.commit
)为False,则根据下面的选项(offset.flush.interval.ms
)每60秒自动提交一次。如果put()
方法中没有错误,它将正常提交。
offset.flush.interval.ms
Interval at which to try committing offsets for tasks.
Type: long
Default: 60000
Importance: low
在Sink Kafka中管理偏移量
Kafka Connect应提交通过预提交传递给连接器的所有偏移量。但是,如果预提交返回一组空的偏移量,那么Kafka Connect将不会记录任何偏移量。在此处输入链接描述
SinkTask.java
/**
* Pre-commit hook invoked prior to an offset commit.
*
* The default implementation simply invokes {@link #flush(Map)} and is thus able to assume all {@code currentOffsets} are committable.
*
* @param currentOffsets the current offset state as of the last call to {@link #put(Collection)}},
* provided for convenience but could also be determined by tracking all offsets included in the {@link SinkRecord}s
* passed to {@link #put}.
*
* @return an empty map if Connect-managed offset commits are not desired, otherwise a map of committable offsets by topic-partition.
*/
public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
flush(currentOffsets);
return currentOffsets;
}
或
SinkTaskContext.java
/**
* Request an offset commit. Sink tasks can use this to minimize the potential for redelivery
* by requesting an offset commit as soon as they flush data to the destination system.
*
* This is a hint to the runtime and no timing guarantee should be assumed.
*/
void requestCommit();
我想重置AerospikeSink Kafka Connector偏移量,我首先删除连接器消费组()偏移量,然后重新创建它。当我使用策略重新创建时,它以正确的偏移量重新创建,但是然后,当任务状态从更改为任务时,它会从连接器的前一个实例到达的点继续处理,这会阻止从一开始就读取来自kafka的所有消息(我正在尝试再次读取来自kafka的所有消息)。 注意:使用新名称创建新连接器并不能解决问题。 使用任
我们正在使用kafka拓扑转发向kafka主题发送记录。 我们之前使用了一个单独的生产者来发布消息,我们能够获取消息的偏移量和分区。现在我们想用上下文替换它。向前地 如何使用上下文获取Kafka接收器处理器发送的记录的偏移量和分区。向前地
我使用MANUAL_IMMEDIATEack模式,Spring-kafka 1.3.9(不能更改为Java8),并在监听器代码中完成处理时提交偏移量。我使用自定义反序列化器及其工作正常,除非我遇到反序列化异常。然后Kafka卡住了。我已经处理了这个由Deserializer,喜欢而不是抛出异常(当反序列化异常发生)我得到一个反序列化对象的新实例,并设置原始消息(导致反序列化异常)在一个字段(异常数
我使用的是camel kafka组件,我不清楚在提交补偿时引擎盖下发生了什么。如下所示,我正在聚合记录,我认为对于我的用例来说,只有在记录保存到SFTP后提交偏移量才有意义。 是否可以手动控制何时可以执行提交?
我正在尝试将来自主题的数据(json数据)写入MySql数据库。我想我需要一个JDBC接收器连接器。 我如何配置连接器以将主题中的json数据映射到如何将数据插入数据库。 我能找到的文件只有这个。 “接收器连接器需要了解架构,因此您应该使用合适的转换器,例如架构注册表附带的Avro转换器,或启用了架构的JSON转换器。如果存在Kafka记录键,则可以是基元类型或连接结构,记录值必须是连接结构。从连