NIFI 简介
Apache NiFi 为数据流设计,支持高度可配置的指示图的数据路由、转换和系统中介逻辑,支持从多种数据源动态拉取数据,是一个易于使用、功能强大而且可靠的数据拉取、数据处理和分发系统。
如何实现自定义 processor ?
package org.apache.nifi.processor; import org.apache.nifi.processor.exception.ProcessException; public abstract class AbstractProcessor extends AbstractSessionFactoryProcessor { public AbstractProcessor() { } @Override public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { final ProcessSession session = sessionFactory.createSession(); try { onTrigger(context, session); session.commit(); } catch (final Throwable t) { session.rollback(true); throw t; } } public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException; }
任何自定义 processor 都必须 直接或者间接 继承 AbstractProcessor,整个 Processor 的核心部分 是 onTrigger 方法,onTrigger方法会在一个flow file被传入处理器时调用。
开发
tree nifi-workshop nifi-workshop ├── Chuck_Norris_Speaking.xml ├── nifi-workshop-demo-nar │ └── pom.xml ├── nifi-workshop-demo-processors │ ├── pom.xml │ └── src │ ├── main │ │ ├── java │ │ │ └── com │ │ │ └── hortonworks │ │ │ └── iot │ │ │ └── MyProcessor.java │ │ └── resources │ │ └── META-INF │ │ └── services │ │ └── org.apache.nifi.processor.Processor │ └── test │ └── java │ └── com │ └── hortonworks │ └── iot │ └── MyProcessorTest.java ├── pom.xml ├── README.md └── screenshot.png 16 directories, 9 files
上述 pom 文件中的 NiFi 版本比较古老(0.3.0,Latest commit on 29 Sep 2015),虽然能运行通过,但是为了跟官方保持一致(1.7.0),需要更新相应的 pom 文件。
- 更新后的 nifi-workshop 目录下的 pom 文件为
4.0.0
org.apache.nifi
nifi-nar-bundles
1.7.0
hwx.iot
workshop.demo
1.0-SNAPSHOT
pom
nifi-workshop-demo-processors
nifi-workshop-demo-nar
- nifi-workshop-demo-nar 目录下的 pom 文件保持不变
- 更新后的 nifi-workshop-demo-processors 目录下的 pom 文件为
4.0.0
hwx.iot
workshop.demo
1.0-SNAPSHOT
nifi-workshop-demo-processors
jar
UTF-8
1.7.0
org.apache.nifi
nifi-api
${nifi.version}
org.apache.nifi
nifi-utils
${nifi.version}
org.apache.nifi
nifi-processor-utils
${nifi.version}
org.slf4j
slf4j-simple
1.7.12
test
org.apache.nifi
nifi-mock
${nifi.version}
test
junit
junit
4.12
test
MyProcessor 的代码为
package com.hortonworks.iot; import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.ReadsAttributes; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processor.util.StandardValidators; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @Tags({"string"}) @CapabilityDescription("Awesome string manipulator") @SeeAlso({}) @ReadsAttributes({@ReadsAttribute(attribute = "", description = "")}) @WritesAttributes({@WritesAttribute(attribute = "", description = "")}) public class MyProcessor extends AbstractProcessor { public static final PropertyDescriptor PROP_TO_UPPERCASE = new PropertyDescriptor .Builder().name("Convert to UPPERCASE") .description("All caps") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .defaultValue("false") .build(); public static final PropertyDescriptor PROP_BUFFER_SIZE = new PropertyDescriptor .Builder().name("Buffer size") .description("Size of the working buffer. If an incoming string is bigger it will be routed to a failure path") .required(false) .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) .defaultValue("8192 B") .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("Success, all done") .build(); public static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") .description("Where all bad people end up") .build(); private List
descriptors; private Set
relationships; @Override protected void init(final ProcessorInitializationContext context) { final List
descriptors = new ArrayList
(); descriptors.add(PROP_TO_UPPERCASE); descriptors.add(PROP_BUFFER_SIZE); this.descriptors = Collections.unmodifiableList(descriptors); final Set
relationships = new HashSet
(); relationships.add(REL_SUCCESS); relationships.add(REL_FAILURE); this.relationships = Collections.unmodifiableSet(relationships); } @Override public Set
getRelationships() { return this.relationships; } @Override public final List
getSupportedPropertyDescriptors() { return descriptors; } @OnScheduled public void onScheduled(final ProcessContext context) { } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); if (flowFile == null) { return; } final Boolean toUppercase = context.getProperty(PROP_TO_UPPERCASE).asBoolean(); final Double bufSize = context.getProperty(PROP_BUFFER_SIZE).asDataSize(DataUnit.B); if (flowFile.getSize() > bufSize) { getLogger().warn("Incoming string too big: {} vs buffer: {}", new Object[]{flowFile.getSize(), bufSize}); session.transfer(flowFile, REL_FAILURE); return; } try { if (toUppercase) { flowFile = session.write(flowFile, new StreamCallback() { @Override public void process(InputStream inputStream, OutputStream outputStream) throws IOException { String s = IOUtils.toString(inputStream); IOUtils.write(s.toUpperCase(), outputStream); } }); } session.transfer(flowFile, REL_SUCCESS); } catch (Exception e) { getLogger().error("Failed while processing a string", e); session.transfer(flowFile, REL_FAILURE); } } }
- PROP_BUFFER_SIZE(可以理解为缓冲区)
- PROP_TO_UPPERCASE(是否大小写)
MyProcessor 的结果输出有两个方向
- REL_SUCCESS(成功)
- REL_FAILURE(失败)
测试
测试代码如下:
package com.hortonworks.iot; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.util.List; import static com.hortonworks.iot.MyProcessor.PROP_BUFFER_SIZE; import static com.hortonworks.iot.MyProcessor.PROP_TO_UPPERCASE; import static com.hortonworks.iot.MyProcessor.REL_FAILURE; import static com.hortonworks.iot.MyProcessor.REL_SUCCESS; public class MyProcessorTest { private TestRunner testRunner; @Before public void init() { testRunner = TestRunners.newTestRunner(MyProcessor.class); } @Test public void testStringTooBig() { testRunner.setProperty(MyProcessor.PROP_BUFFER_SIZE, "10 B"); byte[] testData = "this string is too long".getBytes(); testRunner.enqueue(testData); testRunner.run(); testRunner.assertAllFlowFilesTransferred(REL_FAILURE); testRunner.setProperty(PROP_TO_UPPERCASE, "true"); testRunner.enqueue(testData); testRunner.run(); testRunner.assertAllFlowFilesTransferred(REL_FAILURE); } @Test public void testUppercase() throws Exception { byte[] testData = "test string".getBytes(); testRunner.enqueue(testData); testRunner.run(); testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); List
ff = testRunner.getFlowFilesForRelationship(REL_SUCCESS); Assert.assertNotNull(ff); Assert.assertEquals("wrong number of messages", 1, ff.size()); ff.get(0).assertContentEquals(testData); testRunner.clearTransferState(); testRunner.setProperty(PROP_BUFFER_SIZE, "10 B"); testRunner.enqueue(testData); testRunner.run(); testRunner.assertAllFlowFilesTransferred(REL_FAILURE); } }
至于如何测试,这个就没必要说了。
The Processor’s JAR file must contain an entry in the META-INF/services directory named org.apache.nifi.processor.Processor. This is a text file where each line contains the fully-qualified class name of a Processor.
部署
- 检查 NiFi 状态
$NIFI_HOME/bin/nifi.sh status
如果 NiFi 已经启动,需要 先停止
$NIFI_HOME/bin/nifi.sh stop
- 编译上述工程
cd nifi-workshop mvn clean install
$NIFI_HOME/bin/nifi.sh start
在浏览器地址栏中输入 http://localhost:8080/nifi/ (localhost 替换为 部署nifi 的机器IP)
进入最后的验证前,可以先参考 https://blog.csdn.net/u0/article/details/
- 案例1(输入,输出 保持一致)

- 案例2(输入小写,输出大写)

- 案例3(输入数据过大,失败)
修改 属性值

输入保持不变的情况下,结果流向发生改变了。
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/222719.html原文链接:https://javaforall.net
