Spark: sortBy和sortByKey函数详解在很多应用场景都需要对结果数据进行排序 Spark 中有时也不例外 在 Spark 中存在两种对 RDD 进行排序的函数 分别是 sortBy 和 sortByKey 函数 sortBy 是对标准的 RDD 进行排序 它是从 Spark nbsp 0 9 0 之后才引入的 可以参见 SPARK 1063 而 sortByKey 函数是对 PairRDD 进行排序 也就是有 Key 和 Value 的 RDD 下面将分别对这两个函数的实现以及使用进
在很多应用场景都需要对结果数据进行排序,Spark中有时也不例外。在Spark中存在两种对RDD进行排序的函数,分别是 sortBy和sortByKey函数。sortBy是对标准的RDD进行排序,它是从Spark 0.9.0之后才引入的(可以参见SPARK-1063)。而sortByKey函数是对PairRDD进行排序,也就是有Key和Value的RDD。下面将分别对这两个函数的实现以及使用进行说明。
一、sortBy函数实现以及使用
sortBy函数是在org.apache.spark.rdd.RDD类中实现的,它的实现如下:
06 |
ascending: Boolean = true, |
07 |
numPartitions: Int = this.partitions.size) |
08 |
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = |
10 |
.sortByKey(ascending, numPartitions) |
4 |
def keyBy[K](f: T => K): RDD[(K, T)] = {
|
那么,如何使用sortBy函数呢?
10 |
scala> val data = List(3,1,90,3,5,12) |
11 |
data: List[Int] = List(3, 1, 90, 3, 5, 12) |
13 |
scala> val rdd = sc.parallelize(data) |
14 |
rdd: org.apache.spark.rdd.RDD[Int] =ParallelCollectionRDD[0] at parallelize at
:14 |
17 |
res0: Array[Int] = Array(3, 1, 90, 3, 5, 12) |
19 |
scala> rdd.sortBy(x => x).collect |
20 |
res1: Array[Int] = Array(1, 3, 3, 5, 12, 90) |
22 |
scala> rdd.sortBy(x => x, false).collect |
23 |
res3: Array[Int] = Array(90, 12, 5, 3, 3, 1) |
25 |
scala> val result = rdd.sortBy(x => x, false) |
26 |
result: org.apache.spark.rdd.RDD[Int] = MappedRDD[23] at sortBy at
:16 |
28 |
scala> result.partitions.size |
31 |
scala> val result = rdd.sortBy(x => x, false, 1) |
32 |
result: org.apache.spark.rdd.RDD[Int] = MappedRDD[26] at sortBy at
:16 |
34 |
scala> result.partitions.size |
二、sortByKey函数实现以及使用
sortByKey函数作用于Key-Value形式的RDD,并对Key进行排序。它是在org.apache.spark.rdd.OrderedRDDFunctions中实现的,实现如下
1 |
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size) |
4 |
val part = new RangePartitioner(numPartitions, self, ascending) |
5 |
new ShuffledRDD[K, V, V](self, part) |
6 |
.setKeyOrdering(if (ascending) ordering else ordering.reverse) |
从函数的实现可以看出,它主要接受两个函数,含义和sortBy一样,这里就不进行解释了。该函数返回的RDD一定是ShuffledRDD类型的,因为对源RDD进行排序,必须进行Shuffle操作,而Shuffle操作的结果RDD就是ShuffledRDD。其实这个函数的实现很优雅,里面用到了RangePartitioner,它可以使得相应的范围Key数据分到同一个partition中,然后内部用到了mapPartitions对每个partition中的数据进行排序,而每个partition中数据的排序用到了标准的sort机制,避免了大量数据的shuffle。下面对sortByKey的使用进行说明:
10 |
scala> val a = sc.parallelize(List("wyp", "iteblog", "com", "", "test"), 2) |
11 |
a: org.apache.spark.rdd.RDD[String] = |
12 |
ParallelCollectionRDD[30] at parallelize at
:12 |
14 |
scala> val b = sc. parallelize (1 to a.count.toInt , 2) |
15 |
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at parallelize at
:14 |
17 |
scala> val c = a.zip(b) |
18 |
c: org.apache.spark.rdd.RDD[(String, Int)] =ZippedPartitionsRDD2[32] at zip at
:16 |
20 |
scala> c.sortByKey().collect |
21 |
res11: Array[(String, Int)] = Array((,4), (com,3), (iteblog,2), (test,5), (wyp,1)) |
上面对Key进行了排序。细心的读者可能会问,soryKy函数中的第一个参数可以对排序方式进行重写。为什么sortByKey没有呢?难道只能用默认的排序规则。不是,是有的。其实在OrderedRDDFunctions类中有个变量ordering它是隐形的:private val ordering = implicitly[Ordering[K]]。他就是默认的排序规则,我们可以对它进行重写,如下:
01 |
scala> val b = sc.parallelize(List(3,1,9,12,4)) |
02 |
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at parallelize at
:12 |
04 |
scala> val c = b.zip(a) |
05 |
c: org.apache.spark.rdd.RDD[(Int, String)] =ZippedPartitionsRDD2[39] at zip at
:16 |
07 |
scala> c.sortByKey().collect |
08 |
res15: Array[(Int, String)] =Array((1,iteblog), (3,wyp), (4,test), (9,com), (12,)) |
10 |
scala> implicit val sortIntegersByString = new Ordering[Int]{
|
11 |
| override def compare(a: Int, b: Int) = |
12 |
| a.toString.compare(b.toString)} |
13 |
sortIntegersByString: Ordering[Int] = $iwC$$iwC$$iwC$$iwC$$iwC$$anon$1@5d533f7a |
15 |
scala> c.sortByKey().collect |
16 |
res17: Array[(Int, String)] =Array((1,iteblog), (12,), (3,wyp), (4,test), (9,com)) |
例子中的sortIntegersByString就是修改了默认的排序规则。这样将默认按照Int大小排序改成了对字符串的排序,所以12会排序在3之前。
转载自过往记忆(http://www.iteblog.com/)
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/226250.html原文链接:https://javaforall.net