datax(5):改造升级-自动识别py环境,执行datax任务

datax(5):改造升级-自动识别py环境,执行datax任务1思考上篇文章已经研究过datax.py文件,产生2个问题:如果用户不是py2环境(datax默认要求环境)怎么处理;能不能有一个脚本自动识别用户的py环境,从而执行datax任务2效果在py2或py3下执行下面命令>pythondatax.py../job/job.json熟悉的配方,熟悉的味道。什么都没有变,但是背后却做了很多事情;3改造过程3.1编写py3的datax脚本共计3个文件===datax.py文件===#!/usr/bin/envpyt.

大家好,又见面了,我是你们的朋友全栈君。

一、 思考

上篇文章已经研究过datax.py文件,产生2个问题:

  1. 如果用户不是py2环境(datax默认要求环境)怎么处理;
  2. 能不能有一个脚本自动识别用户的py环境,从而执行datax任务

二、效果

在py2或py3下执行下面命令
>python datax.py ../job/job.json

在这里插入图片描述
熟悉的配方,熟悉的味道。什么都没有变,但是背后却做了很多事情;


三、改造过程

1 编写py3的datax脚本

共计3个文件

===datax.py文件===
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import sys
import os
import signal
import subprocess
import time
import re
import socket
import json
from optparse import OptionParser
from optparse import OptionGroup
from string import Template
import codecs
import platform


def isWindows():
    return platform.system() == 'Windows'


DATAX_HOME = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))

# print("DATAX_HOME==="+DATAX_HOME)

DATAX_VERSION = 'DATAX-OPENSOURCE-3.0'
if isWindows():
    codecs.register(lambda name: name == 'cp65001' and codecs.lookup('utf-8') or None)
    CLASS_PATH = ("%s/lib/*") % (DATAX_HOME)
else:
    CLASS_PATH = ("%s/lib/*:.") % (DATAX_HOME)

print("CLASS_PATH===" + CLASS_PATH)

LOGBACK_FILE = ("%s/conf/logback.xml") % (DATAX_HOME)
print("LOGBACK_FILE===" + LOGBACK_FILE)

DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
print("DEFAULT_JVM===" + DEFAULT_JVM)

DEFAULT_PROPERTY_CONF = "-Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=%s -Dlogback.configurationFile=%s" % (
    DATAX_HOME, LOGBACK_FILE)
print("DEFAULT_PROPERTY_CONF===" + DEFAULT_PROPERTY_CONF)

ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (
    DEFAULT_PROPERTY_CONF, CLASS_PATH)
print("ENGINE_COMMAND===" + ENGINE_COMMAND)

REMOTE_DEBUG_CONFIG = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=9999"
print("REMOTE_DEBUG_CONFIG===" + REMOTE_DEBUG_CONFIG)

RET_STATE = { 
   
    "KILL": 143,
    "FAIL": -1,
    "OK": 0, #无错误退出 不传参数时,默认传0
    "RUN": 1, #有错误退出
    "RETRY": 2
}


# 获取本地ip
def getLocalIp():
    try:
        return socket.gethostbyname(socket.getfqdn(socket.gethostname()))
    except:
        return "Unknown"


# 自杀(自己停止程序)
def suicide(signum):
    global child_process
    print >> sys.stderr, "[Error] DataX receive unexpected signal %d, starts to suicide." % (signum)

    if child_process:
        child_process.send_signal(signal.SIGQUIT)
        time.sleep(1)
        child_process.kill()
    print >> sys.stderr, "DataX Process was killed ! you did ?"
    sys.exit(RET_STATE["KILL"])


def register_signal():
    if not isWindows():
        global child_process
        signal.signal(2, suicide)
        signal.signal(3, suicide)
        signal.signal(15, suicide)


# 获取选项解析器
def getOptionParser():
    usage = "usage: %prog [options] job-url-or-path"
    parser = OptionParser(usage)  # 定义解析器

    # 生产环境 配置组
    prodEnvOptionGroup = OptionGroup(parser, "Product Env Options",
                                     "Normal user use these options to set jvm parameters, job runtime mode etc. "
                                     "Make sure these options can be used in Product Env.")
    prodEnvOptionGroup.add_option("-j", "--jvm", metavar="<jvm parameters>", dest="jvmParameters", action="store",
                                  default=DEFAULT_JVM, help="Set jvm parameters if necessary.")
    prodEnvOptionGroup.add_option("--jobid", metavar="<job unique id>", dest="jobid", action="store", default="-1",
                                  help="Set job unique id when running by Distribute/Local Mode.")
    prodEnvOptionGroup.add_option("-m", "--mode", metavar="<job runtime mode>",
                                  action="store", default="standalone",
                                  help="Set job runtime mode such as: standalone, local, distribute. "
                                       "Default mode is standalone.")
    prodEnvOptionGroup.add_option("-p", "--params", metavar="<parameter used in job config>",
                                  action="store", dest="params",
                                  help='Set job parameter, eg: the source tableName you want to set it by command, '
                                       'then you can use like this: -p"-DtableName=your-table-name", '
                                       'if you have mutiple parameters: -p"-DtableName=your-table-name -DcolumnName=your-column-name".'
                                       'Note: you should config in you job tableName with ${tableName}.')
    prodEnvOptionGroup.add_option("-r", "--reader", metavar="<parameter used in view job config[reader] template>",
                                  action="store", dest="reader", type="string",
                                  help='View job config[reader] template, eg: mysqlreader,streamreader')
    prodEnvOptionGroup.add_option("-w", "--writer", metavar="<parameter used in view job config[writer] template>",
                                  action="store", dest="writer", type="string",
                                  help='View job config[writer] template, eg: mysqlwriter,streamwriter')
    # 将“生产环境配置组”添加到 解析器
    parser.add_option_group(prodEnvOptionGroup)

    # 测试环境 配置组
    devEnvOptionGroup = OptionGroup(parser, "Develop/Debug Options",
                                    "Developer use these options to trace more details of DataX.")
    devEnvOptionGroup.add_option("-d", "--debug", dest="remoteDebug", action="store_true",
                                 help="Set to remote debug mode.")
    devEnvOptionGroup.add_option("--loglevel", metavar="<log level>", dest="loglevel", action="store",
                                 default="info", help="Set log level such as: debug, info, all etc.")
    # 将“测试环境配置组”添加到 解析器
    parser.add_option_group(devEnvOptionGroup)
    return parser


