当前位置: 首页 > 知识库问答 >
问题:

在apache flink中筛选唯一事件

于鹏
2023-03-14

我在一个java类中定义某些变量,然后用另一个类访问它,以便在流中筛选唯一的元素。请参阅代码以更好地理解该问题。

我面临的问题是这个过滤函数不能很好地工作,无法过滤唯一的事件。我怀疑这个变量在不同的线程之间是共享的,它是原因!?如果这不是正确的方法,请建议另一种方法。提前道谢。

**ClassWithVariables.java**
public static HashMap<String, ArrayList<String>> uniqueMap = new HashMap<>();


**FilterClass.java**
public boolean filter(String val) throws Exception {

       if(ClassWithVariables.uniqueMap.containsKey(key)) {

                Arraylist<String> al = uniqueMap.get(key);

                if(al.contains(val) {
                    return false;
                } else {
                    //Update the hashmap list(uniqueMap)                    
                    return true;    
                }


       } else {

               //Add to hashmap list(uniqueMap)
               return true;
       }

}

共有1个答案

程昕
2023-03-14

去重复流的正确方法包括按密钥对流进行分区,这样包含相同密钥的所有元素将由相同的工作者处理,并使用Flink的托管、键控状态机制,这样状态是容错的和可重新伸缩的。下面是一个示例实现:

public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  env.addSource(new EventSource())
    .keyBy(e -> e.key)
    .flatMap(new Deduplicate())
    .print();

  env.execute();
}

public static class Deduplicate extends RichFlatMapFunction<Event, Event> {
  ValueState<Boolean> seen;

  @Override
  public void open(Configuration conf) {
    ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("seen", Types.BOOLEAN);
    seen = getRuntimeContext().getState(desc);
  }

  @Override
  public void flatMap(Event event, Collector<Event> out) throws Exception {
    if (seen.value() == null) {
      out.collect(event);
      seen.update(true);
    }
  }
}

这也可以实现为RichFilterFunction,BTW。但是请注意,如果您有一个无界的键空间,所使用的状态将无限增长,直到您用完堆或磁盘上的空间为止,这取决于您选择的Flink的状态后端。如果这是一个问题,您可能希望通过状态生存时间设置一个状态保留策略。

还要注意,在Flink管道的不同部分之间共享状态是不可能的。您需要将事情从内到外与看起来正常的事情相比较,并将事件流带到状态,而不是获取它。

 类似资料:
  • 编辑2019:此问题是在2016年11月,当前方法和以前方法的接受答案如下。 我有一个<code>数据。表约有250万行的表。有两列。我想删除两列中重复的任何行。之前的数据。帧我会这样做:<code>df- 有什么建议吗? 干杯,戴维 例 在上面的data.table中,其中< code>V2是表键,只有第4、7和10行将被删除。

  • 本文向大家介绍在 JavaScript 中包含唯一字符的筛选字符串,包括了在 JavaScript 中包含唯一字符的筛选字符串的使用技巧和注意事项,需要的朋友参考一下 问题 我们需要编写一个 JavaScript 函数来接受一个字符串 str。我们的函数应该构造一个只包含输入字符串中唯一字符的新字符串,并删除出现的所有重复字符。 示例 以下是代码- 输出结果 以下是控制台输出-

  • 我有一个对象数组,如何在TypeScript/JavaScript中筛选唯一id数组 列表数组-

  • 我们正在使用Debezium+PostgreSQL。 注意,我们得到了用于创建、读取、更新和删除的4种类型的事件-c、r、u和D。 事件的读取类型未用于我们的应用程序。实际上,我想不出'r'事件的用例,除非我们正在审计或镜像事务的活动。 我从一个贡献者那里得到了使用snapshot.mode的线索。我想当Debezium创建一个快照时必须要做的事情。我不知道怎么做。

  • 我使用了将我的数据帧过滤为两列“Worker”和“Time Type”。 示例数据集 我现在只想看到那些“兼职”或“全职”的输出。 到目前为止,我构建的代码是: 但是我得到了错误 有人知道一个简单的方法来解决这个问题吗? 理想情况下,我想以两件事结束: > 显示全职和兼职员工的输出。 另一个显示此参数以外异常的输出,即第2行中的“Tom”显示“paert Tme”,这是一个异常,值得作为单独的输出

  • 是否可以使用setState更新对象的属性? 比如: 我可以使用以下方式将事件记录到控制台:- 我在控制台上创建的solidity ABI对象是 我需要设置所需的数据,并在以后使用它,所以我使用.setState如下 但是它显示. setState不是一个函数。请帮帮我。