datax(5):改造升级-自动识别py环境,执行datax任务

datax(5):改造升级-自动识别py环境,执行datax任务1思考上篇文章已经研究过datax.py文件,产生2个问题:如果用户不是py2环境(datax默认要求环境)怎么处理;能不能有一个脚本自动识别用户的py环境,从而执行datax任务2效果在py2或py3下执行下面命令>pythondatax.py../job/job.json熟悉的配方,熟悉的味道。什么都没有变,但是背后却做了很多事情;3改造过程3.1编写py3的datax脚本共计3个文件===datax.py文件===#!/usr/bin/envpyt.

大家好,又见面了,我是你们的朋友全栈君。

一、 思考

上篇文章已经研究过datax.py文件,产生2个问题:

  1. 如果用户不是py2环境(datax默认要求环境)怎么处理;
  2. 能不能有一个脚本自动识别用户的py环境,从而执行datax任务

二、效果

在py2或py3下执行下面命令
>python datax.py ../job/job.json

在这里插入图片描述
熟悉的配方,熟悉的味道。什么都没有变,但是背后却做了很多事情;


三、改造过程

1 编写py3的datax脚本

共计3个文件

===datax.py文件===
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import sys
import os
import signal
import subprocess
import time
import re
import socket
import json
from optparse import OptionParser
from optparse import OptionGroup
from string import Template
import codecs
import platform


def isWindows():
    return platform.system() == 'Windows'


DATAX_HOME = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))

# print("DATAX_HOME==="+DATAX_HOME)

DATAX_VERSION = 'DATAX-OPENSOURCE-3.0'
if isWindows():
    codecs.register(lambda name: name == 'cp65001' and codecs.lookup('utf-8') or None)
    CLASS_PATH = ("%s/lib/*") % (DATAX_HOME)
else:
    CLASS_PATH = ("%s/lib/*:.") % (DATAX_HOME)

print("CLASS_PATH===" + CLASS_PATH)

LOGBACK_FILE = ("%s/conf/logback.xml") % (DATAX_HOME)
print("LOGBACK_FILE===" + LOGBACK_FILE)

DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
print("DEFAULT_JVM===" + DEFAULT_JVM)

DEFAULT_PROPERTY_CONF = "-Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=%s -Dlogback.configurationFile=%s" % (
    DATAX_HOME, LOGBACK_FILE)
print("DEFAULT_PROPERTY_CONF===" + DEFAULT_PROPERTY_CONF)

ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (
    DEFAULT_PROPERTY_CONF, CLASS_PATH)
print("ENGINE_COMMAND===" + ENGINE_COMMAND)

REMOTE_DEBUG_CONFIG = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=9999"
print("REMOTE_DEBUG_CONFIG===" + REMOTE_DEBUG_CONFIG)

RET_STATE = { 
   
    "KILL": 143,
    "FAIL": -1,
    "OK": 0, #无错误退出 不传参数时,默认传0
    "RUN": 1, #有错误退出
    "RETRY": 2
}


# 获取本地ip
def getLocalIp():
    try:
        return socket.gethostbyname(socket.getfqdn(socket.gethostname()))
    except:
        return "Unknown"


# 自杀(自己停止程序)
def suicide(signum):
    global child_process
    print >> sys.stderr, "[Error] DataX receive unexpected signal %d, starts to suicide." % (signum)

    if child_process:
        child_process.send_signal(signal.SIGQUIT)
        time.sleep(1)
        child_process.kill()
    print >> sys.stderr, "DataX Process was killed ! you did ?"
    sys.exit(RET_STATE["KILL"])


def register_signal():
    if not isWindows():
        global child_process
        signal.signal(2, suicide)
        signal.signal(3, suicide)
        signal.signal(15, suicide)


# 获取选项解析器
def getOptionParser():
    usage = "usage: %prog [options] job-url-or-path"
    parser = OptionParser(usage)  # 定义解析器

    # 生产环境 配置组
    prodEnvOptionGroup = OptionGroup(parser, "Product Env Options",
                                     "Normal user use these options to set jvm parameters, job runtime mode etc. "
                                     "Make sure these options can be used in Product Env.")
    prodEnvOptionGroup.add_option("-j", "--jvm", metavar="<jvm parameters>", dest="jvmParameters", action="store",
                                  default=DEFAULT_JVM, help="Set jvm parameters if necessary.")
    prodEnvOptionGroup.add_option("--jobid", metavar="<job unique id>", dest="jobid", action="store", default="-1",
                                  help="Set job unique id when running by Distribute/Local Mode.")
    prodEnvOptionGroup.add_option("-m", "--mode", metavar="<job runtime mode>",
                                  action="store", default="standalone",
                                  help="Set job runtime mode such as: standalone, local, distribute. "
                                       "Default mode is standalone.")
    prodEnvOptionGroup.add_option("-p", "--params", metavar="<parameter used in job config>",
                                  action="store", dest="params",
                                  help='Set job parameter, eg: the source tableName you want to set it by command, '
                                       'then you can use like this: -p"-DtableName=your-table-name", '
                                       'if you have mutiple parameters: -p"-DtableName=your-table-name -DcolumnName=your-column-name".'
                                       'Note: you should config in you job tableName with ${tableName}.')
    prodEnvOptionGroup.add_option("-r", "--reader", metavar="<parameter used in view job config[reader] template>",
                                  action="store", dest="reader", type="string",
                                  help='View job config[reader] template, eg: mysqlreader,streamreader')
    prodEnvOptionGroup.add_option("-w", "--writer", metavar="<parameter used in view job config[writer] template>",
                                  action="store", dest="writer", type="string",
                                  help='View job config[writer] template, eg: mysqlwriter,streamwriter')
    # 将“生产环境配置组”添加到 解析器
    parser.add_option_group(prodEnvOptionGroup)

    # 测试环境 配置组
    devEnvOptionGroup = OptionGroup(parser, "Develop/Debug Options",
                                    "Developer use these options to trace more details of DataX.")
    devEnvOptionGroup.add_option("-d", "--debug", dest="remoteDebug", action="store_true",
                                 help="Set to remote debug mode.")
    devEnvOptionGroup.add_option("--loglevel", metavar="<log level>", dest="loglevel", action="store",
                                 default="info", help="Set log level such as: debug, info, all etc.")
    # 将“测试环境配置组”添加到 解析器
    parser.add_option_group(devEnvOptionGroup)
    return parser


