当前位置: 首页 > 知识库问答 >
问题:

使用KafkaUtils时手动将偏移范围提交给Kafka。createDirectStream

上官凯泽
2023-03-14

我试图使用Kafka Utils Api从Kafka(0.10.0.0)到Spark(1.6.0)流媒体应用程序使用html" target="_blank">数据

Kafka提尔。createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,InputOpicSet)

要求是将偏移范围手动提交给Kafka本身。

请注意,当在java中使用Kafka消费者(或消费者)对象时,我们可以在参数中设置"enable.auto.commit"="false"后,使用委员会同步或委员会同步方法来实现这一点。

当使用KafkaUtils时,我无法找出同样的方法。

共有1个答案

法景明
2023-03-14

您可以传递"enable.auto.commit"="false"作为kafkaParams的一部分。事实上,您可以传递任何Kafka消费者设置作为其中的一部分。

    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
    kafkaParams.put("enable.auto.commit", true); 
    //more kafka params goes here if needed
    JavaInputDStream<ConsumerRecord<String, String>> stream =
    KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, inputTopicsSet)

然后像这样手动提交偏移量,

stream.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

  // some time later, after outputs have completed
  ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
});

参考:https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html#creating-a-direct-stream

上述代码适用于spark 2.2.0,可能不适用于spark 1.6.0。

 类似资料:
  • 我使用的是camel kafka组件,我不清楚在提交补偿时引擎盖下发生了什么。如下所示,我正在聚合记录,我认为对于我的用例来说,只有在记录保存到SFTP后提交偏移量才有意义。 是否可以手动控制何时可以执行提交?

  • 我目前正在从具有特定偏移量的主题中获取消息。我正在使用寻求()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(委托同步()/委托同步())时,Seek()不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后提交的偏移量中选择。 因此,在使用Seek()时,是否必须将偏移量存储在外部DB中,而不提交给Kafka?Seek和Commit不能并行工作吗? 客户端

  • 我使用MANUAL_IMMEDIATEack模式,Spring-kafka 1.3.9(不能更改为Java8),并在监听器代码中完成处理时提交偏移量。我使用自定义反序列化器及其工作正常,除非我遇到反序列化异常。然后Kafka卡住了。我已经处理了这个由Deserializer,喜欢而不是抛出异常(当反序列化异常发生)我得到一个反序列化对象的新实例,并设置原始消息(导致反序列化异常)在一个字段(异常数

  • 我正在尝试找出使用Spring-Kafka(1.1.0. RELEASE)在Kafka消费者中手动提交偏移的方法。我明白,最好将这些偏移提交给健壮的客户端实现,这样其他消费者就不会处理重复的事件,这些事件最初可能是由现已死亡的消费者处理的,或者因为重新平衡被触发了。 我知道有两种方法可以解决这个问题- > 将ACK_MODE设置为MANUAL_IMMEDIATE,并在侦听器实现中调用ack.ack

  • 我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。 我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。