Window Apply,Process Function,ReduceFunction,AggregateFunction分析

Window Apply,Process Function,ReduceFunction,AggregateFunction分析本文全部来源于官方文档解释 WindowApply WindowedStre DataStream AllWindowedS DataStream Appliesagene Belowisafunc 解释 将一个通用的函数应用到整个窗口 可见 apply 用于窗口函数之后 举例

本文全部来源于官方文档解释。

Window Apply #

WindowedStream → DataStream #

AllWindowedStream → DataStream #

Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.

解释:将一个通用的函数应用到整个窗口。可见apply用于窗口函数之后。

举例:

windowedStream.apply(new WindowFunction 
  
    , Integer, Tuple, Window>() { public void apply (Tuple tuple, Window window, Iterable 
   
     > values, Collector 
    
      out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } }); 
     
    
  
// applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply (new AllWindowFunction 
  
    , Integer, Window>() { public void apply (Window window, Iterable 
   
     > values, Collector 
    
      out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } }); 
     
    
  

源码的角度分析:

接口函数windowFuntion中 定义了apply方法。

源码的注释为:计算窗口的值,并输出none或多个元素。  

总结:apply 是一个通用的函数应用于窗口中数据的计算,可以紧挨window  Funtion之后使用,另外在接口方法 WindowFunction中定义了apply方法,用户可以自定义apply对窗口中数据的处理规则。

Process Function #

The ProcessFunction #

ProcessFunction是一种底层流处理操作,可以访问所有(非循环)流应用程序的基本构建块:

  • 事件(流元素)
  • 状态(容错,一致,仅在键控流上)
  • 计时器(事件时间和处理时间,仅在键控流上)

可以将其ProcessFunction视为可以FlatMapFunction访问键控状态和计时器。它通过为输入流中接收到的每个事件调用来处理事件。

对于容错状态,ProcessFunction可以访问 Flink 的keyed state,可以通过 访问 RuntimeContext,类似于其他有状态函数访问 keyed state 的方式。

计时器允许应用程序对处理时间和事件时间的变化做出反应。对该函数的每次调用processElement(...)都会获得一个Context对象,该对象可以访问元素的事件时间时间戳和TimerService。可TimerService用于为将来的事件/处理时间瞬间注册回调。对于事件时间计时器,onTimer(...)当当前水印超过或超过计时器的时间戳时调用该方法,而对于处理时间计时器,onTimer(...)当挂钟时间达到指定时间时调用该方法。在该调用期间,所有状态再次限定为创建计时器的键,允许计时器操作键控状态。

如果要访问键控状态和计时器,则必须
ProcessFunction在键控流上应用:

他的具体介绍可以参照官网:Process Function | Apache Flinkicon-default.png?t=M1L8https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/process_function/

 总结:processFunction 是一个底层的处理流的函数:它能为用户提供三方面东西,1.元素2.状态3.计时器 可以让用户很方便的定义流数据的规则。一般我们通常使用ProcessFunction较多。

ReduceFunction #

ReduceFunction指定输入中的两个元素如何组合生成相同类型的输出元素。 Flink使用ReduceFunction递增地聚合窗口的元素。  

val input: DataStream[(String, Long)] = ... input .keyBy( 
  
    ) .window( 
   
     ) .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) } 
    
  

AggregateFunction

AggregateFunction是ReduceFunction的一般化版本,

它有三种类型:

输入类型(IN)、

累加类型(ACC)

输出类型(OUT)。

 输入类型是输入流中元素的类型,AggregateFunction有一个将一个输入元素添加到累加器的方法。 该接口还提供了创建初始累加器、将两个累加器合并到一个累加器以及从累加器提取输出(类型为OUT)的方法。 我们将在下面的例子中看到它是如何工作的。 输入类型和输出类型可以不相同 

与ReduceFunction一样,Flink会在一个窗口的输入元素到达时递增地聚合它们。  

/ * The accumulator is used to keep a running sum and a count. The [getResult] method * computes the average. */ class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] { override def createAccumulator() = (0L, 0L) override def add(value: (String, Long), accumulator: (Long, Long)) = (accumulator._1 + value._2, accumulator._2 + 1L) override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2 override def merge(a: (Long, Long), b: (Long, Long)) = (a._1 + b._1, a._2 + b._2) } val input: DataStream[(String, Long)] = ... input .keyBy( 
  
    ) .window( 
   
     ) .aggregate(new AverageAggregate) 
    
  

 

 

 

 

 

 

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/210771.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月18日 下午11:57
下一篇 2026年3月18日 下午11:57


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号