datax(5):改造升级-自动识别py环境,执行datax任务

datax(5):改造升级-自动识别py环境,执行datax任务1思考上篇文章已经研究过datax.py文件,产生2个问题:如果用户不是py2环境(datax默认要求环境)怎么处理;能不能有一个脚本自动识别用户的py环境,从而执行datax任务2效果在py2或py3下执行下面命令>pythondatax.py../job/job.json熟悉的配方,熟悉的味道。什么都没有变,但是背后却做了很多事情;3改造过程3.1编写py3的datax脚本共计3个文件===datax.py文件===#!/usr/bin/envpyt.

大家好,又见面了,我是你们的朋友全栈君。

一、 思考

上篇文章已经研究过datax.py文件,产生2个问题:

  1. 如果用户不是py2环境(datax默认要求环境)怎么处理;
  2. 能不能有一个脚本自动识别用户的py环境,从而执行datax任务

二、效果

在py2或py3下执行下面命令
>python datax.py ../job/job.json

在这里插入图片描述
熟悉的配方,熟悉的味道。什么都没有变,但是背后却做了很多事情;


三、改造过程

1 编写py3的datax脚本

共计3个文件

===datax.py文件===
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import sys
import os
import signal
import subprocess
import time
import re
import socket
import json
from optparse import OptionParser
from optparse import OptionGroup
from string import Template
import codecs
import platform


def isWindows():
    return platform.system() == 'Windows'


DATAX_HOME = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))

# print("DATAX_HOME==="+DATAX_HOME)

DATAX_VERSION = 'DATAX-OPENSOURCE-3.0'
if isWindows():
    codecs.register(lambda name: name == 'cp65001' and codecs.lookup('utf-8') or None)
    CLASS_PATH = ("%s/lib/*") % (DATAX_HOME)
else:
    CLASS_PATH = ("%s/lib/*:.") % (DATAX_HOME)

print("CLASS_PATH===" + CLASS_PATH)

LOGBACK_FILE = ("%s/conf/logback.xml") % (DATAX_HOME)
print("LOGBACK_FILE===" + LOGBACK_FILE)

DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
print("DEFAULT_JVM===" + DEFAULT_JVM)

DEFAULT_PROPERTY_CONF = "-Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=%s -Dlogback.configurationFile=%s" % (
    DATAX_HOME, LOGBACK_FILE)
print("DEFAULT_PROPERTY_CONF===" + DEFAULT_PROPERTY_CONF)

ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (
    DEFAULT_PROPERTY_CONF, CLASS_PATH)
print("ENGINE_COMMAND===" + ENGINE_COMMAND)

REMOTE_DEBUG_CONFIG = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=9999"
print("REMOTE_DEBUG_CONFIG===" + REMOTE_DEBUG_CONFIG)

RET_STATE = { 
   
    "KILL": 143,
    "FAIL": -1,
    "OK": 0, #无错误退出 不传参数时,默认传0
    "RUN": 1, #有错误退出
    "RETRY": 2
}


# 获取本地ip
def getLocalIp():
    try:
        return socket.gethostbyname(socket.getfqdn(socket.gethostname()))
    except:
        return "Unknown"


# 自杀(自己停止程序)
def suicide(signum):
    global child_process
    print >> sys.stderr, "[Error] DataX receive unexpected signal %d, starts to suicide." % (signum)

    if child_process:
        child_process.send_signal(signal.SIGQUIT)
        time.sleep(1)
        child_process.kill()
    print >> sys.stderr, "DataX Process was killed ! you did ?"
    sys.exit(RET_STATE["KILL"])


def register_signal():
    if not isWindows():
        global child_process
        signal.signal(2, suicide)
        signal.signal(3, suicide)
        signal.signal(15, suicide)


# 获取选项解析器
def getOptionParser():
    usage = "usage: %prog [options] job-url-or-path"
    parser = OptionParser(usage)  # 定义解析器

    # 生产环境 配置组
    prodEnvOptionGroup = OptionGroup(parser, "Product Env Options",
                                     "Normal user use these options to set jvm parameters, job runtime mode etc. "
                                     "Make sure these options can be used in Product Env.")
    prodEnvOptionGroup.add_option("-j", "--jvm", metavar="<jvm parameters>", dest="jvmParameters", action="store",
                                  default=DEFAULT_JVM, help="Set jvm parameters if necessary.")
    prodEnvOptionGroup.add_option("--jobid", metavar="<job unique id>", dest="jobid", action="store", default="-1",
                                  help="Set job unique id when running by Distribute/Local Mode.")
    prodEnvOptionGroup.add_option("-m", "--mode", metavar="<job runtime mode>",
                                  action="store", default="standalone",
                                  help="Set job runtime mode such as: standalone, local, distribute. "
                                       "Default mode is standalone.")
    prodEnvOptionGroup.add_option("-p", "--params", metavar="<parameter used in job config>",
                                  action="store", dest="params",
                                  help='Set job parameter, eg: the source tableName you want to set it by command, '
                                       'then you can use like this: -p"-DtableName=your-table-name", '
                                       'if you have mutiple parameters: -p"-DtableName=your-table-name -DcolumnName=your-column-name".'
                                       'Note: you should config in you job tableName with ${tableName}.')
    prodEnvOptionGroup.add_option("-r", "--reader", metavar="<parameter used in view job config[reader] template>",
                                  action="store", dest="reader", type="string",
                                  help='View job config[reader] template, eg: mysqlreader,streamreader')
    prodEnvOptionGroup.add_option("-w", "--writer", metavar="<parameter used in view job config[writer] template>",
                                  action="store", dest="writer", type="string",
                                  help='View job config[writer] template, eg: mysqlwriter,streamwriter')
    # 将“生产环境配置组”添加到 解析器
    parser.add_option_group(prodEnvOptionGroup)

    # 测试环境 配置组
    devEnvOptionGroup = OptionGroup(parser, "Develop/Debug Options",
                                    "Developer use these options to trace more details of DataX.")
    devEnvOptionGroup.add_option("-d", "--debug", dest="remoteDebug", action="store_true",
                                 help="Set to remote debug mode.")
    devEnvOptionGroup.add_option("--loglevel", metavar="<log level>", dest="loglevel", action="store",
                                 default="info", help="Set log level such as: debug, info, all etc.")
    # 将“测试环境配置组”添加到 解析器
    parser.add_option_group(devEnvOptionGroup)
    return parser


# 生成job配置模板
def generateJobConfigTemplate(reader, writer):
    readerRef = "Please refer to the %s document:\n https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n" % (
        reader, reader, reader)
    writerRef = "Please refer to the %s document:\n https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n " % (
        writer, writer, writer)
    print("readerRef" + readerRef)
    print("writerRef" + writerRef)
    jobGuid = 'Please save the following configuration as a json file and use\n python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json \nto run the job.\n'
    print("jobGuid" + jobGuid)
    jobTemplate = { 
   
        "job": { 
   
            "setting": { 
   
                "speed": { 
   
                    "channel": ""
                }
            },
            "content": [
                { 
   
                    "reader": { 
   },
                    "writer": { 
   }
                }
            ]
        }
    }
    readerTemplatePath = "%s/plugin/reader/%s/plugin_job_template.json" % (DATAX_HOME, reader)
    print("readerTemplatePath===" + readerTemplatePath)
    writerTemplatePath = "%s/plugin/writer/%s/plugin_job_template.json" % (DATAX_HOME, writer)
    print("writerTemplatePath===" + writerTemplatePath)
    try:
        readerPar = readPluginTemplate(readerTemplatePath)
    except Exception as e:
        print("Read reader[%s] template error: can\'t find file %s" % (reader, readerTemplatePath))
    try:
        writerPar = readPluginTemplate(writerTemplatePath)
    except Exception as e:
        print("Read writer[%s] template error: : can\'t find file %s" % (writer, writerTemplatePath))
    jobTemplate['job']['content'][0]['reader'] = readerPar
    jobTemplate['job']['content'][0]['writer'] = writerPar
    print(json.dumps(jobTemplate, indent=4, sort_keys=True))


# 读取插件模板
def readPluginTemplate(plugin):
    with open(plugin, 'r') as f:
        return json.load(f)


# 输入路径是否是一个url
def isUrl(path):
    if not path:
        return False

    assert (isinstance(path, str))
    m = re.match(r"^http[s]?://\S+\w*", path.lower())
    if m:
        return True
    else:
        return False


# 构建启动命令行
def buildStartCommand(options, args):
    commandMap = { 
   }
    tempJVMCommand = DEFAULT_JVM
    if options.jvmParameters:
        tempJVMCommand = tempJVMCommand + " " + options.jvmParameters

    if options.remoteDebug:
        tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIG
        print('local ip: ', getLocalIp())

    if options.loglevel:
        tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options.loglevel))

    if options.mode:
        commandMap["mode"] = options.mode

    # jobResource 可能是 URL,也可能是本地文件路径(相对,绝对)
    jobResource = args[0]
    if not isUrl(jobResource):
        jobResource = os.path.abspath(jobResource)
        if jobResource.lower().startswith("file://"):
            jobResource = jobResource[len("file://"):]

    jobParams = ("-Dlog.file.name=%s") % (jobResource[-20:].replace('/', '_').replace('.', '_'))
    if options.params:
        jobParams = jobParams + " " + options.params

    if options.jobid:
        commandMap["jobid"] = options.jobid

    commandMap["jvm"] = tempJVMCommand
    commandMap["params"] = jobParams
    commandMap["job"] = jobResource

    return Template(ENGINE_COMMAND).substitute(**commandMap)


def printCopyright():
    print(''' DataX (%s), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved. ''' % DATAX_VERSION)
    sys.stdout.flush()


if __name__ == "__main__":
    printCopyright()  # 打印版权信息等
    parser = getOptionParser()  # 获取解析器,用于解析datax启动命令里的各项参数
    options, args = parser.parse_args(sys.argv[1:])
    if options.reader is not None and options.writer is not None:
        # 如果参数中有reader和 writer 则 构建job json
        generateJobConfigTemplate(options.reader, options.writer)
        sys.exit(RET_STATE['OK'])
    if len(args) != 1:
        parser.print_help()
        sys.exit(RET_STATE['FAIL'])

    startCommand = buildStartCommand(options, args)
    print("startCommand===" + startCommand )

    child_process = subprocess.Popen(startCommand, True)
    register_signal()
    (stdout, stderr) = child_process.communicate()

    sys.exit(child_process.returncode)

===dxprof.py文件===
#! /usr/bin/env python
# vim: set expandtab tabstop=4 shiftwidth=4 foldmethod=marker nu:

import re
import sys
import time

REG_SQL_WAKE = re.compile(r'Begin\s+to\s+read\s+record\s+by\s+Sql', re.IGNORECASE)
REG_SQL_DONE = re.compile(r'Finished\s+read\s+record\s+by\s+Sql', re.IGNORECASE)
REG_SQL_PATH = re.compile(r'from\s+(\w+)(\s+where|\s*$)', re.IGNORECASE)
REG_SQL_JDBC = re.compile(r'jdbcUrl:\s*\[(.+?)\]', re.IGNORECASE)
REG_SQL_UUID = re.compile(r'(\d+\-)+reader')
REG_COMMIT_UUID = re.compile(r'(\d+\-)+writer')
REG_COMMIT_WAKE = re.compile(r'begin\s+to\s+commit\s+blocks', re.IGNORECASE)
REG_COMMIT_DONE = re.compile(r'commit\s+blocks\s+ok', re.IGNORECASE)

# { 
   { 
   { function parse_timestamp() #
def parse_timestamp(line):
    try:
        ts = int(time.mktime(time.strptime(line[0:19], '%Y-%m-%d %H:%M:%S')))
    except:
        ts = 0

    return ts

# }}} #

# { 
   { 
   { function parse_query_host() #
def parse_query_host(line):
    ori = REG_SQL_JDBC.search(line)
    if (not ori):
        return ''

    ori = ori.group(1).split('?')[0]
    off = ori.find('@')
    if (off > -1):
        ori = ori[off+1:len(ori)]
    else:
        off = ori.find('//')
        if (off > -1):
            ori = ori[off+2:len(ori)]

    return ori.lower()
# }}} #

# { 
   { 
   { function parse_query_table() #
def parse_query_table(line):
    ori = REG_SQL_PATH.search(line)
    return (ori and ori.group(1).lower()) or ''
# }}} #

# { 
   { 
   { function parse_reader_task() #
def parse_task(fname):
    global LAST_SQL_UUID
    global LAST_COMMIT_UUID
    global DATAX_JOBDICT
    global DATAX_JOBDICT_COMMIT
    global UNIXTIME
    LAST_SQL_UUID = ''
    DATAX_JOBDICT = { 
   }
    LAST_COMMIT_UUID = ''
    DATAX_JOBDICT_COMMIT = { 
   }

    UNIXTIME = int(time.time())
    with open(fname, 'r') as f:
        for line in f.readlines():
            line = line.strip()

            if (LAST_SQL_UUID and (LAST_SQL_UUID in DATAX_JOBDICT)):
                DATAX_JOBDICT[LAST_SQL_UUID]['host'] = parse_query_host(line)
                LAST_SQL_UUID = ''

            if line.find('CommonRdbmsReader$Task') > 0:
                parse_read_task(line)
            elif line.find('commit blocks') > 0:
                parse_write_task(line)
            else:
                continue
# }}} #

# { 
   { 
   { function parse_read_task() #
def parse_read_task(line):
    ser = REG_SQL_UUID.search(line)
    if not ser:
        return

    LAST_SQL_UUID = ser.group()
    if REG_SQL_WAKE.search(line):
        DATAX_JOBDICT[LAST_SQL_UUID] = { 
   
            'stat' : 'R',
            'wake' : parse_timestamp(line),
            'done' : UNIXTIME,
            'host' : parse_query_host(line),
            'path' : parse_query_table(line)
        }
    elif ((LAST_SQL_UUID in DATAX_JOBDICT) and REG_SQL_DONE.search(line)):
        DATAX_JOBDICT[LAST_SQL_UUID]['stat'] = 'D'
        DATAX_JOBDICT[LAST_SQL_UUID]['done'] = parse_timestamp(line)
# }}} #

# { 
   { 
   { function parse_write_task() #
def parse_write_task(line):
    ser = REG_COMMIT_UUID.search(line)
    if not ser:
        return

    LAST_COMMIT_UUID = ser.group()
    if REG_COMMIT_WAKE.search(line):
        DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID] = { 
   
            'stat' : 'R',
            'wake' : parse_timestamp(line),
            'done' : UNIXTIME,
        }
    elif ((LAST_COMMIT_UUID in DATAX_JOBDICT_COMMIT) and REG_COMMIT_DONE.search(line)):
        DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID]['stat'] = 'D'
        DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID]['done'] = parse_timestamp(line)
# }}} #

# { 
   { 
   { function result_analyse() #
def result_analyse():
    def compare(a, b):
        return b['cost'] - a['cost']

    tasklist = []
    hostsmap = { 
   }
    statvars = { 
   'sum' : 0, 'cnt' : 0, 'svr' : 0, 'max' : 0, 'min' : int(time.time())}
    tasklist_commit = []
    statvars_commit = { 
   'sum' : 0, 'cnt' : 0}

    for idx in DATAX_JOBDICT:
        item = DATAX_JOBDICT[idx]
        item['uuid'] = idx;
        item['cost'] = item['done'] - item['wake']
        tasklist.append(item);

        if (not (item['host'] in hostsmap)):
            hostsmap[item['host']] = 1
            statvars['svr'] += 1

        if (item['cost'] > -1 and item['cost'] < 864000):
            statvars['sum'] += item['cost']
            statvars['cnt'] += 1
            statvars['max'] = max(statvars['max'], item['done'])
            statvars['min'] = min(statvars['min'], item['wake'])

    for idx in DATAX_JOBDICT_COMMIT:
        itemc = DATAX_JOBDICT_COMMIT[idx]
        itemc['uuid'] = idx
        itemc['cost'] = itemc['done'] - itemc['wake']
        tasklist_commit.append(itemc)

        if (itemc['cost'] > -1 and itemc['cost'] < 864000):
            statvars_commit['sum'] += itemc['cost']
            statvars_commit['cnt'] += 1

    ttl = (statvars['max'] - statvars['min']) or 1
    idx = float(statvars['cnt']) / (statvars['sum'] or ttl)

    tasklist.sort(compare)
    for item in tasklist:
        print('%s\t%s.%s\t%s\t%s\t% 4d\t% 2.1f%%\t% .2f' %(item['stat'], item['host'], item['path'],
                                                           time.strftime('%H:%M:%S', time.localtime(item['wake'])),
                                                           (('D' == item['stat']) and time.strftime('%H:%M:%S', time.localtime(item['done']))) or '--',
                                                           item['cost'], 100 * item['cost'] / ttl, idx * item['cost']))

    if (not len(tasklist) or not statvars['cnt']):
        return

    print('\n--- DataX Profiling Statistics ---')
    print('%d task(s) on %d server(s), Total elapsed %d second(s), %.2f second(s) per task in average' %(statvars['cnt'],
                                                                                                         statvars['svr'], statvars['sum'], float(statvars['sum']) / statvars['cnt']))
    print('Actually cost %d second(s) (%s - %s), task concurrency: %.2f, tilt index: %.2f' %(ttl,
                                                                                             time.strftime('%H:%M:%S', time.localtime(statvars['min'])),
                                                                                             time.strftime('%H:%M:%S', time.localtime(statvars['max'])),
                                                                                             float(statvars['sum']) / ttl, idx * tasklist[0]['cost']))

    idx_commit = float(statvars_commit['cnt']) / (statvars_commit['sum'] or ttl)
    tasklist_commit.sort(compare)
    print('%d task(s) done odps comit, Total elapsed %d second(s), %.2f second(s) per task in average, tilt index: %.2f' % (
        statvars_commit['cnt'],
        statvars_commit['sum'], float(statvars_commit['sum']) / statvars_commit['cnt'],
        idx_commit * tasklist_commit[0]['cost']))

# }}} #

if (len(sys.argv) < 2):
    print("Usage: %s filename" %(sys.argv[0]))
    quit(1)
else:
    parse_task(sys.argv[1])
    result_analyse()

===perftrace.py文件===
#!/usr/bin/env python
# -*- coding:utf-8 -*-


""" Life's short, Python more. """

import re
import os
import sys
import json
import uuid
import signal
import time
import subprocess
from optparse import OptionParser
reload(sys)
sys.setdefaultencoding('utf8')

##begin cli & help logic
def getOptionParser():
    usage = getUsage()
    parser = OptionParser(usage = usage)
    #rdbms reader and writer
    parser.add_option('-r', '--reader', action='store', dest='reader', help='trace datasource read performance with specified !json! string')
    parser.add_option('-w', '--writer', action='store', dest='writer', help='trace datasource write performance with specified !json! string')

    parser.add_option('-c', '--channel',  action='store', dest='channel', default='1', help='the number of concurrent sync thread, the default is 1')
    parser.add_option('-f', '--file',   action='store', help='existing datax configuration file, include reader and writer params')
    parser.add_option('-t', '--type',   action='store', default='reader', help='trace which side\'s performance, cooperate with -f --file params, need to be reader or writer')
    parser.add_option('-d', '--delete',   action='store', default='true', help='delete temporary files, the default value is true')
    #parser.add_option('-h', '--help', action='store', default='true', help='print usage information')
    return parser

def getUsage():
    return ''' The following params are available for -r --reader: [these params is for rdbms reader, used to trace rdbms read performance, it's like datax's key] *datasourceType: datasource type, may be mysql|drds|oracle|ads|sqlserver|postgresql|db2 etc... *jdbcUrl: datasource jdbc connection string, mysql as a example: jdbc:mysql://ip:port/database *username: username for datasource *password: password for datasource *table: table name for read data column: column to be read, the default value is ['*'] splitPk: the splitPk column of rdbms table where: limit the scope of the performance data set fetchSize: how many rows to be fetched at each communicate [these params is for stream reader, used to trace rdbms write performance] reader-sliceRecordCount: how man test data to mock(each channel), the default value is 10000 reader-column : stream reader while generate test data(type supports: string|long|date|double|bool|bytes; support constant value and random function),demo: [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}] The following params are available for -w --writer: [these params is for rdbms writer, used to trace rdbms write performance, it's like datax's key] datasourceType: datasource type, may be mysql|drds|oracle|ads|sqlserver|postgresql|db2|ads etc... *jdbcUrl: datasource jdbc connection string, mysql as a example: jdbc:mysql://ip:port/database *username: username for datasource *password: password for datasource *table: table name for write data column: column to be writed, the default value is ['*'] batchSize: how many rows to be storeed at each communicate, the default value is 512 preSql: prepare sql to be executed before write data, the default value is '' postSql: post sql to be executed end of write data, the default value is '' url: required for ads, pattern is ip:port schme: required for ads, ads database name [these params is for stream writer, used to trace rdbms read performance] writer-print: true means print data read from source datasource, the default value is false The following params are available global control: -c --channel: the number of concurrent tasks, the default value is 1 -f --file: existing completely dataX configuration file path -t --type: test read or write performance for a datasource, couble be reader or writer, in collaboration with -f --file -h --help: print help message some demo: perftrace.py --channel=10 --reader='{ 
   "jdbcUrl":"jdbc:mysql://127.0.0.1:3306/database", "username":"", "password":"", "table": "", "where":"", "splitPk":"", "writer-print":"false"}' perftrace.py --channel=10 --writer='{ 
   "jdbcUrl":"jdbc:mysql://127.0.0.1:3306/database", "username":"", "password":"", "table": "", "reader-sliceRecordCount": "10000", "reader-column": [{ 
   "type":"string","value":"abc"},{ 
   "type":"string","random":"10,20"}]}' perftrace.py --file=/tmp/datax.job.json --type=reader --reader='{ 
   "writer-print": "false"}' perftrace.py --file=/tmp/datax.job.json --type=writer --writer='{ 
   "reader-sliceRecordCount": "10000", "reader-column": [{ 
   "type":"string","value":"abc"},{ 
   "type":"string","random":"10,20"}]}' some example jdbc url pattern, may help: jdbc:oracle:thin:@ip:port:database jdbc:mysql://ip:port/database jdbc:sqlserver://ip:port;DatabaseName=database jdbc:postgresql://ip:port/database warn: ads url pattern is ip:port warn: test write performance will write data into your table, you can use a temporary table just for test. '''

def printCopyright():
    DATAX_VERSION = 'UNKNOWN_DATAX_VERSION'
    print(''' DataX Util Tools (%s), From Alibaba ! Copyright (C) 2010-2016, Alibaba Group. All Rights Reserved.''' % DATAX_VERSION)
    sys.stdout.flush()


def yesNoChoice():
    yes = set(['yes','y', 'ye', ''])
    no = set(['no','n'])
    choice = raw_input().lower()
    if choice in yes:
        return True
    elif choice in no:
        return False
    else:
        sys.stdout.write("Please respond with 'yes' or 'no'")
##end cli & help logic


##begin process logic
def suicide(signum, e):
    global childProcess
    print >> sys.stderr, "[Error] Receive unexpected signal %d, starts to suicide." % (signum)
    if childProcess:
        childProcess.send_signal(signal.SIGQUIT)
        time.sleep(1)
        childProcess.kill()
    print >> sys.stderr, "DataX Process was killed ! you did ?"
    sys.exit(-1)


def registerSignal():
    global childProcess
    signal.signal(2, suicide)
    signal.signal(3, suicide)
    signal.signal(15, suicide)


def fork(command, isShell=False):
    global childProcess
    childProcess = subprocess.Popen(command, shell = isShell)
    registerSignal()
    (stdout, stderr) = childProcess.communicate()
    #阻塞直到子进程结束
    childProcess.wait()
    return childProcess.returncode
##end process logic


##begin datax json generate logic
#warn: if not '': -> true; if not None: -> true
def notNone(obj, context):
    if not obj:
        raise Exception("Configuration property [%s] could not be blank!" % (context))

def attributeNotNone(obj, attributes):
    for key in attributes:
        notNone(obj.get(key), key)

def isBlank(value):
    if value is None or len(value.strip()) == 0:
        return True
    return False

def parsePluginName(jdbcUrl, pluginType):
    import re
    #warn: drds
    name = 'pluginName'
    mysqlRegex = re.compile('jdbc:(mysql)://.*')
    if (mysqlRegex.match(jdbcUrl)):
        name = 'mysql'
    postgresqlRegex = re.compile('jdbc:(postgresql)://.*')
    if (postgresqlRegex.match(jdbcUrl)):
        name = 'postgresql'
    oracleRegex = re.compile('jdbc:(oracle):.*')
    if (oracleRegex.match(jdbcUrl)):
        name = 'oracle'
    sqlserverRegex = re.compile('jdbc:(sqlserver)://.*')
    if (sqlserverRegex.match(jdbcUrl)):
        name = 'sqlserver'
    db2Regex = re.compile('jdbc:(db2)://.*')
    if (db2Regex.match(jdbcUrl)):
        name = 'db2'
    return "%s%s" % (name, pluginType)

def renderDataXJson(paramsDict, readerOrWriter = 'reader', channel = 1):
    dataxTemplate = { 
   
        "job": { 
   
            "setting": { 
   
                "speed": { 
   
                    "channel": 1
                }
            },
            "content": [
                { 
   
                    "reader": { 
   
                        "name": "",
                        "parameter": { 
   
                            "username": "",
                            "password": "",
                            "sliceRecordCount": "10000",
                            "column": [
                                "*"
                            ],
                            "connection": [
                                { 
   
                                    "table": [],
                                    "jdbcUrl": []
                                }
                            ]
                        }
                    },
                    "writer": { 
   
                        "name": "",
                        "parameter": { 
   
                            "print": "false",
                            "connection": [
                                { 
   
                                    "table": [],
                                    "jdbcUrl": ''
                                }
                            ]
                        }
                    }
                }
            ]
        }
    }
    dataxTemplate['job']['setting']['speed']['channel'] = channel
    dataxTemplateContent = dataxTemplate['job']['content'][0]

    pluginName = ''
    if paramsDict.get('datasourceType'):
        pluginName = '%s%s' % (paramsDict['datasourceType'], readerOrWriter)
    elif paramsDict.get('jdbcUrl'):
        pluginName = parsePluginName(paramsDict['jdbcUrl'], readerOrWriter)
    elif paramsDict.get('url'):
        pluginName = 'adswriter'

    theOtherSide = 'writer' if readerOrWriter == 'reader' else 'reader'
    dataxPluginParamsContent = dataxTemplateContent.get(readerOrWriter).get('parameter')
    dataxPluginParamsContent.update(paramsDict)

    dataxPluginParamsContentOtherSide = dataxTemplateContent.get(theOtherSide).get('parameter')

    if readerOrWriter == 'reader':
        dataxTemplateContent.get('reader')['name'] = pluginName
        dataxTemplateContent.get('writer')['name'] = 'streamwriter'
        if paramsDict.get('writer-print'):
            dataxPluginParamsContentOtherSide['print'] = paramsDict['writer-print']
            del dataxPluginParamsContent['writer-print']
        del dataxPluginParamsContentOtherSide['connection']
    if readerOrWriter == 'writer':
        dataxTemplateContent.get('reader')['name'] = 'streamreader'
        dataxTemplateContent.get('writer')['name'] = pluginName
        if paramsDict.get('reader-column'):
            dataxPluginParamsContentOtherSide['column'] = paramsDict['reader-column']
            del dataxPluginParamsContent['reader-column']
        if paramsDict.get('reader-sliceRecordCount'):
            dataxPluginParamsContentOtherSide['sliceRecordCount'] = paramsDict['reader-sliceRecordCount']
            del dataxPluginParamsContent['reader-sliceRecordCount']
        del dataxPluginParamsContentOtherSide['connection']

    if paramsDict.get('jdbcUrl'):
        if readerOrWriter == 'reader':
            dataxPluginParamsContent['connection'][0]['jdbcUrl'].append(paramsDict['jdbcUrl'])
        else:
            dataxPluginParamsContent['connection'][0]['jdbcUrl'] = paramsDict['jdbcUrl']
    if paramsDict.get('table'):
        dataxPluginParamsContent['connection'][0]['table'].append(paramsDict['table'])


    traceJobJson = json.dumps(dataxTemplate, indent = 4)
    return traceJobJson

def isUrl(path):
    if not path:
        return False
    if not isinstance(path, str):
        raise Exception('Configuration file path required for the string, you configure is:%s' % path)
    m = re.match(r"^http[s]?://\S+\w*", path.lower())
    if m:
        return True
    else:
        return False


def readJobJsonFromLocal(jobConfigPath):
    jobConfigContent = None
    jobConfigPath = os.path.abspath(jobConfigPath)
    file = open(jobConfigPath)
    try:
        jobConfigContent = file.read()
    finally:
        file.close()
    if not jobConfigContent:
        raise Exception("Your job configuration file read the result is empty, please check the configuration is legal, path: [%s]\nconfiguration:\n%s" % (jobConfigPath, str(jobConfigContent)))
    return jobConfigContent


def readJobJsonFromRemote(jobConfigPath):
    import urllib
    conn = urllib.urlopen(jobConfigPath)
    jobJson = conn.read()
    return jobJson

def parseJson(strConfig, context):
    try:
        return json.loads(strConfig)
    except Exception as e:
        import traceback
        traceback.print_exc()
        sys.stdout.flush()
        print >> sys.stderr, '%s %s need in line with json syntax' % (context, strConfig)
        sys.exit(-1)

def convert(options, args):
    traceJobJson = ''
    if options.file:
        if isUrl(options.file):
            traceJobJson = readJobJsonFromRemote(options.file)
        else:
            traceJobJson = readJobJsonFromLocal(options.file)
        traceJobDict = parseJson(traceJobJson, '%s content' % options.file)
        attributeNotNone(traceJobDict, ['job'])
        attributeNotNone(traceJobDict['job'], ['content'])
        attributeNotNone(traceJobDict['job']['content'][0], ['reader', 'writer'])
        attributeNotNone(traceJobDict['job']['content'][0]['reader'], ['name', 'parameter'])
        attributeNotNone(traceJobDict['job']['content'][0]['writer'], ['name', 'parameter'])
        if options.type == 'reader':
            traceJobDict['job']['content'][0]['writer']['name'] = 'streamwriter'
            if options.reader:
                traceReaderDict = parseJson(options.reader, 'reader config')
                if traceReaderDict.get('writer-print') is not None:
                    traceJobDict['job']['content'][0]['writer']['parameter']['print'] = traceReaderDict.get('writer-print')
                else:
                    traceJobDict['job']['content'][0]['writer']['parameter']['print'] = 'false'
            else:
                traceJobDict['job']['content'][0]['writer']['parameter']['print'] = 'false'
        elif options.type == 'writer':
            traceJobDict['job']['content'][0]['reader']['name'] = 'streamreader'
            if options.writer:
                traceWriterDict = parseJson(options.writer, 'writer config')
                if traceWriterDict.get('reader-column'):
                    traceJobDict['job']['content'][0]['reader']['parameter']['column'] = traceWriterDict['reader-column']
                if traceWriterDict.get('reader-sliceRecordCount'):
                    traceJobDict['job']['content'][0]['reader']['parameter']['sliceRecordCount'] = traceWriterDict['reader-sliceRecordCount']
            else:
                columnSize = len(traceJobDict['job']['content'][0]['writer']['parameter']['column'])
                streamReaderColumn = []
                for i in range(columnSize):
                    streamReaderColumn.append({ 
   "type": "long", "random": "2,10"})
                traceJobDict['job']['content'][0]['reader']['parameter']['column'] = streamReaderColumn
                traceJobDict['job']['content'][0]['reader']['parameter']['sliceRecordCount'] = 10000
        else:
            pass#do nothing
        return json.dumps(traceJobDict, indent = 4)
    elif options.reader:
        traceReaderDict = parseJson(options.reader, 'reader config')
        return renderDataXJson(traceReaderDict, 'reader', options.channel)
    elif options.writer:
        traceWriterDict = parseJson(options.writer, 'writer config')
        return renderDataXJson(traceWriterDict, 'writer', options.channel)
    else:
        print(getUsage())
        sys.exit(-1)
    #dataxParams = {}
    #for opt, value in options.__dict__.items():
    # dataxParams[opt] = value
##end datax json generate logic


if __name__ == "__main__":
    printCopyright()
    parser = getOptionParser()

    options, args = parser.parse_args(sys.argv[1:])
    #print options, args
    dataxTraceJobJson = convert(options, args)

    #由MAC地址、当前时间戳、随机数生成,可以保证全球范围内的唯一性
    dataxJobPath = os.path.join(os.getcwd(), "perftrace-" + str(uuid.uuid1()))
    jobConfigOk = True
    if os.path.exists(dataxJobPath):
        print("file already exists, truncate and rewrite it? %s" % dataxJobPath)
        if yesNoChoice():
            jobConfigOk = True
        else:
            print("exit failed, because of file conflict")
            sys.exit(-1)
    fileWriter = open(dataxJobPath, 'w')
    fileWriter.write(dataxTraceJobJson)
    fileWriter.close()


    print("trace environments:")
    print("dataxJobPath: %s" % dataxJobPath)
    dataxHomePath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
    print("dataxHomePath: %s" % dataxHomePath)

    dataxCommand = "%s %s" % (os.path.join(dataxHomePath, "bin", "datax.py"), dataxJobPath)
    print("dataxCommand: %s" % dataxCommand)

    returncode = fork(dataxCommand, True)
    if options.delete == 'true':
        os.remove(dataxJobPath)
    sys.exit(returncode)

2 改造package.xml文件

文件位置: xx\DataX\core\src\main\assembly\package.xml 。 该文件主要功能是‘拷贝’源代码中的脚本到target,实现打包的功能。package.xml 文件被 xx\DataX\core\pom.xml引用;

===package.xml文件===
<assembly
        xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
    <id></id>
    <formats>
        <format>dir</format>
    </formats>
    <includeBaseDirectory>false</includeBaseDirectory>
    <fileSets>
        <!-- for bin -->
<!--        <fileSet>-->
<!--            <directory>src/main/bin</directory>-->
<!--            <includes>-->
<!--                <include>*.*</include>-->
<!--            </includes>-->
<!--            <excludes>-->
<!--                <exclude>*.pyc</exclude>-->
<!--            </excludes>-->
<!--            <directoryMode>775</directoryMode>-->
<!--            <outputDirectory>/bin</outputDirectory>-->
<!--        </fileSet>-->
    
        <fileSet>
            <directory>src/main/bin</directory>
            <includes>
                <include>*</include>
            </includes>
            <excludes>
                <exclude>*.pyc</exclude>
            </excludes>
            <directoryMode>775</directoryMode>
            <outputDirectory>/bin</outputDirectory>
        </fileSet>
        <fileSet>
            <directory>src/main/bin/py/py2</directory>
            <includes>
                <include>*</include>
            </includes>
            <excludes>
                <exclude>*.pyc</exclude>
            </excludes>
            <directoryMode>775</directoryMode>
            <outputDirectory>/bin/py/py2</outputDirectory>
        </fileSet>
        
        <fileSet>
            <directory>src/main/bin/py/py3</directory>
            <includes>
                <include>*</include>
            </includes>
            <excludes>
                <exclude>*.pyc</exclude>
            </excludes>
            <directoryMode>775</directoryMode>
            <outputDirectory>/bin/py/py3</outputDirectory>
        </fileSet>
        <!-- for scripts -->
        <fileSet>
            <directory>src/main/script</directory>
            <includes>
                <include>*.*</include>
            </includes>
            <directoryMode>775</directoryMode>
            <outputDirectory>/script</outputDirectory>
        </fileSet>
        <!-- for configs -->
        <fileSet>
            <directory>src/main/conf</directory>
            <includes>
                <include>*.*</include>
            </includes>
            <outputDirectory>/conf</outputDirectory>
        </fileSet>
        <!-- for engine -->
        <fileSet>
            <directory>target/</directory>
            <includes>
                <include>datax-core-0.0.1-SNAPSHOT.jar</include>
            </includes>
            <outputDirectory>/lib</outputDirectory>
        </fileSet>

        <fileSet>
            <directory>src/main/job/</directory>
            <includes>
                <include>*.json</include>
            </includes>
            <outputDirectory>/job</outputDirectory>
        </fileSet>

        <fileSet>
            <directory>src/main/tools/</directory>
            <includes>
                <include>*.*</include>
            </includes>
            <outputDirectory>/tools</outputDirectory>
        </fileSet>

        <fileSet>
            <fileMode>777</fileMode>
            <directory>src/main/tmp</directory>
            <includes>
                <include>*.*</include>
            </includes>
            <outputDirectory>/tmp</outputDirectory>
        </fileSet>
    </fileSets>

    <dependencySets>
        <dependencySet>
            <useProjectArtifact>false</useProjectArtifact>
            <outputDirectory>/lib</outputDirectory>
            <scope>runtime</scope>
        </dependencySet>
    </dependencySets>
</assembly>

3 新增datax.py文件

此datax.py非彼datax.py,此文件主要用来检查用户的py环境,从而调用不同的py脚本;

===datax.py 如果有更好的写法,欢迎补充===
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 根据机器上py版本,自动选择调用py2还是py3

import sys
import os
currentFilePath = os.path.dirname(os.path.abspath(__file__))
print("currentFilePath===" + currentFilePath)

argvs=sys.argv[1:]
params = " ".join(argvs)
print("params===" + params)

if sys.version > '3':
    pyPath = "py3"
else:
    pyPath = "py2"

execPy = "python " + currentFilePath +os.sep +"py" +os.sep + pyPath + os.sep + "datax.py " + params
print("execPy===" + execPy)
os.system(execPy)

4 各类文件归位

如下图在core模块创建目录,将对应文件放入。(忽略datax.bat,该文件是想在win下无py环境直接调用datax,正在编写中)
在这里插入图片描述
项目打包编译后结果如下

在这里插入图片描述

最后,开心的去奔跑玩耍吧,管你py2还是3,just running~~~


注:

  1. 对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;

  2. 所有代码都已经上传到github(master分支和dev),可以免费白嫖

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/145514.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • Pycharm我认为最好看,最舒服的主题配色和字体设置

    Pycharm我认为最好看,最舒服的主题配色和字体设置File->Settings,如下图所示设置主题Editor->ColorScheme->Python,如下图所示,在右侧第一个框中下拉选择Twilight。这个主题看着就很舒服。设置字体Editor->General->Font,在右侧的Fonts是选择字体样式为Monospaced,大小Size设为18,行间距Linespacing设为1.2这样就设置完成啦!大概是这个样子,有没有觉得看起来hen舒服。如果有觉得更好的主题样式,欢迎大家一起来分享。

    2022年8月25日
    9
  • mysql前缀索引的索引选择性

    mysql前缀索引的索引选择性mysql前缀索引的索引选择性一.基础概念在mysql中建立前缀索引的意义在于相对于整列建立索引,前缀索引仅仅是选择该列的部分字符作为索引,减少索引的字符可以节约索引空间,从而提高索引效率,但这样也会降低索引的选择性关于索引的选择性,它是指不重复的索引值(也称为基数cardinality)和数据表的记录总数的比值,范围从1/(数据表记录总数)到1之间。索引的选择性越高则查询效率越高,因为选

    2022年5月23日
    39
  • 分布式Session共享解决方案「建议收藏」

    Session是服务器用来保存用户操作的一系列会话信息,由Web容器进行管理。单机情况下,不存在Session共享的情况,分布式情况下,如果不进行Session共享会出现请求落到不同机器要重复登录的情况,一般来说解决Session共享有以下几种方案。1、session复制session复制是早期的企业级的使用比较多的一种服务器集群session管理机制。应用服务器开启web容器的sessi…

    2022年4月4日
    38
  • 电商网站详情页系统架构图_连连跨境电商

    电商网站详情页系统架构图_连连跨境电商电商网站的商品详情页系统架构小型电商网站的商品详情页系统架构小型电商网站的页面展示采用页面全量静态化的思想。数据库中存放了所有的商品信息,页面静态化系统,将数据填充进静态模板中,形成静态化页面,推入Nginx服务器。用户浏览网站页面时,取用一个已经静态化好的html页面,直接返回回去,不涉及任何的业务逻辑处理。下面是页面模板的简单Demo。<html>&…

    2022年10月1日
    3
  • k8s(十)基本存储[通俗易懂]

    k8s(十)基本存储[通俗易懂]文章目录概述EmptyDirHostPathNFSk8s的数据存储概述在前面已经提到,容器的生命周期可能很短,会被频繁的创建和销毁。那么容器在销毁的时候,保存在容器中的数据也会被清除。这种结果对用户来说,在某些情况下是不乐意看到的。为了持久化保存容器中的数据,kubernetes引入了Volume的概念。Volume是Pod中能够被多个容器访问的共享目录,它被定义在Pod上,然后被一个Pod里面的多个容器挂载到具体的文件目录下,kubernetes通过Volume实现同一个Pod中不同容器之间的数据

    2022年8月9日
    2
  • activiti7实战教程(一)集成用户系统

    activiti7实战教程(一)集成用户系统新建SpringBoot项目版本号2.6.3 <?xmlversion=”1.0″encoding=”UTF-8″?><projectxmlns=”http://maven.apache.org/POM/4.0.0″xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0https://maven.a

    2022年7月21日
    23

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号