前言:我们来学习Spark基础吧!
一、搭建学习环境
1、下载spark
2、python-shell
我们运行bin/pyspark之后就进入了spark的python shell。我们为了验证是否成功了,可以运行下面的代码
lines = sc.textFile("README.md") print lines.first()
接下来就会看到打印出一条信息:# Apache Spark。 spark提供的python shell是我们良好的学习平台。我们可以在里面随意的调用spark提供的API。
3、IDE环境
可能有些同学已经习惯了IDE带来的好处(例如我),所以也希望能通过IDE来进行学习和开发。 但是spark并没有提供任何python 模块给我们下载使用, 也就是说,你无法通过pip install的方式下载spark模块。 这一点就不如java和scala了,maven是可以直接集成spark的。 所以我们要做一点额外的事情以让pycharm能够拥有开发spark程序的能力。
- 在pycharm找到Project Structure 把解压的目录中的python目录加进去

- 添加run–>Edit configurations。 添加一个运行配置。并配置SPARK_HOME环境变量为解压目录。然后配置PYTHONPATH环境变量为解压目录中的python目录。

然后各位就可以在pycharm上编写spark代码并运行了。
"""SimpleApp""" from pyspark import SparkContext logFile = "/Users/sungaofei/Documents/spark/README.md" sc = SparkContext("local","Simple App") logData = sc.textFile(logFile).cache() numAs = logData.filter(lambda s: 'a' in s).count() numBs = logData.filter(lambda s: 'b' in s).count() temp = logData.first() print temp print("Lines with a: %i, lines with b: %i"%(numAs, numBs)) 二、从demo中学习
"""SimpleApp""" from pyspark import SparkContext logFile = "/Users/sungaofei/Documents/spark/README.md" conf = SparkConf().setMaster("local").setAppName("My App") sc = SparkContext(conf = conf) logData = sc.textFile(logFile).cache() numAs = logData.filter(lambda s: 'a' in s).count() numBs = logData.filter(lambda s: 'b' in s).count() temp = logData.first() print temp print("Lines with a: %i, lines with b: %i"%(numAs, numBs)) 三、RDD基础
上面提到过RDD,它是spark定义的固定不变的分布式数据集合
- 说它固定不变是因为它一经创建后你就无法改变它的内容了。你只能通过当前的RDD调用一些方法来生成新的RDD,但是你永远都无法真正改变一个RDD的数据。例如刚才的demo,我们调用filter方法过滤掉一些数据,但我们并没有改变原有RDD的数据,你在其他地方调用原RDD的时候仍然是全量的未经过滤的数据。 filter方法返回的是一个新的RDD
- 说它是分布式数据集合是因为每一个RDD都由多个partitions(分片)组成。上一篇我们讲到HDFS,所以首先数据是分布在不同的机器上的。在spark读取数据的时候会根据一定的规则(可以是默认64M一个partition,也可以指定partition数量)。 这些分布在不同partition也就是数据分片组成了RDD。spark在运行的时候,每个partition都会生成一个task。他们会跑在不同的计算资源上。我们知道java中万物皆对象,在spark中所有数据皆RDD。可以说RDD就是spark的一切,就如MapReduce就是Haddop的一切一样。
我们可以使用两种方式创建RDD
- 通过sc.textFile()从外部文件中读取。就如我们的demo一样
- 通过从一个集合中初始化一个RDD。如下:
lines = sc.parallelize(["pandas", "i like pandas"])
1、transformations and actions
RDD支持两种操作,transformation和action。ransformation的操会返回一个新的RDD,就如我们在demo中看到的filter()方法,是一种组织和准备数据的方式。为之后的action执行计算提供数据基础。action的操作是真正产生一个计算操作的过程。例如demo中的count()。 action不会返回一个RDD,它会返回实际的操作结果或者将数据保存到外部文件中。spark提供了很多函数,如果你分不清哪些是transformation哪些是action。 只要看它的返回值就好了。返回一个新RDD的就是transformation,不返回的就是action。 区分一个函数是哪种操作很重要,因为spark处理这两种操作的方式很不一样。
- transformation
transformation是一种返回一个新的RDD的方法。它遵循延迟计算的规则。也就是说spark在运行的时候遇到transformation的时候并不会真正的执行它,直到碰到一个action的时候才会真正的执行。我们稍后会专门讨论延迟计算的规则。这里我们知道有这个概念就好。大部分transformation都是按行元素处理,就是说他们同一时间只处理一行数据(有少数transformation不是的)。就像上面说的,spark大部分的函数都是函数式编程,要求我们传递一个函数作为参数。那么所有transformation都是需要传递至少一个函数作为参数的, 这个参数就是我们指定的如何处理数据的逻辑。spark会将数据拆成一行一行的并作为参数调用我们指定的函数。就如demo中的filter,spark会将RDD的每一行作为参数传递给我们自定义的函数。
- action
像之前说的RDD可以使用很多的transformation来组织和准备数据,但是光准备数据还是不行得,我们终究要用数据计算一些东西,这时候就需要我们的action,就如我们demo中的count()用来计算数据的行数. 我们还可以使用frist()取出第一条数据,用take(n)来取出前n条数据,saveAsTextFile()用来把数据存储到外部文件。也就是说action是我们真正使用数据来进行计算的方式,真正实现数据的价值的方式。
- 延迟计算
之前提到过,transformation的操作是延迟计算的。意思是说spark在运行的时候,运行到transformation的时候实际上并不会真正的执行transformations。直到碰到了这个RDD的action的时候,才会一股脑的执行之前所有的操作。也许这对刚接触大数据处理的同学来说有点难以理解,但如果我们仔细的想一想就会发现其实这样的设计相当的合理。 因为我们在实际情况中面对的是非常庞大的数据。如果我们在一开始就执行所有的数据操作并将数据载入内存中那将是一种很大的浪费。例如在demo中,如果我们使用的不是count这种操作全部数据的方式而是使用了first()或者take(n)这种只取了一部分数据的操作。那么事先就执行transformation的操作并将所有数据载入内存的话,那将是极大的浪费。所以取而代之的,spark在每次遇到transformation的时候并不会立刻执行,而是通过一些元数据记录RDD的操作轨迹,在遇到action的时候再推断出最优的解决方案。
四、常见的transformation
- map
除了我们在demo中看到的filter()方法来过滤数据,我们还可以使用map()这种MapReduce时代保留下来的函数。看下面的demo
nums = sc.parallelize([1, 2, 3, 4]) squared = nums.map(lambda x: x * x).collect() for num in squared: print "%i " % (num)
- flatMap()
与map()很相似的一个方法是flatMap()。map的操作是处理每一行的同时,返回的也是一行数据。 flatMap不一样,它返回的是一个可迭代的对象。也就是说map是一行数据转换成一行数据,flatMap是一行数据转换成多行数据。例如下面的demo
lines = sc.parallelize(["hello world", "hi"]) words = lines.flatMap(lambda line: line.split(" ")) words.first() # returns "hello" words.count() # returns 3
- action
reduce
最常用的action操作是我们在MapReduce时期就熟悉reduce操作,此操作是一个聚合方法。demo如下:
rdd = sc.parallelize([1,2,3,4,5]) sum = rdd.reduce(lambda x, y: x + y) reduce接受一个函数当做参数,而这个函数也接受两个参数x和y。 这俩个参数代表着RDD中的两行,reduce是聚合函数。 它会不断的将之前计算出的两行传递给函数进行聚合计算。上面demo中的sum为15.因为reduce做了一个累加的操作。
- 其他
此外还有我们早就见过的count(),以及一些其他的例如:
我们上面说过从性能上考虑RDD是延迟计算的,每遇到一个action都会从头开始执行。这样是不够的,因为有的时候我们需要重复使用一个RDD很多次。如果这个RDD的每一个action都要重新载入那么多的数据,那也是很蛋疼的。 所以spark提供了persist函数来让我们缓存RDD。
lines = sc.parallelize(["hello world", "hi"]) a = lines.flatMap(lambda line: line.split(" ")).persist() a.count() a.take(10)
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/210879.html原文链接:https://javaforall.net
