datax(4): datax.py解读

datax(4): datax.py解读datax直接使用py文件进行任务提交,今天读一读它1文件位置原始文件位置在xx/DataX/core/src/main/bin/下,datax项目打包后会将文件拷贝到xx/DataX\target\datax\datax\bin下。core模块的pom.xml指定‘拷贝’datax.py文件的方式maven-assembly-plugin<plugin><artifactId>maven-asse.

大家好,又见面了,我是你们的朋友全栈君。

datax 直接使用py文件进行任务提交,今天读一读它


一、文件位置

原始文件位置在 xx/DataX/core/src/main/bin/下,datax项目打包后会将文件拷贝到 xx/DataX\target\datax\datax\bin 下。

 core模块的pom.xml 指定‘拷贝’datax.py文件的方式maven-assembly-plugin
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.alibaba.datax.core.Engine</mainClass>
                        </manifest>
                    </archive>
                    <finalName>datax</finalName>
                    <descriptors>
                        <!--指定装配的配置文件-->
                        <descriptor>src/main/assembly/package.xml</descriptor>
                    </descriptors>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
  
DataX\core\src\main\assembly\package.xml里面是一些打包的细节


二、文件的作用

该py文件主要用来提交datax任务,相当于datax的入口;样例执行datax任务如下

python datax.py { 
   YOUR_JOB.json}

三、文件解读

文件的主入口 if name == “main”:

if __name__ == "__main__":
    # 1 打印版权信息
    printCopyright()
    # 2 获取选项的解析器
    parser = getOptionParser()
    # 3 根据入参,使用解析器解析出参数值
    # 3.1 parse_args方法返回俩参,分别为instance类型的options和list类型的args
    # 3.2 用sys.argv[1:]来获取命令参数,返回一个list类型的返回值
    options, args = parser.parse_args(sys.argv[1:])
    if options.reader is not None and options.writer is not None:
        # 4 如果解析后,入参的 reader和writer不为空,在从github上构建出一个 json的样例模板
        generateJobConfigTemplate(options.reader,options.writer)
        sys.exit(RET_STATE['OK'])
    if len(args) != 1:
        parser.print_help()
        sys.exit(RET_STATE['FAIL'])

    # 5 根据入参 构建执行脚本
    startCommand = buildStartCommand(options, args)
    # print startCommand 该命令可以打印出 用户输入的参数+py文件构建的参数,作为整体形成一个执行脚本。(执行脚本最后调用java类)
    # 打印出来的startCommand 如下:
    # java -server -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=D:\idea-workspace\github\DataX\target\datax\datax/log -Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=D:\idea-workspace\github\DataX\target\datax\datax/log -Dloglevel=info -Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=D:\idea-workspace\github\DataX\target\datax\datax -Dlogback.configurationFile=D:\idea-workspace\github\DataX\target\datax\datax/conf/logback.xml -classpath D:\idea-workspace\github\DataX\target\datax\datax/lib/* -Dlog.file.name=x\datax\job\job_json com.alibaba.datax.core.Engine -mode standalone -jobid -1 -job D:\idea-workspace\github\DataX\target\datax\datax\job\job.json

    # 6 创建并返回一个子进程,并在这个进程中执行指定的shell 脚本
    child_process = subprocess.Popen(startCommand, shell=True)
    # 7 将执行结果保存在信号量中
    register_signal()
    # 8 父子进程进行通信,并将通信结果保存到 stdout, stderr
    (stdout, stderr) = child_process.communicate()

    # 9 退出(根据子进程的状态码)
    sys.exit(child_process.returncode)

文件中的方法和变量,通过变量名或函数名就可以直接猜到含义,本文不在赘述;
在这里插入图片描述
整体的datax.py文件如下

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import sys
import os
import signal
import subprocess
import time
import re
import socket
import json
from optparse import OptionParser
from optparse import OptionGroup
from string import Template
import codecs
import platform

def isWindows():
    return platform.system() == 'Windows'

DATAX_HOME = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

DATAX_VERSION = 'DATAX-OPENSOURCE-3.0'
if isWindows():
    codecs.register(lambda name: name == 'cp65001' and codecs.lookup('utf-8') or None)
    CLASS_PATH = ("%s/lib/*") % (DATAX_HOME)
