当前位置: 首页 > 知识库问答 >
问题:

Apache Flink作业中的多数据流支持

葛鸿熙
2023-03-14

其他流式框架(如Apache Samza、Storm或Nifi)是否可以实现这一点?

我们非常期待得到答复。

共有1个答案

全昊焜
2023-03-14

是的,这在闪烁和风暴中是可能的(没有关于Samza或Nifi的线索...)

您可以添加任意多个源运算符,每个运算符都可以从不同的源使用。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = ... // see Flink webpage for more details    

DataStream<String> stream1 = env.addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);)
DataStream<String> stream2 = env.readTextFile("/tmp/myFile.txt");

DataStream<String> allStreams = stream1.union(stream2);

对于使用低级别API的Storm,模式类似。请参阅Apache Storm bolt从不同的喷注/螺栓接收多个输入元组

 类似资料:
  • 当我在GCP中运行作业时,这工作很好,但如果没有任何更新,它将失败。如果我删除update标志,只要没有正在运行的作业,它就可以正常工作。 是否有一种方法来指定,如果作业存在,则更新它,而不仅仅是开始一个新的作业?

  • 我们正在将我们的工作和数据流从Spring XD迁移到Kubernetes上的Spring cloud数据流。 Spring cloud数据流有商业支持吗?任何链接都有帮助。

  • 我注意到,每次我运行一个新作业时,它所花费的时间比我再次启动它时长20%左右? 如果一个作业运行多次,flink是否缓存一些结果并重用它们?如果是,我如何控制这一点? 我想测量我的任务运行了多长时间,但每次我重新运行它们时,速度都比以前快。

  • 我当前正尝试将Dataflow与pub/sub一起使用,但出现以下错误: 工作流失败。原因:(6E74E8516C0638CA):刷新凭据时出现问题。请检查:1。已为项目启用Dataflow API。2.您的项目有一个机器人服务帐户:service-[project number]@dataflow-service-producer-prod.iam.gserviceAccount.com应该可以

  • 在我当前的架构中,多个数据流作业在不同阶段被触发,作为ABC框架的一部分,我需要捕获这些作业的作业id作为数据流管道中的审计指标,并在BigQuery中更新它。 如何使用JAVA从管道中获取数据流作业的运行id?有没有我可以使用的现有方法,或者我是否需要在管道中使用google cloud的客户端库?

  • 我试图从一个数据流作业中运行两个分离的管道,类似于下面的问题: 一个数据流作业中的并行管道 如果我们使用单个p.run()使用单个数据流作业运行两个分离的管道,如下所示: 我认为它将在一个数据流作业中启动两个独立的管道,但它会创建两个包吗?它会在两个不同的工人上运行吗?