spark sortBy sortByKey实战详解

spark sortBy sortByKey实战详解日常工作中 排序是道绕过不过去的侃 我们每天都会面对各种各样的排序需求 那么在 spark 中如何排序呢 我们来看一些很有代表性的例子 1 最简单的排序假设有个 RDD Int 类型的数据 需要按数据大小进行排序 那这个排序算最简单的 sc parallelize Array 1 3 2 4 6 5 sortBy x gt x collect 代码运行的结果

日常工作中,排序是道绕过不过去的侃,我们每天都会面对各种各样的排序需求。那么在spark中如何排序呢?我们来看一些很有代表性的例子。

1.最简单的排序

假设有个RDD[Int]类型的数据,需要按数据大小进行排序,那这个排序算最简单的:

sc.parallelize(Array(1,3,2,4,6,5)).sortBy(x => x).collect() 

代码运行的结果:

 Array[Int] = Array(1, 2, 3, 4, 5, 6) 

2.kv结构的排序

在kv结构的数据中,按value排序是常见的需求:

sc.parallelize(Array(("a", 1), ("c", 3), ("b", 2), ("d", 4))).sortBy(_._2) 

代码运行的结果:

Array[(String, Int)] = Array((a,1), (b,2), (c,3), (d,4)) 

3.定制排序规则

有如下结构的数据:

<10  <100  <20  <200  

希望按照
<后面的数字大小排序,得到如下结果:< p="">

<10  <20  <100  <200  

代码如下:

val array = Array(("<10",), ("<100",), ("<20",),("<200",)); sc.parallelize(array).sortBy({item => item._1.substring(1, item._1.length).toInt}).collect() 

要理解上述代码的原理,我们需要分析一下sortBy的源码。

 / * Return this RDD sorted by the given key function. */ def sortBy[K]( f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length) (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope { this.keyBy[K](f) .sortByKey(ascending, numPartitions) .values } 

sortBy必需传入的一个参数为f: (T) => KT为array中的元素类型。

 / * Creates tuples of the elements in this RDD by applying `f`. */ def keyBy[K](f: T => K): RDD[(K, T)] = withScope { val cleanedF = sc.clean(f) map(x => (cleanedF(x), x)) } 

传入的f: (T) => K作用在keyBy方法上,生成了一个RDD[(K, T)]的数据。
然后调用sortByKey,最后取出里面的T,得到的就是原始array中的类型!

4.用sortByKey实现上面的功能

我们再来看看sortByKey的源码

 / * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling * `collect` or `save` on the resulting RDD will return or output an ordered list of records * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in * order of the keys). */ // TODO: this currently doesn't work on P other than Tuple2! def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)] = self.withScope { val part = new RangePartitioner(numPartitions, self, ascending) new ShuffledRDD[K, V, V](self, part) .setKeyOrdering(if (ascending) ordering else ordering.reverse) } 

大家看到sortByKey的源码可能会有疑惑,难道sortByKey不能指定排序方式么?不能像sortBy那样传入一个函数么?
其实是可以的。sortByKey位于OrderedRDDFunctions类中,OrderedRDDFunctions中有一个隐藏变量:

private val ordering = implicitly[Ordering[K]] 
implicit val sortByNum = new Ordering[String] { override def compare(x: String, y: String): Int = x.substring(1, x.length).toInt.compareTo(y.substring(1, y.length).toInt)}; val array = Array(("<10",), ("<100",), ("<20",),("<200",)); sc.parallelize(array).sortByKey().collect() 

最后的输出结果为:

Array[(String, Int)] = Array((<10,), (<20,), (<100,), (<200,)) 

同样达到了我们的目的!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/221640.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月17日 下午5:37
下一篇 2026年3月17日 下午5:38


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号