当前位置: 首页 > 知识库问答 >
问题:

Apache Flink-序列化json并执行连接操作

赵英资
2023-03-14

我试图使用Jackson库读取Kafka主题中的字符串,并从另一个流执行连接。

这是一个包含两个数据流的示例代码。我想对这些消息流执行连接操作。

例如,传入的流是:

messageStream1 = {"A":"a"}
messageStream2 = {"B":"a"}

连接条件是messageStream1。“A”=messageStream2。“B”。我如何在Flink中实现这一点?

数据流 1:

DataStream<String> messageStream1 = env.addSource(
  new FlinkKafkaConsumer082<String>("input", new SimpleStringSchema() , parameterTool.getProperties()));

messageStream1.map(new MapFunction<String, JsonNode>() {
    @Override
    public JsonNode map(String value) throws Exception {
        JsonFactory factory = new JsonFactory();
        ObjectMapper mapper = new ObjectMapper(factory);
        try {
            JsonNode rootNode = mapper.readTree(value);
            Iterator<Map.Entry<String,JsonNode>> fieldsIterator = rootNode.fields();
            while (fieldsIterator.hasNext()) {
                Map.Entry<String,JsonNode> field = fieldsIterator.next();
                System.out.println("Key: " + field.getKey() + "\tValue:" + field.getValue());
            }
            return rootNode;
        }catch (java.io.IOException ex){
            ex.printStackTrace();
            return null;
        }
    }
});

数据流 2:

DataStream<String> messageStream2 = env.addSource(
  new FlinkKafkaConsumer082<String>("input", new SimpleStringSchema() , parameterTool.getProperties()));

messageStream2.map(new MapFunction<String, JsonNode>() {
    @Override
    public JsonNode map(String value) throws Exception {
        JsonFactory factory = new JsonFactory();
        ObjectMapper mapper = new ObjectMapper(factory);
        try {
            JsonNode rootNode = mapper.readTree(value);
            Iterator<Map.Entry<String,JsonNode>> fieldsIterator = rootNode.fields();
            while (fieldsIterator.hasNext()) {
                Map.Entry<String,JsonNode> field = fieldsIterator.next();
                System.out.println("Key: " + field.getKey() + "\tValue:" + field.getValue());
            }
            return rootNode;
        }catch (java.io.IOException ex){
            ex.printStackTrace();
            return null;
        }
    }
});

共有1个答案

左丘成仁
2023-03-14

您需要将键字段提取到一个额外的属性中,以便 Flink 可以访问它(另一种方法是提供自定义键选择器:https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#specifying-keys)。

因此,map(...) 的返回类型可能是 Tuple2

然后,您可以按照留档(https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html)中的描述指定连接:

messageStream1.join(messageStream2)
    .where(0).equalTo(0) // both zeros indicate that the join happens on the zero's attribute, ie, the String attribute of Tuple2
    .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
    .apply(new JoinFunction() {...});

为了使用DataStreamAPI执行连接,您还需要指定连接窗口。只能加入属于同一窗口的元组。

 类似资料:
  • 问题内容: 我在Web API项目中使用了Dictionary,该项目的序列化方式类似于JSON: 因为我有重复的键,所以我不能再使用Dictionary类型,而现在我正在使用 但这是以这种方式序列化的: 有没有办法像字典一样进行序列化? 谢谢。 问题答案: 如果您使用Newtonsoft Json.NET库,则可以执行以下操作。 定义一个转换器,以所需的方式写入键/值对的列表: 然后使用转换器:

  • 我遇到了类似的问题:Kafka反序列化我的Kafka生产者中的嵌套泛型类型,我发送的对象如下所示: 其基石是:。 此界面如下所示: 添加字段@class是因为@JsonTypeInfo注释,我认为这足以让反序列化器“理解”在反序列化中使用什么类型的IExternalData。不幸的是,在Kafka的听众那边,我得到了一个例外: 无法构造的实例(不存在像default construct这样的创建者

  • 我正在建立一个网站使用Node.js和序列(与Postgres后端)。我有一个用外键返回许多对象的查询,我想向视图传递一个外键引用的对象列表。 在本例中,Attentings包含Hackathon键,我想返回Hackathon的列表。由于代码是异步的,以下内容在节点中当然不起作用: 有没有办法以同步的方式进行查询,这意味着我直到“黑客”列表充满所有对象才返回视图? 谢谢!

  • 问题内容: 我想要一种尽可能自动地将对象序列化和反序列化为JSON的方法。 序列化: 对我来说,理想的方式是,如果我在实例JSONSerialize()中调用,它将返回带有JSON对象的字符串,该对象具有该对象的所有公共属性。对于那些原始值,这很简单,对于对象,它应该尝试调用每个JSONSerialize()或ToString()或类似的东西来递归序列化所有公共属性。对于集合,它也应该正确运行(只

  • 问题内容: 我正在使用python包pymongo从mongodb数据库中检索数据。 然后我转换为列表 这是print(l)返回的内容: 现在,我需要转换为JSON,以便可以对其进行操作。 我还尝试遵循 http://api.mongodb.org/python/1.7/api/pymongo/json_util.html 失败:编辑:链接的最新版本为http://api.mongodb.org/

  • 我想要一种将对象序列化和反序列化为JSON的方法,尽可能自动。 Serialize:对我来说,理想的方式是,如果我调用一个实例JSONSerialize(),它会返回一个带有JSON对象的字符串,该对象的所有公共属性都是< code >“name _ of _ property”:“value”。对于那些基本类型的值,这很简单,对于对象,它应该尝试在每个JSONSerialize()或ToStri