当前位置: 首页 > 知识库问答 >
问题:

同时在两个DataStream之前,在flink中找到一个DataStream包含来自其他DataStream的值?

佟嘉祯
2023-03-14

假设我有两个 DataStream

DataStream<String> A contains values {id1_id2 , id3_id4, id99_id0, id15_id3,id11_id5....}

DataStream<String> B contains values {id2, id3,id5...}

是否可以在DataStream A上进行一些处理,以便它将在另一个中输出值

DataStream<String> C ={id1, id3, id15, id11}

所以B中存在的所有值都将与a相交。我已经尝试使用processElement()和RichCoFlatMapFunction,但它不起作用。

public class MatchAggregator
        extends RichCoFlatMapFunction<String, String, Tuple1<String>> {

    private ValueState<String> doubleState;
    private ValueState<String> singleState;

    @Override
    public void open(Configuration config) {

        doubleState = getRuntimeContext().getState(new ValueStateDescriptor<>("doubleEvents",String.class));
        singleState = getRuntimeContext().getState(new ValueStateDescriptor<>("singleEvents",String.class));
    }
    
    @Override
    public void flatMap1(String s, Collector<Tuple1<String>> collector) throws Exception {
        String single = singleState.value();
       //this is outputting null.
        System.out.println(single);
      //s is also null
        if(single.contains(s)){
            String replaceNumber = single.replace(s,"");
            String replaceEmp = replaceNumber.replace("_","");
            single.clear();
            collector.collect(Tuple1.of(replaceEmp));
        }else {
            personContactState.update(s);
        }
    }

    @Override
    public void flatMap2(String s, Collector<Tuple1<String>> collector) throws Exception {

        
    }
}

我使用了两种数据流,比如:

DataStream<Tuple1<String>> match = A.connect(B).flatMap(new MatchAggregator());

match.print();

共有1个答案

堵彬彬
2023-03-14

RichCoFlatMapFunction的确切行为将取决于您对两个连接流的键控方式。

String single=singleState。value()。在您共享的代码中,update从不在singleState,因此singleState。value()将始终为空。

 类似资料:
  • Datastream Pro是一个数据库"浏览器"和数据操作工具.它易于使用,可靠,稳定,操作直观。Datastream Pro支持所有兼容JDBC的数据库(已经在Oracle,MySQL,postgreSQL和HSQLDb上测试成功).利用它可以浏览与编辑数据库中的数据,可在一个友好的界面中运行与编辑SQL脚本,可使用查询编辑器来编辑SQL查询,可同时连接到多个数据库和易于使用的连接向导等。

  • 我想加入一个大表,不可能包含在TM内存和流(kakfa)中。我在测试中成功加入了这两个表,将table-api与datastream api混合在一起。我做了以下操作: 它正在工作,但我从未见过这种类型的实现。可以吗?缺点是什么?

  • 以下用例的最佳实践建议是什么?我们需要将流与一组“规则”相匹配,这些规则本质上是Flink数据集的概念。可以对此“规则集”进行更新,但并不频繁。必须根据“规则集”中的所有记录检查每个流事件,并且每个匹配将生成一个或多个事件到接收器数据流中。规则集中的记录数在6位数范围内。 目前,我们只是将规则加载到本地规则列表中,并在传入的数据流上使用flatMap。在flatMap中,我们只是在一个列表上迭代,

  • 我正在使用Flink流读取来自Kafka的数据,并对数据进行处理。在使用Kafka之前,当应用程序启动时,我需要使用DataSet API读取一个文件,并根据一些条件对文件进行排序,然后从中创建一个列表。然后开始以流媒体的方式从Kafka那里消费。我已经编写了一个逻辑,使用DataSet API从文件中读取数据并对其进行排序。但当我尝试调谐程序,它从来没有执行,闪烁立即开始消耗Kafka。有没有办

  • 我试图编写一个流作业,它将数据流下沉到postgres表中。为了提供完整的信息,我的工作基于以下文章:https://tech.signavio.com/2017/postgres-flink-sink,这些文章建议使用JDBCoutputFormat。 所以我的问题是:我错过了什么吗?我应该将插入的行提交到某个地方吗? 向你致意,伊格内修斯