假设我有两个 DataStream
DataStream<String> A contains values {id1_id2 , id3_id4, id99_id0, id15_id3,id11_id5....}
DataStream<String> B contains values {id2, id3,id5...}
是否可以在DataStream A上进行一些处理,以便它将在另一个中输出值
DataStream<String> C ={id1, id3, id15, id11}
所以B中存在的所有值都将与a相交。我已经尝试使用processElement()和RichCoFlatMapFunction,但它不起作用。
public class MatchAggregator
extends RichCoFlatMapFunction<String, String, Tuple1<String>> {
private ValueState<String> doubleState;
private ValueState<String> singleState;
@Override
public void open(Configuration config) {
doubleState = getRuntimeContext().getState(new ValueStateDescriptor<>("doubleEvents",String.class));
singleState = getRuntimeContext().getState(new ValueStateDescriptor<>("singleEvents",String.class));
}
@Override
public void flatMap1(String s, Collector<Tuple1<String>> collector) throws Exception {
String single = singleState.value();
//this is outputting null.
System.out.println(single);
//s is also null
if(single.contains(s)){
String replaceNumber = single.replace(s,"");
String replaceEmp = replaceNumber.replace("_","");
single.clear();
collector.collect(Tuple1.of(replaceEmp));
}else {
personContactState.update(s);
}
}
@Override
public void flatMap2(String s, Collector<Tuple1<String>> collector) throws Exception {
}
}
我使用了两种数据流,比如:
DataStream<Tuple1<String>> match = A.connect(B).flatMap(new MatchAggregator());
match.print();
RichCoFlatMapFunction
的确切行为将取决于您对两个连接流的键控方式。
String single=singleState。value()。在您共享的代码中,
update
从不在singleState
,因此singleState。value()
将始终为空。
Datastream Pro是一个数据库"浏览器"和数据操作工具.它易于使用,可靠,稳定,操作直观。Datastream Pro支持所有兼容JDBC的数据库(已经在Oracle,MySQL,postgreSQL和HSQLDb上测试成功).利用它可以浏览与编辑数据库中的数据,可在一个友好的界面中运行与编辑SQL脚本,可使用查询编辑器来编辑SQL查询,可同时连接到多个数据库和易于使用的连接向导等。
我想加入一个大表,不可能包含在TM内存和流(kakfa)中。我在测试中成功加入了这两个表,将table-api与datastream api混合在一起。我做了以下操作: 它正在工作,但我从未见过这种类型的实现。可以吗?缺点是什么?
以下用例的最佳实践建议是什么?我们需要将流与一组“规则”相匹配,这些规则本质上是Flink数据集的概念。可以对此“规则集”进行更新,但并不频繁。必须根据“规则集”中的所有记录检查每个流事件,并且每个匹配将生成一个或多个事件到接收器数据流中。规则集中的记录数在6位数范围内。 目前,我们只是将规则加载到本地规则列表中,并在传入的数据流上使用flatMap。在flatMap中,我们只是在一个列表上迭代,
我正在使用Flink流读取来自Kafka的数据,并对数据进行处理。在使用Kafka之前,当应用程序启动时,我需要使用DataSet API读取一个文件,并根据一些条件对文件进行排序,然后从中创建一个列表。然后开始以流媒体的方式从Kafka那里消费。我已经编写了一个逻辑,使用DataSet API从文件中读取数据并对其进行排序。但当我尝试调谐程序,它从来没有执行,闪烁立即开始消耗Kafka。有没有办
我试图编写一个流作业,它将数据流下沉到postgres表中。为了提供完整的信息,我的工作基于以下文章:https://tech.signavio.com/2017/postgres-flink-sink,这些文章建议使用JDBCoutputFormat。 所以我的问题是:我错过了什么吗?我应该将插入的行提交到某个地方吗? 向你致意,伊格内修斯