大数据教程:Transformation和Action算子演示

大数据教程:Transformation和Action算子演示大数据教程:Transformation和Action算子演示

大家好,又见面了,我是你们的朋友全栈君。

大数据教程:Transformation和Action算子演示

一、Transformation算子演示

val conf = new SparkConf().setAppName(“Test”).setMaster(“local”)
val sc = new SparkContext(conf)

//通过并行化生成rdd

val rdd = sc.parallelize(List(5,6,4,7,3,8,2,9,10))

//map:对rdd里面每一个元乘以2然后排序

val rdd2: RDD[Int] = rdd.map(_ * 2)

//collect以数组的形式返回数据集的所有元素(是Action算子)

println(rdd2.collect().toBuffer)

//filter:该RDD由经过func函数计算后返回值为true的输入元素组成

val rdd3: RDD[Int] = rdd2.filter(_ > 10)
println(rdd3.collect().toBuffer)

val rdd4 = sc.parallelize(Array(“a b c”,”b c d”))

//flatMap:将rdd4中的元素进行切分后压平

val rdd5: RDD[String] = rdd4.flatMap(_.split(” “))
println(rdd5.collect().toBuffer)

//假如: List(List(” a,b” ,”b c”),List(“e c”,” i o”))

//压平 flatMap(_.flatMap(_.split(” “)))

//sample随机抽样

//withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样

//fraction抽样比例例如30% 即0.3 但是这个值是一个浮动的值不准确

//seed用于指定随机数生成器种子 默认参数不传

val rdd5_1 = sc.parallelize(1 to 10)
val sample = rdd.sample(false,0.5)
println(sample.collect().toBuffer)

//union:求并集

val rdd6 = sc.parallelize(List(5,6,7,8))
val rdd7 = sc.parallelize(List(1,2,5,6))
val rdd8 = rdd6 union rdd7
println(rdd8.collect.toBuffer)

//intersection:求交集

val rdd9 = rdd6 intersection rdd7
println(rdd9.collect.toBuffer)

//distinct:去重出重复

println(rdd8.distinct.collect.toBuffer)

//join相同的key会被合并

val rdd10_1 = sc.parallelize(List((“tom”,1),(“jerry” ,3),(“kitty”,2)))
val rdd10_2 = sc.parallelize(List((“jerry” ,2),(“tom”,2),(“dog”,10)))
val rdd10_3 = rdd10_1 join rdd10_2
println(rdd10_3.collect().toBuffer)

//左连接和右连接

//除基准值外是Option类型,因为可能存在空值所以使用Option

val rdd10_4 = rdd10_1 leftOuterJoin rdd10_2

//以左边为基准没有是null

val rdd10_5 = rdd10_1 rightOuterJoin rdd10_2

//以右边为基准没有是null

println(rdd10_4.collect().toList)
println(rdd10_5.collect().toBuffer)

val rdd11_1 = sc.parallelize(List((“tom”,1),(“jerry” ,3),(“kitty”,2)))
val rdd11_2 = sc.parallelize(List((“jerry” ,2),(“tom”,2),(“dog”,10)))

//笛卡尔积

val rdd11_3 = rdd11_1 cartesian rdd11_2
println(rdd11_3.collect.toBuffer)

//根据传入的参数进行分组

val rdd11_5_1 = rdd11_4.groupBy(_._1)
println(rdd11_5_1.collect().toList)

//按照相同key进行分组,并且可以制定分区

val rdd11_5_2 = rdd11_4.groupByKey
println(rdd11_5_2.collect().toList)

//根据相同key进行分组[分组的话需要二元组]

//cogroup 和 groupBykey的区别

//cogroup不需要对数据先进行合并就以进行分组 得到的结果是 同一个key 和不同数据集中的数据集合

//groupByKey是需要先进行合并然后在根据相同key进行分组

val rdd11_6: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd11_1 cogroup rdd11_2
println(rdd11_6)

二、Action算子演示

val conf = new SparkConf().setAppName(“Test”).setMaster(“local[*]”)
val sc = new SparkContext(conf)

/* Action 算子*/

//集合函数

val rdd1 = sc.parallelize(List(2,1,3,6,5),2)
val rdd1_1 = rdd1.reduce(_+_)
println(rdd1_1)

//以数组的形式返回数据集的所有元素

println(rdd1.collect().toBuffer)

//返回RDD的元素个数

println(rdd1.count())

//取出对应数量的值 默认降序, 若输入0 会返回一个空数组

println(rdd1.top(3).toBuffer)

//顺序取出对应数量的值

println(rdd1.take(3).toBuffer)

//顺序取出对应数量的值 默认生序

println(rdd1.takeOrdered(3).toBuffer)

//获取第一个值 等价于 take(1)

println(rdd1.first())

//将处理过后的数据写成文件(存储在HDFS或本地文件系统)

//rdd1.saveAsTextFile(“dir/file1”)

//统计key的个数并生成map k是key名 v是key的个数

