当前位置: 首页 > 知识库问答 >
问题:

将KTable与KStream连接起来,但输出主题中没有任何内容

江瀚昂
2023-03-14

我将一个KStream与一个KTable左联接,但我没有看到输出主题的任何输出:

  val stringSerde: Serde[String] = Serdes.String()
  val longSerde: Serde[java.lang.Long] = Serdes.Long()
  val genericRecordSerde: Serde[GenericRecord] = new GenericAvroSerde()

  val builder = new KStreamBuilder()

  val networkImprStream: KStream[Long, GenericRecord] = builder
    .stream(dfpGcsNetworkImprEnhanced)

  // Create a global table for advertisers. The data from this global table
  // will be fully replicated on each instance of this application.
  val advertiserTable: GlobalKTable[java.lang.Long, GenericRecord]= builder.globalTable(advertiserTopicName, "advertiser-store")

  // Join the network impr stream to the advertiser global table. As this is global table
  // we can use a non-key based join with out needing to repartition the input stream
  val networkImprWithAdvertiserNameKStream: KStream[java.lang.Long, GenericRecord] = networkImprStream.leftJoin(advertiserTable,
    (_, networkImpr) => {
      println(networkImpr)
      networkImpr.get("advertiserId").asInstanceOf[java.lang.Long]
    },
    (networkImpr: GenericRecord, adertiserIdToName: GenericRecord) => {
      println(networkImpr)
      networkImpr.put("advertiserName", adertiserIdToName.get("name"))
      networkImpr
    }
  )

  networkImprWithAdvertiserNameKStream.to(networkImprProcessed)

  val streams = new KafkaStreams(builder, streamsConfiguration)
  streams.cleanUp()
  streams.start()
  // usually the stream application would be running forever,
  // in this example we just let it run for some time and stop since the input data is finite.
  Thread.sleep(15000L)

如果我绕过连接,直接将输入主题输出到输出,我会看到消息到达。我已经将联接更改为左联接,添加了一些printlns以查看何时提取键(但控制台上没有打印任何内容)。而且我每次都使用Kafka流重置工具,所以从头开始。我的想法快用完了。此外,我还添加了一些对存储的测试访问,它可以工作,并且包含来自流的键(尽管这不应该因为左联接而禁止任何输出)。

共有1个答案

凤经武
2023-03-14

在我的源流中,密钥为空。虽然我没有使用这个键来连接表,但是这个键不能为空。因此创建一个带有虚拟密钥的中间流是有效的。因此,即使我在这里有一个全局KTable,对流消息的键的限制也适用于这里:http://docs.confluent.io/current/streams/developer-guide.html#kstream-ktable-join

具有null键或null值的流的输入记录将被忽略,并且不会触发联接。

 类似资料:
  • 我试图加入KStream与KTable。如果没有连接,我可以从中间主题“book属性-by-id”中阅读。 KTable的示例消息: KStream的示例消息: “最终聚合”主题的所需输出: 这是密码 加入KStream时出现异常 线程“xxx-StreamThread-1”组织中出现异常。阿帕奇。Kafka。溪流。错误。TopologyBuilderException:无效的拓扑构建:未找到流线

  • 我正在使KStream-KStream连接,其中创建2个内部主题。而KStream-KTable join将创建1个内部主题+1个表。 就性能和其他因素而言,哪个更好?

  • 我是Ksql新手,正在尝试聚合,已经创建了一个Kstream kstream。。。 ktable... 将这些值发布到Kafka test1主题中 当我使用Kafka控制台使用者时,我只看到输出为 {“总销售额”:200}{“总销售额”:500} 我怎样才能看到身份证也印在Kafka主题中?我需要在桌子外创建某种视图吗?

  • 我正在尝试以下列方式使用kafka流实现事件源模式。 我在一家安全服务公司工作,处理两个用例: 注册用户,处理 应生成 。 更改用户名,处理 应生成 。 我有两个主题: 命令主题,每个命令都是键控的,密钥是用户的电子邮件。例如: 实现思想可以用以下拓扑表示: 对于这个拓扑,我使用的是。 此拓扑的更显式版本: 我遇到的问题: 在具有现有记录的命令主题上启动流应用程序: 在构建这样的拓扑时,我缺少什么

  • 有没有人发帖回应这个问题?还有其他帖子没有答案。我们的情况是,在流流程的第一步中,我们将消息推送到支持KTable的主题上。然后我们从这些消息中提取少量数据并将其传递出去。我们正在对较小数量的数据进行多次计算,以便进行分组和聚合。在流式处理的最后,我们只想通过一个KTable连接回原来的主题,以便再次获取完整的消息内容。联接的结果只是数据的一个子集,因为它无法在KTable中找到条目。 这只是问题

  • 我尝试将构建作业从Hudson迁移到Jenkins(2.32.1)。Maven构建工作正常,但是SonarQube的Maven构建步骤不起作用。我使用Jenkins SonarQube插件2.5版。 我的SonarQube配置(下面添加SonarQube扫描仪): 我的构建环境(以下是使用环境变量配置SonarQube扫描器): 日志显示,Jenkins使用了错误的数据库URL(H2而不是Post