我需要将一个由整数组成的立方体拆分为向量,对每个向量执行一些操作(比如简单的加法),然后将向量合并回一个立方体。向量操作应该并行执行(即每个流一个向量)。多维数据集是包含ID的对象。
我可以将多维数据集拆分为向量,并使用多维数据集ID创建一个元组,然后使用keyBy(ID),并为每个多维数据集的向量创建一个分区。然而,似乎我必须使用某个时间单位的窗口来做这件事。应用程序对延迟非常敏感,所以我更喜欢在向量到达时将它们组合起来,也许使用某种逻辑时钟(我知道一个多维数据集中有多少个向量),然后在最后一个向量到达时向下游发送重新组装的多维数据集。这在Flink有可能吗?
下面是一个示例性的代码片段:
//Stream topology..
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Cube> stream = env
//Take cubes from collection and send downstream
.fromCollection(cubes)
//Split the cube(int[][][]) to vectors(int[]) and send downstream
.flatMap(new VSplitter()) //returns tuple with id at pos 1
.keyBy(1)
//For each value in each vector element, add its value with one.
.map(new MapFunction<Tuple2<CubeVector, Integer>, Tuple2<CubeVector, Integer>>() {
@Override
public Tuple2<CubeVector, Integer> map(Tuple2<CubeVector, Integer> cVec) throws Exception {
CubeVector cv = cVec.getField(0);
cv.cubeVectorAdd(1);
cVec.setField(cv, 0);
return cVec;
}
})
//** Merge vectors back to a cube **//
.
.
.
//The cube splitter to vectors..
public static class VSplitter implements FlatMapFunction<Cube, Tuple2<CubeVector, Integer>> {
@Override
public void flatMap(Cube cube, Collector<Tuple2<CubeVector, Integer>> out) throws Exception {
for (CubeVector cv : cubeVSplit(cube)) {
//out.assignTimestamp()
out.collect(new Tuple2<CubeVector, Integer>(cv, cube.getId()));
}
}
}
您可以使用FlatMapFunction
,它不断追加CubeVector
,直到它看到足够多的CubeVector
来重构多维数据集
。下面的代码段应该可以完成这个任务:
DataStream<Tuple2<CubeVector, Integer> input = ...
input.keyBy(1).flatMap(
new RichFlatMapFunction<Tuple2<CubeVector, Integer>, Cube> {
private static final ListStateDescriptor<CubeVector> cubeVectorsStateDescriptor = new ListStateDescriptor<CubeVector>(
"cubeVectors",
new CubeVectorTypeInformation());
private static final ValueStateDescriptor<Integer> cubeVectorCounterDescriptor = new ValueStateDescriptor<>(
"cubeVectorCounter",
BasicTypeInfo.INT_TYPE_INFO);
private ListState<CubeVector> cubeVectors;
private ValueState<Integer> cubeVectorCounter;
@Override
public void open(Configuration parameters) {
cubeVectors = getRuntimeContext().getListState(cubeVectorsStateDescriptor);
cubeVectorCounter = getRuntimeContext().getState(cubeVectorCounterDescriptor);
}
@Override
public void flatMap(Tuple2<CubeVector, Integer> cubeVectorIntegerTuple2, Collector<Cube> collector) throws Exception {
cubeVectors.add(cubeVectorIntegerTuple2.f0);
final int oldCounterValue = cubeVectorCounter.value();
final int newCounterValue = oldCounterValue + 1;
if (newCounterValue == NUMBER_CUBE_VECTORS) {
Cube cube = createCube(cubeVectors.get());
cubeVectors.clear();
cubeVectorCounter.update(0);
collector.collect(cube);
} else {
cubeVectorCounter.update(newCounterValue);
}
}
});
我正在使用Flink处理来自某些数据源(如Kafka、Pravega等)的数据。 在我的例子中,数据源是Pravega,它为我提供了一个flink连接器。 我的数据源正在向我发送一些JSON数据,如下所示: 以下是我的代码: 如您所见,我使用FlinkPravegaReader和适当的反序列化程序来获取来自Pravega的JSON流。 然后我尝试将JSON数据转换为String,它们并对它们进行计
我想分割交换消息体(它是MyCustomClass对象的列表),处理它们(一个接一个),然后将所有的交换聚合在一起。拆分可以,一个一个处理也可以,但是我想不出怎么把它们聚合起来。 我不需要复杂的聚合,只需要收集分离的交换列表,并在最终的处理器中处理它们。
我有一个来自Kafka数据流,它对MyModel中的一个字段有2个可能的值。MyModel是一个pojo,具有从Kafka的消息解析的特定于领域的字段。
整理周二的数据集,我看不到如何拆分“流派”列。我试过:
本文向大家介绍如何在R中将向量拆分为块?,包括了如何在R中将向量拆分为块?的使用技巧和注意事项,需要的朋友参考一下 这可以在seq_along,split和ceiling的帮助下完成。 示例
我正在尝试使用ApacheFlink从读取数据 我的Flink工作是连接到webSocket,但它不是从webSocket拉数据。 下面是我尝试使用ApacheFlink API连接到websocket的示例代码 中的run()