Spark基础操作(一)

Spark基础操作(一)前言 我们来学习 Spark 基础吧 一 搭建学习环境 1 下载 spark 我使用的是 spark1 6 2 下载地址我们直接下载 然后解压 我们看看里面的目录 2 python shell 我们运行 bin pyspark 之后就进入了 spark 的 pythonshell 我们为了验证是否成功了 可以运行下面的代码 lines sc textFile README md printlines first 接下来就会看到打印出一条信息 ApacheSpark spark 提供的 pytho

前言:我们来学习Spark基础吧!

一、搭建学习环境

1、下载spark

2、python-shell

我们运行bin/pyspark之后就进入了spark的python shell。我们为了验证是否成功了,可以运行下面的代码

lines = sc.textFile("README.md") print lines.first() 

接下来就会看到打印出一条信息:# Apache Spark。 spark提供的python shell是我们良好的学习平台。我们可以在里面随意的调用spark提供的API。

3、IDE环境

可能有些同学已经习惯了IDE带来的好处(例如我),所以也希望能通过IDE来进行学习和开发。 但是spark并没有提供任何python 模块给我们下载使用, 也就是说,你无法通过pip install的方式下载spark模块。 这一点就不如java和scala了,maven是可以直接集成spark的。 所以我们要做一点额外的事情以让pycharm能够拥有开发spark程序的能力。

  1. 在pycharm找到Project Structure 把解压的目录中的python目录加进去
    在这里插入图片描述

  2. 添加run–>Edit configurations。 添加一个运行配置。并配置SPARK_HOME环境变量为解压目录。然后配置PYTHONPATH环境变量为解压目录中的python目录。
    在这里插入图片描述
    然后各位就可以在pycharm上编写spark代码并运行了。




"""SimpleApp""" from pyspark import SparkContext logFile = "/Users/sungaofei/Documents/spark/README.md" sc = SparkContext("local","Simple App") logData = sc.textFile(logFile).cache() numAs = logData.filter(lambda s: 'a' in s).count() numBs = logData.filter(lambda s: 'b' in s).count() temp = logData.first() print temp print("Lines with a: %i, lines with b: %i"%(numAs, numBs)) 

二、从demo中学习

"""SimpleApp""" from pyspark import SparkContext logFile = "/Users/sungaofei/Documents/spark/README.md" conf = SparkConf().setMaster("local").setAppName("My App") sc = SparkContext(conf = conf) logData = sc.textFile(logFile).cache() numAs = logData.filter(lambda s: 'a' in s).count() numBs = logData.filter(lambda s: 'b' in s).count() temp = logData.first() print temp print("Lines with a: %i, lines with b: %i"%(numAs, numBs)) 

三、RDD基础

上面提到过RDD,它是spark定义的固定不变的分布式数据集合

  1. 说它固定不变是因为它一经创建后你就无法改变它的内容了。你只能通过当前的RDD调用一些方法来生成新的RDD,但是你永远都无法真正改变一个RDD的数据。例如刚才的demo,我们调用filter方法过滤掉一些数据,但我们并没有改变原有RDD的数据,你在其他地方调用原RDD的时候仍然是全量的未经过滤的数据。 filter方法返回的是一个新的RDD
  2. 说它是分布式数据集合是因为每一个RDD都由多个partitions(分片)组成。上一篇我们讲到HDFS,所以首先数据是分布在不同的机器上的。在spark读取数据的时候会根据一定的规则(可以是默认64M一个partition,也可以指定partition数量)。 这些分布在不同partition也就是数据分片组成了RDD。spark在运行的时候,每个partition都会生成一个task。他们会跑在不同的计算资源上。我们知道java中万物皆对象,在spark中所有数据皆RDD。可以说RDD就是spark的一切,就如MapReduce就是Haddop的一切一样。

我们可以使用两种方式创建RDD

  1. 通过sc.textFile()从外部文件中读取。就如我们的demo一样
  2. 通过从一个集合中初始化一个RDD。如下:
lines = sc.parallelize(["pandas", "i like pandas"]) 

1、transformations and actions

RDD支持两种操作,transformation和action。ransformation的操会返回一个新的RDD,就如我们在demo中看到的filter()方法,是一种组织和准备数据的方式。为之后的action执行计算提供数据基础。action的操作是真正产生一个计算操作的过程。例如demo中的count()。 action不会返回一个RDD,它会返回实际的操作结果或者将数据保存到外部文件中。spark提供了很多函数,如果你分不清哪些是transformation哪些是action。 只要看它的返回值就好了。返回一个新RDD的就是transformation,不返回的就是action。 区分一个函数是哪种操作很重要,因为spark处理这两种操作的方式很不一样。

  1. transformation
    transformation是一种返回一个新的RDD的方法。它遵循延迟计算的规则。也就是说spark在运行的时候遇到transformation的时候并不会真正的执行它,直到碰到一个action的时候才会真正的执行。我们稍后会专门讨论延迟计算的规则。这里我们知道有这个概念就好。大部分transformation都是按行元素处理,就是说他们同一时间只处理一行数据(有少数transformation不是的)。就像上面说的,spark大部分的函数都是函数式编程,要求我们传递一个函数作为参数。那么所有transformation都是需要传递至少一个函数作为参数的, 这个参数就是我们指定的如何处理数据的逻辑。spark会将数据拆成一行一行的并作为参数调用我们指定的函数。就如demo中的filter,spark会将RDD的每一行作为参数传递给我们自定义的函数。

  2. action
    像之前说的RDD可以使用很多的transformation来组织和准备数据,但是光准备数据还是不行得,我们终究要用数据计算一些东西,这时候就需要我们的action,就如我们demo中的count()用来计算数据的行数. 我们还可以使用frist()取出第一条数据,用take(n)来取出前n条数据,saveAsTextFile()用来把数据存储到外部文件。也就是说action是我们真正使用数据来进行计算的方式,真正实现数据的价值的方式。

  3. 延迟计算
    之前提到过,transformation的操作是延迟计算的。意思是说spark在运行的时候,运行到transformation的时候实际上并不会真正的执行transformations。直到碰到了这个RDD的action的时候,才会一股脑的执行之前所有的操作。也许这对刚接触大数据处理的同学来说有点难以理解,但如果我们仔细的想一想就会发现其实这样的设计相当的合理。 因为我们在实际情况中面对的是非常庞大的数据。如果我们在一开始就执行所有的数据操作并将数据载入内存中那将是一种很大的浪费。例如在demo中,如果我们使用的不是count这种操作全部数据的方式而是使用了first()或者take(n)这种只取了一部分数据的操作。那么事先就执行transformation的操作并将所有数据载入内存的话,那将是极大的浪费。所以取而代之的,spark在每次遇到transformation的时候并不会立刻执行,而是通过一些元数据记录RDD的操作轨迹,在遇到action的时候再推断出最优的解决方案。

四、常见的transformation

  1. map
    除了我们在demo中看到的filter()方法来过滤数据,我们还可以使用map()这种MapReduce时代保留下来的函数。看下面的demo

nums = sc.parallelize([1, 2, 3, 4]) squared = nums.map(lambda x: x * x).collect() for num in squared: print "%i " % (num) 
  1. flatMap()
    与map()很相似的一个方法是flatMap()。map的操作是处理每一行的同时,返回的也是一行数据。 flatMap不一样,它返回的是一个可迭代的对象。也就是说map是一行数据转换成一行数据,flatMap是一行数据转换成多行数据。例如下面的demo

lines = sc.parallelize(["hello world", "hi"]) words = lines.flatMap(lambda line: line.split(" ")) words.first() # returns "hello" words.count() # returns 3 
  1. action
    reduce
    最常用的action操作是我们在MapReduce时期就熟悉reduce操作,此操作是一个聚合方法。demo如下:




rdd = sc.parallelize([1,2,3,4,5]) sum = rdd.reduce(lambda x, y: x + y) 

reduce接受一个函数当做参数,而这个函数也接受两个参数x和y。 这俩个参数代表着RDD中的两行,reduce是聚合函数。 它会不断的将之前计算出的两行传递给函数进行聚合计算。上面demo中的sum为15.因为reduce做了一个累加的操作。

  1. 其他
    此外还有我们早就见过的count(),以及一些其他的例如:

我们上面说过从性能上考虑RDD是延迟计算的,每遇到一个action都会从头开始执行。这样是不够的,因为有的时候我们需要重复使用一个RDD很多次。如果这个RDD的每一个action都要重新载入那么多的数据,那也是很蛋疼的。 所以spark提供了persist函数来让我们缓存RDD。

lines = sc.parallelize(["hello world", "hi"]) a = lines.flatMap(lambda line: line.split(" ")).persist() a.count() a.take(10) 
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/210879.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月18日 下午11:42
下一篇 2026年3月18日 下午11:42


相关推荐

  • Spring Boot默认数据库连接池HikariPool

    Spring Boot默认数据库连接池HikariPoolHikariPool加入了启动Springboot的启动器后看到可以看到连接池是HikariPool,不是常用的C3P0,DBCPSpringBoot1用的是tomcat连接池,2开始就是HikariPool了

    2022年6月23日
    58
  • JVM调优之 -Xms -Xmx -Xmn -Xss[通俗易懂]

    原文地址  http://unixboy.iteye.com/blog/174173堆大小设置JVM中最大堆大小有三方面限制:相关操作系统的数据模型(32-bt还是64-bit)限制;系统的可用虚拟内存限制;系统的可用物理内存限制。32位系统下,一般限制在1.5G~2G;64为操作系统对内存无限制。我在WindowsServer2003系统,3.5G物理内存,J

    2022年4月8日
    105
  • 思科配置VLAN的实例

    思科配置VLAN的实例在我们上面的环境,是一个传统的网络,所有的主机都在一个广播域地址,正是这个原因,使得网络当中的广播包,给整个网络带来巨大的压力。总之,在这种情况下,同一个vlan的主机可以通信,不同一个vlan的主机不可以通信。pc2和pc4同属vlan20,(同一个vlan的主机可以通信)VLAN就是虚拟局域网的意思,它的特点是灵活性高,可扩展性高。好了,我们有关于思科配置VLAN的实例就到这里了,谢谢大家。在sw1上将相应的接口加入到相应的vlan,(在sw2上将相应的接口加入到相应的vlan,(…

    2026年1月24日
    4
  • a卡eth挖矿教程_a卡挖eth用什么内核

    a卡eth挖矿教程_a卡挖eth用什么内核  对于ETH挖矿来说,A卡无疑是最合适的选择,性价比高。如果你只想挖ETH,那选择A卡无疑是最明智的。但是,在使用A卡挖矿的过程中,往往会出现很多难以解决的问题,影响挖矿的效率。其中很大一部分原因是由于A卡本身的特性导致的。很多人在使用A卡挖ETH的过程中会出现这样一种情况:开始一段时间还是正常的运行,但是运行一段时间后就开始报错,导致无法正常挖矿。这是由于A卡具有自动更新的特性。所以在使用A卡…

    2022年10月16日
    5
  • layui 如何去dom_layui 弹出层

    layui 如何去dom_layui 弹出层这是一个可以重要也可以不重要的方法,重要的是,它的权利真的很大,尤其是在模块化加载layer时,你会发现你必须要用到它。它不仅可以配置一些诸如路径、加载的模块,甚至还可以决定整个弹层的默认参数。而说它不重要,是因为多数情况下,你会发现,你似乎不是那么十分需要它。但你真的需要认识一下这位伙计。如果您是采用seajs或者requirejs加载layer,你需要执行该方法来完成初始化的配置。比如:lay…

    2022年6月11日
    38
  • Putty(菩提)远程连接服务器教程听语音

    Putty(菩提)远程连接服务器教程听语音

    2021年10月8日
    59

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号