当前位置: 首页 > 知识库问答 >
问题:

Kafka-Streams-加入前过滤GlobalKTable

谢善
2023-03-14

你能就解决下列问题的方法给我一个建议吗。我有两个主题,一个是静态内容,另一个是数据流。任务是连接数据,这在正常情况下很容易。我将静态内容理解为GlobalKTable,动态内容理解为KStream,然后简单地将它们连接起来。问题在于查找数据存在于同一主题的多个版本中。“版本”由“validFrom”字段标识。因此,流的数据需要根据其时间戳与相应版本的查找数据连接。有没有办法过滤GlobalKTable中的数据?

向马丁问好

共有1个答案

艾原
2023-03-14

您不能对GlobalKTable本身应用筛选操作,但您可以尝试测试ValueJoiner中记录的版本,并将未通过测试的连接结果记录的值设置为null...在联接之后,您可以应用一个筛选器,筛选出值为null的所有记录。

 类似资料:
  • 我一直在检查Kafka流。我一直在测试下面的Kafka流代码 生产者主题:(这是第一个生产者主题-发送以下json数据) JSON-主题的生产者: Stream Topic代码:(这是第二个Streaming代码和主题) 如果UserID值为“1”,我想对其进行归档,然后将该数据发送到目标流媒体主题。 当我使用“.filter”并打印System.out时。println(“value:”valu

  • streams streams_overview Kafka Streams is a client library for processing and analyzing data stored in Kafka and either write the resulting data back to Kafka or send the final output to an external s

  • 对于这个用例,我应该使用Kafka Consumer API还是Kafka Streams API?我有一个话题与一些消费群体消费它。本主题包含一种类型的事件,它是一个内部埋藏了一个类型字段的JSON消息。一些信息会被一些消费者群体消费,而另一些消费者群体不会消费,一个消费者群体可能根本不会消费很多信息。 我的问题是:我是否应该使用消费者API,然后在每个事件上读取type字段,并删除或处理基于t

  • Redis团队为Redis 5.0引入了新的Streams数据类型。由于Streams从第一视角看起来像Kafka主题,因此似乎很难找到使用它的真实世界示例。 在streams intro中,我们与Kafka streams进行了比较: 运行时消费者组处理。例如,如果三个消费者中的一个永久失败,Redis将继续服务第一个和第二个,因为现在我们只有两个逻辑分区(消费者)。 Redis流更快。他们从内

  • Kafka Streams 是一个用于处理和分析存储在 Kafka 系统中的数据的客户端库。 它建立在重要的流处理概念上,如恰当地区分事件时间(event time)和处理时间(processing time),支持窗口操作(window),exactly-once 处理语义以及简单高效的应用程序状态管理。 Kafka Streams 的入门门槛很低。我们可以在单节点环境上快速实现一个小规模的验证

  • 我有来自 3 个 mysql 表、1 个主表和两个子表的原始流。我尝试加入三个原始流并转换为单个输出流。如果父流上有任何更新,但如果子流发生任何变化,则不触发输出,它就可以工作。 父流上的任何新添加或更新都由处理器拾取,并将其与其他KTable连接,并在输出流上返回。但对child1stream或child2stream的任何添加或更新都不会触发输出流。 我认为将所有输入流设为 KTable,它们