Flink 异步IO
在flink我们经常需要与外部系统打交道,由于外部系统的问题,可能导致时间耗时比较长,为了不影响flink的处理性能,flink引入了异步IO来处理这个问题
实现
需要 extends RichAsyncFunction
分析
AsyncDataStream 有两个重要的方法
返回的结果可能是乱序的 public static
SingleOutputStreamOperator
unorderedWait( DataStream
in, AsyncFunction
func, long timeout, TimeUnit timeUnit, int capacity //异常操作最大个数) { return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.UNORDERED); } 返回结果是有序的 public static
SingleOutputStreamOperator
orderedWait( DataStream
in, AsyncFunction
func, long timeout, TimeUnit timeUnit, int capacity) { return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED); }
demo
public class FlinkAsIoTest { public static void main(String[] args) { StreamExecutionEnvironment en = StreamExecutionEnvironment.getExecutionEnvironment(); //自己实现的 KafkaUtil util = new KafkaUtil(); FlinkKafkaConsumer011 consumer1 = util.getConsumer("stream1", "test2"); DataStream
source = en.addSource(consumer1); try { source.print(); DataStream
result = AsyncDataStream.orderedWait(source, new AsIo(), 5000, TimeUnit.MILLISECONDS, 10); result.print(); en.execute("test io"); } catch (Exception e) { e.printStackTrace(); } } } class AsIo extends RichAsyncFunction
{ @Override public void asyncInvoke(String s, ResultFuture
resultFuture) throws Exception { CompletableFuture.supplyAsync(new Supplier
() { @Override public String get() { return queryResurlt(s); } }).thenAccept(new Consumer
() { @Override public void accept(String s) { resultFuture.complete(Collections.singleton(s)); } }); } / * 假定这是一个异步操作 * * @param params * @return */ public String queryResurlt(String params) { try { //sleep 4s Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } if (params.equals("beijing")) { return "this is beijing"; } else if (params.equals("chendou")) { return "this is chendou"; } else { return "this is other"; } } @Override public void timeout(String input, ResultFuture
resultFuture) throws Exception { } }
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/221978.html原文链接:https://javaforall.net
