(精华)转:RDD:创建的几种方式(scala和java)

(精华)转:RDD:创建的几种方式(scala和java)转:https://blog.csdn.net/weixin_38750084/article/details/82769600下面开始初始化sparkspark程序需要做的第一件事情,就是创建一个SparkContext对象,它将告诉spark如何访问一个集群,而要创建一个SparkContext对象,你首先要创建一个SparkConf对象,该对象访问了你的应用程序的信息比如下面的代码是运行在spark模式下 publicclasssparkTestCon{ …

大家好,又见面了,我是你们的朋友全栈君。

转: https://blog.csdn.net/weixin_38750084/article/details/82769600 

这篇文章非常棒, 用代码实际演示了如何创建RDD; 本文主要转载了 java创建RDD的两种方式, 

 

【方式1】

下面开始初始化spark
spark程序需要做的第一件事情,就是创建一个SparkContext对象,它将告诉spark如何访问一个集群,而要创建一个SparkContext对象,你首先要创建一个SparkConf对象,该对象访问了你的应用程序的信息
比如下面的代码是运行在spark模式下

public class sparkTestCon {
 
    public static void main(String[] args) {
        SparkConf conf=new SparkConf();
        conf.set("spark.testing.memory", "2147480000");     //因为jvm无法获得足够的资源
        JavaSparkContext sc = new JavaSparkContext("spark://192.168.52.140:7077", "First Spark App",conf);
        System.out.println(sc);
    }
 
}

下面是运行在本机,把上面的第6行代码改为如下

JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);

RDD的创建有两种方式 
1.引用外部文件系统的数据集(HDFS) 
2.并行化一个已经存在于驱动程序中的集合(并行集合,是通过对于驱动程序中的集合调用JavaSparkContext.parallelize来构建的RDD)

第一种方式创建 
下面通过代码来理解RDD和怎么操作RDD

package com.tg.spark;
 
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
/**
 * 引用外部文件系统的数据集(HDFS)创建RDD
 *  匿名内部类定义函数传给spark
 * @author 汤高
 *
 */
public class RDDOps {
    //完成对所有行的长度求和
    public static void main(String[] args) {
 
        SparkConf conf=new SparkConf();
        conf.set("spark.testing.memory", "2147480000");     //因为jvm无法获得足够的资源
        JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);
        System.out.println(sc);
 
        //通过hdfs上的文件定义一个RDD 这个数据暂时还没有加载到内存,也没有在上面执行动作,lines仅仅指向这个文件
        JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
 
        //定义lineLengths作为Map转换的结果 由于惰性,不会立即计算lineLengths
        //第一个参数为传入的内容,第二个参数为函数操作完后返回的结果类型
        JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
          public Integer call(String s) { 
              System.out.println("每行长度"+s.length());
              return s.length(); }
        });
        //运行reduce  这是一个动作action  这时候,spark才将计算拆分成不同的task,
        //并运行在独立的机器上,每台机器运行他自己的map部分和本地的reducation,并返回结果集给去驱动程序
        int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
          public Integer call(Integer a, Integer b) { return a + b; }
        });
 
        System.out.println(totalLength);
        //为了以后复用  持久化到内存...
        lineLengths.persist(StorageLevel.MEMORY_ONLY());
 
 
    }
}
 

如果觉得刚刚那种写法难以理解,可以看看第二种写法

package com.tg.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
/**
 * 引用外部文件系统的数据集(HDFS)创建RDD 
 *  外部类定义函数传给spark
 * @author 汤高
 *
 */
public class RDDOps2 {
    // 完成对所有行的长度求和
    public static void main(String[] args) {
 
        SparkConf conf = new SparkConf();
        conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
        JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
        System.out.println(sc);
 
 
        //通过hdfs上的文件定义一个RDD 这个数据暂时还没有加载到内存,也没有在上面执行动作,lines仅仅指向这个文件
        JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
        //定义lineLengths作为Map转换的结果 由于惰性,不会立即计算lineLengths
        JavaRDD<Integer> lineLengths = lines.map(new GetLength());
 
 
        //运行reduce  这是一个动作action  这时候,spark才将计算拆分成不同的task,
                //并运行在独立的机器上,每台机器运行他自己的map部分和本地的reducation,并返回结果集给去驱动程序
        int totalLength = lineLengths.reduce(new Sum());
 
        System.out.println("总长度"+totalLength);
        // 为了以后复用 持久化到内存...
        lineLengths.persist(StorageLevel.MEMORY_ONLY());
 
    }
    //定义map函数
    //第一个参数为传入的内容,第二个参数为函数操作完后返回的结果类型
    static class GetLength implements Function<String, Integer> {
        public Integer call(String s) {
            return s.length();
        }
    }
    //定义reduce函数 
    //第一个参数为内容,第三个参数为函数操作完后返回的结果类型
    static class Sum implements Function2<Integer, Integer, Integer> {
        public Integer call(Integer a, Integer b) {
            return a + b;
        }
    }
}

【方式2】 (java编程推荐)

并行化一个已经存在于驱动程序中的集合(并行集合,是通过对于驱动程序中的集合调用JavaSparkContext.parallelize来构建的RDD)

