NIFI自定义开发

NIFI自定义开发NIFI 自定义开发 NIFI 自定义开发组件的结果目标 是能够在 NIFI 主页 Processor 里面拖动出来可以使用自己设置的属性 以及自己操作的逻辑处理 NIFI 自定义组件的开发 其实就是继承 AbstractProc 类 实现其方法即可 下面直接放上一个例子进行说明 以及各对象的使用用途说明 自定义开发组件步骤 1 mavenPOM 文件中引入 nifi 对应包 2 resources 目录下新建文件夹 META INF services3 META INF services 下新建一个 Fileo

NIFI自定义开发

自定义开发组件步骤

目录

在这里插入图片描述

事例源码如下

此Class功能为提取JSON属性中某个属性值并返回,代码较少,比较简单

package com.leach.nifi.processor; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.leach.nifi.common.LeachCommon; import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.*; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processor.util.StandardValidators; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.*; / * @title * @author xiaobin.wang * @create 2020/01/02 */ //不需要关注上下文 @SideEffectFree //支持批量 @SupportsBatching //用于标记这个processor的标签,可以用于搜索 @Tags({ 
    "leach","extract", "json", "attribute","key"}) // 声明允许输入 @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) //针对这个processor的注释 @CapabilityDescription("提取json中某个属性作为新的flowfile传递到下方") @WritesAttribute(attribute = "mime.type", description = "Sets the mime type to application/json") public class LeachExtractAttrFromJson extends AbstractProcessor { 
    private List<PropertyDescriptor> properties; private Set<Relationship> relationships; public static final PropertyDescriptor ATTRIBUTE_NAME=new PropertyDescriptor.Builder() .name("ATTRIBUTE NAME") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .description("json对象中的key键,如果提取嵌套中的key键,则使用英文“:”号分割。例:key:key1:key2。key键不允许存在于数组中,只允许存在于单个对象中") .build(); public static final PropertyDescriptor RESULT_IS_JSON=new PropertyDescriptor.Builder() .name("RESULT IS JSON") .required(true) .description("结果是否格式化为json格式") .defaultValue("true") .allowableValues("true", "false") .build(); public static final Relationship SUCCESS = new Relationship.Builder().name("success") .description("提取成功的flowfile内容标识").build(); public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description( "提取失败的flowfile内容标识") .build(); @Override public void init(final ProcessorInitializationContext context) { 
    ArrayList<PropertyDescriptor> properties = new ArrayList<>(); properties.add(ATTRIBUTE_NAME); properties.add(RESULT_IS_JSON); // 防止多线程ADD this.properties = Collections.unmodifiableList(properties); Set<Relationship> relationships = new HashSet<>(); relationships.add(SUCCESS); relationships.add(REL_FAILURE); // 防止多线程ADD this.relationships = Collections.unmodifiableSet(relationships); } @Override public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException { 
    final ComponentLog logger =getLogger(); FlowFile flowFile = processSession.get(); if (flowFile == null) { 
    logger.error("FlowFile is null", new Object[] { 
   }); return; } final String keys_name = processContext.getProperty(ATTRIBUTE_NAME).getValue(); final String is_json = processContext.getProperty(RESULT_IS_JSON).getValue(); String[] keys=keys_name.split(":"); try { 
    flowFile = processSession.write(flowFile, new StreamCallback() { 
    @Override public void process(InputStream rcdIn, OutputStream rcdOut) throws IOException { 
    String json = IOUtils.toString(rcdIn,"utf-8"); JSONObject sourceTmp = JSON.parseObject(json); List<String> list=new ArrayList<>(); Collections.addAll(list,keys); Object val= LeachCommon.extractAttrVal(list,sourceTmp); if("true".equalsIgnoreCase(is_json)){ 
    val=JSONObject.parseObject(String.valueOf(val),Object.class); } //根据key获取值 rcdOut.write(JSON.toJSONString(val).getBytes()); } }); flowFile = processSession.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); flowFile = processSession.putAttribute(flowFile, "inputProcessor", this.getIdentifier()); flowFile = processSession.putAttribute(flowFile, "relationship", "success"); flowFile = processSession.putAttribute(flowFile, "level", "INFO"); processSession.transfer(flowFile, SUCCESS); } catch (final Exception pe) { 
    logger.error("从json中提取属性失败 {} due to {}; transferring to failure", new Object[] { 
    flowFile, pe }); flowFile = processSession.putAttribute(flowFile, "inputProcessor", this.getIdentifier()); flowFile = processSession.putAttribute(flowFile, "relationship", "failure"); flowFile = processSession.putAttribute(flowFile, "level", "ERROR"); processSession.transfer(flowFile, REL_FAILURE); return; } } @Override public Set<Relationship> getRelationships() { 
    return relationships; } @Override public List<PropertyDescriptor> getSupportedPropertyDescriptors() { 
    return properties; } 

编辑Resources文件

POM文件

<modelVersion>4.0.0</modelVersion> <groupId>com.leach.nifi</groupId> <artifactId>leach_nifi</artifactId> <version>1.0-SNAPSHOT</version> <packaging>nar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <nifi.version>1.11.3</nifi.version> </properties> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-api</artifactId> <version>${nifi.version}</version> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-processor-utils</artifactId> <version>${nifi.version}</version> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-utils</artifactId> <version>${nifi.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>5.1.6.RELEASE</version> </dependency> <build> <plugins> <plugin> <groupId>org.apache.nifi</groupId> <artifactId>nifi-nar-maven-plugin</artifactId> <version>1.3.1</version> <extensions>true</extensions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>3.0.0-M4</version> </plugin> </plugins> </build> 

代码说明

AbstractProcessor:自定义开发nifi插件包,继承此类
PropertyDescriptor:此对象为nifi插件前端供用户进行配置的属性设置
例下图中属性,对应前端组件情形
在这里插入图片描述
在这里插入图片描述
PropertyDescriptor.addValidator:此参数为验证前端输入的数据是否符合此设置格式,所有的验证方式参考源码中StandardValidators类
PropertyDescriptor.allowableValues:此参数设置后,前端将变成下拉框选择样式;
Relationship:此对象为插件执行后,返回的结果状态关系,一般设置两个即可,一个成功一个失败;有特殊情况,可自行增加设置。需要注意,每个Processor执行后,所有的结果必须配置一个结果状态。
在这里插入图片描述
init:此方法目的是将此类中的属性和状态码组装起来返回给前端插件,每增加一个属性,都需要增加进去。固定








onTrigger:此方法为插件执行后,触发的事件执行器,组件的所有逻辑及数据处理均在此方法里面;
ProcessContext:此对象中包含前端传入的所有参数信息;
ProcessSession:此对象包含上游传入进此插件的所有Flowfile内容,最终返回给前端的FlowFile也是存放进此对象之中的

打包项目

更新nifi服务

重启

备注

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/232227.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号