当前位置: 首页 > 知识库问答 >
问题:

如何利用JDBC操作符在airflow中提取sql查询结果

尹正奇
2023-03-14

我已经在Airflow Connections中配置了JDBC连接。DAG的My Task部分如下所示,其中包含一个select语句。当触发DAG是成功的,但我的查询结果不打印在日志中。如何使用JDBC操作符获取查询的结果。

dag=dag(DAG_ID='test_azure_sqldw_v1',default_args=default_args,schedule_interval=none,dagrun_timeout=timedelta(Seconds=120))

sql=“从表名中选择计数(*)”

azure_sqldw=jdbcopetask_id='azure_sqldw',sql=sql,jdbc_conn_id=“cdf_sqldw”,autocommit=true,dag=dag)

共有1个答案

汤洋
2023-03-14

操作员不打印到日志。它只是运行查询。如果您想要获取结果来执行某些操作,您需要使用钩子。

from airflow.providers.jdbc.hooks.jdbc import JdbcHook

def func(jdbc_conn_id, sql, **kwargs):
    """Print df from JDBC """
    pprint(kwargs)
    hook = JdbcHook(jdbc_conn_id=jdbc_conn_id)
    df = hook.get_pandas_df(sql=sql,autocommit=True)
    print(df.to_string())


run_this = PythonOperator(
    task_id='task',
    python_callable=func,
    op_kwargs={'jdbc_conn_id': 'cdf_sqldw', 'sql': 'select count(*) from tablename' }, 
    dag=dag,
)

还可以创建执行所需操作的自定义运算符。

 类似资料:
  • 我正在尝试更新数据库表。如果表单中的密码为空,我如何通过提供一个条件来防止表单中的密码(“memberpassword”,$pass)被sql代码更新?有可能吗?

  • 要在spark sql中运行sql语句以联接PostgreSQL中的两个表,请执行以下操作: 数据库引擎会执行联接操作并发回联接结果吗?或者数据库会将表_1和表_2的所有记录发送给spark job和spark job进行连接吗?是否有一些文档来解释此操作?谢谢

  • 问题内容: 我试图从字符串中提取括号之间的值。我怎样才能做到这一点。 例如:我有这个字符串:Abta Gupta(01792) 我想得到括号之间的结果,即:01792 我正在尝试编写这样的查询: 这实际上给了我结果。但是问题是括号之间的字符数不一定总是5。 因此,我想避免在查询中对“ 5”进行硬编码。 有人可以让我知道如何使这个查询更通用。 谢谢阿卜哈 问题答案: 我们可以在这里尝试使用:

  • 编辑-上下文:我正在使用Talend ETL工具,并在查询中使用ISODate或Date或new Date,如以下错误导致失败,因此我需要解决方法: 如果没有以下错误,我无法做到这一点: 大概是因为ETL工具调用: 考虑到我无法在com的查询中使用新日期()。mongodb。util。JSON。parse()方法,是否有解决方法? 我正在使用MongoDB v2.6.3,无法让$date运算符工作

  • 问题内容: 是否可以获取MySQL中sql查询结果的大小(以字节为单位)? 例如: 这将返回10000行。我不需要行,但结果集的大小以字节为单位。可能吗? 问题答案: 对,可能不准确,请注意

  • 我的代码在这里: 但它不起作用。我该怎么做?我在jcombobox get from database中有一个列表表名,当我选择其中一个时,这个框架将显示表的所有列和信息。