datax --postgresql使用

鞠建安
2023-12-01

datax1.json

{
    "job": {
        "setting": {
             "speed": {
                "byte":1048576,
                "channel":"4"
            }
        },
        "content": [
            {
                "reader": {
                    "name": "postgresqlreader",
                    "parameter": {
                        "username": "postgres",
                        "password": "asd!23",
                        "where": "",
                        "connection": [
                            {
                                "querySql": [
                                    "select id,name,age from public.person where id > 5 and id<15;"
                                ],
                                "jdbcUrl": [
                                    "jdbc:postgresql://192.168.109.106:5432/postgres"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "postgresqlwriter",
                    "parameter": {
                        "print": true,
                        "encoding": "UTF-8",
                        "username": "postgres",
                        "password": "asd!23",
                        "column": [
                       "id","name","age"
                    ],
                    "connection": [
                        {
                            "jdbcUrl": "jdbc:postgresql://192.168.109.106:5432/postgres",
                            "table": ["public.person_copy"]
                        }
                    ]
                    }
                }
            }
        ]
    }
}

 

学习文档


#1.datax支持的reader,writer插件的模板
目前DataX支持的数据源有:

Reader

Reader实现了从数据存储系统批量抽取数据,并转换为DataX标准数据交换协议,DataX任意Reader能与DataX任意Writer实现无缝对接,达到任意异构数据互通之目的。
RDBMS 关系型数据库

�MysqlReader: 使用JDBC批量抽取Mysql数据集。
OracleReader: 使用JDBC批量抽取Oracle数据集。
SqlServerReader: 使用JDBC批量抽取SqlServer数据集
PostgresqlReader: 使用JDBC批量抽取PostgreSQL数据集
DrdsReader: 针对公有云上DRDS的批量数据抽取工具。
数仓数据存储

ODPSReader: 使用ODPS Tunnel SDK批量抽取ODPS数据。
NoSQL数据存储

OTSReader: 针对公有云上OTS的批量数据抽取工具。
HBaseReader: 针对 HBase 0.94版本的在线数据抽取工具
无结构化数据存储

TxtFileReader: 读取(递归/过滤)本地文件。
FtpReader: 读取(递归/过滤)远程ftp文件。
HdfsReader: 针对Hdfs文件系统中textfile和orcfile文件批量数据抽取工具。
OssReader: 针对公有云OSS产品的批量数据抽取工具。
StreamReader
Writer

Writer实现了从DataX标准数据交换协议,翻译为具体的数据存储类型并写入目的数据存储。DataX任意Writer能与DataX任意Reader实现无缝对接,达到任意异构数据互通之目的。
RDBMS 关系型数据库

MysqlWriter: 使用JDBC(Insert,Replace方式)写入Mysql数据库
OracleWriter: 使用JDBC(Insert方式)写入Oracle数据库
PostgresqlWriter: 使用JDBC(Insert方式)写入PostgreSQL数据库
SqlServerWriter: 使用JDBC(Insert方式)写入sqlserver数据库
DrdsWriter: 使用JDBC(Replace方式)写入Drds数据库
数仓数据存储

ODPSWriter: 使用ODPS Tunnel SDK向ODPS写入数据。
ADSWriter: 使用ODPS中转将数据导入ADS。
NoSQL数据存储

OTSWriter: 使用OTS SDK向OTS Public模型的表中导入数据。
OCSWriter
MongoDBReader:MongoDBReader
MongoDBWriter:MongoDBWriter
无结构化数据存储

TxtFileWriter: 提供写入本地文件功能。
OssWriter: 使用OSS SDK写入OSS数据。
HdfsWriter: 提供向Hdfs文件系统中写入textfile文件和orcfile文件功能。
StreamWriter

# 2.查看各种reader,writer插件的模板,如postgresqlreader,mysqlreader,streamreader,oraclereader不同数据库有不同的模板
#

python /opt/datax/bin/datax.py -r postgresqlreader -w postgresqlwriter
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "postgresqlreader", 
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": [], 
                                "table": [],
                                "querySql": [
                                    "select id,name,age from public.person where id > 10;"
                                ]
                            }
                        ], 
                        "password": "", 
                        "username": ""
                    }
                }, 
                "writer": {
                    "name": "postgresqlwriter", 
                    "parameter": {
                        "column": [], 
                        "connection": [
                            {
                                "jdbcUrl": "", 
                                "table": []
                            }
                        ], 
                        "password": "", 
                        "postSql": [], 
                        "preSql": [], 
                        "username": ""
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}


#3. 根据模板配置自己具体的.json

#4. 执行.json
python /opt/datax/bin/datax.py ./datax1.json

[root@azkaban bin]# cat datax.py 
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import sys
import os
import signal
import subprocess
import time
import re
import socket
import json
from optparse import OptionParser
from optparse import OptionGroup
from string import Template
import codecs
import platform

def isWindows():
    return platform.system() == 'Windows'

DATAX_HOME = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

DATAX_VERSION = 'DATAX-OPENSOURCE-3.0'
if isWindows():
    codecs.register(lambda name: name == 'cp65001' and codecs.lookup('utf-8') or None)
    CLASS_PATH = ("%s/lib/*") % (DATAX_HOME)
else:
    CLASS_PATH = ("%s/lib/*:.") % (DATAX_HOME)
LOGBACK_FILE = ("%s/conf/logback.xml") % (DATAX_HOME)
DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
DEFAULT_PROPERTY_CONF = "-Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=%s -Dlogback.configurationFile=%s" % (
    DATAX_HOME, LOGBACK_FILE)
ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s  ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (
    DEFAULT_PROPERTY_CONF, CLASS_PATH)
REMOTE_DEBUG_CONFIG = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=9999"

RET_STATE = {
    "KILL": 143,
    "FAIL": -1,
    "OK": 0,
    "RUN": 1,
    "RETRY": 2
}


def getLocalIp():
    try:
        return socket.gethostbyname(socket.getfqdn(socket.gethostname()))
    except:
        return "Unknown"


def suicide(signum, e):
    global child_process
    print >> sys.stderr, "[Error] DataX receive unexpected signal %d, starts to suicide." % (signum)

    if child_process:
        child_process.send_signal(signal.SIGQUIT)
        time.sleep(1)
        child_process.kill()
    print >> sys.stderr, "DataX Process was killed ! you did ?"
    sys.exit(RET_STATE["KILL"])


def register_signal():
    if not isWindows():
        global child_process
        signal.signal(2, suicide)
        signal.signal(3, suicide)
        signal.signal(15, suicide)


def getOptionParser():
    usage = "usage: %prog [options] job-url-or-path"
    parser = OptionParser(usage=usage)

    prodEnvOptionGroup = OptionGroup(parser, "Product Env Options",
                                     "Normal user use these options to set jvm parameters, job runtime mode etc. "
                                     "Make sure these options can be used in Product Env.")
    prodEnvOptionGroup.add_option("-j", "--jvm", metavar="<jvm parameters>", dest="jvmParameters", action="store",
                                  default=DEFAULT_JVM, help="Set jvm parameters if necessary.")
    prodEnvOptionGroup.add_option("--jobid", metavar="<job unique id>", dest="jobid", action="store", default="-1",
                                  help="Set job unique id when running by Distribute/Local Mode.")
    prodEnvOptionGroup.add_option("-m", "--mode", metavar="<job runtime mode>",
                                  action="store", default="standalone",
                                  help="Set job runtime mode such as: standalone, local, distribute. "
                                       "Default mode is standalone.")
    prodEnvOptionGroup.add_option("-p", "--params", metavar="<parameter used in job config>",
                                  action="store", dest="params",
                                  help='Set job parameter, eg: the source tableName you want to set it by command, '
                                       'then you can use like this: -p"-DtableName=your-table-name", '
                                       'if you have mutiple parameters: -p"-DtableName=your-table-name -DcolumnName=your-column-name".'
                                       'Note: you should config in you job tableName with ${tableName}.')
    prodEnvOptionGroup.add_option("-r", "--reader", metavar="<parameter used in view job config[reader] template>",
                                  action="store", dest="reader",type="string",
                                  help='View job config[reader] template, eg: mysqlreader,streamreader')
    prodEnvOptionGroup.add_option("-w", "--writer", metavar="<parameter used in view job config[writer] template>",
                                  action="store", dest="writer",type="string",
                                  help='View job config[writer] template, eg: mysqlwriter,streamwriter')
    parser.add_option_group(prodEnvOptionGroup)

    devEnvOptionGroup = OptionGroup(parser, "Develop/Debug Options",
                                    "Developer use these options to trace more details of DataX.")
    devEnvOptionGroup.add_option("-d", "--debug", dest="remoteDebug", action="store_true",
                                 help="Set to remote debug mode.")
    devEnvOptionGroup.add_option("--loglevel", metavar="<log level>", dest="loglevel", action="store",
                                 default="info", help="Set log level such as: debug, info, all etc.")
    parser.add_option_group(devEnvOptionGroup)
    return parser

def generateJobConfigTemplate(reader, writer):
    readerRef = "Please refer to the %s document:\n     https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n" % (reader,reader,reader)
    writerRef = "Please refer to the %s document:\n     https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n " % (writer,writer,writer)
    print readerRef
    print writerRef
    jobGuid = 'Please save the following configuration as a json file and  use\n     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json \nto run the job.\n'
    print jobGuid
    jobTemplate={
      "job": {
        "setting": {
          "speed": {
            "channel": ""
          }
        },
        "content": [
          {
            "reader": {},
            "writer": {}
          }
        ]
      }
    }
    readerTemplatePath = "%s/plugin/reader/%s/plugin_job_template.json" % (DATAX_HOME,reader)
    writerTemplatePath = "%s/plugin/writer/%s/plugin_job_template.json" % (DATAX_HOME,writer)
    try:
      readerPar = readPluginTemplate(readerTemplatePath);
    except Exception, e:
       print "Read reader[%s] template error: can\'t find file %s" % (reader,readerTemplatePath)
    try:
      writerPar = readPluginTemplate(writerTemplatePath);
    except Exception, e:
      print "Read writer[%s] template error: : can\'t find file %s" % (writer,writerTemplatePath)
    jobTemplate['job']['content'][0]['reader'] = readerPar;
    jobTemplate['job']['content'][0]['writer'] = writerPar;
    print json.dumps(jobTemplate, indent=4, sort_keys=True)

def readPluginTemplate(plugin):
    with open(plugin, 'r') as f:
            return json.load(f)

def isUrl(path):
    if not path:
        return False

    assert (isinstance(path, str))
    m = re.match(r"^http[s]?://\S+\w*", path.lower())
    if m:
        return True
    else:
        return False


def buildStartCommand(options, args):
    commandMap = {}
    tempJVMCommand = DEFAULT_JVM
    if options.jvmParameters:
        tempJVMCommand = tempJVMCommand + " " + options.jvmParameters

    if options.remoteDebug:
        tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIG
        print 'local ip: ', getLocalIp()

    if options.loglevel:
        tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options.loglevel))

    if options.mode:
        commandMap["mode"] = options.mode

    # jobResource 可能是 URL,也可能是本地文件路径(相对,绝对)
    jobResource = args[0]
    if not isUrl(jobResource):
        jobResource = os.path.abspath(jobResource)
        if jobResource.lower().startswith("file://"):
            jobResource = jobResource[len("file://"):]

    jobParams = ("-Dlog.file.name=%s") % (jobResource[-20:].replace('/', '_').replace('.', '_'))
    if options.params:
        jobParams = jobParams + " " + options.params

    if options.jobid:
        commandMap["jobid"] = options.jobid

    commandMap["jvm"] = tempJVMCommand
    commandMap["params"] = jobParams
    commandMap["job"] = jobResource

    return Template(ENGINE_COMMAND).substitute(**commandMap)


def printCopyright():
    print '''
DataX (%s), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.

''' % DATAX_VERSION
    sys.stdout.flush()


if __name__ == "__main__":
    printCopyright()
    parser = getOptionParser()
    options, args = parser.parse_args(sys.argv[1:])
    if options.reader is not None and options.writer is not None:
        generateJobConfigTemplate(options.reader,options.writer)
        sys.exit(RET_STATE['OK'])
    if len(args) != 1:
        parser.print_help()
        sys.exit(RET_STATE['FAIL'])

    startCommand = buildStartCommand(options, args)
    # print startCommand

    child_process = subprocess.Popen(startCommand, shell=True)
    register_signal()
    (stdout, stderr) = child_process.communicate()

    sys.exit(child_process.returncode)

#4. .json带传参

[root@azkaban test1]# vim datax.sh 

#python /opt/datax/bin/datax.py -p"-DtableName=public.person -DsqlWhere=id\\>20" ./datax1.json

#获取将目的表最大标志位
psql -h 192.168.109.106 -U postgres -d postgres -c "SELECT max(id) FROM public.person_copy;
">/home/test1/incre.txt

#不输入密码
#pg_dump "host=196.168.109.106 port=5432 user=postgres password=asd!23 dbname=postgres"-c "SELECT max(id) FROM public.person_copy;
#">/home/test1/incre.txt

#将最大标志位付给变量,获取第3行的数字
declare -i maxid=`sed -n '3p' incre.txt`
echo ${maxid}
#执行增量导数据
python /opt/datax/bin/datax.py -p"-DtableName=public.person -DsqlWhere=id\\>${maxid}" ./datax1.json

datax1.json

{
    "job": {
        "setting": {
             "speed": {
                "byte":1048576,
                "channel":"4"
            }
        },
        "content": [
            {
                "reader": {
                    "name": "postgresqlreader",
                    "parameter": {
                        "username": "postgres",
                        "password": "asd!23",
                        "where": "",
                        "connection": [
                            {
                                "querySql": [
                                    "select id,name,age from $tableName where $sqlWhere;"
                                ],
                                "jdbcUrl": [
                                    "jdbc:postgresql://192.168.109.106:5432/postgres"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "postgresqlwriter",
                    "parameter": {
                        "print": true,
                        "encoding": "UTF-8",
                        "username": "postgres",
                        "password": "asd!23",
                        "column": [
                       "id","name","age"
                    ],  
                    "connection": [
                        {
                            "jdbcUrl": "jdbc:postgresql://192.168.109.106:5432/postgres",
                            "table": ["public.person_copy"]
                        }
                    ]
                    }
                }
            }
        ]
    }
}

获取第1行的第2列的值

sed -n 1p incre.txt | cut -d " " -f2


 

 类似资料: