当前位置: 首页 > 面试题库 >

如何在Airflow中运行Spark代码?

法兴德
2023-03-14
问题内容

地球人你好!我正在使用Airflow计划和运行Spark任务。我这次发现的所有内容都是Airflow可以管理的python DAG。
DAG示例:

spark_count_lines.py
import logging

from airflow import DAG
from airflow.operators import PythonOperator

from datetime import datetime

args = {
  'owner': 'airflow'
  , 'start_date': datetime(2016, 4, 17)
  , 'provide_context': True
}

dag = DAG(
  'spark_count_lines'
  , start_date = datetime(2016, 4, 17)
  , schedule_interval = '@hourly'
  , default_args = args
)

def run_spark(**kwargs):
  import pyspark
  sc = pyspark.SparkContext()
  df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt')
  logging.info('Number of lines in people.txt = {0}'.format(df.count()))
  sc.stop()

t_main = PythonOperator(
  task_id = 'call_spark'
  , dag = dag
  , python_callable = run_spark
)

问题是我的Python代码不好,并且有一些用Java编写的任务。我的问题是如何在python DAG中运行Spark Java
jar?或者,也许还有其他方法吗?我发现了spark提交:http : //spark.apache.org/docs/latest/submitting-
applications.html
但我不知道如何将所有内容连接在一起。也许有人以前使用过它并有可行的例子。感谢您的时间!


问题答案:

您应该可以使用BashOperator。保持其余代码不变,导入所需的类和系统软件包:

from airflow.operators.bash_operator import BashOperator

import os
import sys

设置所需的路径:

os.environ['SPARK_HOME'] = '/path/to/spark/root'
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))

并添加运算符:

spark_task = BashOperator(
    task_id='spark_java',
    bash_command='spark-submit --class {{ params.class }} {{ params.jar }}',
    params={'class': 'MainClassName', 'jar': '/path/to/your.jar'},
    dag=dag
)

您可以使用Jinja模板轻松扩展它以提供其他参数。

您当然可以通过替换bash_command适合您情况的模板来针对非火花场景进行调整,例如:

bash_command = 'java -jar {{ params.jar }}'

和调整params



 类似资料:
  • 问题内容: 运行Airflow的常规说明不适用于Windows环境: Airflow实用程序在命令行中不可用,我在其他地方找不到要手动添加的实用程序。Airflow如何在Windows上运行? 问题答案: 您可以在Windows中激活,并直接按照本教程进行操作。我能够按照上面的步骤启动并成功运行。 安装完成后,请进行编辑以将所有配置指向Windows系统中的某个位置,而不是lxss(ubuntu)

  • 我正在使用docker compose和气流图像puckel/docker气流设置一个新的气流服务器运行良好,但当我尝试使用DockerOperator时,没有名为“docker”的模块。 我需要如何更改我的docker-compose文件才能在我的docker中使用docker? 我试图在谷歌上查找错误消息,但没有一个建议的解决方案奏效。 这是我正在使用的< code>WebServer的doc

  • 我想通过将spark Java/Scala api转换为dll文件来运行C#中的apache spark源代码。我引用了IKVM/IKVMC将spark jar文件转换为dll文件,但无法得到正确的结果。有没有办法在C#中运行spark源?

  • 问题内容: 我使用以下Dockerfile创建了一个Spark容器: 我还有两个用Scala编程语言编写的文件,这对我来说听起来很新。问题在于容器只知道Java,而没有安装任何其他命令。有什么方法可以在容器上没有安装任何程序的情况下运行Scala? 文件名是和。这是initDocuments.scala文件: 我也尝试了以下方法,但不起作用。 错误: PS: 我试图使用以下命令来更改代理地址,但我

  • 我想定期运行代码在django为此我下载了芹菜并试图配置它 添加到设置中。派克 他做了一个芹菜。py文件 创建了一个任务。py文件 依次发射了一切 节拍带给我的 芹菜。0.5(奇点)正在启动本地时间- 但工人每15秒就扔下一个 [2021-02-19 16:18:02,275:警告/SpawnPoolWorker-1]c:\用户\admin\appdata\本地\程序\python\python3

  • null 大多数文档描述了如何在Kubernetes上运行Spark集群。在Kubernetes上独立运行Spark的方法是什么?