package com.tg.spark;
 
import java.util.Arrays;
import java.util.List;
 
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
 
import com.tg.spark.RDDOps2.GetLength;
import com.tg.spark.RDDOps2.Sum;
/**
 * 并行化一个已经存在于驱动程序中的集合创建RDD
 * @author 汤高
 *
 */
public class RDDOps3 {
    // 完成对所有数求和
    public static void main(String[] args) {
 
        SparkConf conf = new SparkConf();
        conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
        JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
        System.out.println(sc);
 
        List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
        //并行集合,是通过对于驱动程序中的集合调用JavaSparkContext.parallelize来构建的RDD
        JavaRDD<Integer> distData = sc.parallelize(data);
 
        JavaRDD<Integer> lineLengths = distData.map(new GetLength());
 
        // 运行reduce 这是一个动作action 这时候,spark才将计算拆分成不同的task,
        // 并运行在独立的机器上,每台机器运行他自己的map部分和本地的reducation,并返回结果集给去驱动程序
        int totalLength = lineLengths.reduce(new Sum());
 
        System.out.println("总和" + totalLength);
        // 为了以后复用 持久化到内存...
        lineLengths.persist(StorageLevel.MEMORY_ONLY());
 
    }
 
    // 定义map函数
    static class GetLength implements Function<Integer, Integer> {
 
        @Override
        public Integer call(Integer a) throws Exception {
 
            return a;
        }
    }
 
    // 定义reduce函数
    static class Sum implements Function2<Integer, Integer, Integer> {
        public Integer call(Integer a, Integer b) {
            return a + b;
        }
    }
}

注意:上面的写法是基于jdk1.7或者更低版本 
基于jdk1.8有更简单的写法 
下面是官方文档的说明

所以如果要完成上面第一种创建方式,在jdk1.8中可以简单的这么写

JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);


List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

主要不同就是在jdk1.7中我们要自己写一个函数传到map或者reduce方法中,而在jdk1.8中可以直接在map或者reduce方法中写lambda表达式

参考原文:https://blog.csdn.net/tanggao1314/article/details/51570452/

 

扩展:

SparkContext的parallelize的参数

通过调用SparkContext的parallelize方法,在一个已经存在的Scala集合上创建的(一个Seq对象)。集合的对象将会被拷贝,创建出一个可以被并行操作的分布式数据集。

var data = [1, 2, 3, 4, 5]  
var distData = sc.parallelize(data)  

在一个Spark程序的开始部分,有好多是用sparkContext的parallelize制作RDD的,是ParallelCollectionRDD,创建一个并行集合。

例如sc.parallelize(0 until numMappers, numMappers)

创建并行集合的一个重要参数,是slices的数目(例子中是numMappers),它指定了将数据集切分为几份。

在集群模式中,Spark将会在一份slice上起一个Task。典型的,你可以在集群中的每个cpu上,起2-4个Slice (也就是每个cpu分配2-4个Task)。

一般来说,Spark会尝试根据集群的状况,来自动设定slices的数目。当让,也可以手动的设置它,通过parallelize方法的第二个参数。(例如:sc.parallelize(data, 10)).

参考:https://blog.csdn.net/caoli98033/article/details/41777065

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/136308.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • android toast点击事件_android生命周期七种方法

    android toast点击事件_android生命周期七种方法设置AndroidToast持续时间非常长(例如1分钟)(SetAndroidToastdurationtobereallylong(e.g.,1minute))我尝试将我的Toast节目时间设置为1分钟。我试试这个:finalToasttoast=Toast.makeText(getApplicationContext(),”MESSAGE”,Toast.LE…

    2022年9月12日
    0
  • OnContextMenu事件

    OnContextMenu事件用oncontextmenu事件单禁用右键菜单 一个页面中,BODY中用oncontextmenu='returnfalse'来取消鼠标右键;在JS中设置oncontext

    2022年7月2日
    24
  • 安全帽子识别

    安全帽子识别https://www.jianshu.com/p/55bd49c22cf4

    2022年5月20日
    37
  • 研究了一天的ant,

    研究了一天的ant,

    2021年4月24日
    171
  • Python实现久坐提醒小助手程序「建议收藏」

    Python实现久坐提醒小助手程序「建议收藏」不论是日常的工作还是学习,现代年轻人在电脑屏幕时长数据能让人惊掉下巴,继而引发一系列身体不适的现象。小李也是久坐族中的一员,为了时刻提醒自己起来活动活动,我开发了一款基于PythonGU…

    2022年9月26日
    0
  • JVM优化[通俗易懂]

    JVM优化[通俗易懂]为什么要进行JVM优化?在本地开发环境中我们很少有需求对JVM进行优化,但是到了生产环境我们的程序可能出现如下问题:运行的引用“卡住了”,日志不输出,程序没反应服务器的CPU负载突然升高在多线程应用下,如何合理的分配线程的数量。。。。。。。。。通过Java-server和java-client设置JVM的运行参数serverVM的初始堆空间会大一些,默认使用的是并行垃圾回…

    2022年4月28日
    46

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号