# 生成job配置模板
def generateJobConfigTemplate(reader, writer):
    readerRef = "Please refer to the %s document:\n https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n" % (
        reader, reader, reader)
    writerRef = "Please refer to the %s document:\n https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n " % (
        writer, writer, writer)
    print("readerRef" + readerRef)
    print("writerRef" + writerRef)
    jobGuid = 'Please save the following configuration as a json file and use\n python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json \nto run the job.\n'
    print("jobGuid" + jobGuid)
    jobTemplate = { 
   
        "job": { 
   
            "setting": { 
   
                "speed": { 
   
                    "channel": ""
                }
            },
            "content": [
                { 
   
                    "reader": { 
   },
                    "writer": { 
   }
                }
            ]
        }
    }
    readerTemplatePath = "%s/plugin/reader/%s/plugin_job_template.json" % (DATAX_HOME, reader)
    print("readerTemplatePath===" + readerTemplatePath)
    writerTemplatePath = "%s/plugin/writer/%s/plugin_job_template.json" % (DATAX_HOME, writer)
    print("writerTemplatePath===" + writerTemplatePath)
    try:
        readerPar = readPluginTemplate(readerTemplatePath)
    except Exception as e:
        print("Read reader[%s] template error: can\'t find file %s" % (reader, readerTemplatePath))
    try:
        writerPar = readPluginTemplate(writerTemplatePath)
    except Exception as e:
        print("Read writer[%s] template error: : can\'t find file %s" % (writer, writerTemplatePath))
    jobTemplate['job']['content'][0]['reader'] = readerPar
    jobTemplate['job']['content'][0]['writer'] = writerPar
    print(json.dumps(jobTemplate, indent=4, sort_keys=True))


# 读取插件模板
def readPluginTemplate(plugin):
    with open(plugin, 'r') as f:
        return json.load(f)


# 输入路径是否是一个url
def isUrl(path):
    if not path:
        return False

    assert (isinstance(path, str))
    m = re.match(r"^http[s]?://\S+\w*", path.lower())
    if m:
        return True
    else:
        return False


# 构建启动命令行
def buildStartCommand(options, args):
    commandMap = { 
   }
    tempJVMCommand = DEFAULT_JVM
    if options.jvmParameters:
        tempJVMCommand = tempJVMCommand + " " + options.jvmParameters

    if options.remoteDebug:
        tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIG
        print('local ip: ', getLocalIp())

    if options.loglevel:
        tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options.loglevel))

    if options.mode:
        commandMap["mode"] = options.mode

    # jobResource 可能是 URL,也可能是本地文件路径(相对,绝对)
    jobResource = args[0]
    if not isUrl(jobResource):
        jobResource = os.path.abspath(jobResource)
        if jobResource.lower().startswith("file://"):
            jobResource = jobResource[len("file://"):]

    jobParams = ("-Dlog.file.name=%s") % (jobResource[-20:].replace('/', '_').replace('.', '_'))
    if options.params:
        jobParams = jobParams + " " + options.params

    if options.jobid:
        commandMap["jobid"] = options.jobid

    commandMap["jvm"] = tempJVMCommand
    commandMap["params"] = jobParams
    commandMap["job"] = jobResource

    return Template(ENGINE_COMMAND).substitute(**commandMap)


def printCopyright():
    print(''' DataX (%s), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved. ''' % DATAX_VERSION)
    sys.stdout.flush()


if __name__ == "__main__":
    printCopyright()  # 打印版权信息等
    parser = getOptionParser()  # 获取解析器,用于解析datax启动命令里的各项参数
    options, args = parser.parse_args(sys.argv[1:])
    if options.reader is not None and options.writer is not None:
        # 如果参数中有reader和 writer 则 构建job json
        generateJobConfigTemplate(options.reader, options.writer)
        sys.exit(RET_STATE['OK'])
    if len(args) != 1:
        parser.print_help()
        sys.exit(RET_STATE['FAIL'])

    startCommand = buildStartCommand(options, args)
    print("startCommand===" + startCommand )

    child_process = subprocess.Popen(startCommand, True)
    register_signal()
    (stdout, stderr) = child_process.communicate()

    sys.exit(child_process.returncode)

===dxprof.py文件===
#! /usr/bin/env python
# vim: set expandtab tabstop=4 shiftwidth=4 foldmethod=marker nu:

import re
import sys
import time

REG_SQL_WAKE = re.compile(r'Begin\s+to\s+read\s+record\s+by\s+Sql', re.IGNORECASE)
REG_SQL_DONE = re.compile(r'Finished\s+read\s+record\s+by\s+Sql', re.IGNORECASE)
REG_SQL_PATH = re.compile(r'from\s+(\w+)(\s+where|\s*$)', re.IGNORECASE)
REG_SQL_JDBC = re.compile(r'jdbcUrl:\s*\[(.+?)\]', re.IGNORECASE)
REG_SQL_UUID = re.compile(r'(\d+\-)+reader')
REG_COMMIT_UUID = re.compile(r'(\d+\-)+writer')
REG_COMMIT_WAKE = re.compile(r'begin\s+to\s+commit\s+blocks', re.IGNORECASE)
REG_COMMIT_DONE = re.compile(r'commit\s+blocks\s+ok', re.IGNORECASE)

# { 
   { 
   { function parse_timestamp() #
def parse_timestamp(line):
    try:
        ts = int(time.mktime(time.strptime(line[0:19], '%Y-%m-%d %H:%M:%S')))
    except:
        ts = 0

    return ts

# }}} #

# { 
   { 
   { function parse_query_host() #
def parse_query_host(line):
    ori = REG_SQL_JDBC.search(line)
    if (not ori):
        return ''

    ori = ori.group(1).split('?')[0]
    off = ori.find('@')
    if (off > -1):
        ori = ori[off+1:len(ori)]
    else:
        off = ori.find('//')
        if (off > -1):
            ori = ori[off+2:len(ori)]

    return ori.lower()
# }}} #

# { 
   { 
   { function parse_query_table() #
def parse_query_table(line):
    ori = REG_SQL_PATH.search(line)
    return (ori and ori.group(1).lower()) or ''
# }}} #

# { 
   { 
   { function parse_reader_task() #
def parse_task(fname):
    global LAST_SQL_UUID
    global LAST_COMMIT_UUID
    global DATAX_JOBDICT
    global DATAX_JOBDICT_COMMIT
    global UNIXTIME
    LAST_SQL_UUID = ''
    DATAX_JOBDICT = { 
   }
    LAST_COMMIT_UUID = ''
    DATAX_JOBDICT_COMMIT = { 
   }

    UNIXTIME = int(time.time())
    with open(fname, 'r') as f:
        for line in f.readlines():
            line = line.strip()

            if (LAST_SQL_UUID and (LAST_SQL_UUID in DATAX_JOBDICT)):
                DATAX_JOBDICT[LAST_SQL_UUID]['host'] = parse_query_host(line)
                LAST_SQL_UUID = ''

            if line.find('CommonRdbmsReader$Task') > 0:
                parse_read_task(line)
            elif line.find('commit blocks') > 0:
                parse_write_task(line)
            else:
                continue
# }}} #

# { 
   { 
   { function parse_read_task() #
def parse_read_task(line):
    ser = REG_SQL_UUID.search(line)
    if not ser:
        return

    LAST_SQL_UUID = ser.group()
    if REG_SQL_WAKE.search(line):
        DATAX_JOBDICT[LAST_SQL_UUID] = { 
   
            'stat' : 'R',
            'wake' : parse_timestamp(line),
            'done' : UNIXTIME,
            'host' : parse_query_host(line),
            'path' : parse_query_table(line)
        }
    elif ((LAST_SQL_UUID in DATAX_JOBDICT) and REG_SQL_DONE.search(line)):
        DATAX_JOBDICT[LAST_SQL_UUID]['stat'] = 'D'
        DATAX_JOBDICT[LAST_SQL_UUID]['done'] = parse_timestamp(line)
# }}} #

# { 
   { 
   { function parse_write_task() #
def parse_write_task(line):
    ser = REG_COMMIT_UUID.search(line)
    if not ser:
        return

    LAST_COMMIT_UUID = ser.group()
    if REG_COMMIT_WAKE.search(line):
        DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID] = { 
   
            'stat' : 'R',
            'wake' : parse_timestamp(line),
            'done' : UNIXTIME,
        }
    elif ((LAST_COMMIT_UUID in DATAX_JOBDICT_COMMIT) and REG_COMMIT_DONE.search(line)):
        DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID]['stat'] = 'D'
        DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID]['done'] = parse_timestamp(line)
# }}} #

# { 
   { 
   { function result_analyse() #
def result_analyse():
    def compare(a, b):
        return b['cost'] - a['cost']

    tasklist = []
    hostsmap = { 
   }
    statvars = { 
   'sum' : 0, 'cnt' : 0, 'svr' : 0, 'max' : 0, 'min' : int(time.time())}
    tasklist_commit = []
    statvars_commit = { 
   'sum' : 0, 'cnt' : 0}

    for idx in DATAX_JOBDICT:
        item = DATAX_JOBDICT[idx]
        item['uuid'] = idx;
        item['cost'] = item['done'] - item['wake']
        tasklist.append(item);

        if (not (item['host'] in hostsmap)):
            hostsmap[item['host']] = 1
            statvars['svr'] += 1

        if (item['cost'] > -1 and item['cost'] < 864000):
            statvars['sum'] += item['cost']
            statvars['cnt'] += 1
            statvars['max'] = max(statvars['max'], item['done'])
            statvars['min'] = min(statvars['min'], item['wake'])

    for idx in DATAX_JOBDICT_COMMIT:
        itemc = DATAX_JOBDICT_COMMIT[idx]
        itemc['uuid'] = idx
        itemc['cost'] = itemc['done'] - itemc['wake']
        tasklist_commit.append(itemc)

        if (itemc['cost'] > -1 and itemc['cost'] < 864000):
            statvars_commit['sum'] += itemc['cost']
            statvars_commit['cnt'] += 1

    ttl = (statvars['max'] - statvars['min']) or 1
    idx = float(statvars['cnt']) / (statvars['sum'] or ttl)

    tasklist.sort(compare)
    for item in tasklist:
        print('%s\t%s.%s\t%s\t%s\t% 4d\t% 2.1f%%\t% .2f' %(item['stat'], item['host'], item['path'],
                                                           time.strftime('%H:%M:%S', time.localtime(item['wake'])),
                                                           (('D' == item['stat']) and time.strftime('%H:%M:%S', time.localtime(item['done']))) or '--',
                                                           item['cost'], 100 * item['cost'] / ttl, idx * item['cost']))

    if (not len(tasklist) or not statvars['cnt']):
        return

    print('\n--- DataX Profiling Statistics ---')
    print('%d task(s) on %d server(s), Total elapsed %d second(s), %.2f second(s) per task in average' %(statvars['cnt'],
                                                                                                         statvars['svr'], statvars['sum'], float(statvars['sum']) / statvars['cnt']))
    print('Actually cost %d second(s) (%s - %s), task concurrency: %.2f, tilt index: %.2f' %(ttl,
                                                                                             time.strftime('%H:%M:%S', time.localtime(statvars['min'])),
                                                                                             time.strftime('%H:%M:%S', time.localtime(statvars['max'])),
                                                                                             float(statvars['sum']) / ttl, idx * tasklist[0]['cost']))

    idx_commit = float(statvars_commit['cnt']) / (statvars_commit['sum'] or ttl)
    tasklist_commit.sort(compare)
    print('%d task(s) done odps comit, Total elapsed %d second(s), %.2f second(s) per task in average, tilt index: %.2f' % (
        statvars_commit['cnt'],
        statvars_commit['sum'], float(statvars_commit['sum']) / statvars_commit['cnt'],
        idx_commit * tasklist_commit[0]['cost']))

# }}} #

if (len(sys.argv) < 2):
    print("Usage: %s filename" %(sys.argv[0]))
    quit(1)
else:
    parse_task(sys.argv[1])
    result_analyse()

===perftrace.py文件===
#!/usr/bin/env python
# -*- coding:utf-8 -*-


""" Life's short, Python more. """

import re
import os
import sys
import json
import uuid
import signal
import time
import subprocess
from optparse import OptionParser
reload(sys)
sys.setdefaultencoding('utf8')

##begin cli & help logic
def getOptionParser():
    usage = getUsage()
    parser = OptionParser(usage = usage)
    #rdbms reader and writer
    parser.add_option('-r', '--reader', action='store', dest='reader', help='trace datasource read performance with specified !json! string')
    parser.add_option('-w', '--writer', action='store', dest='writer', help='trace datasource write performance with specified !json! string')

    parser.add_option('-c', '--channel',  action='store', dest='channel', default='1', help='the number of concurrent sync thread, the default is 1')
    parser.add_option('-f', '--file',   action='store', help='existing datax configuration file, include reader and writer params')
    parser.add_option('-t', '--type',   action='store', default='reader', help='trace which side\'s performance, cooperate with -f --file params, need to be reader or writer')
    parser.add_option('-d', '--delete',   action='store', default='true', help='delete temporary files, the default value is true')
    #parser.add_option('-h', '--help', action='store', default='true', help='print usage information')
    return parser

def getUsage():
    return ''' The following params are available for -r --reader: [these params is for rdbms reader, used to trace rdbms read performance, it's like datax's key] *datasourceType: datasource type, may be mysql|drds|oracle|ads|sqlserver|postgresql|db2 etc... *jdbcUrl: datasource jdbc connection string, mysql as a example: jdbc:mysql://ip:port/database *username: username for datasource *password: password for datasource *table: table name for read data column: column to be read, the default value is ['*'] splitPk: the splitPk column of rdbms table where: limit the scope of the performance data set fetchSize: how many rows to be fetched at each communicate [these params is for stream reader, used to trace rdbms write performance] reader-sliceRecordCount: how man test data to mock(each channel), the default value is 10000 reader-column : stream reader while generate test data(type supports: string|long|date|double|bool|bytes; support constant value and random function),demo: [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}] The following params are available for -w --writer: [these params is for rdbms writer, used to trace rdbms write performance, it's like datax's key] datasourceType: datasource type, may be mysql|drds|oracle|ads|sqlserver|postgresql|db2|ads etc... *jdbcUrl: datasource jdbc connection string, mysql as a example: jdbc:mysql://ip:port/database *username: username for datasource *password: password for datasource *table: table name for write data column: column to be writed, the default value is ['*'] batchSize: how many rows to be storeed at each communicate, the default value is 512 preSql: prepare sql to be executed before write data, the default value is '' postSql: post sql to be executed end of write data, the default value is '' url: required for ads, pattern is ip:port schme: required for ads, ads database name [these params is for stream writer, used to trace rdbms read performance] writer-print: true means print data read from source datasource, the default value is false The following params are available global control: -c --channel: the number of concurrent tasks, the default value is 1 -f --file: existing completely dataX configuration file path -t --type: test read or write performance for a datasource, couble be reader or writer, in collaboration with -f --file -h --help: print help message some demo: perftrace.py --channel=10 --reader='{ 
   "jdbcUrl":"jdbc:mysql://127.0.0.1:3306/database", "username":"", "password":"", "table": "", "where":"", "splitPk":"", "writer-print":"false"}' perftrace.py --channel=10 --writer='{ 
   "jdbcUrl":"jdbc:mysql://127.0.0.1:3306/database", "username":"", "password":"", "table": "", "reader-sliceRecordCount": "10000", "reader-column": [{ 
   "type":"string","value":"abc"},{ 
   "type":"string","random":"10,20"}]}' perftrace.py --file=/tmp/datax.job.json --type=reader --reader='{ 
   "writer-print": "false"}' perftrace.py --file=/tmp/datax.job.json --type=writer --writer='{ 
   "reader-sliceRecordCount": "10000", "reader-column": [{ 
   "type":"string","value":"abc"},{ 
   "type":"string","random":"10,20"}]}' some example jdbc url pattern, may help: jdbc:oracle:thin:@ip:port:database jdbc:mysql://ip:port/database jdbc:sqlserver://ip:port;DatabaseName=database jdbc:postgresql://ip:port/database warn: ads url pattern is ip:port warn: test write performance will write data into your table, you can use a temporary table just for test. '''

def printCopyright():
    DATAX_VERSION = 'UNKNOWN_DATAX_VERSION'
    print(''' DataX Util Tools (%s), From Alibaba ! Copyright (C) 2010-2016, Alibaba Group. All Rights Reserved.''' % DATAX_VERSION)
    sys.stdout.flush()


def yesNoChoice():
    yes = set(['yes','y', 'ye', ''])
    no = set(['no','n'])
    choice = raw_input().lower()
    if choice in yes:
        return True
    elif choice in no:
        return False
    else:
        sys.stdout.write("Please respond with 'yes' or 'no'")
##end cli & help logic


##begin process logic
def suicide(signum, e):
    global childProcess
    print >> sys.stderr, "[Error] Receive unexpected signal %d, starts to suicide." % (signum)
    if childProcess:
        childProcess.send_signal(signal.SIGQUIT)
        time.sleep(1)
        childProcess.kill()
    print >> sys.stderr, "DataX Process was killed ! you did ?"
    sys.exit(-1)


def registerSignal():
    global childProcess
    signal.signal(2, suicide)
    signal.signal(3, suicide)
    signal.signal(15, suicide)


def fork(command, isShell=False):
    global childProcess
    childProcess = subprocess.Popen(command, shell = isShell)
    registerSignal()
    (stdout, stderr) = childProcess.communicate()
    #阻塞直到子进程结束
    childProcess.wait()
    return childProcess.returncode
##end process logic


##begin datax json generate logic
#warn: if not '': -> true; if not None: -> true
def notNone(obj, context):
    if not obj:
        raise Exception("Configuration property [%s] could not be blank!" % (context))

def attributeNotNone(obj, attributes):
    for key in attributes:
        notNone(obj.get(key), key)

def isBlank(value):
    if value is None or len(value.strip()) == 0:
        return True
    return False

def parsePluginName(jdbcUrl, pluginType):
    import re
    #warn: drds
    name = 'pluginName'
    mysqlRegex = re.compile('jdbc:(mysql)://.*')
    if (mysqlRegex.match(jdbcUrl)):
        name = 'mysql'
    postgresqlRegex = re.compile('jdbc:(postgresql)://.*')
    if (postgresqlRegex.match(jdbcUrl)):
        name = 'postgresql'
    oracleRegex = re.compile('jdbc:(oracle):.*')
    if (oracleRegex.match(jdbcUrl)):
        name = 'oracle'
    sqlserverRegex = re.compile('jdbc:(sqlserver)://.*')
    if (sqlserverRegex.match(jdbcUrl)):
        name = 'sqlserver'
    db2Regex = re.compile('jdbc:(db2)://.*')
    if (db2Regex.match(jdbcUrl)):
        name = 'db2'
    return "%s%s" % (name, pluginType)

def renderDataXJson(paramsDict, readerOrWriter = 'reader', channel = 1):
    dataxTemplate = { 
   
        "job": { 
   
            "setting": { 
   
                "speed": { 
   
                    "channel": 1
                }
            },
            "content": [
                { 
   
                    "reader": { 
   
                        "name": "",
                        "parameter": { 
   
                            "username": "",
                            "password": "",
                            "sliceRecordCount": "10000",
                            "column": [
                                "*"
                            ],
                            "connection": [
                                { 
   
                                    "table": [],
                                    "jdbcUrl": []
                                }
                            ]
                        }
                    },
                    "writer": { 
   
                        "name": "",
                        "parameter": { 
   
                            "print": "false",
                            "connection": [
                                { 
   
                                    "table": [],
                                    "jdbcUrl": ''
                                }
                            ]
                        }
                    }
                }
            ]
        }
    }
    dataxTemplate['job']['setting']['speed']['channel'] = channel
    dataxTemplateContent = dataxTemplate['job']['content'][0]

    pluginName = ''
    if paramsDict.get('datasourceType'):
        pluginName = '%s%s' % (paramsDict['datasourceType'], readerOrWriter)
    elif paramsDict.get('jdbcUrl'):
        pluginName = parsePluginName(paramsDict['jdbcUrl'], readerOrWriter)
    elif paramsDict.get('url'):
        pluginName = 'adswriter'

    theOtherSide = 'writer' if readerOrWriter == 'reader' else 'reader'
    dataxPluginParamsContent = dataxTemplateContent.get(readerOrWriter).get('parameter')
    dataxPluginParamsContent.update(paramsDict)

    dataxPluginParamsContentOtherSide = dataxTemplateContent.get(theOtherSide).get('parameter')

    if readerOrWriter == 'reader':
        dataxTemplateContent.get('reader')['name'] = pluginName
        dataxTemplateContent.get('writer')['name'] = 'streamwriter'
        if paramsDict.get('writer-print'):
            dataxPluginParamsContentOtherSide['print'] = paramsDict['writer-print']
            del dataxPluginParamsContent['writer-print']
        del dataxPluginParamsContentOtherSide['connection']
    if readerOrWriter == 'writer':
        dataxTemplateContent.get('reader')['name'] = 'streamreader'
        dataxTemplateContent.get('writer')['name'] = pluginName
        if paramsDict.get('reader-column'):
            dataxPluginParamsContentOtherSide['column'] = paramsDict['reader-column']
            del dataxPluginParamsContent['reader-column']
        if paramsDict.get('reader-sliceRecordCount'):
            dataxPluginParamsContentOtherSide['sliceRecordCount'] = paramsDict['reader-sliceRecordCount']
            del dataxPluginParamsContent['reader-sliceRecordCount']
        del dataxPluginParamsContentOtherSide['connection']

    if paramsDict.get('jdbcUrl'):
        if readerOrWriter == 'reader':
            dataxPluginParamsContent['connection'][0]['jdbcUrl'].append(paramsDict['jdbcUrl'])
        else:
            dataxPluginParamsContent['connection'][0]['jdbcUrl'] = paramsDict['jdbcUrl']
    if paramsDict.get('table'):
        dataxPluginParamsContent['connection'][0]['table'].append(paramsDict['table'])


    traceJobJson = json.dumps(dataxTemplate, indent = 4)
    return traceJobJson

def isUrl(path):
    if not path:
        return False
    if not isinstance(path, str):
        raise Exception('Configuration file path required for the string, you configure is:%s' % path)
    m = re.match(r"^http[s]?://\S+\w*", path.lower())
    if m:
        return True
    else:
        return False


def readJobJsonFromLocal(jobConfigPath):
    jobConfigContent = None
    jobConfigPath = os.path.abspath(jobConfigPath)
    file = open(jobConfigPath)
    try:
        jobConfigContent = file.read()
    finally:
        file.close()
    if not jobConfigContent:
        raise Exception("Your job configuration file read the result is empty, please check the configuration is legal, path: [%s]\nconfiguration:\n%s" % (jobConfigPath, str(jobConfigContent)))
    return jobConfigContent


def readJobJsonFromRemote(jobConfigPath):
    import urllib
    conn = urllib.urlopen(jobConfigPath)
    jobJson = conn.read()
    return jobJson

def parseJson(strConfig, context):
    try:
        return json.loads(strConfig)
    except Exception as e:
        import traceback
        traceback.print_exc()
        sys.stdout.flush()
        print >> sys.stderr, '%s %s need in line with json syntax' % (context, strConfig)
        sys.exit(-1)

def convert(options, args):
    traceJobJson = ''
    if options.file:
        if isUrl(options.file):
            traceJobJson = readJobJsonFromRemote(options.file)
        else:
            traceJobJson = readJobJsonFromLocal(options.file)
        traceJobDict = parseJson(traceJobJson, '%s content' % options.file)
        attributeNotNone(traceJobDict, ['job'])
        attributeNotNone(traceJobDict['job'], ['content'])
        attributeNotNone(traceJobDict['job']['content'][0], ['reader', 'writer'])
        attributeNotNone(traceJobDict['job']['content'][0]['reader'], ['name', 'parameter'])
        attributeNotNone(traceJobDict['job']['content'][0]['writer'], ['name', 'parameter'])
        if options.type == 'reader':
            traceJobDict['job']['content'][0]['writer']['name'] = 'streamwriter'
            if options.reader:
                traceReaderDict = parseJson(options.reader, 'reader config')
                if traceReaderDict.get('writer-print') is not None:
                    traceJobDict['job']['content'][0]['writer']['parameter']['print'] = traceReaderDict.get('writer-print')
                else:
                    traceJobDict['job']['content'][0]['writer']['parameter']['print'] = 'false'
            else:
                traceJobDict['job']['content'][0]['writer']['parameter']['print'] = 'false'
        elif options.type == 'writer':
            traceJobDict['job']['content'][0]['reader']['name'] = 'streamreader'
            if options.writer:
                traceWriterDict = parseJson(options.writer, 'writer config')
                if traceWriterDict.get('reader-column'):
                    traceJobDict['job']['content'][0]['reader']['parameter']['column'] = traceWriterDict['reader-column']
                if traceWriterDict.get('reader-sliceRecordCount'):
                    traceJobDict['job']['content'][0]['reader']['parameter']['sliceRecordCount'] = traceWriterDict['reader-sliceRecordCount']
            else:
                columnSize = len(traceJobDict['job']['content'][0]['writer']['parameter']['column'])
                streamReaderColumn = []
                for i in range(columnSize):
                    streamReaderColumn.append({ 
   "type": "long", "random": "2,10"})
                traceJobDict['job']['content'][0]['reader']['parameter']['column'] = streamReaderColumn
                traceJobDict['job']['content'][0]['reader']['parameter']['sliceRecordCount'] = 10000
        else:
            pass#do nothing
        return json.dumps(traceJobDict, indent = 4)
    elif options.reader:
        traceReaderDict = parseJson(options.reader, 'reader config')
        return renderDataXJson(traceReaderDict, 'reader', options.channel)
    elif options.writer:
        traceWriterDict = parseJson(options.writer, 'writer config')
        return renderDataXJson(traceWriterDict, 'writer', options.channel)
    else:
        print(getUsage())
        sys.exit(-1)
    #dataxParams = {}
    #for opt, value in options.__dict__.items():
    # dataxParams[opt] = value
##end datax json generate logic


if __name__ == "__main__":
    printCopyright()
    parser = getOptionParser()

    options, args = parser.parse_args(sys.argv[1:])
    #print options, args
    dataxTraceJobJson = convert(options, args)

    #由MAC地址、当前时间戳、随机数生成,可以保证全球范围内的唯一性
    dataxJobPath = os.path.join(os.getcwd(), "perftrace-" + str(uuid.uuid1()))
    jobConfigOk = True
    if os.path.exists(dataxJobPath):
        print("file already exists, truncate and rewrite it? %s" % dataxJobPath)
        if yesNoChoice():
            jobConfigOk = True
        else:
            print("exit failed, because of file conflict")
            sys.exit(-1)
    fileWriter = open(dataxJobPath, 'w')
    fileWriter.write(dataxTraceJobJson)
    fileWriter.close()


    print("trace environments:")
    print("dataxJobPath: %s" % dataxJobPath)
    dataxHomePath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
    print("dataxHomePath: %s" % dataxHomePath)

    dataxCommand = "%s %s" % (os.path.join(dataxHomePath, "bin", "datax.py"), dataxJobPath)
    print("dataxCommand: %s" % dataxCommand)

    returncode = fork(dataxCommand, True)
    if options.delete == 'true':
        os.remove(dataxJobPath)
    sys.exit(returncode)

2 改造package.xml文件

文件位置: xx\DataX\core\src\main\assembly\package.xml 。 该文件主要功能是‘拷贝’源代码中的脚本到target,实现打包的功能。package.xml 文件被 xx\DataX\core\pom.xml引用;

===package.xml文件===
<assembly
        xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
    <id></id>
    <formats>
        <format>dir</format>
    </formats>
    <includeBaseDirectory>false</includeBaseDirectory>
    <fileSets>
        <!-- for bin -->
<!--        <fileSet>-->
<!--            <directory>src/main/bin</directory>-->
<!--            <includes>-->
<!--                <include>*.*</include>-->
<!--            </includes>-->
<!--            <excludes>-->
<!--                <exclude>*.pyc</exclude>-->
<!--            </excludes>-->
<!--            <directoryMode>775</directoryMode>-->
<!--            <outputDirectory>/bin</outputDirectory>-->
<!--        </fileSet>-->
    
        <fileSet>
            <directory>src/main/bin</directory>
            <includes>
                <include>*</include>
            </includes>
            <excludes>
                <exclude>*.pyc</exclude>
            </excludes>
            <directoryMode>775</directoryMode>
            <outputDirectory>/bin</outputDirectory>
        </fileSet>
        <fileSet>
            <directory>src/main/bin/py/py2</directory>
            <includes>
                <include>*</include>
            </includes>
            <excludes>
                <exclude>*.pyc</exclude>
            </excludes>
            <directoryMode>775</directoryMode>
            <outputDirectory>/bin/py/py2</outputDirectory>
        </fileSet>
        
        <fileSet>
            <directory>src/main/bin/py/py3</directory>
            <includes>
                <include>*</include>
            </includes>
            <excludes>
                <exclude>*.pyc</exclude>
            </excludes>
            <directoryMode>775</directoryMode>
            <outputDirectory>/bin/py/py3</outputDirectory>
        </fileSet>
        <!-- for scripts -->
        <fileSet>
            <directory>src/main/script</directory>
            <includes>
                <include>*.*</include>
            </includes>
            <directoryMode>775</directoryMode>
            <outputDirectory>/script</outputDirectory>
        </fileSet>
        <!-- for configs -->
        <fileSet>
            <directory>src/main/conf</directory>
            <includes>
                <include>*.*</include>
            </includes>
            <outputDirectory>/conf</outputDirectory>
        </fileSet>
        <!-- for engine -->
        <fileSet>
            <directory>target/</directory>
            <includes>
                <include>datax-core-0.0.1-SNAPSHOT.jar</include>
            </includes>
            <outputDirectory>/lib</outputDirectory>
        </fileSet>

        <fileSet>
            <directory>src/main/job/</directory>
            <includes>
                <include>*.json</include>
            </includes>
            <outputDirectory>/job</outputDirectory>
        </fileSet>

        <fileSet>
            <directory>src/main/tools/</directory>
            <includes>
                <include>*.*</include>
            </includes>
            <outputDirectory>/tools</outputDirectory>
        </fileSet>

        <fileSet>
            <fileMode>777</fileMode>
            <directory>src/main/tmp</directory>
            <includes>
                <include>*.*</include>
            </includes>
            <outputDirectory>/tmp</outputDirectory>
        </fileSet>
    </fileSets>

    <dependencySets>
        <dependencySet>
            <useProjectArtifact>false</useProjectArtifact>
            <outputDirectory>/lib</outputDirectory>
            <scope>runtime</scope>
        </dependencySet>
    </dependencySets>
</assembly>

3 新增datax.py文件

此datax.py非彼datax.py,此文件主要用来检查用户的py环境,从而调用不同的py脚本;

===datax.py 如果有更好的写法,欢迎补充===
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 根据机器上py版本,自动选择调用py2还是py3

import sys
import os
currentFilePath = os.path.dirname(os.path.abspath(__file__))
print("currentFilePath===" + currentFilePath)

argvs=sys.argv[1:]
params = " ".join(argvs)
print("params===" + params)

if sys.version > '3':
    pyPath = "py3"
else:
    pyPath = "py2"

execPy = "python " + currentFilePath +os.sep +"py" +os.sep + pyPath + os.sep + "datax.py " + params
print("execPy===" + execPy)
os.system(execPy)

4 各类文件归位

如下图在core模块创建目录,将对应文件放入。(忽略datax.bat,该文件是想在win下无py环境直接调用datax,正在编写中)
在这里插入图片描述
项目打包编译后结果如下

在这里插入图片描述

最后,开心的去奔跑玩耍吧,管你py2还是3,just running~~~


注:

  1. 对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;

  2. 所有代码都已经上传到github(master分支和dev),可以免费白嫖

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/145514.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • scratch编程一款节奏小游戏「建议收藏」

    scratch编程一款节奏小游戏「建议收藏」今天小恐龙来教大家做一款好玩的小游戏:这个游戏本恐龙没有加音乐,因为箭头落下的速度会根据我们的命中率来改变,按中的越多速度就越快,漏的越多速度就越慢,有上下限!首先画出轨道,粗细随意:轨道直接设置成移到(0,0)就可以了,没有别的程序然后是箭头:变量分值和速度是隐藏起来的,我们在玩的时候是看不到的,速度下限是5,上限是10,不然太快了反应不过来!箭头克隆体程序:最后是判定线的程序:如果是觉得没有音乐太单调的话,可以通过创建一个链表来储存音符。再新建一个变量,每按中一个箭头就将

    2022年6月16日
    54
  • 为什么要进行分销?

    为什么要进行分销?

    2021年6月17日
    95
  • element-ui中表格获取当前行的索引index[通俗易懂]

    element-ui中表格获取当前行的索引index[通俗易懂]前言弄文件上传时,需要对上传列表的文件进行一定的操作,例如暂停/取消等等,因为我是使用element-ui中表格展示上传文件列表的,这时的操作却需要使用到当前行的索引下,如何获取索引就是我接下来要做的工作了:获取当前行的索引index使用scope.$index,scope.row即可实现获取索引<el-table-columnlabel=”排序”min-width=”100″><templateslot-scope=”scope”>{{sco

    2025年8月29日
    5
  • matlabGUI入门

    matlabGUI入门1基础知识1.1函数1.2数据类型1.3绘图1.4其它2GUIDE2.1创建GUI界面2.2模板选择2.3控件2.4对象浏览器2.5回调函数2.6属性检查器2.7数据传输由窗口、菜单、图标、光标、按键、对话框和文本等各种图形对象组成的用户界面叫作图形用户界面(GUI)。它可以允许用户定制与MATLAB的交互方式,从而命令窗口不再是唯一与MATLAB的交互方式。用户通过鼠标或键盘选择、激活这些图形对象,使计算机产生某种动作或变化。

    2022年6月5日
    58
  • SpringBoot跨域的几种解决方案

    SpringBoot跨域的几种解决方案SpringBoot跨域请求处理方式方法一、SpringBoot的注解@CrossOrigin(也支持SpringMVC)简单粗暴的方式,Controller层在需要跨域的类或者方法上加上该注解即可@RestController@CrossOrigin@RequestMapping(“/situation”)publicclassSituationControllerextendsPublicUtilController{@AutowiredprivateSit

    2022年6月16日
    30
  • 业务逻辑漏洞总结[通俗易懂]

    业务逻辑漏洞总结[通俗易懂]逻辑漏洞简介逻辑漏洞就是指攻击者利用业务/功能上的设计缺陷,获取敏感信息或破坏业务的完整性。一般出现在密码修改、越权访问、密码找回、交易支付金额等功能处。逻辑漏洞的破坏方式并非是向程序添加破坏内容,而是利用逻辑处理不严密或代码问题或固有不足。操作上并不影响程序运行,在逻辑上是顺利执行的。这种漏洞一般的防护手段或设备无法阻止,因为走的都是合法流量。也没有防护标准。逻辑漏洞分类越权漏洞密码修改密码找回验证码漏洞支付漏洞短信轰炸投票/积分/抽奖逻辑漏洞重要性常见的OWASP

    2022年5月24日
    43

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号