(精华)转:RDD:创建的几种方式(scala和java)

(精华)转:RDD:创建的几种方式(scala和java)转:https://blog.csdn.net/weixin_38750084/article/details/82769600下面开始初始化sparkspark程序需要做的第一件事情,就是创建一个SparkContext对象,它将告诉spark如何访问一个集群,而要创建一个SparkContext对象,你首先要创建一个SparkConf对象,该对象访问了你的应用程序的信息比如下面的代码是运行在spark模式下 publicclasssparkTestCon{ …

大家好,又见面了,我是你们的朋友全栈君。

转: https://blog.csdn.net/weixin_38750084/article/details/82769600 

这篇文章非常棒, 用代码实际演示了如何创建RDD; 本文主要转载了 java创建RDD的两种方式, 

 

【方式1】

下面开始初始化spark
spark程序需要做的第一件事情,就是创建一个SparkContext对象,它将告诉spark如何访问一个集群,而要创建一个SparkContext对象,你首先要创建一个SparkConf对象,该对象访问了你的应用程序的信息
比如下面的代码是运行在spark模式下

public class sparkTestCon {
 
    public static void main(String[] args) {
        SparkConf conf=new SparkConf();
        conf.set("spark.testing.memory", "2147480000");     //因为jvm无法获得足够的资源
        JavaSparkContext sc = new JavaSparkContext("spark://192.168.52.140:7077", "First Spark App",conf);
        System.out.println(sc);
    }
 
}

下面是运行在本机,把上面的第6行代码改为如下

JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);

RDD的创建有两种方式 
1.引用外部文件系统的数据集(HDFS) 
2.并行化一个已经存在于驱动程序中的集合(并行集合,是通过对于驱动程序中的集合调用JavaSparkContext.parallelize来构建的RDD)

第一种方式创建 
下面通过代码来理解RDD和怎么操作RDD

package com.tg.spark;
 
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
/**
 * 引用外部文件系统的数据集(HDFS)创建RDD
 *  匿名内部类定义函数传给spark
 * @author 汤高
 *
 */
public class RDDOps {
    //完成对所有行的长度求和
    public static void main(String[] args) {
 
        SparkConf conf=new SparkConf();
        conf.set("spark.testing.memory", "2147480000");     //因为jvm无法获得足够的资源
        JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);
        System.out.println(sc);
 
        //通过hdfs上的文件定义一个RDD 这个数据暂时还没有加载到内存,也没有在上面执行动作,lines仅仅指向这个文件
        JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
 
        //定义lineLengths作为Map转换的结果 由于惰性,不会立即计算lineLengths
        //第一个参数为传入的内容,第二个参数为函数操作完后返回的结果类型
        JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
          public Integer call(String s) { 
              System.out.println("每行长度"+s.length());
              return s.length(); }
        });
        //运行reduce  这是一个动作action  这时候,spark才将计算拆分成不同的task,
        //并运行在独立的机器上,每台机器运行他自己的map部分和本地的reducation,并返回结果集给去驱动程序
        int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
          public Integer call(Integer a, Integer b) { return a + b; }
        });
 
        System.out.println(totalLength);
        //为了以后复用  持久化到内存...
        lineLengths.persist(StorageLevel.MEMORY_ONLY());
 
 
    }
}
 

如果觉得刚刚那种写法难以理解,可以看看第二种写法

package com.tg.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
/**
 * 引用外部文件系统的数据集(HDFS)创建RDD 
 *  外部类定义函数传给spark
 * @author 汤高
 *
 */
public class RDDOps2 {
    // 完成对所有行的长度求和
    public static void main(String[] args) {
 
        SparkConf conf = new SparkConf();
        conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
        JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
        System.out.println(sc);
 
 
        //通过hdfs上的文件定义一个RDD 这个数据暂时还没有加载到内存,也没有在上面执行动作,lines仅仅指向这个文件
        JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
        //定义lineLengths作为Map转换的结果 由于惰性,不会立即计算lineLengths
        JavaRDD<Integer> lineLengths = lines.map(new GetLength());
 
 
        //运行reduce  这是一个动作action  这时候,spark才将计算拆分成不同的task,
                //并运行在独立的机器上,每台机器运行他自己的map部分和本地的reducation,并返回结果集给去驱动程序
        int totalLength = lineLengths.reduce(new Sum());
 
        System.out.println("总长度"+totalLength);
        // 为了以后复用 持久化到内存...
        lineLengths.persist(StorageLevel.MEMORY_ONLY());
 
    }
    //定义map函数
    //第一个参数为传入的内容,第二个参数为函数操作完后返回的结果类型
    static class GetLength implements Function<String, Integer> {
        public Integer call(String s) {
            return s.length();
        }
    }
    //定义reduce函数 
    //第一个参数为内容,第三个参数为函数操作完后返回的结果类型
    static class Sum implements Function2<Integer, Integer, Integer> {
        public Integer call(Integer a, Integer b) {
            return a + b;
        }
    }
}

【方式2】 (java编程推荐)

并行化一个已经存在于驱动程序中的集合(并行集合,是通过对于驱动程序中的集合调用JavaSparkContext.parallelize来构建的RDD)

