使用kafka流处理器api
场景:流处理器(使用kafka流处理器api实现)从源主题读取数据,并基于某些业务逻辑将数据写入目标主题。
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsProcessor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dev_cluster.org:9092");
props.put(StreamsConfig.STATE_DIR_CONFIG, "streams-pipe");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
Topology topology = new Topology();
topology.addSource("mySource", "source_topic");
topology.addProcessor("StreamsProcessor",()->new StreamsProcessor(), "mySource");
topology.addSink("sink1","output_topic","StreamsProcessor");
topology.addSink("sink2","output_topic2","StreamsProcessor");
topology.addSink("sink3","output_topic3","StreamsProcessor");
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
--------------------------------------------------------------
public void init(ProcessorContext context)
{
this.context = context;
context.commit();
}
public void process(String key, String Value)
{
// In a loop send to sink1 sink2 or sink3
context.forward(key,Value,To.child("sink1"));
}
----------------------------------------------------------------
您可以impelement某种kafkaProducer,它将是一个messageFailureHandler,使用它,您可以将所有失败的消息发送到一个专用的kafka主题。
如果您对kafka-connect中死信队列的概念很熟悉,那么它也是一样的(此外,在kafka-connect中,它只是一个配置问题)。
我有一个spring boot应用程序与单个Kafka消费者从一些主题获取消息。但有时在处理消息时会出现错误。 我理解我需要禁用自动提交并手动提交成功的消息,但是,在这种情况下,如果我没有为这个异常情况抛出任何异常,并手动提交每个下一个成功的消息,那么我将丢失前一个不成功的消息,对吗?
类项目: hbm文件: 方法如下:
我在spark streaming应用程序中看到一些失败的批处理,原因是与内存相关的问题,如 无法计算拆分,找不到块输入-0-1464774108087
我有一个生产者/消费者模式,如下所示 固定数量的生成器线程,每个线程写入它们自己的BlockingQueue,通过执行器调用 单个使用者线程,读取生产者线程 每个生产者都在运行一个数据库查询,并将结果写入其队列。消费者轮询所有生产者队列。目前,如果出现数据库错误,生产者线程就会死掉,然后消费者就会永远停留在产品队列中等待更多的结果。 我应该如何构造它来正确处理catch错误?
11.1 日志项处理和失败 一个常见的用例是需要在一个步骤中特殊处理错误,chunk-oriented步骤(从创建工厂bean的这个步骤)允许用户实现一个简单的ItemReadListener用例,用来监听读入错误,和一个ItemWriteListener,用来监听写出错误.下面的代码片段说明一个监听器监听失败日志的读写: >public class ItemFailureLoggerListen