当前位置: 首页 > 知识库问答 >
问题:

flink中非并行数据源到ParallelDataSource

李飞翼
2023-03-14

我想在Apache Flink中将非并行数据源转换为并行数据源。在伪代码中,它类似于以下内容:

int partitions = env.getParallelim();

DataSource<String> input = new CustomDataSource<String>();
DataSource<String> parallel = input.setParallelism(partitions).suffle();

我通过实现一个noop map函数来完成它,但我认为还有更优雅的方法。

谢谢

共有1个答案

凤高澹
2023-03-14

您可以使用ParallelSourceFunction而不是SourceFunction作为要在CustomDataSource中实现的接口。

参见:https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/parallelsourcefunction.html

 类似资料:
  • 不幸的是,Kafka Flink连接器只支持-csv、json和avro格式。因此,我不得不使用较低级别的API(数据流)。 问题:如果我可以从datastream对象中创建一个表,那么我就可以接受在该表上运行的查询。它将使转换部分无缝和通用。是否可以在数据流对象上运行SQL查询?

  • 我在Flink中构建了一个工作流,它由一个自定义源、一系列地图/平面地图和一个接收器组成。 我的自定义源的run()方法遍历存储在文件夹中的文件,并通过上下文的collect()方法收集每个文件的名称和内容(我有一个自定义对象,它将此信息存储在两个字段中)。 然后,我有一系列地图/平面图来转换这些对象,然后使用自定义接收器将其打印到文件中。在Flink的Web UI中生成的执行图如下所示: 我有一

  • 问题内容: 我正在尝试使用Hibernate作为提供程序将Web应用程序的JDBC代码更改为JPA。我正在使用Eclipse IDE。在那我已经定义了一个MySQL数据源。我在persistence.xml中添加了它。但是,我收到以下错误。 我的persistence.xml看起来像, 有什么建议么!!!提前致谢! 问题答案: 在中配置数据源时,您不需要。在应用程序服务器配置中配置数据源并通过JN

  • 我一直在试图找到一个连接器,将数据从Redis读取到Flink。Flink的文档中包含了要写入Redis的连接器的描述。在我的Flink工作中,我需要从Redis读取数据。在使用ApacheFlink进行数据流传输时,Fabian提到可以从Redis读取数据。可用于此目的的接头是什么?

  • 我试图理解Flink中的并行是如何工作的。本文件https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/programming-model.html似乎表明水槽的平行度等于1。在我的例子中,我正在向我的接收器中的HBase写信——这是否意味着只有一个任务(线程?)哪个将写入HBase?它是否没有为应用程序设置全局并行

  • 我正在启动一个新的Flink应用程序,允许我的公司执行大量报告。我们有一个现有的遗留系统,我们需要的大部分数据都保存在SQL Server数据库中。在开始使用新部署的Kafka流中的更多数据之前,我们首先需要使用这些数据库中的数据。 我花了很多时间阅读Flink的书和网页,但我有一些简单的问题和假设,我希望你能帮助我进步。 首先,我想使用DataStream API,这样我们既可以使用历史数据,也