public void streamHandler() {
Properties props = getKafkaProperties();
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> processStream = builder.stream("INTER_TOPIC",
Consumed.with(Serdes.String(), Serdes.String()));
//processStream.peek((key,value)->System.out.println("key :"+key+" value :"+value));
processStream.map((key, value) -> getTransformer().transform(key, value)).filter((key,value)->filteroutFailedRequest(key,value)).to("FINAL_TOPIC", Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams IStreams = new KafkaStreams(builder.build(), props);
IStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throw-able e) {
logger.error("Thread Name :" + t.getName() + " Error while processing:", e);
}
});
IStreams.cleanUp();
IStreams.start();
try {
System.in.read();
} catch (IOException e) {
logger.error("Failed streaming ",e);
}
}
但是我的接收器只在2个分区中获取数据,但是我配置了20个流线程,并且我验证了我的生产者正在写入所有20个分区,如何知道我的转换节点转发到我的FINAL_TOPIC的所有20个分区
30 Sep 2019 10:39:41,416 INFO c.j.m.s.StreamHandler [289] [streams-user-61a77203-9afc-4c66-843d-94c20a509793-StreamThread-3] Received
30 Sep 2019 10:39:41,416 INFO c.j.m.s.StreamHandler [289] [streams-user-61a77203-9afc-4c66-843d-94c20a509793-StreamThread-4] Received
30 Sep 2019 10:39:41,416 INFO c.j.m.s.StreamHandler [289] [streams-user-61a77203-9afc-4c66-843d-94c20a509793-StreamThread-3] Received
30 Sep 2019 10:39:41,416 INFO c.j.m.s.StreamHandler [289] [streams-user-61a77203-9afc-4c66-843d-94c20a509793-StreamThread-4] Received
30 Sep 2019 10:40:57,427 INFO c.j.m.s.StreamHandler [289] [streams-user-61a77203-9afc-4c66-843d-94c20a509793-StreamThread-3] Received
30 Sep 2019 10:40:57,427 INFO c.j.m.s.StreamHandler [289] [streams-user-61a77203-9afc-4c66-843d-94c20a509793-StreamThread-4] Received
30 Sep 2019 10:40:57,427 INFO c.j.m.s.StreamHandler [289] [streams-user-61a77203-9afc-4c66-843d-94c20a509793-StreamThread-3] Received
30 Sep 2019 10:40:57,427 INFO c.j.m.s.StreamHandler [289] [streams-user-61a77203-9afc-4c66-843d-94c20a509793-StreamThread-4] Received
partition-er是循环的
为什么你认为分区器是循环的?默认情况下,Kafka Streams根据键应用基于哈希的分区。
如果要更改默认分区器,可以实现接口streampartitioner
并通过以下途径传递:
Produced.with(Serdes.String(), Serdes.String())
.withStreamPartitioner(...)
我们正在使用kafka拓扑转发向kafka主题发送记录。 我们之前使用了一个单独的生产者来发布消息,我们能够获取消息的偏移量和分区。现在我们想用上下文替换它。向前地 如何使用上下文获取Kafka接收器处理器发送的记录的偏移量和分区。向前地
对于传入记录,我需要验证值,并且基于结果对象,我需要将错误转发到不同的主题,如果成功验证,则使用context.forward()转发相同的错误。可以使用本链接中提供的DSL来完成 现在,调用者再次需要检查并根据键来区分接收器主题。我使用processorAPI是因为我需要use头。 编辑: 当条件为false时,如何推送到不同的流。当前正在创建另一个谓词,该谓词收集链中不满足上述谓词的所有其他记
乡亲们, 我在BigQuery中使用融合连接器进行数据传输。对于每个事件,我都创建了一个将在BigQuery中解释的avro模式。是否可以将任何事件字段定义为表分区? 它目前正在使用_PARTITIONTIME隐藏字段,但我需要它成为实际事件的字段,以便更容易处理重复。
我有一个kafka主题,有200万条消息,我的刷新大小是100000,默认分区为分布式模式,有4个工作者,我可以看到数据在几秒钟内立即写入HDFS(10到15秒)。 我看到创建了一个+tmp目录和文件夹,并且每次触发一个新连接器时都会创建主题。 kafka connect的行为是每次都写得这么快,还是已经将数据存储在HDFS中,并根据连接器属性将其移动到主题目录? 我需要清楚这是怎么发生的。如果我
我有一个spring boot应用程序,用于向kafka发送消息。该应用程序在每天1000万个请求的高流量下在6个实例上运行。我也有一款春装Kafka消费应用。但是这个应用程序有2个实例,这些实例不能使用所有的消息,因为这个应用程序运行的是单线程。我的主题有4个分区,我想根据分区数做消费者应用程序多线程。但是我不确定我的代码是否有效。 SpringKafka独立 配置类 根据我主题的分区计数,我将
我已经创建了一个Java记录,并且希望有一个构造函数,与默认构造函数相比,它可以接受更少的参数,并根据给定的参数计算和初始化所有成员。 然而,我发现这很难实现,因为自定义构造函数的第一行必须调用默认构造函数。我目前的方法是根据需要调用计算函数,但这会导致不必要的处理。 肯定有更好的方法来实现这一点吗?