conductor 系统任务

conductor 系统任务动态任务 参数 dynamicTaskN 来自任务输入的参数的名称 其值用于调度任务 例如如果参数的值为 ABC 则调度的下一个任务类型为 ABC Example name user task taskReferenc t1 inputParamet files work

动态任务

参数:

dynamicTaskNameParam:来自任务输入的参数的名称,其值用于调度任务。 例如 如果参数的值为ABC,则调度的下一个任务类型为“ABC”。

Example

{ "name": "user_task", "taskReferenceName": "t1", "inputParameters": { "files": "${workflow.input.files}", "taskToExecute": "${workflow.input.user_supplied_task}" }, "type": "DYNAMIC", "dynamicTaskNameParam": "taskToExecute" } 
如果以user_supplied_task的值(假设值为user_task_2)来启动工作流,那么Conductor将在调度此动态任务时调度user_task_2。

决策任务:

决策任务与编程语言中的case ... switch语句相似。 该任务需要3个参数:
caseValueParam: 其值将用作开关的任务输入中的参数名称。
decisionCases: 映射其中key是caseValueParam的可能值,值是要执行的任务列表。
defaultCase:在决策案例中找不到匹配值时执行的任务列表(默认条件)







Example

{ "name": "decide_task", "taskReferenceName": "decide1", "inputParameters": { "case_value_param": "${workflow.input.movieType}" }, "type": "DECISION", "caseValueParam": "case_value_param", "decisionCases": { "Show": [ { "name": "setup_episodes", "taskReferenceName": "se1", "inputParameters": { "movieId": "${workflow.input.movieId}" }, "type": "SIMPLE" }, { "name": "generate_episode_artwork", "taskReferenceName": "ga", "inputParameters": { "movieId": "${workflow.input.movieId}" }, "type": "SIMPLE" } ], "Movie": [ { "name": "setup_movie", "taskReferenceName": "sm", "inputParameters": { "movieId": "${workflow.input.movieId}" }, "type": "SIMPLE" }, { "name": "generate_movie_artwork", "taskReferenceName": "gma", "inputParameters": { "movieId": "${workflow.input.movieId}" }, "type": "SIMPLE" } ] } }

Fork任务:

fork用于调度并行的任务。
参数:
forkTasks:任务列表的列表。 每个子列表被安排并行执行。 然而,子列表中的任务是以串行方式安排的。





Example

{ "forkTasks": [ [ { "name": "task11", "taskReferenceName": "t11" }, { "name": "task12", "taskReferenceName": "t12" } ], [ { "name": "task21", "taskReferenceName": "t21" }, { "name": "task22", "taskReferenceName": "t22" } ] ] } 
当执行时,task11和task21被调度为同时执行。


动态fork任务
动态fork与FORK_JOIN任务相同。 除了在运行时使用任务的输入提供要分配的任务的列表。 当fork的任务数量不固定并且根据输入而变化时很有用。
dynamicForkTasksParam:包含并行执行的工作流任务配置列表的参数名称
dynamicForkTasksInputParamName: 参数的名字,此参数的值是一个映射,映射的key是fork任务的别名,value是fork任务的input






Example

{ "dynamicTasks": "${taskA.output.dynamicTasksJSON}", "dynamicTasksInput": "${taskA.output.dynamicTasksInputJSON}", "type": "FORK_JOIN_DYNAMIC", "dynamicForkTasksParam": "dynamicTasks", "dynamicForkTasksInputParamName": "dynamicTasksInput" } 

假设taskA的输出为:


{ "dynamicTasksInputJSON": { "forkedTask1": { "width": 100, "height": 100, "params": { "recipe": "jpg" } }, "forkedTask2": { "width": 200, "height": 200, "params": { "recipe": "jpg" } } }, "dynamicTasksJSON": [ { "name": "encode_task", "taskReferenceName": "forkedTask1", "type": "SIMPLE" }, { "name": "encode_task", "taskReferenceName": "forkedTask2", "type": "SIMPLE" } ] }
执行时,动态fork任务将调度两个并行任务,类型为“encode_task”,引用名称为“forkedTask1”和“forkedTask2”,由_ dynamicTasksInputJSON_指定



join任务:

join任务用于等待fork任务产生的一个或多个任务的完成。
参数:
joinOn:任务参考名称列表,其中JOIN将等待完成。




Example

{ "joinOn": ["taskRef1", "taskRef3"] } 

Join Task Output

fork任务的输出将是一个JSON对象,其中key是任务别名,值是fork任务的输出。

 

子工作流任务:

Sub Workflow任务允许在另一个工作流中嵌套工作流。

参数:

subWorkflowParam:任务参考名称列表,其中JOIN将等待完成。

Example

{ "name": "sub_workflow_task", "taskReferenceName": "sub1", "inputParameters": { "requestId": "${workflow.input.requestId}", "file": "${encode.output.location}" }, "type": "SUB_WORKFLOW", "subWorkflowParam": { "name": "deployment_workflow", "version": 1 } } 
执行时,将使用两个输入参数requestId和file执行deployment_workflow。 完成生成的工作流程后,任务将被标记为已完成。 如果子工作流程终止或失败,则将任务标记为失败,并重新配置。


等待任务:

等待任务被实现为保持IN_PROGRESS状态的门,除非由外部触发器标记为COMPLETED或FAILED。 要使用等待任务,请将任务类型设置为WAIT

参数:无
外部触发等待任务
任务资源端点可用于将任务的状态更新为终止状态。
Contrib模块提供SQS集成,其中外部系统可以将消息放置在服务器侦听的预配置队列中。 消息到达时,它们被标记为COMPLETED或FAILED。

SQS队列
可以使用以下API检索服务器用于更新任务状态的SQS队列:













GET /queue

当更新任务的状态时,消息需要符合以下规范:

     消息必须是有效的JSON字符串。
     消息JSON应包含一个名为externalId的密钥,其值为包含以下密钥的JSONized字符串:
         workflowId:工作流的ID
         taskRefName:应该更新的任务引用名称。
     每个队列代表一个特定的任务状态,相应地标记任务。 例如 消息进入COMPLETED队列将任务状态标记为COMPLETED。
     使用消息更新任务的输出。








示例SQS有效负载:






{ "some_key": "valuex", "externalId": "{ 
     \"taskRefName\":\"TASK_REFERENCE_NAME\",\"workflowId\":\"WORKFLOW_ID\"}" }


HTTP任务:


HTTP任务用于通过HTTP调用另一个微服务器。
该任务期望一个名为http_request的输入参数作为任务输入的一部分,具有以下详细信息:
uri:服务的URI。 使用vipAddress时可以是部分或包含服务器地址。
method: HTTP method. One of the GET, PUT, POST, DELETE, OPTIONS, HEAD
accept: Accept header as required by server.
contentType:Content Type - 支持的类型是 text/plain, text/html and, application/json
headers:要与请求一起发送的其他http标头的map。
body:Request body
vipAddress:使用基于发现的服务URL时。

HTTP 任务输出:
response: 包含response的JSON主体(如果存在)
headers: Response Headers
statusCode: Integer status code















Example

Task Input payload using vipAddress

{ "http_request": { "vipAddress": "examplevip-prod", "uri": "/", "method": "GET", "accept": "text/plain" } } 

Task Input using an absolute URL

{ "http_request": { "uri": "http://example.com/", "method": "GET", "accept": "text/plain" } } 
如果请求无法完成或远程服务器返回不成功的状态代码,任务将被标记为FAILED。


Event任务:

事件任务提供将事件(消息)发布到conductor或外部事件系统(如SQS)的能力。 事件任务对于为工作流和任务创建基于事件的依赖非常有用。
参数:
sink:生产的事件的合格名称。 例如 conductor或sqs:sqs_queue_name







Example

{ "sink": 'sqs:example_sqs_queue_name' } 
当使用conductor作为接收器生成事件时,事件名称遵循以下结构:conductor:<workflow_name>:<task_reference_name>
对于SQS,使用队列的名称,而不是URI。 conductor根据名称查找URI。

Supported Sinks

  • Conductor
  • SQS

事件任务输入:

给予事件任务的输入作为有效载荷提供给发布的消息。 例如 如果将消息放入SQS队列(sink is sqs)中,则消息有效内容将作为任务的输入。

时间任务输出:

event_produced生成的事件的名称。


 

 

 


转载于:https://www.cnblogs.com/mhc-fly/p/7010549.html

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/233748.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 深度优先遍历和广度优先遍历[通俗易懂]

    深度优先遍历和广度优先遍历[通俗易懂]深度优先遍历和广度优先遍历什么是深度/广度优先遍历?深度优先遍历简称DFS(DepthFirstSearch),广度优先遍历简称BFS(BreadthFirstSearch),它们是遍历图当中所有顶点的两种方式。这两种遍历方式有什么不同呢?我们来举个栗子:我们来到一个游乐场,游乐场里有11个景点。我们从景点0开始,要玩遍游乐场的所有景点,可以有什么…

    2022年6月13日
    42
  • ttyLinux安装完整指南「建议收藏」

    ttyLinux安装完整指南「建议收藏」本文地址:http://blog.csdn.net/useway《Java程序员,上班那点事儿》ttyLinux号称是最小的Linux系统,这两天找到了一个很好用虚拟机VirtualBox,这个虚拟机是SUN开发的一个免费开源的虚拟机,原来一直用vmware,不过发现SUN的这个免费的虚拟机更好用,主要是比较轻量级的,感觉挺好。正好想试试ttyLinux到底有多小,就用这个虚拟机试试…

    2022年8月12日
    3
  • ORACLE触发器具体解释

    ORACLE触发器具体解释

    2021年12月4日
    38
  • AVX512与AVX2比较「建议收藏」

    AVX512与AVX2比较「建议收藏」采用,SHA256(SHA256哈希计算是有效负载处理管道的重要部分)优点:1、寄存器变化(与AVX2相比,不仅寄存器的宽度从256位增加到512位,而且寄存器的数量也增加了一倍,达到32)2、比AVX2提供高达8倍的性能提升,由于并行处理了16条消息如何最好地利用为了获得AVX512实现的最佳性能,这里有一些提示:有很多例行程序并行进行SHA256计算。 尝试使用…

    2022年5月30日
    69
  • MSDP,Anycast — overview

    MSDP,Anycast — overviewMSDP,Anycast–overviewIPmulticastisdeployedasanintegralcomponentinmission-criticalnetworkedapplicationsthroughouttheworld.Theseapplicationsmustberobust,hardened,andsc

    2022年5月15日
    34
  • 细谈 axios和ajax区别

    细谈 axios和ajax区别刚刚接触axios有好多疑惑。它和ajax有什么关系呢和区别呢?接下来一起看下:1.区别axios是通过promise实现对ajax技术的一种封装,就像jQuery实现ajax封装一样。简单来说:ajax技术实现了网页的局部数据刷新,axios实现了对ajax的封装。axios是ajaxajax不止axios。下面列出代码来对比一下:axios:axios({…

    2022年10月22日
    0

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号