else:
    CLASS_PATH = ("%s/lib/*:.") % (DATAX_HOME)
LOGBACK_FILE = ("%s/conf/logback.xml") % (DATAX_HOME)
DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
DEFAULT_PROPERTY_CONF = "-Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=%s -Dlogback.configurationFile=%s" % (
    DATAX_HOME, LOGBACK_FILE)
ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (
    DEFAULT_PROPERTY_CONF, CLASS_PATH)
REMOTE_DEBUG_CONFIG = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=9999"

RET_STATE = { 
   
    "KILL": 143,
    "FAIL": -1,
    "OK": 0,
    "RUN": 1,
    "RETRY": 2
}

# 获取本地ip
def getLocalIp():
    try:
        return socket.gethostbyname(socket.getfqdn(socket.gethostname()))
    except:
        return "Unknown"

# 根据信号值,结束本 子进程
def suicide(signum):
    global child_process
    print >> sys.stderr, "[Error] DataX receive unexpected signal %d, starts to suicide." % (signum)

    if child_process:
        child_process.send_signal(signal.SIGQUIT)
        time.sleep(1)
        child_process.kill()
    print >> sys.stderr, "DataX Process was killed ! you did ?"
    sys.exit(RET_STATE["KILL"])

# 
def register_signal():
    if not isWindows():
        global child_process
        signal.signal(2, suicide)
        signal.signal(3, suicide)
        signal.signal(15, suicide)

# 构造解析器
def getOptionParser():
    usage = "usage: %prog [options] job-url-or-path"
    parser = OptionParser(usage=usage)

    prodEnvOptionGroup = OptionGroup(parser, "Product Env Options",
                                     "Normal user use these options to set jvm parameters, job runtime mode etc. "
                                     "Make sure these options can be used in Product Env.")
    prodEnvOptionGroup.add_option("-j", "--jvm", metavar="<jvm parameters>", dest="jvmParameters", action="store",
                                  default=DEFAULT_JVM, help="Set jvm parameters if necessary.")
    prodEnvOptionGroup.add_option("--jobid", metavar="<job unique id>", dest="jobid", action="store", default="-1",
                                  help="Set job unique id when running by Distribute/Local Mode.")
    prodEnvOptionGroup.add_option("-m", "--mode", metavar="<job runtime mode>",
                                  action="store", default="standalone",
                                  help="Set job runtime mode such as: standalone, local, distribute. "
                                       "Default mode is standalone.")
    prodEnvOptionGroup.add_option("-p", "--params", metavar="<parameter used in job config>",
                                  action="store", dest="params",
                                  help='Set job parameter, eg: the source tableName you want to set it by command, '
                                       'then you can use like this: -p"-DtableName=your-table-name", '
                                       'if you have mutiple parameters: -p"-DtableName=your-table-name -DcolumnName=your-column-name".'
                                       'Note: you should config in you job tableName with ${tableName}.')
    prodEnvOptionGroup.add_option("-r", "--reader", metavar="<parameter used in view job config[reader] template>",
                                  action="store", dest="reader",type="string",
                                  help='View job config[reader] template, eg: mysqlreader,streamreader')
    prodEnvOptionGroup.add_option("-w", "--writer", metavar="<parameter used in view job config[writer] template>",
                                  action="store", dest="writer",type="string",
                                  help='View job config[writer] template, eg: mysqlwriter,streamwriter')
    parser.add_option_group(prodEnvOptionGroup)

    devEnvOptionGroup = OptionGroup(parser, "Develop/Debug Options",
                                    "Developer use these options to trace more details of DataX.")
    devEnvOptionGroup.add_option("-d", "--debug", dest="remoteDebug", action="store_true",
                                 help="Set to remote debug mode.")
    devEnvOptionGroup.add_option("--loglevel", metavar="<log level>", dest="loglevel", action="store",
                                 default="info", help="Set log level such as: debug, info, all etc.")
    parser.add_option_group(devEnvOptionGroup)
    return parser

# 根据writer和reader名, 从github拉取对应的模板,最终创建出 json任务的模板
def generateJobConfigTemplate(reader, writer):
    readerRef = "Please refer to the %s document:\n https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n" % (reader,reader,reader)
    writerRef = "Please refer to the %s document:\n https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n " % (writer,writer,writer)
    print readerRef
    print writerRef
    jobGuid = 'Please save the following configuration as a json file and use\n python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json \nto run the job.\n'
    print jobGuid
    jobTemplate={ 
   
      "job": { 
   
        "setting": { 
   
          "speed": { 
   
            "channel": ""
          }
        },
        "content": [
          { 
   
            "reader": { 
   },
            "writer": { 
   }
          }
        ]
      }
    }
    readerTemplatePath = "%s/plugin/reader/%s/plugin_job_template.json" % (DATAX_HOME,reader)
    writerTemplatePath = "%s/plugin/writer/%s/plugin_job_template.json" % (DATAX_HOME,writer)
    try:
      readerPar = readPluginTemplate(readerTemplatePath);
    except Exception, e:
       print "Read reader[%s] template error: can\'t find file %s" % (reader,readerTemplatePath)
    try:
      writerPar = readPluginTemplate(writerTemplatePath);
    except Exception, e:
      print "Read writer[%s] template error: : can\'t find file %s" % (writer,writerTemplatePath)
    jobTemplate['job']['content'][0]['reader'] = readerPar;
    jobTemplate['job']['content'][0]['writer'] = writerPar;
    print json.dumps(jobTemplate, indent=4, sort_keys=True)

# 根据插件名读取插件模板
def readPluginTemplate(plugin):
    with open(plugin, 'r') as f:
            return json.load(f)

# 判断入参是否为一个 url
def isUrl(path):
    if not path:
        return False

    assert (isinstance(path, str))
    m = re.match(r"^http[s]?://\S+\w*", path.lower())
    if m:
        return True
    else:
        return False

# 通过各类 if else 构建启动命令。启动命令中包含2部分 JVM参数+环境变量
def buildStartCommand(options, args):
    commandMap = { 
   }
    tempJVMCommand = DEFAULT_JVM
    if options.jvmParameters:
        tempJVMCommand = tempJVMCommand + " " + options.jvmParameters

    if options.remoteDebug:
        tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIG
        print 'local ip: ', getLocalIp()

    if options.loglevel:
        tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options.loglevel))

    if options.mode:
        commandMap["mode"] = options.mode

    # jobResource 可能是 URL,也可能是本地文件路径(相对,绝对)
    jobResource = args[0]
    if not isUrl(jobResource):
        jobResource = os.path.abspath(jobResource)
        if jobResource.lower().startswith("file://"):
            jobResource = jobResource[len("file://"):]

    jobParams = ("-Dlog.file.name=%s") % (jobResource[-20:].replace('/', '_').replace('.', '_'))
    if options.params:
        jobParams = jobParams + " " + options.params

    if options.jobid:
        commandMap["jobid"] = options.jobid

    commandMap["jvm"] = tempJVMCommand
    commandMap["params"] = jobParams
    commandMap["job"] = jobResource

    return Template(ENGINE_COMMAND).substitute(**commandMap)

# 打印版权信息
def printCopyright():
    print ''' DataX (%s), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved. ''' % DATAX_VERSION
    sys.stdout.flush()

# 程序主入口
if __name__ == "__main__":
    # 1 打印版权信息
    printCopyright()
    # 2 获取选项的解析器
    parser = getOptionParser()
    # 3 根据入参,使用解析器解析出参数值
    # 3.1 parse_args方法返回俩参,分别为instance类型的options和list类型的args
    # 3.2 用sys.argv[1:]来获取命令参数,返回一个list类型的返回值
    options, args = parser.parse_args(sys.argv[1:])
    if options.reader is not None and options.writer is not None:
        # 4 如果解析后,入参的 reader和writer不为空,在从github上构建出一个 json的样例模板
        generateJobConfigTemplate(options.reader,options.writer)
        sys.exit(RET_STATE['OK'])
    if len(args) != 1:
        parser.print_help()
        sys.exit(RET_STATE['FAIL'])

    # 5 根据入参 构建执行脚本
    startCommand = buildStartCommand(options, args)
    # print startCommand 该命令可以打印出 用户输入的参数+py文件构建的参数,作为整体形成一个执行脚本。(执行脚本最后调用java类)

    # 6 创建并返回一个子进程,并在这个进程中执行指定的shell 脚本
    child_process = subprocess.Popen(startCommand, shell=True)
    # 7 将执行结果保存在信号量中
    register_signal()
    # 8 父子进程进行通信,并将通信结果保存到 stdout, stderr
    (stdout, stderr) = child_process.communicate()

    # 9 退出(根据子进程的状态码)
    sys.exit(child_process.returncode)


注:

  1. 对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;

  2. 所有代码都已经上传到github(master分支和dev),可以免费白嫖

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/145434.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 链表排序总结(全)(C++)[通俗易懂]

    链表排序总结(全)(C++)[通俗易懂]文章目录链表排序与数组排序的区别借助外部空间冒泡排序插入排序归并排序快速排序链表排序与数组排序的区别数组的排序几乎所有人都很熟悉了,常用的算法插入、冒泡、归并以及快排等都会或多或少依赖于数组可以在O(1)时间随机访问的特点。链表排序一般指单链表排序,链表是不支持随机访问的,需要访问后面的节点只能从表头顺序遍历,所以链表的排序是一个相对比较复杂的问题。那么怎样进行链表排序呢?借助外部空间既然数组排序简单,那可以借助数组进行排序:把链表中的值一次遍历导入数组(时间复杂度O(n))对数组进行排序

    2022年10月11日
    4
  • TOF相机总结[通俗易懂]

    TOF相机总结[通俗易懂]转载也要顶! 关于tof相机很好的总结~  2013-05-1113:22:30|  分类:默认分类|  标签:|字号大中小 订阅1.1TOF初探   TOF是Timeofflight的简写,直译为飞行时间的意思。所谓飞行时间法3D成像,是通过给目标连续发送光脉冲,然后用传感

    2022年5月26日
    31
  • ram和rom的区别_RAM和ROM各有什么特点

    ram和rom的区别_RAM和ROM各有什么特点RAM和ROM总结一、在解释之前先备注一些缩写的全称便于记忆:1、EPROM:(ElectricallyProgrammableRead-Only-Memory)电可编程序只读存储器2、EE

    2022年8月1日
    7
  • opencv 视频实时处理_opencv 控制摄像头

    opencv 视频实时处理_opencv 控制摄像头最近研究了通过OpenCV采集摄像头数据,并同时将视频流数据推送到RTSP和RTMP。RTSP服务采用开源的LIVE555(需要自己修改和实现部分代码)。RTMP服务采用开源CRtmpServer。

    2022年10月21日
    6
  • asp.net mvc实现文件下载「建议收藏」

    asp.net mvc实现文件下载「建议收藏」前段时间一直对如何解决文件下载的问题比较困惑,对文件下载的问题一直都是用的前端的方式解决的,代码如下//下载functiondownload(filePath){window.open(filePath);}但是这个方法有他的缺陷:1.下载的文件后缀必须为iis程序池中存在的文件2.此方法是通过浏览器打开服务器文件,无法直接下载近期看了asp.net下载文件几种方式…

    2022年7月22日
    17
  • 微信定位精灵服务器或网络异常,为什么微信定位精灵定位不了怎么办?

    微信定位精灵服务器或网络异常,为什么微信定位精灵定位不了怎么办?方法如下:1、下载“微信定位精灵”软件,安装;2、按图示设置如下,然后缩小地图,把光标定位在大马或任意地方,点击左上角的圆形定位按键完成定位,点右上角的菜单栏,选择“启动微信”,接下来的正常操作就行了。3、打开手机网络,关掉手机的网络定位,GPS等等。打开精灵,看见地图中间有个十字架,那就是你将要定位的地方,比如你的朋友身边。5.点击左上角的定位按钮。一秒你就穿越了。6、打开右边的启动微信。找身边…

    2022年5月7日
    97

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号