"UserID":111,"UpdateTime":06-13-2018 12:13:43.200Z,"Comments":2,"Like":10
"UserID":111,"UpdateTime":06-13-2018 12:13:40.200Z,"Comments":0,"Like":6
"UserID":222,"UpdateTime":06-13-2018 12:13:43.200Z,"Comments":1,"Like":10
"UserID":111,"UpdateTime":06-13-2018 12:13:44.600Z,"Comments":3,"Like":12
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-sorting");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("UnsortedMessages");
TimeWindowedKStream<String, String> countss = source.groupByKey().windowedBy(TimeWindows.of(10000L)
.until(10000L));
/*
SORTING CODE
*/
outputMessage.toStream().to("SortedMessages", Produced.with(Serdes.String(), Serdes.Long()));
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-sorting-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
事先多谢。
如果您希望忽略键对消息进行排序,那么只有在输入主题具有与输出主题相同的分区数的情况下,才能根据分区进行排序。对于这种情况,您应该提取分区号并将其用作消息键(cf:https://docs.confluent.io/current/streams/faq.html#accessing-record-metadata-suck-as-topic-partition-and-offset-information)
对于排序来说,就比较棘手了。注意,Kafka Streams遵循“连续输出”模型,并使用DSL为每个输入记录发出更新。因此,使用处理器API可能更好。您可以使用带有附加存储的处理器
并将记录放入存储。作为内存中的结构,您保留一个排序的记录列表。当时间向前推进时,您可以发出“完成”窗口,并从存储中删除相应的记录。
我不认为你可以使用DSL来建立这个。
我的Kafka publisher发送以下格式的字符串消息: 例如: 另外,我们为每个消息添加一些消息键,将它们发送到相应的分区。 我如何在1分钟窗口中重新排序消息并将它们发送到另一个主题?
有没有办法在Kafka消息有效载荷中添加时间戳标头?我想检查消息是何时在消费者端创建的,并基于此应用自定义逻辑。 编辑: 我试图找到一种方法,将一些自定义值(基本上是时间戳)附加到生产者发布的消息上,这样我就可以在特定的时间段内消费消息。现在Kafka只确保消息将按照它们被放入队列的顺序传递。但是在我的例子中,先前生成的记录可能在某个延迟之后到达(因此在时间T1生成的消息可能比在稍后时间T2生成的
任何建议或忠告都会真的很有帮助。提前道谢。
根据生产者配置,有:重试和max.in.flight.requests.per.connection.假设重试 在主题的一个分区中,消息是否可以无序到达(例如,如果第一条消息有重试次数,但是第二条消息在第一次尝试时被传递给代理)? 或者乱序只发生在主题的几个分区之间,但在分区内顺序被保留?
我没有找到任何关于Debezium如何设置Kafka消息时间戳的文档,以及是否设置了时间戳。 通过比较这些值,Kafka消息的时间戳总是在数据库表(source.ts_ms)更改的时间戳之后,也在Debezium(ts_ms)处理更改的时间之后。这表明Kafka消息时间戳只是Kafka代理设置为摄入时间的时间戳。 有人知道关于Debezium是否以及如何在其填充的接收器主题中设置Kafka消息时间
我有一个Java Spring Kafka应用程序,它将Book类型的对象发送到Kafka主题。然后我试着用Kafka流来映射每一条信息,把这本书的作者作为它的关键字。然后,我尝试将它们添加到一个KTable中,该表保存了密钥和拥有该密钥的消息的数量。然后,该表被发送到一个输出kafka主题。 书籍型号: 流结构: 运行应用后,输出显示每个唯一键,但随机表情,而不是计数。 在搞砸之后,我发现了一件