当Flink在Kubernetes作为豆荚运行时,有人知道如何用Flink运行梁Python管道吗?
我已经成功地使用Portable runner和指向运行在Docker容器中的本地Flink服务器的作业服务运行了一个Beam Python管道。
是否有一种方法可以部署一个侧容器并使用不同的工厂来运行Python harness流程?正确的做法是什么?
这是DockerEnvironmentFactory的修补程序:
diff -pr beam-release-2.15.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java beam-release-2.15.0-1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
*** beam-release-2.15.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java 2019-08-14 22:33:41.000000000 +0100
--- beam-release-2.15.0-1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java 2019-09-09 16:02:07.000000000 +0100
*************** package org.apache.beam.runners.fnexecut
*** 19,24 ****
--- 19,26 ----
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
+ import java.net.InetAddress;
+ import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
*************** public class DockerEnvironmentFactory im
*** 127,133 ****
ImmutableList.<String>builder()
.addAll(gcsCredentialArgs())
// NOTE: Host networking does not work on Mac, but the command line flag is accepted.
! .add("--network=host")
// We need to pass on the information about Docker-on-Mac environment (due to missing
// host networking on Mac)
.add("--env=DOCKER_MAC_CONTAINER=" + System.getenv("DOCKER_MAC_CONTAINER"));
--- 129,135 ----
ImmutableList.<String>builder()
.addAll(gcsCredentialArgs())
// NOTE: Host networking does not work on Mac, but the command line flag is accepted.
! .add("--network=flink")
// We need to pass on the information about Docker-on-Mac environment (due to missing
// host networking on Mac)
.add("--env=DOCKER_MAC_CONTAINER=" + System.getenv("DOCKER_MAC_CONTAINER"));
*************** public class DockerEnvironmentFactory im
*** 222,228 ****
private static ServerFactory getServerFactory() {
ServerFactory.UrlFactory dockerUrlFactory =
! (host, port) -> HostAndPort.fromParts(DOCKER_FOR_MAC_HOST, port).toString();
if (RUNNING_INSIDE_DOCKER_ON_MAC) {
// If we're already running in a container, we need to use a fixed port range due to
// non-existing host networking in Docker-for-Mac. The port range needs to be published
--- 224,230 ----
private static ServerFactory getServerFactory() {
ServerFactory.UrlFactory dockerUrlFactory =
! (host, port) -> HostAndPort.fromParts(getCanonicalHostName(), port).toString();
if (RUNNING_INSIDE_DOCKER_ON_MAC) {
// If we're already running in a container, we need to use a fixed port range due to
// non-existing host networking in Docker-for-Mac. The port range needs to be published
*************** public class DockerEnvironmentFactory im
*** 237,242 ****
--- 239,252 ----
}
}
+ private static String getCanonicalHostName() throws RuntimeException {
+ try {
+ return InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
/** Provider for DockerEnvironmentFactory. */
public static class Provider implements EnvironmentFactory.Provider {
private final boolean retainDockerContainer;
*************** public class DockerEnvironmentFactory im
*** 269,275 ****
public ServerFactory getServerFactory() {
switch (getPlatform()) {
case LINUX:
! return ServerFactory.createDefault();
case MAC:
return DockerOnMac.getServerFactory();
default:
--- 279,286 ----
public ServerFactory getServerFactory() {
switch (getPlatform()) {
case LINUX:
! return DockerOnMac.getServerFactory();
! // return ServerFactory.createDefault();
case MAC:
return DockerOnMac.getServerFactory();
default:
这是我用来运行Flink的Docker组合文件:
version: '3.4'
services:
jobmanager:
image: tenx/flink:1.8.1
command: 'jobmanager'
environment:
JOB_MANAGER_RPC_ADDRESS: 'jobmanager'
DOCKER_MAC_CONTAINER: 1
FLINK_JM_HEAP: 128
volumes:
- jobmanager-data:/data
- /var/run/docker.sock:/var/run/docker.sock
ports:
- target: 8081
published: 8081
protocol: tcp
mode: ingress
networks:
- flink
taskmanager:
image: tenx/flink:1.8.1
command: 'taskmanager'
environment:
JOB_MANAGER_RPC_ADDRESS: 'jobmanager'
DOCKER_MAC_CONTAINER: 1
FLINK_TM_HEAP: 1024
TASK_MANAGER_NUMBER_OF_TASK_SLOTS: 2
networks:
- flink
volumes:
- taskmanager-data:/data
- /var/run/docker.sock:/var/run/docker.sock
- /var/folders:/var/folders
volumes:
jobmanager-data:
taskmanager-data:
networks:
flink:
external: true
import apache_beam as beam
import logging
class LogElements(beam.PTransform):
class _LoggingFn(beam.DoFn):
def __init__(self, prefix=''):
super(LogElements._LoggingFn, self).__init__()
self.prefix = prefix
def process(self, element, **kwargs):
logging.info(self.prefix + str(element))
yield element
def __init__(self, label=None, prefix=''):
super(LogElements, self).__init__(label)
self.prefix = prefix
def expand(self, input):
input | beam.ParDo(self._LoggingFn(self.prefix))
from apache_beam.options.pipeline_options import PipelineOptions
options = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099"])
p = beam.Pipeline(options=options)
(p | beam.Create([1, 2, 3, 4, 5]) | LogElements())
p.run()
我可以更改用于运行Python容器的映像:
options=PipelineOptions([“--runner=portablerunner”,“--job_endpoint=localhost:8099”,“--environment_type=docker”,“--environment_config=beam/python:latest”])
我可以禁用Docker并启用ExternalEnvironmentFactory:
是否有可用的实现?
我找到解决办法了。Apache Beam2.16.0的新版本提供了一个与外部环境类型结合使用的实现。该实现基于worker_pool_main,它是为支持Kubernetes而创建的。
我已经在应用程序中添加了这些字段。pom中微服务和依赖关系的yml。xml。Jaeger在my local上运行也可以识别服务 我已经在kubernetes上部署了所有的微服务。请帮助我在kubernetes部署jaeger。 更新:我已达到此步骤。我有一个用于jaeger查询的负载平衡器IP。但是我的微服务将把日志发送到哪个主机和端口??
我正在kubernetes上试用最新版本的Flink1.5的flink工作。 我的问题是如何在上面的flink集群上运行一个示例应用程序。flink示例项目提供了如何使用flink应用程序构建docker映像并将该应用程序提交给flink的信息。我遵循了这个例子,只是把flink的版本改成了最新版本。我发现应用程序(example-app)提交成功,并且在kubernetes的pod中显示,但是f
TiCDC 是一款 TiDB 增量数据同步工具,本文介绍如何使用 TiDB Operator 在 Kubernetes 上部署 TiCDC。 前置条件 TiDB Operator 部署完成。 全新部署 TiDB 集群同时部署 TiCDC 参考 在标准 Kubernetes 上部署 TiDB 集群进行部署。 在现有 TiDB 集群上新增 TiCDC 组件 编辑 TidbCluster Custom
本文介绍如何在 Kubernetes 上部署 TiFlash。 前置条件 TiDB Operator 部署完成。 全新部署 TiDB 集群同时部署 TiFlash 参考 在标准 Kubernetes 上部署 TiDB 集群进行部署。 在现有 TiDB 集群上新增 TiFlash 组件 编辑 TidbCluster Custom Resource: kubectl edit tc ${cluster
本文介绍如何在 Kubernetes 上部署 TiDB Operator。 准备环境 TiDB Operator 部署前,请确认以下软件需求: Kubernetes v1.12 或者更高版本 DNS 插件 PersistentVolume RBAC 启用(可选) Helm 版本 >= 2.11.0 && < 3.0.0 && != 2.16.4 部署 Kubernetes 集群 TiDB Oper
Kubernetes的部署和状态集提供了在独立,分布式或共享模式下部署MinIO服务器的完美平台。 在Kubernetes上部署MinIO有多种选择,您可以选择最适合您的。 MinIO Helm Chart通过一个简单的命令即可提供自定义而且简单的MinIO部署。更多关于MinIO Helm部署的资料,请访问这里. 你也可以浏览Kubernetes MinIO示例 ,通过.yaml文件来部署Min