当前位置: 首页 > 知识库问答 >
问题:

使用kafka streams processor api处理失败消息

闾丘博超
2023-03-14

使用kafka流处理器api

场景:流处理器(使用kafka流处理器api实现)从源主题读取数据,并基于某些业务逻辑将数据写入目标主题。

  Properties props = new Properties();
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsProcessor");
  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dev_cluster.org:9092");
  props.put(StreamsConfig.STATE_DIR_CONFIG, "streams-pipe");
  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);

  Topology topology = new Topology();
  topology.addSource("mySource", "source_topic");
  topology.addProcessor("StreamsProcessor",()->new StreamsProcessor(), "mySource"); 
  topology.addSink("sink1","output_topic","StreamsProcessor");
  topology.addSink("sink2","output_topic2","StreamsProcessor");
  topology.addSink("sink3","output_topic3","StreamsProcessor");

  KafkaStreams streams = new KafkaStreams(topology, props);
  streams.start();
  --------------------------------------------------------------
  public void init(ProcessorContext context) 
  {
      this.context = context;
      context.commit();
  }

  public void process(String key, String Value) 
  {   
      // In a loop send to sink1 sink2 or sink3
      context.forward(key,Value,To.child("sink1"));
  }
  ----------------------------------------------------------------

共有1个答案

龚睿
2023-03-14

您可以impelement某种kafkaProducer,它将是一个messageFailureHandler,使用它,您可以将所有失败的消息发送到一个专用的kafka主题。

如果您对kafka-connect中死信队列的概念很熟悉,那么它也是一样的(此外,在kafka-connect中,它只是一个配置问题)。

 类似资料:
  • 我有一个spring boot应用程序与单个Kafka消费者从一些主题获取消息。但有时在处理消息时会出现错误。 我理解我需要禁用自动提交并手动提交成功的消息,但是,在这种情况下,如果我没有为这个异常情况抛出任何异常,并手动提交每个下一个成功的消息,那么我将丢失前一个不成功的消息,对吗?

  • 类项目: hbm文件: 方法如下:

  • 我在spark streaming应用程序中看到一些失败的批处理,原因是与内存相关的问题,如 无法计算拆分,找不到块输入-0-1464774108087

  • 我有一个生产者/消费者模式,如下所示 固定数量的生成器线程,每个线程写入它们自己的BlockingQueue,通过执行器调用 单个使用者线程,读取生产者线程 每个生产者都在运行一个数据库查询,并将结果写入其队列。消费者轮询所有生产者队列。目前,如果出现数据库错误,生产者线程就会死掉,然后消费者就会永远停留在产品队列中等待更多的结果。 我应该如何构造它来正确处理catch错误?

  • 11.1 日志项处理和失败 一个常见的用例是需要在一个步骤中特殊处理错误,chunk-oriented步骤(从创建工厂bean的这个步骤)允许用户实现一个简单的ItemReadListener用例,用来监听读入错误,和一个ItemWriteListener,用来监听写出错误.下面的代码片段说明一个监听器监听失败日志的读写: >public class ItemFailureLoggerListen