我将一个KStream与一个KTable左联接,但我没有看到输出主题的任何输出:
val stringSerde: Serde[String] = Serdes.String()
val longSerde: Serde[java.lang.Long] = Serdes.Long()
val genericRecordSerde: Serde[GenericRecord] = new GenericAvroSerde()
val builder = new KStreamBuilder()
val networkImprStream: KStream[Long, GenericRecord] = builder
.stream(dfpGcsNetworkImprEnhanced)
// Create a global table for advertisers. The data from this global table
// will be fully replicated on each instance of this application.
val advertiserTable: GlobalKTable[java.lang.Long, GenericRecord]= builder.globalTable(advertiserTopicName, "advertiser-store")
// Join the network impr stream to the advertiser global table. As this is global table
// we can use a non-key based join with out needing to repartition the input stream
val networkImprWithAdvertiserNameKStream: KStream[java.lang.Long, GenericRecord] = networkImprStream.leftJoin(advertiserTable,
(_, networkImpr) => {
println(networkImpr)
networkImpr.get("advertiserId").asInstanceOf[java.lang.Long]
},
(networkImpr: GenericRecord, adertiserIdToName: GenericRecord) => {
println(networkImpr)
networkImpr.put("advertiserName", adertiserIdToName.get("name"))
networkImpr
}
)
networkImprWithAdvertiserNameKStream.to(networkImprProcessed)
val streams = new KafkaStreams(builder, streamsConfiguration)
streams.cleanUp()
streams.start()
// usually the stream application would be running forever,
// in this example we just let it run for some time and stop since the input data is finite.
Thread.sleep(15000L)
如果我绕过连接,直接将输入主题输出到输出,我会看到消息到达。我已经将联接更改为左联接,添加了一些printlns以查看何时提取键(但控制台上没有打印任何内容)。而且我每次都使用Kafka流重置工具,所以从头开始。我的想法快用完了。此外,我还添加了一些对存储的测试访问,它可以工作,并且包含来自流的键(尽管这不应该因为左联接而禁止任何输出)。
在我的源流中,密钥为空。虽然我没有使用这个键来连接表,但是这个键不能为空。因此创建一个带有虚拟密钥的中间流是有效的。因此,即使我在这里有一个全局KTable,对流消息的键的限制也适用于这里:http://docs.confluent.io/current/streams/developer-guide.html#kstream-ktable-join
具有null键或null值的流的输入记录将被忽略,并且不会触发联接。
我试图加入KStream与KTable。如果没有连接,我可以从中间主题“book属性-by-id”中阅读。 KTable的示例消息: KStream的示例消息: “最终聚合”主题的所需输出: 这是密码 加入KStream时出现异常 线程“xxx-StreamThread-1”组织中出现异常。阿帕奇。Kafka。溪流。错误。TopologyBuilderException:无效的拓扑构建:未找到流线
我正在使KStream-KStream连接,其中创建2个内部主题。而KStream-KTable join将创建1个内部主题+1个表。 就性能和其他因素而言,哪个更好?
我是Ksql新手,正在尝试聚合,已经创建了一个Kstream kstream。。。 ktable... 将这些值发布到Kafka test1主题中 当我使用Kafka控制台使用者时,我只看到输出为 {“总销售额”:200}{“总销售额”:500} 我怎样才能看到身份证也印在Kafka主题中?我需要在桌子外创建某种视图吗?
我正在尝试以下列方式使用kafka流实现事件源模式。 我在一家安全服务公司工作,处理两个用例: 注册用户,处理 应生成 。 更改用户名,处理 应生成 。 我有两个主题: 命令主题,每个命令都是键控的,密钥是用户的电子邮件。例如: 实现思想可以用以下拓扑表示: 对于这个拓扑,我使用的是。 此拓扑的更显式版本: 我遇到的问题: 在具有现有记录的命令主题上启动流应用程序: 在构建这样的拓扑时,我缺少什么
有没有人发帖回应这个问题?还有其他帖子没有答案。我们的情况是,在流流程的第一步中,我们将消息推送到支持KTable的主题上。然后我们从这些消息中提取少量数据并将其传递出去。我们正在对较小数量的数据进行多次计算,以便进行分组和聚合。在流式处理的最后,我们只想通过一个KTable连接回原来的主题,以便再次获取完整的消息内容。联接的结果只是数据的一个子集,因为它无法在KTable中找到条目。 这只是问题
我尝试将构建作业从Hudson迁移到Jenkins(2.32.1)。Maven构建工作正常,但是SonarQube的Maven构建步骤不起作用。我使用Jenkins SonarQube插件2.5版。 我的SonarQube配置(下面添加SonarQube扫描仪): 我的构建环境(以下是使用环境变量配置SonarQube扫描器): 日志显示,Jenkins使用了错误的数据库URL(H2而不是Post