NIFI自定义开发

NIFI自定义开发NIFI 自定义开发 NIFI 自定义开发组件的结果目标 是能够在 NIFI 主页 Processor 里面拖动出来可以使用自己设置的属性 以及自己操作的逻辑处理 NIFI 自定义组件的开发 其实就是继承 AbstractProc 类 实现其方法即可 下面直接放上一个例子进行说明 以及各对象的使用用途说明 自定义开发组件步骤 1 mavenPOM 文件中引入 nifi 对应包 2 resources 目录下新建文件夹 META INF services3 META INF services 下新建一个 Fileo

NIFI自定义开发

自定义开发组件步骤

目录

在这里插入图片描述

事例源码如下

此Class功能为提取JSON属性中某个属性值并返回,代码较少,比较简单

package com.leach.nifi.processor; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.leach.nifi.common.LeachCommon; import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.*; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processor.util.StandardValidators; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.*; / * @title * @author xiaobin.wang * @create 2020/01/02 */ //不需要关注上下文 @SideEffectFree //支持批量 @SupportsBatching //用于标记这个processor的标签,可以用于搜索 @Tags({ 
    "leach","extract", "json", "attribute","key"}) // 声明允许输入 @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) //针对这个processor的注释 @CapabilityDescription("提取json中某个属性作为新的flowfile传递到下方") @WritesAttribute(attribute = "mime.type", description = "Sets the mime type to application/json") public class LeachExtractAttrFromJson extends AbstractProcessor { 
    private List<PropertyDescriptor> properties; private Set<Relationship> relationships; public static final PropertyDescriptor ATTRIBUTE_NAME=new PropertyDescriptor.Builder() .name("ATTRIBUTE NAME") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .description("json对象中的key键,如果提取嵌套中的key键,则使用英文“:”号分割。例:key:key1:key2。key键不允许存在于数组中,只允许存在于单个对象中") .build(); public static final PropertyDescriptor RESULT_IS_JSON=new PropertyDescriptor.Builder() .name("RESULT IS JSON") .required(true) .description("结果是否格式化为json格式") .defaultValue("true") .allowableValues("true", "false") .build(); public static final Relationship SUCCESS = new Relationship.Builder().name("success") .description("提取成功的flowfile内容标识").build(); public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description( "提取失败的flowfile内容标识") .build(); @Override public void init(final ProcessorInitializationContext context) { 
    ArrayList<PropertyDescriptor> properties = new ArrayList<>(); properties.add(ATTRIBUTE_NAME); properties.add(RESULT_IS_JSON); // 防止多线程ADD this.properties = Collections.unmodifiableList(properties); Set<Relationship> relationships = new HashSet<>(); relationships.add(SUCCESS); relationships.add(REL_FAILURE); // 防止多线程ADD this.relationships = Collections.unmodifiableSet(relationships); } @Override public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException { 
    final ComponentLog logger =getLogger(); FlowFile flowFile = processSession.get(); if (flowFile == null) { 
    logger.error("FlowFile is null", new Object[] { 
   }); return; } final String keys_name = processContext.getProperty(ATTRIBUTE_NAME).getValue(); final String is_json = processContext.getProperty(RESULT_IS_JSON).getValue(); String[] keys=keys_name.split(":"); try { 
    flowFile = processSession.write(flowFile, new StreamCallback() { 
    @Override public void process(InputStream rcdIn, OutputStream rcdOut) throws IOException { 
    String json = IOUtils.toString(rcdIn,"utf-8"); JSONObject sourceTmp = JSON.parseObject(json); List<String> list=new ArrayList<>(); Collections.addAll(list,keys); Object val= LeachCommon.extractAttrVal(list,sourceTmp); if("true".equalsIgnoreCase(is_json)){ 
    val=JSONObject.parseObject(String.valueOf(val),Object.class); } //根据key获取值 rcdOut.write(JSON.toJSONString(val).getBytes()); } }); flowFile = processSession.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); flowFile = processSession.putAttribute(flowFile, "inputProcessor", this.getIdentifier()); flowFile = processSession.putAttribute(flowFile, "relationship", "success"); flowFile = processSession.putAttribute(flowFile, "level", "INFO"); processSession.transfer(flowFile, SUCCESS); } catch (final Exception pe) { 
    logger.error("从json中提取属性失败 {} due to {}; transferring to failure", new Object[] { 
    flowFile, pe }); flowFile = processSession.putAttribute(flowFile, "inputProcessor", this.getIdentifier()); flowFile = processSession.putAttribute(flowFile, "relationship", "failure"); flowFile = processSession.putAttribute(flowFile, "level", "ERROR"); processSession.transfer(flowFile, REL_FAILURE); return; } } @Override public Set<Relationship> getRelationships() { 
    return relationships; } @Override public List<PropertyDescriptor> getSupportedPropertyDescriptors() { 
    return properties; } 

编辑Resources文件

POM文件

<modelVersion>4.0.0</modelVersion> <groupId>com.leach.nifi</groupId> <artifactId>leach_nifi</artifactId> <version>1.0-SNAPSHOT</version> <packaging>nar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <nifi.version>1.11.3</nifi.version> </properties> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-api</artifactId> <version>${nifi.version}</version> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-processor-utils</artifactId> <version>${nifi.version}</version> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-utils</artifactId> <version>${nifi.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>5.1.6.RELEASE</version> </dependency> <build> <plugins> <plugin> <groupId>org.apache.nifi</groupId> <artifactId>nifi-nar-maven-plugin</artifactId> <version>1.3.1</version> <extensions>true</extensions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>3.0.0-M4</version> </plugin> </plugins> </build> 

代码说明

AbstractProcessor:自定义开发nifi插件包,继承此类
PropertyDescriptor:此对象为nifi插件前端供用户进行配置的属性设置
例下图中属性,对应前端组件情形
在这里插入图片描述
在这里插入图片描述
PropertyDescriptor.addValidator:此参数为验证前端输入的数据是否符合此设置格式,所有的验证方式参考源码中StandardValidators类
PropertyDescriptor.allowableValues:此参数设置后,前端将变成下拉框选择样式;
Relationship:此对象为插件执行后,返回的结果状态关系,一般设置两个即可,一个成功一个失败;有特殊情况,可自行增加设置。需要注意,每个Processor执行后,所有的结果必须配置一个结果状态。
在这里插入图片描述
init:此方法目的是将此类中的属性和状态码组装起来返回给前端插件,每增加一个属性,都需要增加进去。固定








onTrigger:此方法为插件执行后,触发的事件执行器,组件的所有逻辑及数据处理均在此方法里面;
ProcessContext:此对象中包含前端传入的所有参数信息;
ProcessSession:此对象包含上游传入进此插件的所有Flowfile内容,最终返回给前端的FlowFile也是存放进此对象之中的

打包项目

更新nifi服务

重启

备注

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/232227.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • HashMap扩容全过程

    HashMap扩容全过程 1.如果HashMap的大小超过了负载因子(loadfactor)定义的容量,怎么办?默认的负载因子大小为0.75,也就是说,当一个map填满了75%的bucket时候,和其它集合类(如ArrayList等)一样,将会创建原来HashMap大小的两倍的bucket数组,来重新调整map的大小,并将原来的对象放入新的bucket数组中。这个过程叫作rehashing,因为它调用hash方…

    2025年11月19日
    8
  • dts展开为platform_device结构过程分析

    dts展开为platform_device结构过程分析dts节点展开为platform_device结构过程分析1.概述本文主要是记录学习Linux解析dts的代码分析,以便进行后续回顾。平台:ARMVexpress内核版本:linux-4.92.dts节点展开为platform_device结构过程分析自从ARM引入的dts之后,bsp驱动代码产生了非常之大的变化,像在linux-2.6.32这些版本的platform驱动中,会存在大…

    2022年7月24日
    17
  • 50个多线程面试题,你会多少?(一)[通俗易懂]

    50个多线程面试题,你会多少?(一)[通俗易懂]下面是Java线程相关的热门面试题,你可以用它来好好准备面试。什么是线程? 什么是线程安全和线程不安全? 什么是自旋锁? 什么是Java内存模型? 什么是CAS? 什么是乐观锁和悲观锁? 什么是AQS? 什么是原子操作?在JavaConcurrencyAPI中有哪些原子类(atomicclasses)? 什么是Executors框架? 什么是阻塞队列?如何使用阻塞队列来…

    2022年5月2日
    39
  • 质量工具因果图_质量管理因果图例题

    质量工具因果图_质量管理因果图例题前言在项目中,我们经常需要用到不同的工具对项目质量进行评审。使用不同的质量工具可能得到的结果不太一样。下面简单说下项目中常用到的质量分析工具因果图。释义:什么是因果图因果图又称为石川图、Ishikawa或鱼骨图,它把影响质量诸因素之间的关系以树状图的方式表示出来,使人一目了然,便于分析原因并采取相应的措施。它是一种在问题发生后,寻找根本原因的一种方法。它取名石川图是因为它是由日

    2022年8月14日
    10
  • 向量函数的内积_向量的内积运算

    向量函数的内积_向量的内积运算这是我的第一篇博客,谈谈自己在读研中的一些小思考,希望能给大家的学习带来一点启发。对于函数内积,我想很多理工科的都理解,最常用的就是傅里叶变换,一个信号与很多个频率的基函数相乘,也就是信号与每个基函数做内积,求得在每个基函数上的占比,或者说是在该基函数上的投影大小,遍历全部基函数,就求得在全部基函数的占比。![在这里插入图片描述](https://img-blog.csdnimg.cn/202…

    2022年9月1日
    6
  • 微信小程序 之 40029

    微信小程序 之 40029开发版正常使用,但是生成体验版之后报40029错误。错误原因:1:openid和当前小程序不对应,或者AppSecret秘钥和当前小程序不对应的。2:当前小程序能正确获取到token,但是在code还没有失效时,另一个小程序也用这个小程序的openid或者AppSecret秘钥去请求token。这个时候也会报40029,说到底还是第一个错误,只是触发的方式不同。解决办法也很简单,检查一下,当前的openid和AppSecret是否和当前要部署使用的小程序的openid和Ap..

    2022年5月6日
    49

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号