当前位置: 首页 > 知识库问答 >
问题:

如何使用KTable作为参考数据来更新KStream?

关项明
2023-03-14

我有一个Kafka主题,包含Json格式的数据:

{"id": "A", "country": "France"}
{"id": "B", "currency": "£"}

我想用类似“参考表”的东西来规范内容:

country ( "France" ) -> "FR"
currency ( "£" ) -> "GBP"

为了输出:

{"id": "A", "country": "FR"}
{"id": "B", "currency": "GBP"}

我认为这是使用KTable存储参考数据的典型用例。但我在实施上有点纠结。

当前状态

摄取参考数据

在Kafka上创建的专用主题:poc映射在

主题提供了示例Json数据:

{"mapping":"ccy",     "from":"£",      "to":"GBP"}
{"mapping":"country", "from":"France", "to":"FR"}

在对键和值进行返工后,在KTable中摄入的数据:

         KStream<String, String> mappingStream = builder
                .stream("poc-mapping-in",consumed)
                .map(
                     (key, value) -> KeyValue.pair(
                         value.get("mapping")+"#"+value.get("from"), 
                         value.get("to").asText())
         );

         KGroupedStream<String, String> mappingGroupedStream = mappingStream.groupByKey(
                 Serialized.with(Serdes.String(),Serdes.String() ));


         KTable<String,String> mappingTable = mappingGroupedStream.aggregate(
                () -> "", //initializer 
                (aggKey, newValue, aggValue) -> newValue, // adder 
                Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("ReferenceStore")
                    .withValueSerde(Serdes.String())
                    .withKeySerde(Serdes.String())
                );

           // Testing
           mappingTable.toStream().to("poc-mapping-in-content", 
                Produced.with(Serdes.String(), Serdes.String()));

在主题poc-maping-in-content中,我得到以下行:

"currency"#"£"      GBP
"country"#"France"  FR

这看起来像我所期望的。双引号很奇怪,但这并不妨碍我走得更远。

数据被/应该存储在名为ReferenceStore的本地存储中。

获取业务流

在Kafka上创建的主题:poc原始事件

主题提供了示例Json数据:

{"id": "A", "country": "France"}
{"id": "B", "currency": "£"}

KStream中摄取的数据:

  final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde);
  KStream<String, JsonNode> businessData = builder.stream("poc-raw-events", consumed);

从这里我不知道该怎么办。从技术上讲,我知道如何更新JsonNode中的属性。因此,我尝试使用foreachKStream上循环,方法如下:

    businessData.foreach(new ForeachAction<String, JsonNode>()  {
        public void apply(String k, JsonNode v) {
            System.out.println(k+ " : " +v);
                    if (v==null) {System.out.println("NULL detected"); return;}
            Iterator<Entry<String, JsonNode>> fields = v.fields();
            int i=0;
            while (fields.hasNext()) {
                i++;
                Entry<String, JsonNode> next = fields.next();
                System.out.println(k+ " field #"+i+" : " +next.getKey() + " -- " + next.getValue());

                String key = next.getKey() + "#" + next.getValue());
//              ((ObjectNode) v).put(next.getKey(), "  WHAT HERE ??? ");

            }

        }
    });

我的想法是替换“这里是什么?” 位于最后一行中,数据显示在参考KTable中。但是怎么做???

  • 我没有在KTable上找到类似. findByKey()的东西。
  • 我不知道如何访问参考存储本地存储,因为访问它的方式类似于myKafkaStream.store(...),此时myKafkaStream还没有开始,甚至还没有构建。

我考虑的另一种方法是使用KStream leftJoin KTable功能。但我在某个地方读到过(我没有书签…)要做到这一点,我们应该在两个kTable中使用相同的密钥。但在我的例子中,在Json方面,我不处理join键,而是处理一个简单的属性。

你将如何实现这一点?

共有2个答案

杜思远
2023-03-14

如果ReferenceTable有一个与数据匹配的键。getAltKey()

streamToMap.selectKey((originalKey, data) -> data.getAltKey()).leftJoin(referenceKTable, valueJoiner)

我们可以做到这一点。valueJoiner(或lambda)的实现必须结合这两个输入。

赵雪峰
2023-03-14

由于使用引用数据,我认为您要考虑使用的是<代码> GualCalabe< /Cord>。每个KafkaStreams实例都会完全复制GlobalKTable,并显式创建该表,以保存上述用例的参考数据。

KStream GlobalKTable联接的独特之处在于,您可以使用流的KeyValue映射到GlobalKTable的键。因此,只要您可以从JsonNode中提取属性,就应该能够与GlobalKTable中的相应记录联接

 类似资料:
  • 问题内容: 我有两个Spark数据框: 数据框A: 和数据框B: 数据框B可以包含来自数据框A的重复行,更新行和新行。我想在spark中编写操作,在其中可以创建一个新数据框,其中包含数据框A的行以及数据框B的更新行和新行。 我首先创建一个仅包含不可更新列的哈希列。这是唯一的ID。所以我们可以说,并可以改变值(可更新),但是是唯一的。我创建了一个哈希函数为: 现在,我想编写一些火花代码,基本上从B中

  • 我们已经讨论了如何使用指针实现call by reference概念。 这是另一个通过引用调用的例子,它使用C ++引用 - #include <iostream> using namespace std; // function declaration void swap(int& x, int& y); int main () { // local variable declaratio

  • 我正在用JavaFx乞讨,我意识到我需要一些帮助在运行时用一些TreeItems更新一个TreeView,它应该在主窗口中更新。 在这里,可以看到两个窗口的截图: 较大的是主窗口,它调用(通过在文件中单击>>New Project),New Small。在较小的窗口中,我可以得到键入的字符串,然后单击enter按钮。 请假设我在FXML文件中映射了所有的内容。谢谢

  • 我有一个类似这样的pytest测试: 现在,作为重构的一部分,我移动了这一行: 放入它自己的夹具中(在conftest.py文件中),因为它在其他地方使用。但是,除了直接导入fixture函数外,是否有其他方法在测试中引用它?我知道funcargs通常是调用fixture的方式,但是在本文中,当我想要调用fixture时,我不在测试函数中。

  • 我想用Pyglet制作一个每帧都在变化的网格。因此,我需要经常更新顶点,我认为VBO是最快的方法(如果我错了,请纠正我)。下面是一个要点示例。这是正确的做法吗?我读到应该尽量减少glBindBuffer调用的数量,但在这里它是每帧调用一次的。此外,启用了GL_DYNAMIC_DRAW,但如果我将其更改为GL_STATIC_DRAW,它仍在工作。这让我想知道这是否是一个快速计算的正确设置

  • 我正在用一些初始数据绘制一个条形图,然后在单击按钮时尝试更新。我收到错误“未捕获的类型错误:无法读取未定义的属性'长度'”。单击更新后,将发生错误。如何解决此问题并启用更新功能以绘制新的条形图? D3代码: