当前位置: 首页 > 知识库问答 >
问题:

如何使用部署在Kubernetes上的Flink运行Beam Python管道?

梁丘波
2023-03-14

当Flink在Kubernetes作为豆荚运行时,有人知道如何用Flink运行梁Python管道吗?

我已经成功地使用Portable runner和指向运行在Docker容器中的本地Flink服务器的作业服务运行了一个Beam Python管道。

是否有一种方法可以部署一个侧容器并使用不同的工厂来运行Python harness流程?正确的做法是什么?

这是DockerEnvironmentFactory的修补程序:

diff -pr beam-release-2.15.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java beam-release-2.15.0-1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
*** beam-release-2.15.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java   2019-08-14 22:33:41.000000000 +0100
--- beam-release-2.15.0-1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java 2019-09-09 16:02:07.000000000 +0100
*************** package org.apache.beam.runners.fnexecut
*** 19,24 ****
--- 19,26 ----

  import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;

+ import java.net.InetAddress;
+ import java.net.UnknownHostException;
  import java.nio.file.Files;
  import java.nio.file.Paths;
  import java.time.Duration;
*************** public class DockerEnvironmentFactory im
*** 127,133 ****
          ImmutableList.<String>builder()
              .addAll(gcsCredentialArgs())
              // NOTE: Host networking does not work on Mac, but the command line flag is accepted.
!             .add("--network=host")
              // We need to pass on the information about Docker-on-Mac environment (due to missing
              // host networking on Mac)
              .add("--env=DOCKER_MAC_CONTAINER=" + System.getenv("DOCKER_MAC_CONTAINER"));
--- 129,135 ----
          ImmutableList.<String>builder()
              .addAll(gcsCredentialArgs())
              // NOTE: Host networking does not work on Mac, but the command line flag is accepted.
!             .add("--network=flink")
              // We need to pass on the information about Docker-on-Mac environment (due to missing
              // host networking on Mac)
              .add("--env=DOCKER_MAC_CONTAINER=" + System.getenv("DOCKER_MAC_CONTAINER"));
*************** public class DockerEnvironmentFactory im
*** 222,228 ****

      private static ServerFactory getServerFactory() {
        ServerFactory.UrlFactory dockerUrlFactory =
!           (host, port) -> HostAndPort.fromParts(DOCKER_FOR_MAC_HOST, port).toString();
        if (RUNNING_INSIDE_DOCKER_ON_MAC) {
          // If we're already running in a container, we need to use a fixed port range due to
          // non-existing host networking in Docker-for-Mac. The port range needs to be published
--- 224,230 ----

      private static ServerFactory getServerFactory() {
        ServerFactory.UrlFactory dockerUrlFactory =
!               (host, port) -> HostAndPort.fromParts(getCanonicalHostName(), port).toString();
        if (RUNNING_INSIDE_DOCKER_ON_MAC) {
          // If we're already running in a container, we need to use a fixed port range due to
          // non-existing host networking in Docker-for-Mac. The port range needs to be published
*************** public class DockerEnvironmentFactory im
*** 237,242 ****
--- 239,252 ----
      }
    }

+   private static String getCanonicalHostName() throws RuntimeException {
+     try {
+       return InetAddress.getLocalHost().getCanonicalHostName();
+     } catch (UnknownHostException e) {
+       throw new RuntimeException(e);
+     }
+   }
+
    /** Provider for DockerEnvironmentFactory. */
    public static class Provider implements EnvironmentFactory.Provider {
      private final boolean retainDockerContainer;
*************** public class DockerEnvironmentFactory im
*** 269,275 ****
      public ServerFactory getServerFactory() {
        switch (getPlatform()) {
          case LINUX:
!           return ServerFactory.createDefault();
          case MAC:
            return DockerOnMac.getServerFactory();
          default:
--- 279,286 ----
      public ServerFactory getServerFactory() {
        switch (getPlatform()) {
          case LINUX:
!           return DockerOnMac.getServerFactory();
! //          return ServerFactory.createDefault();
          case MAC:
            return DockerOnMac.getServerFactory();
          default:

这是我用来运行Flink的Docker组合文件:

version: '3.4'
services:
  jobmanager:
    image: tenx/flink:1.8.1
    command: 'jobmanager'
    environment:
      JOB_MANAGER_RPC_ADDRESS: 'jobmanager'
      DOCKER_MAC_CONTAINER: 1
      FLINK_JM_HEAP: 128
    volumes:
      - jobmanager-data:/data
      - /var/run/docker.sock:/var/run/docker.sock
    ports:
      - target: 8081
        published: 8081
        protocol: tcp
        mode: ingress
    networks:
      - flink
  taskmanager:
    image: tenx/flink:1.8.1
    command: 'taskmanager'
    environment:
      JOB_MANAGER_RPC_ADDRESS: 'jobmanager'
      DOCKER_MAC_CONTAINER: 1
      FLINK_TM_HEAP: 1024
      TASK_MANAGER_NUMBER_OF_TASK_SLOTS: 2
    networks:
      - flink
    volumes:
      - taskmanager-data:/data
      - /var/run/docker.sock:/var/run/docker.sock
      - /var/folders:/var/folders
volumes:
    jobmanager-data:
    taskmanager-data:
networks:
  flink:
    external: true
import apache_beam as beam
import logging

class LogElements(beam.PTransform):

    class _LoggingFn(beam.DoFn):

        def __init__(self, prefix=''):
            super(LogElements._LoggingFn, self).__init__()
            self.prefix = prefix

        def process(self, element, **kwargs):
            logging.info(self.prefix + str(element))
            yield element

    def __init__(self, label=None, prefix=''):
        super(LogElements, self).__init__(label)
        self.prefix = prefix

    def expand(self, input):
        input | beam.ParDo(self._LoggingFn(self.prefix))


from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099"])

p = beam.Pipeline(options=options)

(p | beam.Create([1, 2, 3, 4, 5]) | LogElements())

p.run()

我可以更改用于运行Python容器的映像:

options=PipelineOptions([“--runner=portablerunner”,“--job_endpoint=localhost:8099”,“--environment_type=docker”,“--environment_config=beam/python:latest”])

我可以禁用Docker并启用ExternalEnvironmentFactory:

是否有可用的实现?

共有1个答案

黄宏旷
2023-03-14

我找到解决办法了。Apache Beam2.16.0的新版本提供了一个与外部环境类型结合使用的实现。该实现基于worker_pool_main,它是为支持Kubernetes而创建的。

 类似资料:
  • 我已经在应用程序中添加了这些字段。pom中微服务和依赖关系的yml。xml。Jaeger在my local上运行也可以识别服务 我已经在kubernetes上部署了所有的微服务。请帮助我在kubernetes部署jaeger。 更新:我已达到此步骤。我有一个用于jaeger查询的负载平衡器IP。但是我的微服务将把日志发送到哪个主机和端口??

  • 我正在kubernetes上试用最新版本的Flink1.5的flink工作。 我的问题是如何在上面的flink集群上运行一个示例应用程序。flink示例项目提供了如何使用flink应用程序构建docker映像并将该应用程序提交给flink的信息。我遵循了这个例子,只是把flink的版本改成了最新版本。我发现应用程序(example-app)提交成功,并且在kubernetes的pod中显示,但是f

  • TiCDC 是一款 TiDB 增量数据同步工具,本文介绍如何使用 TiDB Operator 在 Kubernetes 上部署 TiCDC。 前置条件 TiDB Operator 部署完成。 全新部署 TiDB 集群同时部署 TiCDC 参考 在标准 Kubernetes 上部署 TiDB 集群进行部署。 在现有 TiDB 集群上新增 TiCDC 组件 编辑 TidbCluster Custom

  • 本文介绍如何在 Kubernetes 上部署 TiFlash。 前置条件 TiDB Operator 部署完成。 全新部署 TiDB 集群同时部署 TiFlash 参考 在标准 Kubernetes 上部署 TiDB 集群进行部署。 在现有 TiDB 集群上新增 TiFlash 组件 编辑 TidbCluster Custom Resource: kubectl edit tc ${cluster

  • 本文介绍如何在 Kubernetes 上部署 TiDB Operator。 准备环境 TiDB Operator 部署前,请确认以下软件需求: Kubernetes v1.12 或者更高版本 DNS 插件 PersistentVolume RBAC 启用(可选) Helm 版本 >= 2.11.0 && < 3.0.0 && != 2.16.4 部署 Kubernetes 集群 TiDB Oper

  • Kubernetes的部署和状态集提供了在独立,分布式或共享模式下部署MinIO服务器的完美平台。 在Kubernetes上部署MinIO有多种选择,您可以选择最适合您的。 MinIO Helm Chart通过一个简单的命令即可提供自定义而且简单的MinIO部署。更多关于MinIO Helm部署的资料,请访问这里. 你也可以浏览Kubernetes MinIO示例 ,通过.yaml文件来部署Min