(精华)转:RDD:创建的几种方式(scala和java)

(精华)转:RDD:创建的几种方式(scala和java)转:https://blog.csdn.net/weixin_38750084/article/details/82769600下面开始初始化sparkspark程序需要做的第一件事情,就是创建一个SparkContext对象,它将告诉spark如何访问一个集群,而要创建一个SparkContext对象,你首先要创建一个SparkConf对象,该对象访问了你的应用程序的信息比如下面的代码是运行在spark模式下 publicclasssparkTestCon{ …

大家好,又见面了,我是你们的朋友全栈君。

转: https://blog.csdn.net/weixin_38750084/article/details/82769600 

这篇文章非常棒, 用代码实际演示了如何创建RDD; 本文主要转载了 java创建RDD的两种方式, 

 

【方式1】

下面开始初始化spark
spark程序需要做的第一件事情,就是创建一个SparkContext对象,它将告诉spark如何访问一个集群,而要创建一个SparkContext对象,你首先要创建一个SparkConf对象,该对象访问了你的应用程序的信息
比如下面的代码是运行在spark模式下

public class sparkTestCon {
 
    public static void main(String[] args) {
        SparkConf conf=new SparkConf();
        conf.set("spark.testing.memory", "2147480000");     //因为jvm无法获得足够的资源
        JavaSparkContext sc = new JavaSparkContext("spark://192.168.52.140:7077", "First Spark App",conf);
        System.out.println(sc);
    }
 
}

下面是运行在本机,把上面的第6行代码改为如下

JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);

RDD的创建有两种方式 
1.引用外部文件系统的数据集(HDFS) 
2.并行化一个已经存在于驱动程序中的集合(并行集合,是通过对于驱动程序中的集合调用JavaSparkContext.parallelize来构建的RDD)

第一种方式创建 
下面通过代码来理解RDD和怎么操作RDD

package com.tg.spark;
 
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
/**
 * 引用外部文件系统的数据集(HDFS)创建RDD
 *  匿名内部类定义函数传给spark
 * @author 汤高
 *
 */
public class RDDOps {
    //完成对所有行的长度求和
    public static void main(String[] args) {
 
        SparkConf conf=new SparkConf();
        conf.set("spark.testing.memory", "2147480000");     //因为jvm无法获得足够的资源
        JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);
        System.out.println(sc);
 
        //通过hdfs上的文件定义一个RDD 这个数据暂时还没有加载到内存,也没有在上面执行动作,lines仅仅指向这个文件
        JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
 
        //定义lineLengths作为Map转换的结果 由于惰性,不会立即计算lineLengths
        //第一个参数为传入的内容,第二个参数为函数操作完后返回的结果类型
        JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
          public Integer call(String s) { 
              System.out.println("每行长度"+s.length());
              return s.length(); }
        });
        //运行reduce  这是一个动作action  这时候,spark才将计算拆分成不同的task,
        //并运行在独立的机器上,每台机器运行他自己的map部分和本地的reducation,并返回结果集给去驱动程序
        int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
          public Integer call(Integer a, Integer b) { return a + b; }
        });
 
        System.out.println(totalLength);
        //为了以后复用  持久化到内存...
        lineLengths.persist(StorageLevel.MEMORY_ONLY());
 
 
    }
}
 

如果觉得刚刚那种写法难以理解,可以看看第二种写法

package com.tg.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
/**
 * 引用外部文件系统的数据集(HDFS)创建RDD 
 *  外部类定义函数传给spark
 * @author 汤高
 *
 */
public class RDDOps2 {
    // 完成对所有行的长度求和
    public static void main(String[] args) {
 
        SparkConf conf = new SparkConf();
        conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
        JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
        System.out.println(sc);
 
 
        //通过hdfs上的文件定义一个RDD 这个数据暂时还没有加载到内存,也没有在上面执行动作,lines仅仅指向这个文件
        JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
        //定义lineLengths作为Map转换的结果 由于惰性,不会立即计算lineLengths
        JavaRDD<Integer> lineLengths = lines.map(new GetLength());
 
 
        //运行reduce  这是一个动作action  这时候,spark才将计算拆分成不同的task,
                //并运行在独立的机器上,每台机器运行他自己的map部分和本地的reducation,并返回结果集给去驱动程序
        int totalLength = lineLengths.reduce(new Sum());
 
        System.out.println("总长度"+totalLength);
        // 为了以后复用 持久化到内存...
        lineLengths.persist(StorageLevel.MEMORY_ONLY());
 
    }
    //定义map函数
    //第一个参数为传入的内容,第二个参数为函数操作完后返回的结果类型
    static class GetLength implements Function<String, Integer> {
        public Integer call(String s) {
            return s.length();
        }
    }
    //定义reduce函数 
    //第一个参数为内容,第三个参数为函数操作完后返回的结果类型
    static class Sum implements Function2<Integer, Integer, Integer> {
        public Integer call(Integer a, Integer b) {
            return a + b;
        }
    }
}

【方式2】 (java编程推荐)

并行化一个已经存在于驱动程序中的集合(并行集合,是通过对于驱动程序中的集合调用JavaSparkContext.parallelize来构建的RDD)

package com.tg.spark;
 
import java.util.Arrays;
import java.util.List;
 
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
 
import com.tg.spark.RDDOps2.GetLength;
import com.tg.spark.RDDOps2.Sum;
/**
 * 并行化一个已经存在于驱动程序中的集合创建RDD
 * @author 汤高
 *
 */
public class RDDOps3 {
    // 完成对所有数求和
    public static void main(String[] args) {
 
        SparkConf conf = new SparkConf();
        conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
        JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
        System.out.println(sc);
 
        List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
        //并行集合,是通过对于驱动程序中的集合调用JavaSparkContext.parallelize来构建的RDD
        JavaRDD<Integer> distData = sc.parallelize(data);
 
        JavaRDD<Integer> lineLengths = distData.map(new GetLength());
 
        // 运行reduce 这是一个动作action 这时候,spark才将计算拆分成不同的task,
        // 并运行在独立的机器上,每台机器运行他自己的map部分和本地的reducation,并返回结果集给去驱动程序
        int totalLength = lineLengths.reduce(new Sum());
 
        System.out.println("总和" + totalLength);
        // 为了以后复用 持久化到内存...
        lineLengths.persist(StorageLevel.MEMORY_ONLY());
 
    }
 
    // 定义map函数
    static class GetLength implements Function<Integer, Integer> {
 
        @Override
        public Integer call(Integer a) throws Exception {
 
            return a;
        }
    }
 
    // 定义reduce函数
    static class Sum implements Function2<Integer, Integer, Integer> {
        public Integer call(Integer a, Integer b) {
            return a + b;
        }
    }
}

注意:上面的写法是基于jdk1.7或者更低版本 
基于jdk1.8有更简单的写法 
下面是官方文档的说明

所以如果要完成上面第一种创建方式,在jdk1.8中可以简单的这么写

JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);


List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

主要不同就是在jdk1.7中我们要自己写一个函数传到map或者reduce方法中,而在jdk1.8中可以直接在map或者reduce方法中写lambda表达式

参考原文:https://blog.csdn.net/tanggao1314/article/details/51570452/

 

扩展:

SparkContext的parallelize的参数

通过调用SparkContext的parallelize方法,在一个已经存在的Scala集合上创建的(一个Seq对象)。集合的对象将会被拷贝,创建出一个可以被并行操作的分布式数据集。

var data = [1, 2, 3, 4, 5]  
var distData = sc.parallelize(data)  

在一个Spark程序的开始部分,有好多是用sparkContext的parallelize制作RDD的,是ParallelCollectionRDD,创建一个并行集合。

例如sc.parallelize(0 until numMappers, numMappers)

创建并行集合的一个重要参数,是slices的数目(例子中是numMappers),它指定了将数据集切分为几份。

在集群模式中,Spark将会在一份slice上起一个Task。典型的,你可以在集群中的每个cpu上,起2-4个Slice (也就是每个cpu分配2-4个Task)。

一般来说,Spark会尝试根据集群的状况,来自动设定slices的数目。当让,也可以手动的设置它,通过parallelize方法的第二个参数。(例如:sc.parallelize(data, 10)).

参考:https://blog.csdn.net/caoli98033/article/details/41777065

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/136308.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • webpack 版本依赖关系[通俗易懂]

    webpack 版本依赖关系[通俗易懂]依赖 版本 新版本 webpack 4 5 webpack-cli 3 4 webpack-dev-server 3 3

    2022年8月10日
    8
  • 怎么完全卸载赛门铁克_如何干净彻底卸载诺顿?

    怎么完全卸载赛门铁克_如何干净彻底卸载诺顿?诺顿有那么难卸载吗?我来教你如何彻底卸载诺顿!本方法跟重装的新系统差不多,没痕迹!很多朋友都遇到过无法卸载诺顿的问题,其实这是有原因的,因為它和操作系统高度整合,所以很多文件会直接注册到系统中,所以比起一般软件来是难点,但这样可以更好的保护您的系统安全。通过以下的方法可以完全卸载掉诺顿,可以顺利的安装诺顿新版本或者其它的杀毒软件。1.单击【开始】-【控制面板】,打开控制面板窗口,单击【添加删除程…

    2022年6月3日
    46
  • ubuntu 安裝deb_ubuntu安装deb

    ubuntu 安裝deb_ubuntu安装deb编程语言中文网今天精心准备的是《ubuntu安装deb》,下面是详解!Ubuntu下如何安装.deb文件我用Vmware安装了Ubuntu系统,打开.deb文件时一直提示错误,提示信息如下:Error:Dependencyisnotsatisfiable:iw我用命令终端输入命令安装也不行…希望各位帮帮忙啊,本人菜鸟一个。。…我用Vmware安装了Ubuntu系统,打开.deb文件时一直提示错…

    2022年6月4日
    29
  • windows查看mysql服务_win10启动错误

    windows查看mysql服务_win10启动错误2.Mysql不同的日志文件。日志文件记如文件中的信息类型log-error(错误日志)记录启动、运行或停止mysql时候出现的问题。log_queries(查询日志)记录建立的客户端连接和执行的语句。log_slave_updates(更新日志)记录更改数据的语句。不赞成使用该日志。log-bin(二进制日志)记录所有更改数据的语句。还用于复制。log_show_queries…

    2022年10月14日
    16
  • Linux三剑客(grep、sed、awk)

    下面所说的是Linux中最重要的三个命令在业界被称为“三剑客”,它们是awk,sed,grep。我们现在知道Linux下一切皆文件,对Linux的操作就是对文件的处理,那么怎么能更好的处理文件呢?这就要用到我们上面的三剑客命令。在说这三个命令前我们要插入一个小插曲就是“正则表达式”。一、正则表达式所谓的正则表达式我个人理解就是正规的表示方法。他是用简…

    2022年4月4日
    83
  • Android布局优化之ViewStub、include、merge使用与源码分析

    Android布局优化之ViewStub、include、merge使用与源码分析在开发中UI布局是我们都会遇到的问题,随着UI越来越多,布局的重复性、复杂度也会随之增长。首先用得最多的应该是include,按照官方的意思,include就是为了解决重复定义相同布局的

    2022年6月28日
    30

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号