我在一个java类中定义某些变量,然后用另一个类访问它,以便在流中筛选唯一的元素。请参阅代码以更好地理解该问题。
我面临的问题是这个过滤函数不能很好地工作,无法过滤唯一的事件。我怀疑这个变量在不同的线程之间是共享的,它是原因!?如果这不是正确的方法,请建议另一种方法。提前道谢。
**ClassWithVariables.java**
public static HashMap<String, ArrayList<String>> uniqueMap = new HashMap<>();
**FilterClass.java**
public boolean filter(String val) throws Exception {
if(ClassWithVariables.uniqueMap.containsKey(key)) {
Arraylist<String> al = uniqueMap.get(key);
if(al.contains(val) {
return false;
} else {
//Update the hashmap list(uniqueMap)
return true;
}
} else {
//Add to hashmap list(uniqueMap)
return true;
}
}
去重复流的正确方法包括按密钥对流进行分区,这样包含相同密钥的所有元素将由相同的工作者处理,并使用Flink的托管、键控状态机制,这样状态是容错的和可重新伸缩的。下面是一个示例实现:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new EventSource())
.keyBy(e -> e.key)
.flatMap(new Deduplicate())
.print();
env.execute();
}
public static class Deduplicate extends RichFlatMapFunction<Event, Event> {
ValueState<Boolean> seen;
@Override
public void open(Configuration conf) {
ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("seen", Types.BOOLEAN);
seen = getRuntimeContext().getState(desc);
}
@Override
public void flatMap(Event event, Collector<Event> out) throws Exception {
if (seen.value() == null) {
out.collect(event);
seen.update(true);
}
}
}
这也可以实现为RichFilterFunction,BTW。但是请注意,如果您有一个无界的键空间,所使用的状态将无限增长,直到您用完堆或磁盘上的空间为止,这取决于您选择的Flink的状态后端。如果这是一个问题,您可能希望通过状态生存时间设置一个状态保留策略。
还要注意,在Flink管道的不同部分之间共享状态是不可能的。您需要将事情从内到外与看起来正常的事情相比较,并将事件流带到状态,而不是获取它。
编辑2019:此问题是在2016年11月,当前方法和以前方法的接受答案如下。 我有一个<code>数据。表约有250万行的表。有两列。我想删除两列中重复的任何行。之前的数据。帧我会这样做:<code>df- 有什么建议吗? 干杯,戴维 例 在上面的data.table中,其中< code>V2是表键,只有第4、7和10行将被删除。
本文向大家介绍在 JavaScript 中包含唯一字符的筛选字符串,包括了在 JavaScript 中包含唯一字符的筛选字符串的使用技巧和注意事项,需要的朋友参考一下 问题 我们需要编写一个 JavaScript 函数来接受一个字符串 str。我们的函数应该构造一个只包含输入字符串中唯一字符的新字符串,并删除出现的所有重复字符。 示例 以下是代码- 输出结果 以下是控制台输出-
我有一个对象数组,如何在TypeScript/JavaScript中筛选唯一id数组 列表数组-
我们正在使用Debezium+PostgreSQL。 注意,我们得到了用于创建、读取、更新和删除的4种类型的事件-c、r、u和D。 事件的读取类型未用于我们的应用程序。实际上,我想不出'r'事件的用例,除非我们正在审计或镜像事务的活动。 我从一个贡献者那里得到了使用snapshot.mode的线索。我想当Debezium创建一个快照时必须要做的事情。我不知道怎么做。
我使用了将我的数据帧过滤为两列“Worker”和“Time Type”。 示例数据集 我现在只想看到那些“兼职”或“全职”的输出。 到目前为止,我构建的代码是: 但是我得到了错误 有人知道一个简单的方法来解决这个问题吗? 理想情况下,我想以两件事结束: > 显示全职和兼职员工的输出。 另一个显示此参数以外异常的输出,即第2行中的“Tom”显示“paert Tme”,这是一个异常,值得作为单独的输出
是否可以使用setState更新对象的属性? 比如: 我可以使用以下方式将事件记录到控制台:- 我在控制台上创建的solidity ABI对象是 我需要设置所需的数据,并在以后使用它,所以我使用.setState如下 但是它显示. setState不是一个函数。请帮帮我。