Spark: sortBy和sortByKey函数详解

Spark: sortBy和sortByKey函数详解在很多应用场景都需要对结果数据进行排序 Spark 中有时也不例外 在 Spark 中存在两种对 RDD 进行排序的函数 分别是 sortBy 和 sortByKey 函数 sortBy 是对标准的 RDD 进行排序 它是从 Spark nbsp 0 9 0 之后才引入的 可以参见 SPARK 1063 而 sortByKey 函数是对 PairRDD 进行排序 也就是有 Key 和 Value 的 RDD 下面将分别对这两个函数的实现以及使用进

在很多应用场景都需要对结果数据进行排序,Spark中有时也不例外。在Spark中存在两种对RDD进行排序的函数,分别是 sortBy和sortByKey函数。sortBy是对标准的RDD进行排序,它是从Spark 0.9.0之后才引入的(可以参见SPARK-1063)。而sortByKey函数是对PairRDD进行排序,也就是有Key和Value的RDD。下面将分别对这两个函数的实现以及使用进行说明。
  


一、sortBy函数实现以及使用

  sortBy函数是在org.apache.spark.rdd.RDD类中实现的,它的实现如下:

01 /
02  * Return this RDD sorted by the given key function.
03  */
04 def sortBy[K](
05     f: (T) => K,
06     ascending: Boolean = true,
07     numPartitions: Int = this.partitions.size)
08     (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =
09   this.keyBy[K](f)
10       .sortByKey(ascending, numPartitions)
11       .values

1 /
2 * Creates tuples of the elements in this RDD by applying `f`.
3 */
4 def keyBy[K](f: => K): RDD[(K, T)] = {
5     map(x => (f(x), x))
6 }

  那么,如何使用sortBy函数呢?

01 /
02  * User: 过往记忆
03  * Date: 14-12-26
04  * Time: 上午10:16
05  * bolg: http://www.iteblog.com
06  * 本文地址:http://www.iteblog.com/archives/1240
07  * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
08  * 过往记忆博客微信公共帐号:iteblog_hadoop
09  */
10 scala> val data = List(3,1,90,3,5,12)
11 data: List[Int] = List(31903512)
12  
13 scala> val rdd = sc.parallelize(data)
14 rdd: org.apache.spark.rdd.RDD[Int] =ParallelCollectionRDD[0] at parallelize at
:14
15  
16 scala> rdd.collect
17 res0: Array[Int] = Array(31903512)
18  
19 scala> rdd.sortBy(x => x).collect
20 res1: Array[Int] = Array(13351290)
21  
22 scala> rdd.sortBy(x => x, false).collect
23 res3: Array[Int] = Array(90125331)
24  
25 scala> val result = rdd.sortBy(x => x, false)
26 result: org.apache.spark.rdd.RDD[Int] = MappedRDD[23] at sortBy at
:16
27  
28 scala> result.partitions.size
29 res9: Int = 2
30  
31 scala> val result = rdd.sortBy(x => x, false1)
32 result: org.apache.spark.rdd.RDD[Int] = MappedRDD[26] at sortBy at
:16
33  
34 scala> result.partitions.size
35 res10: Int = 1

二、sortByKey函数实现以及使用

  sortByKey函数作用于Key-Value形式的RDD,并对Key进行排序。它是在org.apache.spark.rdd.OrderedRDDFunctions中实现的,实现如下

1 def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size)
2     : RDD[(K, V)] =
3 {
4   val part = new RangePartitioner(numPartitions, self, ascending)
5   new ShuffledRDD[K, V, V](self, part)
6     .setKeyOrdering(if (ascending) ordering else ordering.reverse)
7 }

  从函数的实现可以看出,它主要接受两个函数,含义和sortBy一样,这里就不进行解释了。该函数返回的RDD一定是ShuffledRDD类型的,因为对源RDD进行排序,必须进行Shuffle操作,而Shuffle操作的结果RDD就是ShuffledRDD。其实这个函数的实现很优雅,里面用到了RangePartitioner,它可以使得相应的范围Key数据分到同一个partition中,然后内部用到了mapPartitions对每个partition中的数据进行排序,而每个partition中数据的排序用到了标准的sort机制,避免了大量数据的shuffle。下面对sortByKey的使用进行说明:

01 /
02  * User: 过往记忆
03  * Date: 14-12-26
04  * Time: 上午10:16
05  * bolg: http://www.iteblog.com
06  * 本文地址:http://www.iteblog.com/archives/1240
07  * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
08  * 过往记忆博客微信公共帐号:iteblog_hadoop
09  */
10 scala> val = sc.parallelize(List("wyp""iteblog""com""""test"), 2)
11 a: org.apache.spark.rdd.RDD[String] =
12 ParallelCollectionRDD[30] at parallelize at
:12
13  
14 scala> val = sc. parallelize (1 to a.count.toInt , 2)
15 b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at parallelize at
:14
16  
17 scala> val = a.zip(b)
18 c: org.apache.spark.rdd.RDD[(String, Int)] =ZippedPartitionsRDD2[32] at zip at
:16
19  
20 scala> c.sortByKey().collect
21 res11: Array[(String, Int)] = Array((,4), (com,3), (iteblog,2), (test,5), (wyp,1))

  上面对Key进行了排序。细心的读者可能会问,soryKy函数中的第一个参数可以对排序方式进行重写。为什么sortByKey没有呢?难道只能用默认的排序规则。不是,是有的。其实在OrderedRDDFunctions类中有个变量ordering它是隐形的:private val ordering = implicitly[Ordering[K]]。他就是默认的排序规则,我们可以对它进行重写,如下:

01 scala> val = sc.parallelize(List(3,1,9,12,4))
02 b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at parallelize at
:12
03  
04 scala> val = b.zip(a)
05 c: org.apache.spark.rdd.RDD[(Int, String)] =ZippedPartitionsRDD2[39] at zip at
:16
06  
07 scala> c.sortByKey().collect
08 res15: Array[(Int, String)] =Array((1,iteblog), (3,wyp), (4,test), (9,com), (12,))
09  
10 scala> implicit val sortIntegersByString = new Ordering[Int]{
11      override def compare(a: Int, b: Int) =
12      | a.toString.compare(b.toString)}
13 sortIntegersByString: Ordering[Int] = $iwC$$iwC$$iwC$$iwC$$iwC$$anon$1@5d533f7a
14  
15 scala>  c.sortByKey().collect
16 res17: Array[(Int, String)] =Array((1,iteblog), (12,), (3,wyp), (4,test), (9,com))

  例子中的sortIntegersByString就是修改了默认的排序规则。这样将默认按照Int大小排序改成了对字符串的排序,所以12会排序在3之前。

转载自过往记忆(http://www.iteblog.com/)

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/226250.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月17日 上午7:30
下一篇 2026年3月17日 上午7:31


相关推荐

  • 串口服务器调试助手使用教程,comassistant串口调试助手使用说明.pdf

    串口服务器调试助手使用教程,comassistant串口调试助手使用说明.pdf作者:温子祺wenziqi@wenziqi@单片机多功能调试助手简介单片机多功能调试助手简介单单片片机机多多功功能能调调试试助助手手简简介介1111简介图1单片机多功能调试助手单片机多功能调试助手一款集串口/USB/网络调试、进制转换、字模与数码管字型码制作、常用校验值计算、UNICODE码转换、位图输出C文件等众多功能于一身的综合型调试软件,最值得庆幸的是该软件会一直保持更新,并支持在…

    2022年6月12日
    34
  • 浅谈ECMAScript 6下的promises API

    浅谈ECMAScript 6下的promises API9 2 构造函数 nbsp nbsp nbsp nbsp nbsp nbsp promise 的构造函数为以下形式 nbsp nbsp nbsp var nbsp p nbsp nbsp new nbsp Promise executor resolve nbsp reject nbsp nbsp nbsp 上面创建了一个行为由回调函数 exector 决定的 promise 使用参数来处理解决或者拒绝 p nbsp nbsp nbsp nbsp nbsp nbsp nbsp resolve x

    2026年3月17日
    2
  • ASCII码16进制对照表

    ASCII码16进制对照表ASCII 码对照表 ASCII AmericanStan 美国信息互换标准代码 ASC 是基于拉丁字母的一套电脑编码系统 它主要用于显示现代英语和其他西欧语言 它是现今最通用的单字节编码系统 并等同于国际标准 ISO IEC646 ASCII 第一次以规范标准的型态发表是在 1967 年 最后一次更新则是在 1986 年 至今为止共定义了 128 个字符 其中 33 个字符无法显示 这是以现今操作系统为依归 但在 DOS 模式下可显示出一些诸如笑

    2025年9月17日
    7
  • 用Navicat连接MySQL的安装及配置

    用Navicat连接MySQL的安装及配置Nothingisimp 用 Navicat 连接 MySQL 的安装及配置步骤及所遇到问题安装 MySQL 一 下载 MySQL 二 下载完后文件内容三 配置系统的环境变量 Path 四 执行进入 MySQL 安装位置的 bin 目录命令 五 打开 Navicat 连接总结 用 Navicat 连接 MySQL 的安装及配置步骤及所遇到问题背景 由于电脑连续启动持续修复磁盘而不断重启 在网上找不到解决办法只好重置磁盘所重新配置环境之一的问题解决记录下来 如有更好的解决不断重启的问题的好方法求评论区告知 部分图片侵删此

    2026年3月18日
    1
  • 在WAMPSERVER下增加多版本的PHP(PHP5.3,PHP5.4,PHP5.5)支持。

    在WAMPSERVER下增加多版本的PHP(PHP5.3,PHP5.4,PHP5.5)支持。

    2021年9月2日
    57
  • Java 并发编程中的死锁 ( Kotlin 语言讲解)

    Java 并发编程中的死锁 ( Kotlin 语言讲解)什么是死锁?在操作系统中的并发处理场景中,进程对资源的持有与请求过程中,会产生死锁.Say,ProcessAhasresourceR1,ProcessBhasresourceR2.IfProcessArequestresourceR2andProcessBrequestsresourceR1,atthesametime,thend…

    2022年7月16日
    17

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号