# 生成job配置模板
def generateJobConfigTemplate(reader, writer):
    readerRef = "Please refer to the %s document:\n https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n" % (
        reader, reader, reader)
    writerRef = "Please refer to the %s document:\n https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n " % (
        writer, writer, writer)
    print("readerRef" + readerRef)
    print("writerRef" + writerRef)
    jobGuid = 'Please save the following configuration as a json file and use\n python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json \nto run the job.\n'
    print("jobGuid" + jobGuid)
    jobTemplate = { 
   
        "job": { 
   
            "setting": { 
   
                "speed": { 
   
                    "channel": ""
                }
            },
            "content": [
                { 
   
                    "reader": { 
   },
                    "writer": { 
   }
                }
            ]
        }
    }
    readerTemplatePath = "%s/plugin/reader/%s/plugin_job_template.json" % (DATAX_HOME, reader)
    print("readerTemplatePath===" + readerTemplatePath)
    writerTemplatePath = "%s/plugin/writer/%s/plugin_job_template.json" % (DATAX_HOME, writer)
    print("writerTemplatePath===" + writerTemplatePath)
    try:
        readerPar = readPluginTemplate(readerTemplatePath)
    except Exception as e:
        print("Read reader[%s] template error: can\'t find file %s" % (reader, readerTemplatePath))
    try:
        writerPar = readPluginTemplate(writerTemplatePath)
    except Exception as e:
        print("Read writer[%s] template error: : can\'t find file %s" % (writer, writerTemplatePath))
    jobTemplate['job']['content'][0]['reader'] = readerPar
    jobTemplate['job']['content'][0]['writer'] = writerPar
    print(json.dumps(jobTemplate, indent=4, sort_keys=True))


# 读取插件模板
def readPluginTemplate(plugin):
    with open(plugin, 'r') as f:
        return json.load(f)


# 输入路径是否是一个url
def isUrl(path):
    if not path:
        return False

    assert (isinstance(path, str))
    m = re.match(r"^http[s]?://\S+\w*", path.lower())
    if m:
        return True
    else:
        return False


# 构建启动命令行
def buildStartCommand(options, args):
    commandMap = { 
   }
    tempJVMCommand = DEFAULT_JVM
    if options.jvmParameters:
        tempJVMCommand = tempJVMCommand + " " + options.jvmParameters

    if options.remoteDebug:
        tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIG
        print('local ip: ', getLocalIp())

    if options.loglevel:
        tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options.loglevel))

    if options.mode:
        commandMap["mode"] = options.mode

    # jobResource 可能是 URL,也可能是本地文件路径(相对,绝对)
    jobResource = args[0]
    if not isUrl(jobResource):
        jobResource = os.path.abspath(jobResource)
        if jobResource.lower().startswith("file://"):
            jobResource = jobResource[len("file://"):]

    jobParams = ("-Dlog.file.name=%s") % (jobResource[-20:].replace('/', '_').replace('.', '_'))
    if options.params:
        jobParams = jobParams + " " + options.params

    if options.jobid:
        commandMap["jobid"] = options.jobid

    commandMap["jvm"] = tempJVMCommand
    commandMap["params"] = jobParams
    commandMap["job"] = jobResource

    return Template(ENGINE_COMMAND).substitute(**commandMap)


def printCopyright():
    print(''' DataX (%s), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved. ''' % DATAX_VERSION)
    sys.stdout.flush()


if __name__ == "__main__":
    printCopyright()  # 打印版权信息等
    parser = getOptionParser()  # 获取解析器,用于解析datax启动命令里的各项参数
    options, args = parser.parse_args(sys.argv[1:])
    if options.reader is not None and options.writer is not None:
        # 如果参数中有reader和 writer 则 构建job json
        generateJobConfigTemplate(options.reader, options.writer)
        sys.exit(RET_STATE['OK'])
    if len(args) != 1:
        parser.print_help()
        sys.exit(RET_STATE['FAIL'])

    startCommand = buildStartCommand(options, args)
    print("startCommand===" + startCommand )

    child_process = subprocess.Popen(startCommand, True)
    register_signal()
    (stdout, stderr) = child_process.communicate()

    sys.exit(child_process.returncode)

