当前位置: 首页 > 工具软件 > JSONSelect > 使用案例 >

datax之json格式

司马念
2023-12-01

1、datax直接从hdfs数据表上读取数据,写入到sqlserver

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "defaultFS": "hdfs://cnprod1ha",
                        "fieldDelimiter": ",",
                        "fileType": "orc",
                        "path": "/user/hive/userdbs/svccnahahs.db/dwd_dc_appointment_data_d/",
                        "filename":"tmp1",
                        "hadoopConfig":{
                            "dfs.nameservices": "cnprod1ha",
                            "dfs.ha.namenodes.cnprod1ha": "namenode1,namenode2",
                            "dfs.namenode.rpc-address.cnprod1ha.namenode1": "oser406433.cn.wal-mart.com:8020",
                            "dfs.namenode.rpc-address.cnprod1ha.namenode2": "oser406435.cn.wal-mart.com:8020",
                            "dfs.client.failover.proxy.provider.testDfs": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
                        },
                        "column": [
                                 {
                                  "index": 0,
                                  "type": "string"
                                 },
                                 {
                                  "index": 1,
                                  "type": "string"
                                 },
                                 {
                                  "index": 2,
                                  "type": "string"
                                 }
                       ],
                       "fieldDelimiter": ","
                    }
                },
                "writer": {
                    "name": "sqlserverwriter",
                    "parameter": {
                        "username": "datalakereportrw",
                        "password": "Dat@r1P2w",
                        "column": [
                             "dc"
                            ,"appointment_nbr"
                            ,"scheduled_date"
                            ,"scheduled_time"
                        ],
                        "connection": [
                            {
                                "table": [
                                    "dwd_dc_appointment_data_d"
                                ],
                                "jdbcUrl": "jdbc:sqlserver://10.88.133.71:1433;DatabaseName=DataLakeReport"
                            }
                        ]
                     }
                    }
                }
        ],
        "setting": {
            "speed": {
                "channel": "3"
            }
        }
    }
}

2、datax从hdfs读取数据,写入到clickhouse

{
    "job": {
        "setting": {
            "speed": {
                "channel": 2
            }
        },
        "content": [
            {   "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "defaultFS": "hdfs://cnprod1ha",
                        "fileType": "orc",
                        "path": "/tmp/sqoop/export/hadoop服务器名/rpt/dim_calendar/*",
                        "hadoopConfig":{
                            "dfs.nameservices": "cnprod1ha",
                            "dfs.ha.namenodes.cnprod1ha": "namenode1,namenode2",
                            "dfs.namenode.rpc-address.cnprod1ha.namenode1": "oser406433.cn.wal-mart.com:8020",
                            "dfs.namenode.rpc-address.cnprod1ha.namenode2": "oser406435.cn.wal-mart.com:8020",
                            "dfs.client.failover.proxy.provider.testDfs": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
                        },
                        "column": [
                                 {
                                  "index": 0,
                                  "type": "string"
                                 },
                                 {
                                  "index": 1,
                                  "type": "string"
                                 },
                                 {
                                  "index": 2,
                                  "type": "string"
                                 },
                                 {
                                  "index": 3,
                                  "type": "string"
                                 },

                                  {
                                  "index": 4,
                                  "type": "string"
                                 },
                                 {
                                  "index": 5,
                                  "type": "string"
                                 }
                       ],
                       "fieldDelimiter": "|"
                    }
                },
                "writer": {
                    "name": "rdbmswriter",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:clickhouse://服务器名:8123/库",
                                "table": [
                                    "dim_calendar"
                                ]
                            }
                        ],
                        "username": "username",
                        "password": "password",
                        "table": "dim_calendar",
                        "column": [
                            "*"
                        ],
                        "preSql": [
                            "alter table dim_calendar on cluster cluster_rpt delete where event_date between ${start_date} and ${end_date}"
                        ]
                    }
                }
            }
        ]
    }
}

上述json中变量,可在shell中进行变量替换,shell如下

