我试图使用Jackson库读取Kafka主题中的字符串,并从另一个流执行连接。
这是一个包含两个数据流的示例代码。我想对这些消息流执行连接操作。
例如,传入的流是:
messageStream1 = {"A":"a"}
messageStream2 = {"B":"a"}
连接条件是messageStream1。“A”=messageStream2。“B”
。我如何在Flink中实现这一点?
数据流 1:
DataStream<String> messageStream1 = env.addSource(
new FlinkKafkaConsumer082<String>("input", new SimpleStringSchema() , parameterTool.getProperties()));
messageStream1.map(new MapFunction<String, JsonNode>() {
@Override
public JsonNode map(String value) throws Exception {
JsonFactory factory = new JsonFactory();
ObjectMapper mapper = new ObjectMapper(factory);
try {
JsonNode rootNode = mapper.readTree(value);
Iterator<Map.Entry<String,JsonNode>> fieldsIterator = rootNode.fields();
while (fieldsIterator.hasNext()) {
Map.Entry<String,JsonNode> field = fieldsIterator.next();
System.out.println("Key: " + field.getKey() + "\tValue:" + field.getValue());
}
return rootNode;
}catch (java.io.IOException ex){
ex.printStackTrace();
return null;
}
}
});
数据流 2:
DataStream<String> messageStream2 = env.addSource(
new FlinkKafkaConsumer082<String>("input", new SimpleStringSchema() , parameterTool.getProperties()));
messageStream2.map(new MapFunction<String, JsonNode>() {
@Override
public JsonNode map(String value) throws Exception {
JsonFactory factory = new JsonFactory();
ObjectMapper mapper = new ObjectMapper(factory);
try {
JsonNode rootNode = mapper.readTree(value);
Iterator<Map.Entry<String,JsonNode>> fieldsIterator = rootNode.fields();
while (fieldsIterator.hasNext()) {
Map.Entry<String,JsonNode> field = fieldsIterator.next();
System.out.println("Key: " + field.getKey() + "\tValue:" + field.getValue());
}
return rootNode;
}catch (java.io.IOException ex){
ex.printStackTrace();
return null;
}
}
});
您需要将键字段提取到一个额外的属性中,以便 Flink 可以访问它(另一种方法是提供自定义键选择器:https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#specifying-keys)。
因此,map(...)
的返回类型可能是 Tuple2
然后,您可以按照留档(https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html)中的描述指定连接:
messageStream1.join(messageStream2)
.where(0).equalTo(0) // both zeros indicate that the join happens on the zero's attribute, ie, the String attribute of Tuple2
.window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
.apply(new JoinFunction() {...});
为了使用
DataStream
API执行连接,您还需要指定连接窗口。只能加入属于同一窗口的元组。
问题内容: 我在Web API项目中使用了Dictionary,该项目的序列化方式类似于JSON: 因为我有重复的键,所以我不能再使用Dictionary类型,而现在我正在使用 但这是以这种方式序列化的: 有没有办法像字典一样进行序列化? 谢谢。 问题答案: 如果您使用Newtonsoft Json.NET库,则可以执行以下操作。 定义一个转换器,以所需的方式写入键/值对的列表: 然后使用转换器:
我遇到了类似的问题:Kafka反序列化我的Kafka生产者中的嵌套泛型类型,我发送的对象如下所示: 其基石是:。 此界面如下所示: 添加字段@class是因为@JsonTypeInfo注释,我认为这足以让反序列化器“理解”在反序列化中使用什么类型的IExternalData。不幸的是,在Kafka的听众那边,我得到了一个例外: 无法构造的实例(不存在像default construct这样的创建者
我正在建立一个网站使用Node.js和序列(与Postgres后端)。我有一个用外键返回许多对象的查询,我想向视图传递一个外键引用的对象列表。 在本例中,Attentings包含Hackathon键,我想返回Hackathon的列表。由于代码是异步的,以下内容在节点中当然不起作用: 有没有办法以同步的方式进行查询,这意味着我直到“黑客”列表充满所有对象才返回视图? 谢谢!
问题内容: 我想要一种尽可能自动地将对象序列化和反序列化为JSON的方法。 序列化: 对我来说,理想的方式是,如果我在实例JSONSerialize()中调用,它将返回带有JSON对象的字符串,该对象具有该对象的所有公共属性。对于那些原始值,这很简单,对于对象,它应该尝试调用每个JSONSerialize()或ToString()或类似的东西来递归序列化所有公共属性。对于集合,它也应该正确运行(只
问题内容: 我正在使用python包pymongo从mongodb数据库中检索数据。 然后我转换为列表 这是print(l)返回的内容: 现在,我需要转换为JSON,以便可以对其进行操作。 我还尝试遵循 http://api.mongodb.org/python/1.7/api/pymongo/json_util.html 失败:编辑:链接的最新版本为http://api.mongodb.org/
我想要一种将对象序列化和反序列化为JSON的方法,尽可能自动。 Serialize:对我来说,理想的方式是,如果我调用一个实例JSONSerialize(),它会返回一个带有JSON对象的字符串,该对象的所有公共属性都是< code >“name _ of _ property”:“value”。对于那些基本类型的值,这很简单,对于对象,它应该尝试在每个JSONSerialize()或ToStri