===dxprof.py文件===
#! /usr/bin/env python
# vim: set expandtab tabstop=4 shiftwidth=4 foldmethod=marker nu:

import re
import sys
import time

REG_SQL_WAKE = re.compile(r'Begin\s+to\s+read\s+record\s+by\s+Sql', re.IGNORECASE)
REG_SQL_DONE = re.compile(r'Finished\s+read\s+record\s+by\s+Sql', re.IGNORECASE)
REG_SQL_PATH = re.compile(r'from\s+(\w+)(\s+where|\s*$)', re.IGNORECASE)
REG_SQL_JDBC = re.compile(r'jdbcUrl:\s*\[(.+?)\]', re.IGNORECASE)
REG_SQL_UUID = re.compile(r'(\d+\-)+reader')
REG_COMMIT_UUID = re.compile(r'(\d+\-)+writer')
REG_COMMIT_WAKE = re.compile(r'begin\s+to\s+commit\s+blocks', re.IGNORECASE)
REG_COMMIT_DONE = re.compile(r'commit\s+blocks\s+ok', re.IGNORECASE)

# { 
   { 
   { function parse_timestamp() #
def parse_timestamp(line):
    try:
        ts = int(time.mktime(time.strptime(line[0:19], '%Y-%m-%d %H:%M:%S')))
    except:
        ts = 0

    return ts

# }}} #

# { 
   { 
   { function parse_query_host() #
def parse_query_host(line):
    ori = REG_SQL_JDBC.search(line)
    if (not ori):
        return ''

    ori = ori.group(1).split('?')[0]
    off = ori.find('@')
    if (off > -1):
        ori = ori[off+1:len(ori)]
    else:
        off = ori.find('//')
        if (off > -1):
            ori = ori[off+2:len(ori)]

    return ori.lower()
# }}} #

# { 
   { 
   { function parse_query_table() #
def parse_query_table(line):
    ori = REG_SQL_PATH.search(line)
    return (ori and ori.group(1).lower()) or ''
# }}} #

# { 
   { 
   { function parse_reader_task() #
def parse_task(fname):
    global LAST_SQL_UUID
    global LAST_COMMIT_UUID
    global DATAX_JOBDICT
    global DATAX_JOBDICT_COMMIT
    global UNIXTIME
    LAST_SQL_UUID = ''
    DATAX_JOBDICT = { 
   }
    LAST_COMMIT_UUID = ''
    DATAX_JOBDICT_COMMIT = { 
   }

    UNIXTIME = int(time.time())
    with open(fname, 'r') as f:
        for line in f.readlines():
            line = line.strip()

            if (LAST_SQL_UUID and (LAST_SQL_UUID in DATAX_JOBDICT)):
                DATAX_JOBDICT[LAST_SQL_UUID]['host'] = parse_query_host(line)
                LAST_SQL_UUID = ''

            if line.find('CommonRdbmsReader$Task') > 0:
                parse_read_task(line)
            elif line.find('commit blocks') > 0:
                parse_write_task(line)
            else:
                continue
# }}} #

# { 
   { 
   { function parse_read_task() #
def parse_read_task(line):
    ser = REG_SQL_UUID.search(line)
    if not ser:
        return

    LAST_SQL_UUID = ser.group()
    if REG_SQL_WAKE.search(line):
        DATAX_JOBDICT[LAST_SQL_UUID] = { 
   
            'stat' : 'R',
            'wake' : parse_timestamp(line),
            'done' : UNIXTIME,
            'host' : parse_query_host(line),
            'path' : parse_query_table(line)
        }
    elif ((LAST_SQL_UUID in DATAX_JOBDICT) and REG_SQL_DONE.search(line)):
        DATAX_JOBDICT[LAST_SQL_UUID]['stat'] = 'D'
        DATAX_JOBDICT[LAST_SQL_UUID]['done'] = parse_timestamp(line)
# }}} #

# { 
   { 
   { function parse_write_task() #
def parse_write_task(line):
    ser = REG_COMMIT_UUID.search(line)
    if not ser:
        return

    LAST_COMMIT_UUID = ser.group()
    if REG_COMMIT_WAKE.search(line):
        DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID] = { 
   
            'stat' : 'R',
            'wake' : parse_timestamp(line),
            'done' : UNIXTIME,
        }
    elif ((LAST_COMMIT_UUID in DATAX_JOBDICT_COMMIT) and REG_COMMIT_DONE.search(line)):
        DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID]['stat'] = 'D'
        DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID]['done'] = parse_timestamp(line)
# }}} #

# { 
   { 
   { function result_analyse() #
def result_analyse():
    def compare(a, b):
        return b['cost'] - a['cost']

    tasklist = []
    hostsmap = { 
   }
    statvars = { 
   'sum' : 0, 'cnt' : 0, 'svr' : 0, 'max' : 0, 'min' : int(time.time())}
    tasklist_commit = []
    statvars_commit = { 
   'sum' : 0, 'cnt' : 0}

    for idx in DATAX_JOBDICT:
        item = DATAX_JOBDICT[idx]
        item['uuid'] = idx;
        item['cost'] = item['done'] - item['wake']
        tasklist.append(item);

        if (not (item['host'] in hostsmap)):
            hostsmap[item['host']] = 1
            statvars['svr'] += 1

        if (item['cost'] > -1 and item['cost'] < 864000):
            statvars['sum'] += item['cost']
            statvars['cnt'] += 1
            statvars['max'] = max(statvars['max'], item['done'])
            statvars['min'] = min(statvars['min'], item['wake'])

    for idx in DATAX_JOBDICT_COMMIT:
        itemc = DATAX_JOBDICT_COMMIT[idx]
        itemc['uuid'] = idx
        itemc['cost'] = itemc['done'] - itemc['wake']
        tasklist_commit.append(itemc)

        if (itemc['cost'] > -1 and itemc['cost'] < 864000):
            statvars_commit['sum'] += itemc['cost']
            statvars_commit['cnt'] += 1

    ttl = (statvars['max'] - statvars['min']) or 1
    idx = float(statvars['cnt']) / (statvars['sum'] or ttl)

    tasklist.sort(compare)
    for item in tasklist:
        print('%s\t%s.%s\t%s\t%s\t% 4d\t% 2.1f%%\t% .2f' %(item['stat'], item['host'], item['path'],
                                                           time.strftime('%H:%M:%S', time.localtime(item['wake'])),
                                                           (('D' == item['stat']) and time.strftime('%H:%M:%S', time.localtime(item['done']))) or '--',
                                                           item['cost'], 100 * item['cost'] / ttl, idx * item['cost']))

    if (not len(tasklist) or not statvars['cnt']):
        return

    print('\n--- DataX Profiling Statistics ---')
    print('%d task(s) on %d server(s), Total elapsed %d second(s), %.2f second(s) per task in average' %(statvars['cnt'],
                                                                                                         statvars['svr'], statvars['sum'], float(statvars['sum']) / statvars['cnt']))
    print('Actually cost %d second(s) (%s - %s), task concurrency: %.2f, tilt index: %.2f' %(ttl,
                                                                                             time.strftime('%H:%M:%S', time.localtime(statvars['min'])),
                                                                                             time.strftime('%H:%M:%S', time.localtime(statvars['max'])),
                                                                                             float(statvars['sum']) / ttl, idx * tasklist[0]['cost']))

    idx_commit = float(statvars_commit['cnt']) / (statvars_commit['sum'] or ttl)
    tasklist_commit.sort(compare)
    print('%d task(s) done odps comit, Total elapsed %d second(s), %.2f second(s) per task in average, tilt index: %.2f' % (
        statvars_commit['cnt'],
        statvars_commit['sum'], float(statvars_commit['sum']) / statvars_commit['cnt'],
        idx_commit * tasklist_commit[0]['cost']))

# }}} #

if (len(sys.argv) < 2):
    print("Usage: %s filename" %(sys.argv[0]))
    quit(1)
else:
    parse_task(sys.argv[1])
    result_analyse()

===perftrace.py文件===
#!/usr/bin/env python
# -*- coding:utf-8 -*-


""" Life's short, Python more. """

import re
import os
import sys
import json
import uuid
import signal
import time
import subprocess
from optparse import OptionParser
reload(sys)
sys.setdefaultencoding('utf8')

##begin cli & help logic
def getOptionParser():
    usage = getUsage()
    parser = OptionParser(usage = usage)
    #rdbms reader and writer
    parser.add_option('-r', '--reader', action='store', dest='reader', help='trace datasource read performance with specified !json! string')
    parser.add_option('-w', '--writer', action='store', dest='writer', help='trace datasource write performance with specified !json! string')

    parser.add_option('-c', '--channel',  action='store', dest='channel', default='1', help='the number of concurrent sync thread, the default is 1')
    parser.add_option('-f', '--file',   action='store', help='existing datax configuration file, include reader and writer params')
    parser.add_option('-t', '--type',   action='store', default='reader', help='trace which side\'s performance, cooperate with -f --file params, need to be reader or writer')
    parser.add_option('-d', '--delete',   action='store', default='true', help='delete temporary files, the default value is true')
    #parser.add_option('-h', '--help', action='store', default='true', help='print usage information')
    return parser

def getUsage():
    return ''' The following params are available for -r --reader: [these params is for rdbms reader, used to trace rdbms read performance, it's like datax's key] *datasourceType: datasource type, may be mysql|drds|oracle|ads|sqlserver|postgresql|db2 etc... *jdbcUrl: datasource jdbc connection string, mysql as a example: jdbc:mysql://ip:port/database *username: username for datasource *password: password for datasource *table: table name for read data column: column to be read, the default value is ['*'] splitPk: the splitPk column of rdbms table where: limit the scope of the performance data set fetchSize: how many rows to be fetched at each communicate [these params is for stream reader, used to trace rdbms write performance] reader-sliceRecordCount: how man test data to mock(each channel), the default value is 10000 reader-column : stream reader while generate test data(type supports: string|long|date|double|bool|bytes; support constant value and random function),demo: [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}] The following params are available for -w --writer: [these params is for rdbms writer, used to trace rdbms write performance, it's like datax's key] datasourceType: datasource type, may be mysql|drds|oracle|ads|sqlserver|postgresql|db2|ads etc... *jdbcUrl: datasource jdbc connection string, mysql as a example: jdbc:mysql://ip:port/database *username: username for datasource *password: password for datasource *table: table name for write data column: column to be writed, the default value is ['*'] batchSize: how many rows to be storeed at each communicate, the default value is 512 preSql: prepare sql to be executed before write data, the default value is '' postSql: post sql to be executed end of write data, the default value is '' url: required for ads, pattern is ip:port schme: required for ads, ads database name [these params is for stream writer, used to trace rdbms read performance] writer-print: true means print data read from source datasource, the default value is false The following params are available global control: -c --channel: the number of concurrent tasks, the default value is 1 -f --file: existing completely dataX configuration file path -t --type: test read or write performance for a datasource, couble be reader or writer, in collaboration with -f --file -h --help: print help message some demo: perftrace.py --channel=10 --reader='{ 
   "jdbcUrl":"jdbc:mysql://127.0.0.1:3306/database", "username":"", "password":"", "table": "", "where":"", "splitPk":"", "writer-print":"false"}' perftrace.py --channel=10 --writer='{ 
   "jdbcUrl":"jdbc:mysql://127.0.0.1:3306/database", "username":"", "password":"", "table": "", "reader-sliceRecordCount": "10000", "reader-column": [{ 
   "type":"string","value":"abc"},{ 
   "type":"string","random":"10,20"}]}' perftrace.py --file=/tmp/datax.job.json --type=reader --reader='{ 
   "writer-print": "false"}' perftrace.py --file=/tmp/datax.job.json --type=writer --writer='{ 
   "reader-sliceRecordCount": "10000", "reader-column": [{ 
   "type":"string","value":"abc"},{ 
   "type":"string","random":"10,20"}]}' some example jdbc url pattern, may help: jdbc:oracle:thin:@ip:port:database jdbc:mysql://ip:port/database jdbc:sqlserver://ip:port;DatabaseName=database jdbc:postgresql://ip:port/database warn: ads url pattern is ip:port warn: test write performance will write data into your table, you can use a temporary table just for test. '''

def printCopyright():
    DATAX_VERSION = 'UNKNOWN_DATAX_VERSION'
    print(''' DataX Util Tools (%s), From Alibaba ! Copyright (C) 2010-2016, Alibaba Group. All Rights Reserved.''' % DATAX_VERSION)
    sys.stdout.flush()


def yesNoChoice():
    yes = set(['yes','y', 'ye', ''])
    no = set(['no','n'])
    choice = raw_input().lower()
    if choice in yes:
        return True
    elif choice in no:
        return False
    else:
        sys.stdout.write("Please respond with 'yes' or 'no'")
##end cli & help logic


##begin process logic
def suicide(signum, e):
    global childProcess
    print >> sys.stderr, "[Error] Receive unexpected signal %d, starts to suicide." % (signum)
    if childProcess:
        childProcess.send_signal(signal.SIGQUIT)
        time.sleep(1)
        childProcess.kill()
    print >> sys.stderr, "DataX Process was killed ! you did ?"
    sys.exit(-1)


def registerSignal():
    global childProcess
    signal.signal(2, suicide)
    signal.signal(3, suicide)
    signal.signal(15, suicide)


def fork(command, isShell=False):
    global childProcess
    childProcess = subprocess.Popen(command, shell = isShell)
    registerSignal()
    (stdout, stderr) = childProcess.communicate()
    #阻塞直到子进程结束
    childProcess.wait()
    return childProcess.returncode
##end process logic


##begin datax json generate logic
#warn: if not '': -> true; if not None: -> true
def notNone(obj, context):
    if not obj:
        raise Exception("Configuration property [%s] could not be blank!" % (context))

def attributeNotNone(obj, attributes):
    for key in attributes:
        notNone(obj.get(key), key)

def isBlank(value):
    if value is None or len(value.strip()) == 0:
        return True
    return False

def parsePluginName(jdbcUrl, pluginType):
    import re
    #warn: drds
    name = 'pluginName'
    mysqlRegex = re.compile('jdbc:(mysql)://.*')
    if (mysqlRegex.match(jdbcUrl)):
        name = 'mysql'
    postgresqlRegex = re.compile('jdbc:(postgresql)://.*')
    if (postgresqlRegex.match(jdbcUrl)):
        name = 'postgresql'
    oracleRegex = re.compile('jdbc:(oracle):.*')
    if (oracleRegex.match(jdbcUrl)):
        name = 'oracle'
    sqlserverRegex = re.compile('jdbc:(sqlserver)://.*')
    if (sqlserverRegex.match(jdbcUrl)):
        name = 'sqlserver'
    db2Regex = re.compile('jdbc:(db2)://.*')
    if (db2Regex.match(jdbcUrl)):
        name = 'db2'
    return "%s%s" % (name, pluginType)

def renderDataXJson(paramsDict, readerOrWriter = 'reader', channel = 1):
    dataxTemplate = { 
   
        "job": { 
   
            "setting": { 
   
                "speed": { 
   
                    "channel": 1
                }
            },
            "content": [
                { 
   
                    "reader": { 
   
                        "name": "",
                        "parameter": { 
   
                            "username": "",
                            "password": "",
                            "sliceRecordCount": "10000",
                            "column": [
                                "*"
                            ],
                            "connection": [
                                { 
   
                                    "table": [],
                                    "jdbcUrl": []
                                }
                            ]
                        }
                    },
                    "writer": { 
   
                        "name": "",
                        "parameter": { 
   
                            "print": "false",
                            "connection": [
                                { 
   
                                    "table": [],
                                    "jdbcUrl": ''
                                }
                            ]
                        }
                    }
                }
            ]
        }
    }
    dataxTemplate['job']['setting']['speed']['channel'] = channel
    dataxTemplateContent = dataxTemplate['job']['content'][0]

    pluginName = ''
    if paramsDict.get('datasourceType'):
        pluginName = '%s%s' % (paramsDict['datasourceType'], readerOrWriter)
    elif paramsDict.get('jdbcUrl'):
        pluginName = parsePluginName(paramsDict['jdbcUrl'], readerOrWriter)
    elif paramsDict.get('url'):
        pluginName = 'adswriter'

    theOtherSide = 'writer' if readerOrWriter == 'reader' else 'reader'
    dataxPluginParamsContent = dataxTemplateContent.get(readerOrWriter).get('parameter')
    dataxPluginParamsContent.update(paramsDict)

    dataxPluginParamsContentOtherSide = dataxTemplateContent.get(theOtherSide).get('parameter')

    if readerOrWriter == 'reader':
        dataxTemplateContent.get('reader')['name'] = pluginName
        dataxTemplateContent.get('writer')['name'] = 'streamwriter'
        if paramsDict.get('writer-print'):
            dataxPluginParamsContentOtherSide['print'] = paramsDict['writer-print']
            del dataxPluginParamsContent['writer-print']
        del dataxPluginParamsContentOtherSide['connection']
    if readerOrWriter == 'writer':
        dataxTemplateContent.get('reader')['name'] = 'streamreader'
        dataxTemplateContent.get('writer')['name'] = pluginName
        if paramsDict.get('reader-column'):
            dataxPluginParamsContentOtherSide['column'] = paramsDict['reader-column']
            del dataxPluginParamsContent['reader-column']
        if paramsDict.get('reader-sliceRecordCount'):
            dataxPluginParamsContentOtherSide['sliceRecordCount'] = paramsDict['reader-sliceRecordCount']
            del dataxPluginParamsContent['reader-sliceRecordCount']
        del dataxPluginParamsContentOtherSide['connection']

    if paramsDict.get('jdbcUrl'):
        if readerOrWriter == 'reader':
            dataxPluginParamsContent['connection'][0]['jdbcUrl'].append(paramsDict['jdbcUrl'])
        else:
            dataxPluginParamsContent['connection'][0]['jdbcUrl'] = paramsDict['jdbcUrl']
    if paramsDict.get('table'):
        dataxPluginParamsContent['connection'][0]['table'].append(paramsDict['table'])


    traceJobJson = json.dumps(dataxTemplate, indent = 4)
    return traceJobJson

def isUrl(path):
    if not path:
        return False
    if not isinstance(path, str):
        raise Exception('Configuration file path required for the string, you configure is:%s' % path)
    m = re.match(r"^http[s]?://\S+\w*", path.lower())
    if m:
        return True
    else:
        return False


def readJobJsonFromLocal(jobConfigPath):
    jobConfigContent = None
    jobConfigPath = os.path.abspath(jobConfigPath)
    file = open(jobConfigPath)
    try:
        jobConfigContent = file.read()
    finally:
        file.close()
    if not jobConfigContent:
        raise Exception("Your job configuration file read the result is empty, please check the configuration is legal, path: [%s]\nconfiguration:\n%s" % (jobConfigPath, str(jobConfigContent)))
    return jobConfigContent


def readJobJsonFromRemote(jobConfigPath):
    import urllib
    conn = urllib.urlopen(jobConfigPath)
    jobJson = conn.read()
    return jobJson

def parseJson(strConfig, context):
    try:
        return json.loads(strConfig)
    except Exception as e:
        import traceback
        traceback.print_exc()
        sys.stdout.flush()
        print >> sys.stderr, '%s %s need in line with json syntax' % (context, strConfig)
        sys.exit(-1)

def convert(options, args):
    traceJobJson = ''
    if options.file:
        if isUrl(options.file):
            traceJobJson = readJobJsonFromRemote(options.file)
        else:
            traceJobJson = readJobJsonFromLocal(options.file)
        traceJobDict = parseJson(traceJobJson, '%s content' % options.file)
        attributeNotNone(traceJobDict, ['job'])
        attributeNotNone(traceJobDict['job'], ['content'])
        attributeNotNone(traceJobDict['job']['content'][0], ['reader', 'writer'])
        attributeNotNone(traceJobDict['job']['content'][0]['reader'], ['name', 'parameter'])
        attributeNotNone(traceJobDict['job']['content'][0]['writer'], ['name', 'parameter'])
        if options.type == 'reader':
            traceJobDict['job']['content'][0]['writer']['name'] = 'streamwriter'
            if options.reader:
                traceReaderDict = parseJson(options.reader, 'reader config')
                if traceReaderDict.get('writer-print') is not None:
                    traceJobDict['job']['content'][0]['writer']['parameter']['print'] = traceReaderDict.get('writer-print')
                else:
                    traceJobDict['job']['content'][0]['writer']['parameter']['print'] = 'false'
            else:
                traceJobDict['job']['content'][0]['writer']['parameter']['print'] = 'false'
        elif options.type == 'writer':
            traceJobDict['job']['content'][0]['reader']['name'] = 'streamreader'
            if options.writer:
                traceWriterDict = parseJson(options.writer, 'writer config')
                if traceWriterDict.get('reader-column'):
                    traceJobDict['job']['content'][0]['reader']['parameter']['column'] = traceWriterDict['reader-column']
                if traceWriterDict.get('reader-sliceRecordCount'):
                    traceJobDict['job']['content'][0]['reader']['parameter']['sliceRecordCount'] = traceWriterDict['reader-sliceRecordCount']
            else:
                columnSize = len(traceJobDict['job']['content'][0]['writer']['parameter']['column'])
                streamReaderColumn = []
                for i in range(columnSize):
                    streamReaderColumn.append({ 
   "type": "long", "random": "2,10"})
                traceJobDict['job']['content'][0]['reader']['parameter']['column'] = streamReaderColumn
                traceJobDict['job']['content'][0]['reader']['parameter']['sliceRecordCount'] = 10000
        else:
            pass#do nothing
        return json.dumps(traceJobDict, indent = 4)
    elif options.reader:
        traceReaderDict = parseJson(options.reader, 'reader config')
        return renderDataXJson(traceReaderDict, 'reader', options.channel)
    elif options.writer:
        traceWriterDict = parseJson(options.writer, 'writer config')
        return renderDataXJson(traceWriterDict, 'writer', options.channel)
    else:
        print(getUsage())
        sys.exit(-1)
    #dataxParams = {}
    #for opt, value in options.__dict__.items():
    # dataxParams[opt] = value
##end datax json generate logic


if __name__ == "__main__":
    printCopyright()
    parser = getOptionParser()

    options, args = parser.parse_args(sys.argv[1:])
    #print options, args
    dataxTraceJobJson = convert(options, args)

    #由MAC地址、当前时间戳、随机数生成,可以保证全球范围内的唯一性
    dataxJobPath = os.path.join(os.getcwd(), "perftrace-" + str(uuid.uuid1()))
    jobConfigOk = True
    if os.path.exists(dataxJobPath):
        print("file already exists, truncate and rewrite it? %s" % dataxJobPath)
        if yesNoChoice():
            jobConfigOk = True
        else:
            print("exit failed, because of file conflict")
            sys.exit(-1)
    fileWriter = open(dataxJobPath, 'w')
    fileWriter.write(dataxTraceJobJson)
    fileWriter.close()


    print("trace environments:")
    print("dataxJobPath: %s" % dataxJobPath)
    dataxHomePath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
    print("dataxHomePath: %s" % dataxHomePath)

    dataxCommand = "%s %s" % (os.path.join(dataxHomePath, "bin", "datax.py"), dataxJobPath)
    print("dataxCommand: %s" % dataxCommand)

    returncode = fork(dataxCommand, True)
    if options.delete == 'true':
        os.remove(dataxJobPath)
    sys.exit(returncode)

2 改造package.xml文件

文件位置: xx\DataX\core\src\main\assembly\package.xml 。 该文件主要功能是‘拷贝’源代码中的脚本到target,实现打包的功能。package.xml 文件被 xx\DataX\core\pom.xml引用;

===package.xml文件===
<assembly
        xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
    <id></id>
    <formats>
        <format>dir</format>
    </formats>
    <includeBaseDirectory>false</includeBaseDirectory>
    <fileSets>
        <!-- for bin -->
<!--        <fileSet>-->
<!--            <directory>src/main/bin</directory>-->
<!--            <includes>-->
<!--                <include>*.*</include>-->
<!--            </includes>-->
<!--            <excludes>-->
<!--                <exclude>*.pyc</exclude>-->
<!--            </excludes>-->
<!--            <directoryMode>775</directoryMode>-->
<!--            <outputDirectory>/bin</outputDirectory>-->
<!--        </fileSet>-->
    
        <fileSet>
            <directory>src/main/bin</directory>
            <includes>
                <include>*</include>
            </includes>
            <excludes>
                <exclude>*.pyc</exclude>
            </excludes>
            <directoryMode>775</directoryMode>
            <outputDirectory>/bin</outputDirectory>
        </fileSet>
        <fileSet>
            <directory>src/main/bin/py/py2</directory>
            <includes>
                <include>*</include>
            </includes>
            <excludes>
                <exclude>*.pyc</exclude>
            </excludes>
            <directoryMode>775</directoryMode>
            <outputDirectory>/bin/py/py2</outputDirectory>
        </fileSet>
        
        <fileSet>
            <directory>src/main/bin/py/py3</directory>
            <includes>
                <include>*</include>
            </includes>
            <excludes>
                <exclude>*.pyc</exclude>
            </excludes>
            <directoryMode>775</directoryMode>
            <outputDirectory>/bin/py/py3</outputDirectory>
        </fileSet>
        <!-- for scripts -->
        <fileSet>
            <directory>src/main/script</directory>
            <includes>
                <include>*.*</include>
            </includes>
            <directoryMode>775</directoryMode>
            <outputDirectory>/script</outputDirectory>
        </fileSet>
        <!-- for configs -->
        <fileSet>
            <directory>src/main/conf</directory>
            <includes>
                <include>*.*</include>
            </includes>
            <outputDirectory>/conf</outputDirectory>
        </fileSet>
        <!-- for engine -->
        <fileSet>
            <directory>target/</directory>
            <includes>
                <include>datax-core-0.0.1-SNAPSHOT.jar</include>
            </includes>
            <outputDirectory>/lib</outputDirectory>
        </fileSet>

        <fileSet>
            <directory>src/main/job/</directory>
            <includes>
                <include>*.json</include>
            </includes>
            <outputDirectory>/job</outputDirectory>
        </fileSet>

        <fileSet>
            <directory>src/main/tools/</directory>
            <includes>
                <include>*.*</include>
            </includes>
            <outputDirectory>/tools</outputDirectory>
        </fileSet>

        <fileSet>
            <fileMode>777</fileMode>
            <directory>src/main/tmp</directory>
            <includes>
                <include>*.*</include>
            </includes>
            <outputDirectory>/tmp</outputDirectory>
        </fileSet>
    </fileSets>

    <dependencySets>
        <dependencySet>
            <useProjectArtifact>false</useProjectArtifact>
            <outputDirectory>/lib</outputDirectory>
            <scope>runtime</scope>
        </dependencySet>
    </dependencySets>
</assembly>

3 新增datax.py文件

此datax.py非彼datax.py,此文件主要用来检查用户的py环境,从而调用不同的py脚本;

===datax.py 如果有更好的写法,欢迎补充===
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 根据机器上py版本,自动选择调用py2还是py3

import sys
import os
currentFilePath = os.path.dirname(os.path.abspath(__file__))
print("currentFilePath===" + currentFilePath)

argvs=sys.argv[1:]
params = " ".join(argvs)
print("params===" + params)

if sys.version > '3':
    pyPath = "py3"
else:
    pyPath = "py2"

execPy = "python " + currentFilePath +os.sep +"py" +os.sep + pyPath + os.sep + "datax.py " + params
print("execPy===" + execPy)
os.system(execPy)

4 各类文件归位

如下图在core模块创建目录,将对应文件放入。(忽略datax.bat,该文件是想在win下无py环境直接调用datax,正在编写中)
在这里插入图片描述
项目打包编译后结果如下

在这里插入图片描述

最后,开心的去奔跑玩耍吧,管你py2还是3,just running~~~


注:

  1. 对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;

  2. 所有代码都已经上传到github(master分支和dev),可以免费白嫖

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/145514.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • lnmp动静分离[通俗易懂]

    lnmp动静分离[通俗易懂]试验环境:ip服务概括192.168.1.61mysql,nginx,php,博客分离mysql,php到其他服务器192.168.1.62mysql代替1.61的mysql192.168.1.63php代替1.61的php思路:将1.61中的mysql,php,分离到其他服务器,分离后能够正常访问到1.61上的博客即可。一.分离MySQL数据库到1.621.在1.62服务器安装mysql[root@mysqld~]#rpm-ivhhtt

    2022年5月25日
    67
  • Bean @session_spring类方法注解

    Bean @session_spring类方法注解刚开始的时候,在controller层使用@RequestParam的时候,发现这个参数是必须要输入值的,但是我们有时候必须查询的时候允许参数为空,使用这个注解就不行了。在集成了swagger2后,找了半天的原因,发现使用@ApiImplicitParam这个注解可以解决这个问题。对应下面的参数。所以我们可以使用这个注解来解决我们所遇到的参考为空的问题。而且已经集成了swagger2,所以我们尽量…

    2025年8月8日
    3
  • 【Shell常用命令二】管道符 通配符

    【Shell常用命令二】管道符 通配符

    2021年9月9日
    55
  • debian6 安装 vmtools

    debian6 安装 vmtoolsVmware7自带的tools,在debian5和centos5上都可以正常安装,在debian6上就不ok了,下面是解决方案。1.添加源debhttp://debian.uchicago.edu/debian/squeezemaincontribdeb-srchttp://debian.uchicago.edu/debian/squeezemaincon

    2022年10月19日
    4
  • Linux route命令

    Linux route命令一、route命令route命令用来显示并设置Linux内核中的网络路由表,route命令设置的路由主要是静态路由。要实现两个不同的子网之间的通信,需要一台连接两个网络的路由器,或者同时位于两个网络的网关来实现。在Linux系统中设置路由通常是为了解决以下问题:该Linux系统在一个局域网中,局域网中有一个网关,能够让机器访问Internet,那么就需要将这台机器的ip地址设置为Linux机…

    2022年7月18日
    21
  • Android游戏引擎选择[通俗易懂]

    Android游戏引擎选择[通俗易懂]Android游戏引擎选择今天在博客园看到一篇关于android游戏引擎的选择文章,特转载以便日后了解1.Ronkon(网站地址:http://www.ronkonandroid.com)如果不是想帮助作者解决一大堆兼容性问题的话还是不要使用这个引擎,我在上面浪费了1天半,就只是为了把实例程序跑起来。开始还以为是我水平菜,结果一堆人没跑起来,和我一样都是黑屏。虽然它文档做得好但我还是放弃了,本来Android平台兼容性就是老大难,在来个半吊子的引擎我可没本事搞定。2.AndEngine(网站地址:ht

    2026年1月21日
    6

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号