我目前正在尝试使用KStream到KTable的连接来执行Kafka主题的充实。对于我的概念证明,我目前有一个Kafka流,其中有大约600,000条记录,它们都有相同的键,还有一个KTable,它是从一个主题创建的,其中KTable主题中的键与创建KStream的主题中的600,000条记录中的键匹配。
当我使用左联接(通过下面的代码)时,所有记录在ValueJoiner上都返回NULL。
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe-json-parse-" + System.currentTimeMillis());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xx.xx.xxx:9092");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.streams.processor.WallclockTimestampExtractor");
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 5);
final StreamsBuilder builder = new StreamsBuilder();
// Build a Kafka Stream from the Netcool Input Topic
KStream<String, String> source = builder.stream("output-100k");
// Join the KStream to the KTable
KStream<String, String> enriched_output = source
.leftJoin(netcool_enrichment, (orig_msg, description) -> {
String new_msg = jsonEnricher(orig_msg, description);
if (description != null) {
System.out.println("\n[DEBUG] Enriched Input Orig: " + orig_msg);
System.out.println("[DEBUG] Enriched Input Desc: " + description);
System.out.println("[DEBUG] Enriched Output: " + new_msg);
}
return new_msg;
});
下面是来自源KStream的示例输出记录(使用forEach循环):
[KSTREAM] Key: ismlogs
[KSTREAM] Value: {"severity":"debug","ingested_timestamp":"2018-07-18T19:32:47.227Z","@timestamp":"2018-06-28T23:36:31.000Z","offset":482,"@metadata":{"beat":"filebeat","topic":"input-100k","type":"doc","version":"6.2.2"},"beat":{"hostname":"abc.dec.com","name":"abc.dec.com","version":"6.2.2"},"source":"/root/100k-raw.txt","message":"Thu Jun 28 23:36:31 2018 Debug: Checking status of file /ism/profiles/active/test.xml","key":"ismlogs","tags":["ismlogs"]}
我尝试将KTable转换回KStream,并在转换后的流上使用forEach循环,我验证了KTable中的记录是否存在。
KTable<String, String> enrichment = builder.table("enrichment");
KStream<String, String> ktable_debug = enrichment.toStream();
ktable_debug.foreach(new ForeachAction<String, String>() {
public void apply(String key, String value) {
System.out.println("[KTABLE] Key: " + key);
System.out.println("[KTABLE] Value: " + value);
}
});
上面的代码输出:
[KTABLE] Key: "ismlogs"
[KTABLE] Value: "ISM Logs"
根据您的控制台消息,密钥是不同的,因此它们不会加入:
[KSTREAM] Key: ismlogs
[KTABLE] Key: "ismlogs"
对于ktable
来说,关键字实际上是带有双引号的“ismlogs”
。
我正在使KStream-KStream连接,其中创建2个内部主题。而KStream-KTable join将创建1个内部主题+1个表。 就性能和其他因素而言,哪个更好?
我需要帮助理解在Kafka2.2中使用max.task.idle.ms时的Kafka流行为。 我有一个KStream-KTable联接,其中KStream已被重新键入: 所有主题都有10个分区,为了测试,我将max.task.idle.ms设置为2分钟。myTimeExtractor只有在消息被标记为“快照”时才更新消息的事件时间:stream1中的每个快照消息都将其事件时间设置为某个常数T,st
我试图根据Kafka的文档实现这个连接。 我不知道为什么这个连接不起作用。。。 首先我通过了所有的值。 如果未加入序列化选项,我将收到此运行时异常: 线程“StreamAPP-stream-event-b3dc5fff-abee-4fa0-92f9-e1690f8fd152-StreamThread-1”组织中出现异常。阿帕奇。Kafka。溪流。错误。StreamsException:ClassC
我对Kafka的溪流很陌生。我想执行以下KStream-GlobalKTable纯基于DSL的左联接操作,而不使用map操作。 和另一个输入主题,它是 ,其中value: 我要执行左联接操作是一个流,主数据是一个全局表,以实现结果值为 连接条件为 代码:
它还会返回这个表,遗漏一些行(我希望它给出所有流派,而不仅仅是演员在其中扮演角色的流派): 有趣的是,它为“动画”返回了“0”,但为“儿童”或“喜剧”没有行,这是我正在寻找的结果(所有流派都返回了)。我做错了什么?
问题内容: 我在MySQL中有以下查询: 该表有27行,但查询仅返回1行。基于 这个问题, 我认为可能是由于WHERE子句。该表具有字段id,属性,字段,值,其中是第三个表的外键(我不需要从中获取数据)。 有没有一种方法可以从第一个表中选择所有行,包括表#2中字段为23的值(如果没有字段23,则为NULL)? 问题答案: 当然。将WHERE条件移动到JOIN: