NIFI 自定义 processor 实现

NIFI 自定义 processor 实现NIFI 自定义 processor 实现

NIFI 简介

Apache NiFi 为数据流设计,支持高度可配置的指示图的数据路由、转换和系统中介逻辑,支持从多种数据源动态拉取数据,是一个易于使用、功能强大而且可靠的数据拉取、数据处理和分发系统。

如何实现自定义 processor ?

package org.apache.nifi.processor; import org.apache.nifi.processor.exception.ProcessException; public abstract class AbstractProcessor extends AbstractSessionFactoryProcessor { public AbstractProcessor() { } @Override public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { final ProcessSession session = sessionFactory.createSession(); try { onTrigger(context, session); session.commit(); } catch (final Throwable t) { session.rollback(true); throw t; } } public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException; } 

任何自定义 processor 都必须 直接或者间接 继承 AbstractProcessor,整个 Processor 的核心部分 是 onTrigger 方法,onTrigger方法会在一个flow file被传入处理器时调用。

开发

tree nifi-workshop nifi-workshop ├── Chuck_Norris_Speaking.xml ├── nifi-workshop-demo-nar │ └── pom.xml ├── nifi-workshop-demo-processors │ ├── pom.xml │ └── src │ ├── main │ │ ├── java │ │ │ └── com │ │ │ └── hortonworks │ │ │ └── iot │ │ │ └── MyProcessor.java │ │ └── resources │ │ └── META-INF │ │ └── services │ │ └── org.apache.nifi.processor.Processor │ └── test │ └── java │ └── com │ └── hortonworks │ └── iot │ └── MyProcessorTest.java ├── pom.xml ├── README.md └── screenshot.png 16 directories, 9 files 

上述 pom 文件中的 NiFi 版本比较古老(0.3.0,Latest commit on 29 Sep 2015),虽然能运行通过,但是为了跟官方保持一致(1.7.0),需要更新相应的 pom 文件。

  • 更新后的 nifi-workshop 目录下的 pom 文件为
 
   
   
   
   
     4.0.0 
    
    
    
      org.apache.nifi 
     
    
      nifi-nar-bundles 
     
    
      1.7.0 
     
    
   
     hwx.iot 
    
   
     workshop.demo 
    
   
     1.0-SNAPSHOT 
    
   
     pom 
    
    
    
      nifi-workshop-demo-processors 
     
    
      nifi-workshop-demo-nar 
     
    
   
  • nifi-workshop-demo-nar 目录下的 pom 文件保持不变
  • 更新后的 nifi-workshop-demo-processors 目录下的 pom 文件为
 
   
   
   
   
     4.0.0 
    
    
    
      hwx.iot 
     
    
      workshop.demo 
     
    
      1.0-SNAPSHOT 
     
    
   
     nifi-workshop-demo-processors 
    
   
     jar 
    
    
    
      UTF-8 
     
    
      1.7.0 
     
    
    
     
     
       org.apache.nifi 
      
     
       nifi-api 
      
     
       ${nifi.version} 
      
     
     
     
       org.apache.nifi 
      
     
       nifi-utils 
      
     
       ${nifi.version} 
      
     
     
     
       org.apache.nifi 
      
     
       nifi-processor-utils 
      
     
       ${nifi.version} 
      
     
     
     
       org.slf4j 
      
     
       slf4j-simple 
      
     
       1.7.12 
      
     
       test 
      
     
     
     
       org.apache.nifi 
      
     
       nifi-mock 
      
     
       ${nifi.version} 
      
     
       test 
      
     
     
     
       junit 
      
     
       junit 
      
     
       4.12 
      
     
       test 
      
     
    
   

MyProcessor 的代码为

package com.hortonworks.iot; import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.ReadsAttributes; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processor.util.StandardValidators; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @Tags({"string"}) @CapabilityDescription("Awesome string manipulator") @SeeAlso({}) @ReadsAttributes({@ReadsAttribute(attribute = "", description = "")}) @WritesAttributes({@WritesAttribute(attribute = "", description = "")}) public class MyProcessor extends AbstractProcessor { public static final PropertyDescriptor PROP_TO_UPPERCASE = new PropertyDescriptor .Builder().name("Convert to UPPERCASE") .description("All caps") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .defaultValue("false") .build(); public static final PropertyDescriptor PROP_BUFFER_SIZE = new PropertyDescriptor .Builder().name("Buffer size") .description("Size of the working buffer. If an incoming string is bigger it will be routed to a failure path") .required(false) .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) .defaultValue("8192 B") .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("Success, all done") .build(); public static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") .description("Where all bad people end up") .build(); private List 
  
    descriptors; private Set 
   
     relationships; @Override protected void init(final ProcessorInitializationContext context) { final List 
    
      descriptors = new ArrayList 
     
       (); descriptors.add(PROP_TO_UPPERCASE); descriptors.add(PROP_BUFFER_SIZE); this.descriptors = Collections.unmodifiableList(descriptors); final Set 
      
        relationships = new HashSet 
       
         (); relationships.add(REL_SUCCESS); relationships.add(REL_FAILURE); this.relationships = Collections.unmodifiableSet(relationships); } @Override public Set 
        
          getRelationships() { return this.relationships; } @Override public final List 
         
           getSupportedPropertyDescriptors() { return descriptors; } @OnScheduled public void onScheduled(final ProcessContext context) { } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); if (flowFile == null) { return; } final Boolean toUppercase = context.getProperty(PROP_TO_UPPERCASE).asBoolean(); final Double bufSize = context.getProperty(PROP_BUFFER_SIZE).asDataSize(DataUnit.B); if (flowFile.getSize() > bufSize) { getLogger().warn("Incoming string too big: {} vs buffer: {}", new Object[]{flowFile.getSize(), bufSize}); session.transfer(flowFile, REL_FAILURE); return; } try { if (toUppercase) { flowFile = session.write(flowFile, new StreamCallback() { @Override public void process(InputStream inputStream, OutputStream outputStream) throws IOException { String s = IOUtils.toString(inputStream); IOUtils.write(s.toUpperCase(), outputStream); } }); } session.transfer(flowFile, REL_SUCCESS); } catch (Exception e) { getLogger().error("Failed while processing a string", e); session.transfer(flowFile, REL_FAILURE); } } } 
          
         
        
       
      
     
    
  
  • PROP_BUFFER_SIZE(可以理解为缓冲区)
  • PROP_TO_UPPERCASE(是否大小写)

MyProcessor 的结果输出有两个方向

  • REL_SUCCESS(成功)
  • REL_FAILURE(失败)

测试

测试代码如下:

package com.hortonworks.iot; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.util.List; import static com.hortonworks.iot.MyProcessor.PROP_BUFFER_SIZE; import static com.hortonworks.iot.MyProcessor.PROP_TO_UPPERCASE; import static com.hortonworks.iot.MyProcessor.REL_FAILURE; import static com.hortonworks.iot.MyProcessor.REL_SUCCESS; public class MyProcessorTest { private TestRunner testRunner; @Before public void init() { testRunner = TestRunners.newTestRunner(MyProcessor.class); } @Test public void testStringTooBig() { testRunner.setProperty(MyProcessor.PROP_BUFFER_SIZE, "10 B"); byte[] testData = "this string is too long".getBytes(); testRunner.enqueue(testData); testRunner.run(); testRunner.assertAllFlowFilesTransferred(REL_FAILURE); testRunner.setProperty(PROP_TO_UPPERCASE, "true"); testRunner.enqueue(testData); testRunner.run(); testRunner.assertAllFlowFilesTransferred(REL_FAILURE); } @Test public void testUppercase() throws Exception { byte[] testData = "test string".getBytes(); testRunner.enqueue(testData); testRunner.run(); testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); List 
  
    ff = testRunner.getFlowFilesForRelationship(REL_SUCCESS); Assert.assertNotNull(ff); Assert.assertEquals("wrong number of messages", 1, ff.size()); ff.get(0).assertContentEquals(testData); testRunner.clearTransferState(); testRunner.setProperty(PROP_BUFFER_SIZE, "10 B"); testRunner.enqueue(testData); testRunner.run(); testRunner.assertAllFlowFilesTransferred(REL_FAILURE); } } 
  

至于如何测试,这个就没必要说了。

The Processor’s JAR file must contain an entry in the META-INF/services directory named org.apache.nifi.processor.Processor. This is a text file where each line contains the fully-qualified class name of a Processor.

部署

  • 检查 NiFi 状态
$NIFI_HOME/bin/nifi.sh status 

如果 NiFi 已经启动,需要 先停止

$NIFI_HOME/bin/nifi.sh stop 
  • 编译上述工程
cd nifi-workshop mvn clean install 
$NIFI_HOME/bin/nifi.sh start 

在浏览器地址栏中输入 http://localhost:8080/nifi/ (localhost 替换为 部署nifi 的机器IP)

进入最后的验证前,可以先参考 https://blog.csdn.net/u0/article/details/

  • 案例1(输入,输出 保持一致)
    在这里插入图片描述

  • 案例2(输入小写,输出大写)
    在这里插入图片描述

  • 案例3(输入数据过大,失败)
    修改 属性值
    在这里插入图片描述
    输入保持不变的情况下,结果流向发生改变了。
    在这里插入图片描述








版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/222719.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月17日 下午3:08
下一篇 2026年3月17日 下午3:08


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号