我已经成功地在本地开发了一个超级简单的ETL进程(下面称为load_staging),它从远程位置提取数据,然后将未处理的数据写入本地Windows机器上的MongoDB容器。现在,我想使用DockerOperator为每个任务使用Apache Airflow计划这个过程,即我想创建源代码的docker映像,然后使用docker Operator执行该映像中的源代码。因为我在windows机器上工作,所以我只能使用码头集装箱内的气流。
我已经用< code>docker-compose up启动了airflow容器(下面称为webserver)和MongoDB容器(下面称为mongo ),并且在Airflow的GUI中手动触发了DAG。根据Airflow,任务正在成功执行,但似乎docker映像中的代码没有被执行,因为任务完成得太快,而且就在docker容器从我的映像启动后,任务以错误代码0执行,也就是说,我没有看到任务本身的任何日志输出。请参见下面的日志:
[2020-01-20 17:09:44,444] {{docker_operator.py:194}} INFO - Starting docker container from image myaccount/myrepo:load_staging_op
[2020-01-20 17:09:50,473] {{logging_mixin.py:95}} INFO - [[34m2020-01-20 17:09:50,472[0m] {{[34mlocal_task_job.py:[0m105}} INFO[0m - Task exited with return code 0[0m
所以,我的两个问题是:
您可以在下面找到有关如何设置DockerOPator、如何定义应该由DockerOPator执行的映像、启动webserver和mongo容器的docker-compose.yml
文件以及用于创建webserver容器的Dockerfile的更多信息。
在我的DAG定义文件中,我指定了DockerOperator,如下所示:
CONFIG_FILEPATH = "/configs/docker_execution.ini"
data_object_name = "some_name"
task_id_ = "{}_task".format(data_object_name)
cmd = "python /src/etl/load_staging_op/main.py --config_filepath={} --data_object_name={}".format(CONFIG_FILEPATH, data_object_name)
staging_op = DockerOperator(
command=cmd,
task_id=task_id_,
image="myaccount/myrepo:load_staging_op",
api_version="auto",
auto_remove=True
)
上面引用的映像load_staging_op
的 Docker 文件如下所示:
# Inherit from Python image
FROM python:3.7
# Install environment
USER root
COPY ./src/etl/load_staging_op/requirements.txt ./
RUN pip install -r requirements.txt
# Copy source code files into container
COPY ./configs /configs
COPY ./wsdl /wsdl
COPY ./src/all_constants.py /src/all_constants.py
COPY ./src/etl/load_staging_op/utils.py /src/etl/load_staging_op/utils.py
COPY ./src/etl/load_staging_op/main.py /src/etl/load_staging_op/main.py
# Extend python path so that custom modules are found
ENV PYTHONPATH "${PYTHONPATH}:/src"
ENTRYPOINT [ "sh", "-c"]
docker-compose.yml
文件外观的相关方面如下:
version: '2.1'
services:
webserver:
build: ./docker-airflow
restart: always
privileged: true
depends_on:
- mongo
- mongo-express
volumes:
- ./docker-airflow/dags:/usr/local/airflow/dags
# source code volume
- ./src:/src
- ./docker-airflow/workdir:/home/workdir
# Mount the docker socket from the host (currently my laptop) into the webserver container
# so that we can build docker images from inside the webserver container.
- //var/run/docker.sock:/var/run/docker.sock # the two "//" are needed for windows OS
- ./configs:/configs
- ./wsdl:/wsdl
ports:
# Change port to 8081 to avoid Jupyter conflicts
- 8081:8080
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
networks:
- mynet
mongo:
container_name: mymongo
image: mongo
restart: always
ports:
- 27017:27017
networks:
- mynet
上述多克文件中引用的 Web 服务器容器的多克文件如下所示:
FROM puckel/docker-airflow:1.10.4
# Adds DAG folder to the PATH
ENV PYTHONPATH "${PYTHONPATH}:/src:/usr/local/airflow/dags"
# Install the optional packages
COPY requirements.txt requirements.txt # make sure something like docker==4.1.0 is in this requirements.txt file!
USER root
RUN pip install -r requirements.txt
# Install docker inside the webserver container
RUN curl -sSL https://get.docker.com/ | sh
ENV SHARE_DIR /usr/local/share
# Install simple text editor for debugging
RUN ["apt-get", "update"]
RUN ["apt-get", "-y", "install", "vim"]
谢谢你的帮助,我非常感激!
我衷心感谢所有花时间帮助我解决问题的人。我需要实现以下更改才能使其正常工作:
DockerOperator:
network_mode
添加到运行 Web 服务器容器的网络中。这对我来说很困难,因为我是Docker的新手,在网上找不到很多关于这个的教程。为了查找运行Web服务器容器的网络名称,我使用Docker网络ls
之类的东西列出了主机(=Windows笔记本电脑)上所有当前处于活动状态的网络。在显示的网络列表中,我看到一个名为project_root_dirname_mynet
的网络,因此我的项目的根目录和 docker-compose.yml
文件中指定的网络名称的组合。有趣的是(很明显),在列出所有网络之后,您可以使用诸如docker网络检查之类的东西project_root_dirname_mynet
检查网络project_root_dirname_mynet
。这将返回一个带有子部分“容器”的 json 文件,您可以在其中看到 docker-compose.yml
文件中指定的所有容器。然后Dockeroperator的代码变为:
cmd = "--config_filepath {} --data_object_name {}".format(CONFIG_FILEPATH.strip(), data_object_name.strip())
print("Command: {}".format(cmd))
staging_op = DockerOperator(
command=cmd,
task_id=task_id_,
image="myaccount/myrepo:load_staging_op",
api_version="auto",
auto_remove=True,
network_mode="project_root_dirname_mynet"
)
load_staging_op任务的泊坞文件:
ENTRYPOINT["sh","-c"]
更改为ENTRYPOINT["python","/src/etl/load_staging_op/main.py"]
。我认为“python”参数将在容器中打开一个Python控制台,第二个参数只是您要在docker容器中执行的脚本的路径。然后,在运行时(或构建时或以任何方式调用),上面cmd
中的命令行参数将被传递。在图像的源代码中,您可以使用argparse
之类的库来检索这些命令。我正在尝试使用主题列表中的单个kafka使用者组合两个kafka主题,进一步将流中的json字符串转换为POJO。然后,通过keyBy(On事件时间字段)将它们加入,并将它们合并为单个胖json,我计划使用窗口流并在窗口流上应用窗口函数。假设主题A 我有几个问题。 这种方法适合合并主题并创建单个JSON吗 所有窗口流上的窗口函数似乎工作不正常;任何指点都将不胜感激 代码片段: 我得到了- AllW
我的 Windows 机器上运行了一个 docker 容器,它是使用官方文档中提供的 docker-compose 文件的改编版本构建的。 这工作得很好,但是我想将python脚本(我的任务)从已挂载的文件夹中移动到它们自己的docker容器中。 为了测试这一点,我创建了一个简单的“Hello World!”示例-脚本: 它与以下简单的docker文件一起 我可以用< code > docker
根据Docker基于LXC这一事实,我的理解是Docker容器共享来自其主机操作系统的各种资源。我关心的是CPU核心。下面是一个场景: 主机linux操作系统有8个内核 我必须在上面的主机操作系统上部署一组docker容器 我需要部署的一些docker容器更适合使用2核 a) 因此,如果我在该主机上运行所有docker容器,它们是否会像在该主机操作系统上作为正常安装的应用程序运行一样,根据需要消耗
我正在学习如何使用Flink处理流数据。 根据我的理解,我可以多次使用函数进行各种转换。 表示数据源不断向Flink发送字符串。所有字符串都是JSON格式的数据,如下所示: 下面是我的代码: 正如您所看到的,我的示例非常简单:获取并反序列化数据-->将string转换为Json对象-->将Json对象转换为string并获取所需内容(这里只需要)。 就目前而言,似乎一切都很好。我确实从日志文件中获
main.java--(src/sample文件夹) studentcontroller.java--(src/sample/controller文件夹) studentdao.java和sexdao.java(数据访问对象)--(src/sample/model文件夹) Student.java(公共类学生和构造器)--(src/sample/model文件夹) oddbc的util下的dbut
我已经在kubernetes中部署了airflow,如以下链接所述:https://github.com/apache/airflow/tree/master/chart 要访问airflow UI,我可以执行以下操作: