当前位置: 首页 > 知识库问答 >
问题:

为什么Flink文件系统接收器会拆分为多个文件

钱季
2023-03-14

我想使用Flink读取输入文件,进行聚合,然后将结果写入输出文件。作业处于批处理模式。请参见字数。py如下:

from pyflink.table import EnvironmentSettings, BatchTableEnvironment

# https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html

env_settings = EnvironmentSettings.new_instance().in_batch_mode().build()
table_env = BatchTableEnvironment.create(environment_settings=env_settings)

my_source_ddl = """
    create table mySource (
        word VARCHAR
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/input'
    )
"""

my_sink_ddl = """
    create table mySink (
        word VARCHAR,
        `count` BIGINT
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/output'
    )
"""

transform_dml = """
INSERT INTO mySink
SELECT word, COUNT(1) FROM mySource GROUP BY word
"""

table_env.execute_sql(my_source_ddl)
table_env.execute_sql(my_sink_ddl)
table_env.execute_sql(transform_dml).wait()

# before run: echo -e  "flink\npyflink\nflink" > /tmp/input
# after run: cat /tmp/output

在运行python wordcount之前。py,我运行echo-e“flink\npyflink\nflink”

> ls /tmp/output
part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0 part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0
> cat /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-6-file-0
pyflink,1
> cat /tmp/output/part-305680d0-e680-420f-ab17-3e558ceaeba3-cp-0-task-7-file-0
flink,2

虽然我希望有一个包含内容的单个文件/tmp/输出:

pyflink,1
flink,2

实际上,我通过调整下面生成单个文件/tmp/输出的参数,得到了上面的python程序。

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit

# https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv()
                 .field('word', DataTypes.STRING())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv()
                 .field_delimiter('\t')
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
   .select(tab.word, lit(1).count) \
   .execute_insert('mySink').wait()

运行此版本将生成一个 /tmp/output.注意,它没有逗号分隔符。

> cat /tmp/output
flink   2
pyflink 1

知道为什么吗?谢谢

共有1个答案

苍意智
2023-03-14

第一次在没有指定并行度的情况下运行它时,得到了默认的并行度,即大于1(可能是4或8,取决于计算机的内核数)。

Flink被设计为可伸缩的,为了实现这一点,操作符的并行实例(如接收器)彼此解耦。例如,想象一个拥有100或1000个节点的大型集群。为了使其正常工作,每个实例都需要写入自己的文件。

由于您指定了,逗号已更改为制表符。field\u分隔符(“\t”)

 类似资料:
  • 问题内容: 我有从mongodb导出的json文件,如下所示: 大约有30000行,我想将每一行拆分成自己的文件。 (我正在尝试将我的数据转移到榻榻米群集上) 我尝试这样做: 但是我发现它似乎减少了行的负载,而当我期望30000个奇数时,运行此命令的输出仅给了我50个奇数文件! 有没有一种逻辑方法可以使此操作不使用任何适合的方法删除任何数据? 问题答案: 假设您不在乎确切的文件名,如果要将输入拆分

  • 问题内容: 将Spring的配置拆分为多个xml文件的正确方法是什么? 此刻我有 /WEB-INF/foo-servlet.xml /WEB-INF/foo-service.xml /WEB-INF/foo-persistence.xml 我有以下内容: 实际问题: 这种方法正确/最佳吗? 我真的需要同时指定中的配置位置 和该板块? 我需要记住什么才能能够引用中定义的?这与 指定有关吗? 更新1:

  • 问题内容: 我的体积太大了,很难找到正确的视图。 如何将其拆分为多个文件,然后导入?是否涉及速度损失? 我可以这样吗? 问题答案: 在Django中,所有内容都是Python模块(* .py)。你可以创建一个具有内部视图的文件夹,并且仍然可以导入视图,因为这也实现了Python模块。但是一个例子会更好。 你的原始图片可能如下所示: 使用以下文件夹/文件结构,它将起到相同的作用: viewsa.py

  • 问题内容: 有什么方法可以将.tfrecords文件直接拆分为多个.tfrecords文件,而无需回写每个Dataset示例? 问题答案: 您可以使用如下函数: 例如,要将文件分成100条记录,您可以执行以下操作: 这将创建多个较小的记录文件,等等。

  • 问题内容: 我有一个文件,我想用Java读取并将其拆分为(用户输入)输出文件。这是我读取文件的方式: 如何将文件拆分为文件? 注意-由于文件中的条目数约为100k,因此我无法将文件内容存储到数组中,然后将其拆分并保存到多个文件中。 问题答案: 由于一个文件可能很大,因此每个拆分文件也可能很大。 例: 源文件大小:5GB 数字分割:5:目的地 档案大小:每个1GB(5个档案) 即使我们有这样的内存,

  • 问题内容: 我有以下代码: 我将有许多服务(如一项服务),并且我不想将它们全部放在同一个文件中。 我在Stack Overflow中读了另一个问题,我可能需要这样的其他文件:在该文件中写入所有服务,但是当我启动Node时会抛出该错误。 如何分隔代码? 问题答案: 您可以在不同的文件(例如 test-routes.js)中 定义路由,如下所示: 现在在您的主文件中说出 server.js, 您可以像