val rdd2 = sc.parallelize(List((“key1”,2),(“key2”,1),(“key3”,3),(“key4”,6),(“key5”,5)),2)
val rdd2_1: collection.Map[String, Long] = rdd2.countByKey()
println(rdd2_1)

//遍历数据

rdd1.foreach(x => println(x))

/*其他算子*/

//统计value的个数 但是会将集合中的一个元素看做是一个vluae

val value: collection.Map[(String, Int), Long] = rdd2.countByValue
println(value)

//filterByRange:对RDD中的元素进行过滤,返回指定范围内的数据

val rdd3 = sc.parallelize(List((“e”,5),(“c”,3),(“d”,4),(“c”,2),(“a”,1)))
val rdd3_1: RDD[(String, Int)] = rdd3.filterByRange(“c”,”e”)

//包括开始和结束的

println(rdd3_1.collect.toList)

//flatMapValues对参数进行扁平化操作,是value的值

val rdd3_2 = sc.parallelize(List((“a”,”1 2″),(“b”,”3 4″)))
println( rdd3_2.flatMapValues(_.split(” “)).collect.toList)

//foreachPartition 循环的是分区数据

// foreachPartiton一般应用于数据的持久化,存入数据库,可以进行分区的数据存储

val rdd4 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),3)
rdd4.foreachPartition(x => println(x.reduce(_+_)))

//keyBy 以传入的函数返回值作为key ,RDD中的元素为value 新的元组

val rdd5 = sc.parallelize(List(“dog”,”cat”,”pig”,”wolf”,”bee”),3)
val rdd5_1: RDD[(Int, String)] = rdd5.keyBy(_.length)
println(rdd5_1.collect.toList)

//keys获取所有的key values 获取所有的values

println(rdd5_1.keys.collect.toList)
println(rdd5_1.values.collect.toList)

//collectAsMap 将需要的二元组转换成Map

val map: collection.Map[String, Int] = rdd2.collectAsMap()
println(map)

转载于:https://juejin.im/post/5d07547d6fb9a07ef7107428

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/106770.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • Java集合篇:Stack

    Java集合篇:Stack

    2021年10月4日
    57
  • Depix:还原马赛克工具的试用及总结[通俗易懂]

    Depix:还原马赛克工具的试用及总结[通俗易懂]背景一周前发现git上有个叫Depix的项目非常火,可以用来去除马赛克。好奇之下准备下来试用一下这个工具参考:https://github.com/beurtschipper/Depix算法说明:https://www.linkedin.com/pulse/recovering-passwords-from-pixelized-screenshots-sipke-mellemaDeBruijn序列:https://damip.net/article-de-bruijn-sequence说

    2022年6月15日
    199
  • RPC协议了解

    RPC协议了解1.RPC概述RPC(RemoteProcedureCallProtocol)远程过程调用协议。通俗的描述是:客户端在不知道调用细节的情况下,调用存在于远程计算机上的某个过程或函数,就像调用本地应用程序中的一样。正确的描述是:一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。2.RPC特点:2.1)RPC是协议:协议意味着规范。目前典型的RPC实现包括Dubbo、Thrift、Herrty等。但这些实现往往都会附加其他重要功能,例如Dubbo还包括服务管理、访问权限

    2022年5月19日
    62
  • Drupal8 入门教程(一)安装部署[通俗易懂]

    Drupal8 入门教程(一)安装部署[通俗易懂]一、Drupal简介Drupal是使用PHP语言编写的开源内容管理框架(CMF),它由内容管理系统(CMS)和PHP开发框架(Framework)共同构成。连续多年荣获全球最佳CMS大奖,是基于P

    2022年7月4日
    19
  • 如何用手机号申请163邮箱_163邮箱注册手机号注册

    如何用手机号申请163邮箱_163邮箱注册手机号注册如果你还没有邮箱,直接用手机号注册163邮箱,163.net是一款TOM的VIP邮箱,跟普通邮箱的区别是邮箱容量可以无限放大,来往的邮件信息能长期存储,国际邮件能快速收到和发出。怎么申请邮箱?163邮箱申请的好处用手机浏览器输入图片中的网址,进入邮箱官网在这里跟普通邮箱的区别是VIP邮箱有多个后缀选择,不像qq只能有一个。点击注册,接下来选择套餐,根据邮箱名字的位数、容量空间、大附件、群发数量,还有安全防护级别、误发邮件撤回次数、删除的邮件回复次数来选择套餐,不过不用担心,如果你现在已经有邮箱了

    2022年9月17日
    1
  • 关于RuntimeException[通俗易懂]

    关于RuntimeException[通俗易懂]关于RuntimeException今天在写一个异常类的时候继承了RuntimeException,想一探究竟。RuntimeException:在定义方法时不需要声明会抛出runtimeException。Exception:定义方法时必须声明所有可能会抛出的exception。于是去查看了一翻api。publicRuntimeException() 提出了一种新的null运行时异常的详细信息。原因是没有初始化,初始化后可通过调用Throwable.initCause(..

    2022年7月24日
    23

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号