NIFI自定义开发
自定义开发组件步骤
目录

事例源码如下
此Class功能为提取JSON属性中某个属性值并返回,代码较少,比较简单
package com.leach.nifi.processor; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.leach.nifi.common.LeachCommon; import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.*; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processor.util.StandardValidators; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.*; / * @title * @author xiaobin.wang * @create 2020/01/02 */ //不需要关注上下文 @SideEffectFree //支持批量 @SupportsBatching //用于标记这个processor的标签,可以用于搜索 @Tags({
"leach","extract", "json", "attribute","key"}) // 声明允许输入 @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) //针对这个processor的注释 @CapabilityDescription("提取json中某个属性作为新的flowfile传递到下方") @WritesAttribute(attribute = "mime.type", description = "Sets the mime type to application/json") public class LeachExtractAttrFromJson extends AbstractProcessor {
private List<PropertyDescriptor> properties; private Set<Relationship> relationships; public static final PropertyDescriptor ATTRIBUTE_NAME=new PropertyDescriptor.Builder() .name("ATTRIBUTE NAME") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .description("json对象中的key键,如果提取嵌套中的key键,则使用英文“:”号分割。例:key:key1:key2。key键不允许存在于数组中,只允许存在于单个对象中") .build(); public static final PropertyDescriptor RESULT_IS_JSON=new PropertyDescriptor.Builder() .name("RESULT IS JSON") .required(true) .description("结果是否格式化为json格式") .defaultValue("true") .allowableValues("true", "false") .build(); public static final Relationship SUCCESS = new Relationship.Builder().name("success") .description("提取成功的flowfile内容标识").build(); public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description( "提取失败的flowfile内容标识") .build(); @Override public void init(final ProcessorInitializationContext context) {
ArrayList<PropertyDescriptor> properties = new ArrayList<>(); properties.add(ATTRIBUTE_NAME); properties.add(RESULT_IS_JSON); // 防止多线程ADD this.properties = Collections.unmodifiableList(properties); Set<Relationship> relationships = new HashSet<>(); relationships.add(SUCCESS); relationships.add(REL_FAILURE); // 防止多线程ADD this.relationships = Collections.unmodifiableSet(relationships); } @Override public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
final ComponentLog logger =getLogger(); FlowFile flowFile = processSession.get(); if (flowFile == null) {
logger.error("FlowFile is null", new Object[] {
}); return; } final String keys_name = processContext.getProperty(ATTRIBUTE_NAME).getValue(); final String is_json = processContext.getProperty(RESULT_IS_JSON).getValue(); String[] keys=keys_name.split(":"); try {
flowFile = processSession.write(flowFile, new StreamCallback() {
@Override public void process(InputStream rcdIn, OutputStream rcdOut) throws IOException {
String json = IOUtils.toString(rcdIn,"utf-8"); JSONObject sourceTmp = JSON.parseObject(json); List<String> list=new ArrayList<>(); Collections.addAll(list,keys); Object val= LeachCommon.extractAttrVal(list,sourceTmp); if("true".equalsIgnoreCase(is_json)){
val=JSONObject.parseObject(String.valueOf(val),Object.class); } //根据key获取值 rcdOut.write(JSON.toJSONString(val).getBytes()); } }); flowFile = processSession.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); flowFile = processSession.putAttribute(flowFile, "inputProcessor", this.getIdentifier()); flowFile = processSession.putAttribute(flowFile, "relationship", "success"); flowFile = processSession.putAttribute(flowFile, "level", "INFO"); processSession.transfer(flowFile, SUCCESS); } catch (final Exception pe) {
logger.error("从json中提取属性失败 {} due to {}; transferring to failure", new Object[] {
flowFile, pe }); flowFile = processSession.putAttribute(flowFile, "inputProcessor", this.getIdentifier()); flowFile = processSession.putAttribute(flowFile, "relationship", "failure"); flowFile = processSession.putAttribute(flowFile, "level", "ERROR"); processSession.transfer(flowFile, REL_FAILURE); return; } } @Override public Set<Relationship> getRelationships() {
return relationships; } @Override public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties; }
编辑Resources文件
POM文件
<modelVersion>4.0.0</modelVersion> <groupId>com.leach.nifi</groupId> <artifactId>leach_nifi</artifactId> <version>1.0-SNAPSHOT</version> <packaging>nar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <nifi.version>1.11.3</nifi.version> </properties> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-api</artifactId> <version>${nifi.version}</version> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-processor-utils</artifactId> <version>${nifi.version}</version> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-utils</artifactId> <version>${nifi.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>5.1.6.RELEASE</version> </dependency> <build> <plugins> <plugin> <groupId>org.apache.nifi</groupId> <artifactId>nifi-nar-maven-plugin</artifactId> <version>1.3.1</version> <extensions>true</extensions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>3.0.0-M4</version> </plugin> </plugins> </build>
代码说明
AbstractProcessor:自定义开发nifi插件包,继承此类
PropertyDescriptor:此对象为nifi插件前端供用户进行配置的属性设置
例下图中属性,对应前端组件情形


PropertyDescriptor.addValidator:此参数为验证前端输入的数据是否符合此设置格式,所有的验证方式参考源码中StandardValidators类
PropertyDescriptor.allowableValues:此参数设置后,前端将变成下拉框选择样式;
Relationship:此对象为插件执行后,返回的结果状态关系,一般设置两个即可,一个成功一个失败;有特殊情况,可自行增加设置。需要注意,每个Processor执行后,所有的结果必须配置一个结果状态。

init:此方法目的是将此类中的属性和状态码组装起来返回给前端插件,每增加一个属性,都需要增加进去。固定
onTrigger:此方法为插件执行后,触发的事件执行器,组件的所有逻辑及数据处理均在此方法里面;
ProcessContext:此对象中包含前端传入的所有参数信息;
ProcessSession:此对象包含上游传入进此插件的所有Flowfile内容,最终返回给前端的FlowFile也是存放进此对象之中的
打包项目
更新nifi服务
重启
备注
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/232227.html原文链接:https://javaforall.net
