Flink 异步IO

Flink 异步IOFlink 异步 IO 在 flink 我们经常需要与外部系统打交道 由于外部系统的问题 可能导致时间耗时比较长 为了不影响 flink 的处理性能 flink 引入了异步 IO 来处理这个问题实现需要 extendsRichA IN OUT 分析 AsyncDataStr 有两个重要的方法返回的结果可能是乱序的 publicstatic IN OUT

Flink 异步IO

在flink我们经常需要与外部系统打交道,由于外部系统的问题,可能导致时间耗时比较长,为了不影响flink的处理性能,flink引入了异步IO来处理这个问题

实现

需要 extends RichAsyncFunction

分析

AsyncDataStream 有两个重要的方法

 返回的结果可能是乱序的 public static 
  
    SingleOutputStreamOperator 
   
     unorderedWait( DataStream 
    
      in, AsyncFunction 
     
       func, long timeout, TimeUnit timeUnit, int capacity //异常操作最大个数) { return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.UNORDERED); } 返回结果是有序的 public static 
      
        SingleOutputStreamOperator 
       
         orderedWait( DataStream 
        
          in, AsyncFunction 
         
           func, long timeout, TimeUnit timeUnit, int capacity) { return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED); } 
          
         
        
       
      
     
    
  
demo
 public class FlinkAsIoTest { public static void main(String[] args) { StreamExecutionEnvironment en = StreamExecutionEnvironment.getExecutionEnvironment(); //自己实现的 KafkaUtil util = new KafkaUtil(); FlinkKafkaConsumer011 consumer1 = util.getConsumer("stream1", "test2"); DataStream 
  
    source = en.addSource(consumer1); try { source.print(); DataStream 
   
     result = AsyncDataStream.orderedWait(source, new AsIo(), 5000, TimeUnit.MILLISECONDS, 10); result.print(); en.execute("test io"); } catch (Exception e) { e.printStackTrace(); } } } class AsIo extends RichAsyncFunction 
    
      { @Override public void asyncInvoke(String s, ResultFuture 
     
       resultFuture) throws Exception { CompletableFuture.supplyAsync(new Supplier 
      
        () { @Override public String get() { return queryResurlt(s); } }).thenAccept(new Consumer 
       
         () { @Override public void accept(String s) { resultFuture.complete(Collections.singleton(s)); } }); } / * 假定这是一个异步操作 * * @param params * @return */ public String queryResurlt(String params) { try { //sleep 4s Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } if (params.equals("beijing")) { return "this is beijing"; } else if (params.equals("chendou")) { return "this is chendou"; } else { return "this is other"; } } @Override public void timeout(String input, ResultFuture 
        
          resultFuture) throws Exception { } } 
         
        
       
      
     
    
  
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/221978.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月17日 下午4:50
下一篇 2026年3月17日 下午4:51


相关推荐

  • countdowntimer_TIMESTAMPDIFF

    countdowntimer_TIMESTAMPDIFF需求:加载某一个界面,在页面中待5秒后再关闭效果图如下:设置了一个点击事件,当文字显示为Skipactivity时,点击跳转界面。代码及介绍如下图:核心功能代码如下Android自带的CountDownTimer这个工具类,也是通过Handler和子线程来实现的。//倒计时工具类CountDownTimer//CountDownTimer的构造方法有两个参数…

    2026年1月18日
    4
  • Ubuntu中dpkg命令的用法[通俗易懂]

    Ubuntu中dpkg命令的用法[通俗易懂]dpkg是Debianpackage的简写,为”Debian“操作系统专门开发的套件管理系统,用于软件的安装,更新和移除。所有源自”Debian”的Linux的发行版都使用dpkg,例如”Ubuntu”阅读目录安装软件 列出与该包相关联的文件 显示包的版本 移除软件(保留配置) 移除软件(不保留配置) 查找包的详细信息 列出deb包的内容安装软件命令:dpkg-i<.debfilename>实例:dpkg-i~/Download/…

    2022年5月21日
    75
  • 智能体(Agent)全景拆解:从技术核心到业务落地全指南

    智能体(Agent)全景拆解:从技术核心到业务落地全指南

    2026年3月14日
    2
  • MPLS 虚拟专用网 实验配置和抓包

    MPLS 虚拟专用网 实验配置和抓包

    2021年4月14日
    196
  • SOA是什么(转) .

    SOA是什么(转) .一 SOA 是什么 nbsp SOA 的全称是 Service OrientedArch 面向服务架构 是一种架构 不是一种具体的开发技术 nbsp 要真正理解什么是 SOA 需要从软件开发的技术发展史谈起 nbsp 真正的软件开发从开始到现在经历了四个阶段 也可以说成是四代 1 汇编语言开发 2 面向过程的软件 3 面向对象的组件开发 4 面向服务的架构开发 也是今天要谈论的 SO

    2025年8月7日
    5
  • MySQL窗口函数【转载】[通俗易懂]

    MySQL窗口函数【转载】[通俗易懂]注意MySQL窗口函数是8.0及以后才有的新特性。安装mysql8.0(docker安装方式)安装docker安装docker#下载指定版本dockerwgethttps://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo-O/etc/yum.repos.d/docker-ce.repo#安装dockeryum-yinstalldocker-ce-18.06.1.ce-3.el7#开启docke.

    2026年4月15日
    7

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号