(精华)转:RDD:创建的几种方式(scala和java)

(精华)转:RDD:创建的几种方式(scala和java)转:https://blog.csdn.net/weixin_38750084/article/details/82769600下面开始初始化sparkspark程序需要做的第一件事情,就是创建一个SparkContext对象,它将告诉spark如何访问一个集群,而要创建一个SparkContext对象,你首先要创建一个SparkConf对象,该对象访问了你的应用程序的信息比如下面的代码是运行在spark模式下 publicclasssparkTestCon{ …

大家好,又见面了,我是你们的朋友全栈君。

转: https://blog.csdn.net/weixin_38750084/article/details/82769600 

这篇文章非常棒, 用代码实际演示了如何创建RDD; 本文主要转载了 java创建RDD的两种方式, 

 

【方式1】

下面开始初始化spark
spark程序需要做的第一件事情,就是创建一个SparkContext对象,它将告诉spark如何访问一个集群,而要创建一个SparkContext对象,你首先要创建一个SparkConf对象,该对象访问了你的应用程序的信息
比如下面的代码是运行在spark模式下

public class sparkTestCon {
 
    public static void main(String[] args) {
        SparkConf conf=new SparkConf();
        conf.set("spark.testing.memory", "2147480000");     //因为jvm无法获得足够的资源
        JavaSparkContext sc = new JavaSparkContext("spark://192.168.52.140:7077", "First Spark App",conf);
        System.out.println(sc);
    }
 
}

下面是运行在本机,把上面的第6行代码改为如下

JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);

RDD的创建有两种方式 
1.引用外部文件系统的数据集(HDFS) 
2.并行化一个已经存在于驱动程序中的集合(并行集合,是通过对于驱动程序中的集合调用JavaSparkContext.parallelize来构建的RDD)

第一种方式创建 
下面通过代码来理解RDD和怎么操作RDD

package com.tg.spark;
 
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
/**
 * 引用外部文件系统的数据集(HDFS)创建RDD
 *  匿名内部类定义函数传给spark
 * @author 汤高
 *
 */
public class RDDOps {
    //完成对所有行的长度求和
    public static void main(String[] args) {
 
        SparkConf conf=new SparkConf();
        conf.set("spark.testing.memory", "2147480000");     //因为jvm无法获得足够的资源
        JavaSparkContext sc = new JavaSparkContext("local", "First Spark App",conf);
        System.out.println(sc);
 
        //通过hdfs上的文件定义一个RDD 这个数据暂时还没有加载到内存,也没有在上面执行动作,lines仅仅指向这个文件
        JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
 
        //定义lineLengths作为Map转换的结果 由于惰性,不会立即计算lineLengths
        //第一个参数为传入的内容,第二个参数为函数操作完后返回的结果类型
        JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
          public Integer call(String s) { 
              System.out.println("每行长度"+s.length());
              return s.length(); }
        });
        //运行reduce  这是一个动作action  这时候,spark才将计算拆分成不同的task,
        //并运行在独立的机器上,每台机器运行他自己的map部分和本地的reducation,并返回结果集给去驱动程序
        int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
          public Integer call(Integer a, Integer b) { return a + b; }
        });
 
        System.out.println(totalLength);
        //为了以后复用  持久化到内存...
        lineLengths.persist(StorageLevel.MEMORY_ONLY());
 
 
    }
}
 

如果觉得刚刚那种写法难以理解,可以看看第二种写法

package com.tg.spark;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
/**
 * 引用外部文件系统的数据集(HDFS)创建RDD 
 *  外部类定义函数传给spark
 * @author 汤高
 *
 */
public class RDDOps2 {
    // 完成对所有行的长度求和
    public static void main(String[] args) {
 
        SparkConf conf = new SparkConf();
        conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
        JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
        System.out.println(sc);
 
 
        //通过hdfs上的文件定义一个RDD 这个数据暂时还没有加载到内存,也没有在上面执行动作,lines仅仅指向这个文件
        JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
        //定义lineLengths作为Map转换的结果 由于惰性,不会立即计算lineLengths
        JavaRDD<Integer> lineLengths = lines.map(new GetLength());
 
 
        //运行reduce  这是一个动作action  这时候,spark才将计算拆分成不同的task,
                //并运行在独立的机器上,每台机器运行他自己的map部分和本地的reducation,并返回结果集给去驱动程序
        int totalLength = lineLengths.reduce(new Sum());
 
        System.out.println("总长度"+totalLength);
        // 为了以后复用 持久化到内存...
        lineLengths.persist(StorageLevel.MEMORY_ONLY());
 
    }
    //定义map函数
    //第一个参数为传入的内容,第二个参数为函数操作完后返回的结果类型
    static class GetLength implements Function<String, Integer> {
        public Integer call(String s) {
            return s.length();
        }
    }
    //定义reduce函数 
    //第一个参数为内容,第三个参数为函数操作完后返回的结果类型
    static class Sum implements Function2<Integer, Integer, Integer> {
        public Integer call(Integer a, Integer b) {
            return a + b;
        }
    }
}

【方式2】 (java编程推荐)

并行化一个已经存在于驱动程序中的集合(并行集合,是通过对于驱动程序中的集合调用JavaSparkContext.parallelize来构建的RDD)

package com.tg.spark;
 
import java.util.Arrays;
import java.util.List;
 
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
 
import com.tg.spark.RDDOps2.GetLength;
import com.tg.spark.RDDOps2.Sum;
/**
 * 并行化一个已经存在于驱动程序中的集合创建RDD
 * @author 汤高
 *
 */
public class RDDOps3 {
    // 完成对所有数求和
    public static void main(String[] args) {
 
        SparkConf conf = new SparkConf();
        conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
        JavaSparkContext sc = new JavaSparkContext("local", "First Spark App", conf);
        System.out.println(sc);
 
        List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
        //并行集合,是通过对于驱动程序中的集合调用JavaSparkContext.parallelize来构建的RDD
        JavaRDD<Integer> distData = sc.parallelize(data);
 
        JavaRDD<Integer> lineLengths = distData.map(new GetLength());
 
        // 运行reduce 这是一个动作action 这时候,spark才将计算拆分成不同的task,
        // 并运行在独立的机器上,每台机器运行他自己的map部分和本地的reducation,并返回结果集给去驱动程序
        int totalLength = lineLengths.reduce(new Sum());
 
        System.out.println("总和" + totalLength);
        // 为了以后复用 持久化到内存...
        lineLengths.persist(StorageLevel.MEMORY_ONLY());
 
    }
 
    // 定义map函数
    static class GetLength implements Function<Integer, Integer> {
 
        @Override
        public Integer call(Integer a) throws Exception {
 
            return a;
        }
    }
 
    // 定义reduce函数
    static class Sum implements Function2<Integer, Integer, Integer> {
        public Integer call(Integer a, Integer b) {
            return a + b;
        }
    }
}

注意:上面的写法是基于jdk1.7或者更低版本 
基于jdk1.8有更简单的写法 
下面是官方文档的说明

所以如果要完成上面第一种创建方式,在jdk1.8中可以简单的这么写

JavaRDD<String> lines = sc.textFile("hdfs://master:9000/testFile/README.md");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);


List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

主要不同就是在jdk1.7中我们要自己写一个函数传到map或者reduce方法中,而在jdk1.8中可以直接在map或者reduce方法中写lambda表达式

参考原文:https://blog.csdn.net/tanggao1314/article/details/51570452/

 

扩展:

SparkContext的parallelize的参数

通过调用SparkContext的parallelize方法,在一个已经存在的Scala集合上创建的(一个Seq对象)。集合的对象将会被拷贝,创建出一个可以被并行操作的分布式数据集。

var data = [1, 2, 3, 4, 5]  
var distData = sc.parallelize(data)  

在一个Spark程序的开始部分,有好多是用sparkContext的parallelize制作RDD的,是ParallelCollectionRDD,创建一个并行集合。

例如sc.parallelize(0 until numMappers, numMappers)

创建并行集合的一个重要参数,是slices的数目(例子中是numMappers),它指定了将数据集切分为几份。

在集群模式中,Spark将会在一份slice上起一个Task。典型的,你可以在集群中的每个cpu上,起2-4个Slice (也就是每个cpu分配2-4个Task)。

一般来说,Spark会尝试根据集群的状况,来自动设定slices的数目。当让,也可以手动的设置它,通过parallelize方法的第二个参数。(例如:sc.parallelize(data, 10)).

参考:https://blog.csdn.net/caoli98033/article/details/41777065

 

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/136308.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 在IDEA中创建maven项目

    在IDEA中创建maven项目在IDEA中创建maven项目  现在的JavaWeb项目中,绝大多数都是采用的maven结构的项目,而对于maven支持的最好的IDE开发工具为IDEA,所以说我就以在IDEA上为例来进行maven开发的讲解。

    2022年6月22日
    54
  • mybatis trim标签

    mybatis trim标签mybatis的trim标签一般用于动态sql语句中去除多余的and关键字,逗号等以下是trim标签中涉及到的属性:属性描述prefix给sql语句拼接的前缀suffix给sql语句拼接的后缀prefixOverrides去除前缀suffixOverrides去除后缀案例trim实现insert into ()values()<insert id=”insert” parameterType=”InfoStudent”> .

    2022年8月8日
    5
  • Maven项目缺少Maven Dependencies解决方法总结

    Maven项目缺少Maven Dependencies解决方法总结

    2021年4月9日
    129
  • std::vector初始化[通俗易懂]

    std::vector初始化[通俗易懂]#include<iostream>#include<stdint.h>#include<vector>usingnamespacestd;intmain(){ std::vector<uint8_t>temp0(0,0); cout<<“vectorsize:”<<temp0.size()<<endl; std::vector<uint8_t>temp1(.

    2022年9月16日
    0
  • Dijkstra算法

    Dijkstra算法

    2021年12月5日
    57
  • 基于LM331的频率电压转换电路「建议收藏」

    基于LM331的频率电压转换电路「建议收藏」常用的模拟信号的传输方式有电压传输、电流传输和频率传输,其中电压传输的方式最为简单方便,成本最低,但是电压信号在传输的过程中最容易受到干扰,并且传输过程会有损耗,因此不适合远距离传输。将电压转换为电流或者频率后,可以进行远距离传输,且抗干扰能力强,其中电流传输的抗干扰能力最好,传输距离最远,但是成本较高,而频率在距离超过100m时波形会失真,抗干扰能力介于电压和电流之间。本文主要介绍基于LM331的频率-电压转换电路,关于LM331的简介和电压-频率转换电路可以参考《基于LM331的电压频率转换电路》。

    2022年5月5日
    144

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号