package com.tg.spark;
 
import java.util.Arrays;
import java.util.List;
 
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
 
import com.tg.spark.RDDOps2.GetLength;
import com.tg.spark.RDDOps2.Sum;
/**
 * 并行化一个已经存在于驱动程序中的集合创建RDD
 * @author 汤高
 *
 */
public class RDDOps3 {
    // 完成对所有数求和
    public static void main(String[] args) {
 
        SparkConf conf = new SparkConf();
        conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
        JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
        System.out.println(sc);
 
        List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
        //并行集合,是通过对于驱动程序中的集合调用JavaSparkContext.parallelize来构建的RDD
        JavaRDD<Integer> distData = sc.parallelize(data);
 
        JavaRDD<Integer> lineLengths = distData.map(new GetLength());
 
        // 运行reduce 这是一个动作action 这时候,spark才将计算拆分成不同的task,
        // 并运行在独立的机器上,每台机器运行他自己的map部分和本地的reducation,并返回结果集给去驱动程序
        int totalLength = lineLengths.reduce(new Sum());
 
        System.out.println("总和" + totalLength);
        // 为了以后复用 持久化到内存...
        lineLengths.persist(StorageLevel.MEMORY_ONLY());
 
    }
 
    // 定义map函数
    static class GetLength implements Function<Integer, Integer> {
 
        @Override
        public Integer call(Integer a) throws Exception {
 
            return a;
        }
    }
 
    // 定义reduce函数
    static class Sum implements Function2<Integer, Integer, Integer> {
        public Integer call(Integer a, Integer b) {
            return a + b;
        }
    }
}

注意:上面的写法是基于jdk1.7或者更低版本 
基于jdk1.8有更简单的写法 
下面是官方文档的说明

所以如果要完成上面第一种创建方式,在jdk1.8中可以简单的这么写

JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);


List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

主要不同就是在jdk1.7中我们要自己写一个函数传到map或者reduce方法中,而在jdk1.8中可以直接在map或者reduce方法中写lambda表达式

参考原文:https://blog.csdn.net/tanggao1314/article/details/51570452/

 

扩展:

SparkContext的parallelize的参数

通过调用SparkContext的parallelize方法,在一个已经存在的Scala集合上创建的(一个Seq对象)。集合的对象将会被拷贝,创建出一个可以被并行操作的分布式数据集。

var data = [1, 2, 3, 4, 5]  
var distData = sc.parallelize(data)  

在一个Spark程序的开始部分,有好多是用sparkContext的parallelize制作RDD的,是ParallelCollectionRDD,创建一个并行集合。

例如sc.parallelize(0 until numMappers, numMappers)

创建并行集合的一个重要参数,是slices的数目(例子中是numMappers),它指定了将数据集切分为几份。

在集群模式中,Spark将会在一份slice上起一个Task。典型的,你可以在集群中的每个cpu上,起2-4个Slice (也就是每个cpu分配2-4个Task)。

一般来说,Spark会尝试根据集群的状况,来自动设定slices的数目。当让,也可以手动的设置它,通过parallelize方法的第二个参数。(例如:sc.parallelize(data, 10)).

参考:https://blog.csdn.net/caoli98033/article/details/41777065

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/136308.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • c++–继承

    c++–继承

    2021年9月29日
    36
  • 计算机三级网络技术大题暴力做题法

    计算机三级网络技术大题暴力做题法

    2021年9月28日
    57
  • 代理模式proxy_反向代理是什么

    代理模式proxy_反向代理是什么代理模式 Proxy动机模式定义实例结构要点总结笔记动机在面向对象系统中,由于某种原因(比如对象创建的开销很大,或者某些操作需要安全控制,或者需要进程额外的访问等),直接访问会给使用者,或者系统结构带来很多麻烦.如何在不是去透明操作对象的同时来管理/控制这些对象特有的复杂性?增加一层间接曾是软件开发中常见的解决方式模式定义为其他对象提供一种代理以控制(隔离,使用接口)对这个对象的访问实例朴素客户端要去使用process 但是process周围需要做很多事情class ISubject{p

    2022年8月9日
    9
  • MOS管电平转换电路学习

    MOS管电平转换电路学习一个MOSFET电平转换电路引发的问题

    2022年10月19日
    2
  • python3.9多线程_python创建多线程

    python3.9多线程_python创建多线程什么是线程?线程也叫轻量级进程,是操作系统能够进行运算调度的最小单位,它被包涵在进程之中,是进程中的实际运作单位。线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个进程的其

    2022年7月28日
    8
  • latex中bibtex生成参考文献_英文参考文献自动生成方法

    latex中bibtex生成参考文献_英文参考文献自动生成方法创建BibTeX文件  BibTex是一种专门用于协调LaTeX的参考文献处理的文件格式,其后缀为.bib。BibTex可以用JabRef,bibtex等软件生成,也可以直接用记事本创建,操作起来非常灵活,这里主要介绍用JabRef来生成BibTex文件。JabRef是一套开放源代码、处理BibTeX格式的文献管理软件,提供了简易操作的界面来编辑BibTeX档案,功能包含从网络上的科学数据库汇…

    2025年8月30日
    5

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号