当前位置: 首页 > 知识库问答 >
问题:

如何在Apache Flink中连接两个流

王经赋
2023-03-14

例如,我想在单个中组合1,2,34,5的流,因此结果应该是:1, 2, 3, 4, 5。换句话说:如果第一个源已耗尽-从第二个源获取元素。我最近的尝试是:

val a = streamEnvironment.fromElements(1, 2, 3)

val b = streamEnvironment.fromElements(4, 5)

val c = a.union(b)

c.map(x => println(s"X=$x")) // X=4, 5, 1, 2, 3 or something like that

也对datetime进行了类似的尝试,但结果相同。

共有3个答案

廉高邈
2023-03-14

您可以通过一个包含堆缓冲区的平面图大致实现这一点。但实际上,这取决于一些问题。例如,如果来自某个输入流的元素被延迟,则输出将不会严格排序。

def process(): StreamExecutionEnvironment = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

implicit val typeInfo = TypeInformation.of(classOf[Int])
implicit val typeInfo2 = TypeInformation.of(classOf[Unit])

val BUF_SIZE = 3
val STREAM_NUM = 2

val a = env.fromElements(1, 2, 3, 3, 4, 5, 6, 7, Int.MaxValue)
val b = env.fromElements(4, 5, 9, 10 , 11, 13, Int.MaxValue)

val c = a.union(b).flatMap(new FlatMapFunction[Int, Int] {
  val heap = collection.mutable.PriorityQueue[Int]().reverse
  var endCount = 0

  override def flatMap(value: Int, out: Collector[Int]): Unit = {
    if (value == Int.MaxValue) {
      endCount += 1

      if (endCount == STREAM_NUM) {
        heap.foreach(out.collect)
      }
    }
    else {
      heap += value

      while (heap.size > BUF_SIZE) {
        val v = heap.dequeue()
        out.collect(v)
      }
    }
  }
}).setParallelism(1)

c.map(x => println(s"X=$x")).setParallelism(1)

env
}
艾骏
2023-03-14

如果您有N个源(不是流)要连续排序,那么您可以用外部源包装它们。类似于:

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

@SuppressWarnings("serial")
public class SequentialSources<T> implements SourceFunction<T>, ResultTypeQueryable<T> {

    private TypeInformation<T> type;
    private SourceFunction<T>[] sources;
    private volatile boolean isRunning = true;

    public SequentialSources(TypeInformation<T> type, SourceFunction<T>...sources) {
        this.type = type;
        this.sources = sources;
    }

    @Override
    public void run(SourceContext<T> context) throws Exception {
        int index = 0;

        while (isRunning) {
            sources[index++].run(context);
            isRunning = index < sources.length;
        }
    }

    @Override
    public void cancel() {
        isRunning = false;

        for (SourceFunction<T> source : sources) {
            source.cancel();
        }
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return type;
    }
}
赖浩荡
2023-03-14

这在现在是不可能的,至少在高级DataStream API中是不可能的。

可以实现一个低级运算符,首先读取输入,然后读取另一个输入。但是,这将完全阻止一个输入,该输入与Flink处理水印和执行检查点的方式不一致。

在未来,这将有可能使用所谓的侧输入。

 类似资料:
  • 我有一个两层神经网络的例子。第一层接受两个参数并有一个输出。第二个应作为第一层的结果使用一个参数和一个附加参数。应该是这样的: 因此,我创建了一个具有两个层的模型,并尝试将它们合并,但它返回了一个错误:<代码>顺序模型中的第一层必须获得“input\u shape”或“batch\u input\u shape”参数 在线<代码>结果。添加(合并)。 型号:

  • 问题内容: 如何在Java中连接两个数组? 问题答案: 使用Apache Commons Lang库 例如: 在Java 8中使用Stream: 或者像这样,使用flatMap: 为此,你必须使用反射:

  • 比如说,我有一个名为“Contact”的表,其中“first\u name”和“last\u name”作为列。基本上,“从联系人c中选择concat(c.firstname,,,c.lastname)作为全名”是我在hibernate中想要做的。 我可以将整个查询放在createQuery中,并获得所需的输出。但是,我不想在hibernate中执行sql查询。我在这里找到了一篇类似的帖子“我们可

  • 问题内容: 我有两个相等的大小。列表1由10个名称组成,列表2由其电话号码组成。 我想将姓名和号码合而为一。我该怎么做呢? 问题答案: 您可以用于将第二个列表的元素添加到第一个列表: 编辑: 根据上面的说明(“ 我想要新的Arraylist中具有名称和编号的单个String。 ”),您需要循环浏览第一个列表并将第二个列表中的项目追加到它。 像这样: 如果输入: 你会得到:

  • 问题内容: 我正在尝试连接Java中的字符串。为什么这不起作用? 问题答案: 你可以使用运算符来连接字符串: 被隐式转换为。

  • 这是我制作的hashmap(Ik它不是最好的,也没有太多的逻辑),我想知道如果用户输入像“DVIII”这样的数据,其中“d”的值为“500”,“viii”的值为“8”,然后将它们打印为DVIII=5008,那么如何连接hashmap的值