#! /bin/bash -x
source  ~/.bash_profile
declare -r CURR_DIR=$(cd `dirname $0`;pwd)
JOB_NAME=`basename $0 .sh`

start_date=${1:-`date -d '-15 day' '+%Y-%m-%d'`} # T+30
end_date=${2:-`date -d '-1 day' '+%Y-%m-%d'`} # T+1
DB_NAME=${3:-"cn_ominchan_sales_dl_tables"}
QUEUE_NAME=${4:-"smartops"}```
# to HDFS
hive --hivevar start_date="${start_date}" \
  --hivevar end_date="${end_date}" \
  -f ${CURR_DIR}/app_sales_full_channel_m_export.sql
  
#数据同步到clickhouse
tmpfile4=`mktemp -t sql_XXXX.sql`
cat  ${CURR_DIR}/app_sales_full_channel_m.json > $tmpfile4
sed -i "s/__start_date__/${start_date}/g ; s/__end_date__/${end_date}/g" $tmpfile4
datax $tmpfile4

3、datax从sqlserver读取数据写入到sqlserver

{
    "job": {
        "setting": {
            "speed": {
                "channel": 5
            }        },
        "content": [
            {
                "reader": {
                    "name": "sqlserverreader",
                    "parameter": {
                        "username": "xx",
                        "password": "xx",
                        "where": "",
                        "connection": [
                            {
                                "querySql": [
                  	"select * from dbo.test01;"
                                ],
                                "jdbcUrl": [
                                    "jdbc:sqlserver://192.168.130.122:1433;DatabaseName=HX"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "sqlserverwriter",
                    "parameter": {
                        "username": "xx",
                        "password": "xx",
                        "column": [
                            "id",
                            "ids",
                            "age"
                        ],
                        "connection": [
                            {
                                "table": [
                                    "table_name"
                                ],
                                "jdbcUrl": "jdbc:sqlserver://192.168.130.122:1433;DatabaseName=HXS"
                            }
                        ]
                    }
                }
            }
        ]
    }
}

4、datax从sqlserver同步数据到hive

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "sqlserverreader", #sqlserver读取器
                    "parameter": {
                        "username": "your_user_name", #用户名
                        "password": "your_password",  #密码
                        "connection": [
                            {
                                "querySql": [
                                    "SELECT order_id,store_name from ${database_name}.dbo.${table_name} where date between '' and '' "
                                ],
                                "jdbcUrl": ["jdbc:sqlserver://ip:1433"] #地址和端口,默认 1433
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "hdfswriter", #hdfs写入器
                    "parameter": {
                        "defaultFS": "hdfs://cnprod1ha",  #hdfs上的地址
                        "fileType": "orc", #导出的文件存储格式
                        "path": "/tmp/sqoop/export/vn0c43l/lcm/${your_dest_dir}", #你要导出的目标路径,提前创建好
                        "fileName": "${sams}", #导出的文件前缀名
                        "hadoopConfig":{ #hdfs配置,就按照这个写就行
                            "dfs.nameservices": "cnprod1ha",
                            "dfs.ha.namenodes.cnprod1ha": "namenode1,namenode2",
                            "dfs.namenode.rpc-address.cnprod1ha.namenode1": "oser406433.cn.wal-mart.com:8020",
                            "dfs.namenode.rpc-address.cnprod1ha.namenode2": "oser406435.cn.wal-mart.com:8020",
                            "dfs.client.failover.proxy.provider.testDfs": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
                        },
                        "column": [
                                 {
                                  "name": "order_id", #和你上面查询的一致,还有类型
                                  "type": "VARCHAR"
                                 },
                                 {
                                  "name": "store_name",
                                  "type": "VARCHAR"
                                 }                                                                                                                           
                       ],
                       "writeMode": "append", #导出模式,这里是追加
                       "fieldDelimiter": "|"  #导出的文件中字段的分隔符
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "3"
            }
        }
    }
}
 类似资料: