我有一个Kafka主题,包含Json格式的数据:
{"id": "A", "country": "France"}
{"id": "B", "currency": "£"}
我想用类似“参考表”的东西来规范内容:
country ( "France" ) -> "FR"
currency ( "£" ) -> "GBP"
为了输出:
{"id": "A", "country": "FR"}
{"id": "B", "currency": "GBP"}
我认为这是使用KTable
存储参考数据的典型用例。但我在实施上有点纠结。
当前状态
摄取参考数据
在Kafka上创建的专用主题:poc映射在
主题提供了示例Json数据:
{"mapping":"ccy", "from":"£", "to":"GBP"}
{"mapping":"country", "from":"France", "to":"FR"}
在对键和值进行返工后,在KTable
中摄入的数据:
KStream<String, String> mappingStream = builder
.stream("poc-mapping-in",consumed)
.map(
(key, value) -> KeyValue.pair(
value.get("mapping")+"#"+value.get("from"),
value.get("to").asText())
);
KGroupedStream<String, String> mappingGroupedStream = mappingStream.groupByKey(
Serialized.with(Serdes.String(),Serdes.String() ));
KTable<String,String> mappingTable = mappingGroupedStream.aggregate(
() -> "", //initializer
(aggKey, newValue, aggValue) -> newValue, // adder
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("ReferenceStore")
.withValueSerde(Serdes.String())
.withKeySerde(Serdes.String())
);
// Testing
mappingTable.toStream().to("poc-mapping-in-content",
Produced.with(Serdes.String(), Serdes.String()));
在主题poc-maping-in-content
中,我得到以下行:
"currency"#"£" GBP
"country"#"France" FR
这看起来像我所期望的。双引号很奇怪,但这并不妨碍我走得更远。
数据被/应该存储在名为ReferenceStore
的本地存储中。
获取业务流
在Kafka上创建的主题:poc原始事件
主题提供了示例Json数据:
{"id": "A", "country": "France"}
{"id": "B", "currency": "£"}
在KStream
中摄取的数据:
final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde);
KStream<String, JsonNode> businessData = builder.stream("poc-raw-events", consumed);
从这里我不知道该怎么办。从技术上讲,我知道如何更新JsonNode中的属性。因此,我尝试使用foreach
在KStream
上循环,方法如下:
businessData.foreach(new ForeachAction<String, JsonNode>() {
public void apply(String k, JsonNode v) {
System.out.println(k+ " : " +v);
if (v==null) {System.out.println("NULL detected"); return;}
Iterator<Entry<String, JsonNode>> fields = v.fields();
int i=0;
while (fields.hasNext()) {
i++;
Entry<String, JsonNode> next = fields.next();
System.out.println(k+ " field #"+i+" : " +next.getKey() + " -- " + next.getValue());
String key = next.getKey() + "#" + next.getValue());
// ((ObjectNode) v).put(next.getKey(), " WHAT HERE ??? ");
}
}
});
我的想法是替换“这里是什么?”
位于最后一行中,数据显示在参考KTable中。但是怎么做???
我没有在KTable上找到类似. findByKey()
的东西。参考存储
本地存储,因为访问它的方式类似于myKafkaStream.store(...)
,此时myKafkaStream
还没有开始,甚至还没有构建。我考虑的另一种方法是使用KStream leftJoin KTable功能。但我在某个地方读到过(我没有书签…)要做到这一点,我们应该在两个kTable中使用相同的密钥。但在我的例子中,在Json方面,我不处理join键,而是处理一个简单的属性。
你将如何实现这一点?
如果ReferenceTable有一个与数据匹配的键。getAltKey()
streamToMap.selectKey((originalKey, data) -> data.getAltKey()).leftJoin(referenceKTable, valueJoiner)
我们可以做到这一点。valueJoiner(或lambda)的实现必须结合这两个输入。
由于使用引用数据,我认为您要考虑使用的是<代码> GualCalabe< /Cord>。每个KafkaStreams
实例都会完全复制GlobalKTable
,并显式创建该表,以保存上述用例的参考数据。
KStream GlobalKTable联接的独特之处在于,您可以使用流的KeyValue
映射到GlobalKTable
的键。因此,只要您可以从JsonNode
中提取属性,就应该能够与GlobalKTable中的相应记录联接
问题内容: 我有两个Spark数据框: 数据框A: 和数据框B: 数据框B可以包含来自数据框A的重复行,更新行和新行。我想在spark中编写操作,在其中可以创建一个新数据框,其中包含数据框A的行以及数据框B的更新行和新行。 我首先创建一个仅包含不可更新列的哈希列。这是唯一的ID。所以我们可以说,并可以改变值(可更新),但是是唯一的。我创建了一个哈希函数为: 现在,我想编写一些火花代码,基本上从B中
我们已经讨论了如何使用指针实现call by reference概念。 这是另一个通过引用调用的例子,它使用C ++引用 - #include <iostream> using namespace std; // function declaration void swap(int& x, int& y); int main () { // local variable declaratio
我正在用JavaFx乞讨,我意识到我需要一些帮助在运行时用一些TreeItems更新一个TreeView,它应该在主窗口中更新。 在这里,可以看到两个窗口的截图: 较大的是主窗口,它调用(通过在文件中单击>>New Project),New Small。在较小的窗口中,我可以得到键入的字符串,然后单击enter按钮。 请假设我在FXML文件中映射了所有的内容。谢谢
我有一个类似这样的pytest测试: 现在,作为重构的一部分,我移动了这一行: 放入它自己的夹具中(在conftest.py文件中),因为它在其他地方使用。但是,除了直接导入fixture函数外,是否有其他方法在测试中引用它?我知道funcargs通常是调用fixture的方式,但是在本文中,当我想要调用fixture时,我不在测试函数中。
我想用Pyglet制作一个每帧都在变化的网格。因此,我需要经常更新顶点,我认为VBO是最快的方法(如果我错了,请纠正我)。下面是一个要点示例。这是正确的做法吗?我读到应该尽量减少glBindBuffer调用的数量,但在这里它是每帧调用一次的。此外,启用了GL_DYNAMIC_DRAW,但如果我将其更改为GL_STATIC_DRAW,它仍在工作。这让我想知道这是否是一个快速计算的正确设置
我正在用一些初始数据绘制一个条形图,然后在单击按钮时尝试更新。我收到错误“未捕获的类型错误:无法读取未定义的属性'长度'”。单击更新后,将发生错误。如何解决此问题并启用更新功能以绘制新的条形图? D3代码: