我正在学习Flink,我从使用DataStream的简单字数统计开始。为了增强处理能力,我过滤了输出,以仅显示找到3个或更多单词的结果。
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9000)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.apply(new MyWindowFunction())
.sum(1)
.filter(word -> word.f1 >= 3);
我想创建一个WindowFunction,根据找到的单词值对输出进行排序。我试图实现的WindowFunction根本不编译。我正在努力定义WindowFunction接口的apply方法和参数。
public static class MyWindowFunction implements WindowFunction<
Tuple2<String, Integer>, // input type
Tuple2<String, Integer>, // output type
Tuple2<String, Integer>, // key type
TimeWindow> {
void apply(Tuple2<String, Integer> key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) {
String word = ((Tuple2<String, Integer>)key).f0;
Integer count = ((Tuple2<String, Integer>)key).f1;
.........
out.collect(new Tuple2<>(word, count));
}
}
只要Splitter
类(应该是一个FlatMapFunction
)正在发射Tuple2,
. sum(1)
方法将完成您需要的一切(无需使用Application()
)
那么
.sum(1)
将为您进行聚合。如果你需要的东西与 sum()
不同,你通常会使用 .reduce(new MyCustomReduceFunction()),
因为这将是最有效和可扩展的方法,因为就不需要在内存中缓冲大量数据而言。
我正在更新这个答案,使用Flink 1.12.0。为了对中的流元素进行排序,我必须在使用< code>ReduceFunction对流进行计数后使用< code > KeyedProcessFunction 。然后,我必须将最后一个转换的并行度设置为< code>1,以便不改变我使用< code > KeyedProcessFunction 排序的元素的顺序。我使用的序列是< code>socketTextStream -
public class SocketWindowWordCountJava {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.socketTextStream("localhost", 9000)
.flatMap(new SplitterFlatMap())
.keyBy(new WordKeySelector())
.reduce(new SumReducer())
.keyBy(new WordKeySelector())
.process(new SortKeyedProcessFunction(3 * 1000))
.print().setParallelism(1);
String executionPlan = env.getExecutionPlan();
System.out.println("ExecutionPlan ........................ ");
System.out.println(executionPlan);
System.out.println("........................ ");
env.execute("Window WordCount sorted");
}
}
我用来对流进行排序的自定义项是SortKeyedProcessFunction
,它扩展了keyedProcessFunctions
。我使用<code>值状态
public class SortKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, Event> {
private static final long serialVersionUID = 7289761960983988878L;
// delay after which an alert flag is thrown
private final long timeOut;
// state to remember the last timer set
private ValueState<List<Event>> listState = null;
private ValueState<Long> lastTime = null;
public SortKeyedProcessFunction(long timeOut) {
this.timeOut = timeOut;
}
@Override
public void open(Configuration conf) {
// setup timer and HLL state
ValueStateDescriptor<List<Event>> descriptor = new ValueStateDescriptor<>(
// state name
"sorted-events",
// type information of state
TypeInformation.of(new TypeHint<List<Event>>() {
}));
listState = getRuntimeContext().getState(descriptor);
ValueStateDescriptor<Long> descriptorLastTime = new ValueStateDescriptor<Long>(
"lastTime",
TypeInformation.of(new TypeHint<Long>() {
}));
lastTime = getRuntimeContext().getState(descriptorLastTime);
}
@Override
public void processElement(Tuple2<String, Integer> value, Context context, Collector<Event> collector) throws Exception {
// get current time and compute timeout time
long currentTime = context.timerService().currentProcessingTime();
long timeoutTime = currentTime + timeOut;
// register timer for timeout time
context.timerService().registerProcessingTimeTimer(timeoutTime);
List<Event> queue = listState.value();
if (queue == null) {
queue = new ArrayList<Event>();
}
Long current = lastTime.value();
queue.add(new Event(value.f0, value.f1));
lastTime.update(timeoutTime);
listState.update(queue);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Event> out) throws Exception {
// System.out.println("onTimer: " + timestamp);
// check if this was the last timer we registered
System.out.println("timestamp: " + timestamp);
List<Event> queue = listState.value();
Long current = lastTime.value();
if (timestamp == current.longValue()) {
Collections.sort(queue);
queue.forEach( e -> {
out.collect(e);
});
queue.clear();
listState.clear();
}
}
}
class Event implements Comparable<Event> {
String value;
Integer qtd;
public Event(String value, Integer qtd) {
this.value = value;
this.qtd = qtd;
}
public String getValue() { return value; }
public Integer getQtd() { return qtd; }
@Override
public String toString() {
return "Event{" +"value='" + value + '\'' +", qtd=" + qtd +'}';
}
@Override
public int compareTo(@NotNull Event event) {
return this.getValue().compareTo(event.getValue());
}
}
因此,当我使用$ nc -lk 9000
并在控制台上键入单词时,我会在输出上按顺序看到它们
...
Event{value='soccer', qtd=7}
Event{value='swim', qtd=5}
...
Event{value='basketball', qtd=9}
Event{value='soccer', qtd=8}
Event{value='swim', qtd=6}
其他UDF用于流程序的其他转换,它们在这里是为了完整性。
public class SplitterFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 3121588720675797629L;
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word : sentence.split(" ")) {
out.collect(Tuple2.of(word, 1));
}
}
}
public class WordKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
}
public class SumReducer implements ReduceFunction<Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> event1, Tuple2<String, Integer> event2) throws Exception {
return Tuple2.of(event1.f0, event1.f1 + event2.f1);
}
}
最近我尝试使用ApacheFlink进行快速批处理。我有一个表,它有一个列:value和一个不相关的索引列 基本上我想计算每5行值的平均值和范围。然后我将根据我刚才计算的平均值计算平均值和标准偏差。所以我想最好的方法是使用窗口。 看起来是这样的 但是我不知道用。我试过,但它说没有这样的输入。我只希望它在从源代码读取时按顺序分组。但是它必须是一个时间属性,所以我不能使用索引列作为排序。 我是否必须添
问题内容: 在我可以排序的使用此声明: 我无法使用 Swift 重现相同的语句。我发现的只是使用。 问题答案: 您可以使用Swift的内置排序函数,也可以使用Swift数组,因为Swift数组是桥接的,因此可以直接从swift 调用。 使用Swift的功能: 或者,使用的:
问题内容: 我有可能需要按1-n键排序的大文件。其中一些键可能是数字键,有些则可能不是数字键。这是一个固定宽度的柱状文件,因此没有定界符。 有没有一种很好的方法可以用Unix排序呢?使用一个键,就像使用“ -n”一样简单。我已经阅读了手册页并简短地搜索了Google,但是没有找到一个很好的例子。我将如何实现这一目标? 注意:由于文件大小可能,我排除了Perl。这将是不得已的办法。 问题答案: 使用
问题内容: 是否可以使用排序数组,然后再将另一个相关数组定位为与排序数组相同,例如: 从这一点出发,我想对数组进行排序,这样,如果“人”有一个cellNo“ x”,则在对数组进行排序后,他将具有相同的“ cellNo”“ x” 问题答案: 我会采用另一种方法: 创建一个新对象: 创建一个比较器: 打电话一对阵列
问题内容: 为什么我的打印输出数组未在以下代码中排序? 问题答案: 您需要两个循环来实现Bubble Sort。 样例代码:
问题内容: 我有一组三个列表项,它们希望在页面加载时从高到低自动显示。理想情况下使用jquery或javascript。 每个列表项都需要有自己的ID,因为它们每个都有各自的背景图像。数字必须是文本节点,以便用户可以编辑它们。 问题答案: 这可能是最快的方法,因为它不使用jQuery: 像下面这样调用函数: 您可以以相同的方式对其他列表进行排序,如果列表类在同一页面上还有其他元素,则应给您的ul一