Spark UDF使用详解及代码示例

Spark UDF使用详解及代码示例本文介绍如何在 SparkSql 和 DataFrame 中使用 UDF 如何利用 UDF 给一个表或者一个 DataFrame 根据需求添加几列 并给出了旧版 Spark1 x 和新版 Spark2 x 完整的代码示例 下面以 Spark2 x 为例给出代码 关于 Spark1 x 创建 DataFrame 可在最后的完整代码里查看 2 SparkSql 用法

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站:https://www.captainai.net/dongkelun

前言

本文介绍如何在Spark Sql和DataFrame中使用UDF,如何利用UDF给一个表或者一个DataFrame根据需求添加几列,并给出了旧版(Spark1.x)和新版(Spark2.x)完整的代码示例。

  • 关于UDF:UDF:User Defined Function,用户自定义函数。

1、创建测试用DataFrame

下面以Spark2.x为例给出代码,关于Spark1.x创建DataFrame可在最后的完整代码里查看。

// 构造测试数据,有两个字段、名字和年龄 val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18)) //创建测试df val userDF = spark.createDataFrame(userData).toDF("name", "age") userDF.show 
+-----+---+ | name|age| +-----+---+ | Leo| 16| |Marry| 21| | Jack| 14| | Tom| 18| +-----+---+ 
// 注册一张user表 userDF.createOrReplaceTempView("user") 

2、Spark Sql用法

2.1 通过匿名函数注册UDF

下面的UDF的功能是计算某列的长度,该列的类型为String

2.1.1 注册

  • Spark2.x:
spark.udf.register("strLen", (str: String) => str.length()) 
  • Spark1.x:
sqlContext.udf.register("strLen", (str: String) => str.length()) 

2.2.2 使用

仅以Spark2.x为例

spark.sql("select name,strLen(name) as name_len from user").show 
+-----+--------+ | name|name_len| +-----+--------+ | Leo| 3| |Marry| 5| | Jack| 4| | Tom| 3| +-----+--------+ 

2.2 通过实名函数注册UDF

/ * 根据年龄大小返回是否成年 成年:true,未成年:false */ def isAdult(age: Int) = { 
    if (age < 18) { 
    false } else { 
    true } } 

注册(仅以Spark2.x为例)

spark.udf.register("isAdult", isAdult _) 

至于使用都是一样的

2.3 关于spark.udf和sqlContext.udf

def udf: UDFRegistration = sparkSession.udf 

可以看到调用的是sparkSession的udf,即spark.udf

3、DataFrame用法

DataFrame的udf方法虽然和Spark Sql的名字一样,但是属于不同的类,它在org.apache.spark.sql.functions里,下面是它的用法

3.1注册

import org.apache.spark.sql.functions._ //注册自定义函数(通过匿名函数) val strLen = udf((str: String) => str.length()) //注册自定义函数(通过实名函数) val udf_isAdult = udf(isAdult _) 

3.2 使用

可通过withColumn和select使用,下面的代码已经实现了给user表添加两列的功能

  • 通过看源码,下面的withColumn和select方法Spark2.0.0之后才有的,关于spark1.xDataFrame怎么使用注册好的UDF没有研究
//通过withColumn添加列 userDF.withColumn("name_len", strLen(col("name"))).withColumn("isAdult", udf_isAdult(col("age"))).show //通过select添加列 userDF.select(col("*"), strLen(col("name")) as "name_len", udf_isAdult(col("age")) as "isAdult").show 

结果均为

+-----+---+--------+-------+ | name|age|name_len|isAdult| +-----+---+--------+-------+ | Leo| 16| 3| false| |Marry| 21| 5| true| | Jack| 14| 4| false| | Tom| 18| 3| true| +-----+---+--------+-------+ 

3.3 withColumn和select的区别

可通过withColumn的源码看出withColumn的功能是实现增加一列,或者替换一个已存在的列,他会先判断DataFrame里有没有这个列名,如果有的话就会替换掉原来的列,没有的话就用调用select方法增加一列,所以如果我们的需求是增加一列的话,两者实现的功能一样,且最终都是调用select方法,但是withColumn会提前做一些判断处理,所以withColumn的性能不如select好。

  • 注:select方法和sql 里的select一样,如果新增的列名在表里已经存在,那么结果里允许出现两列列名相同但数据不一样,大家可以自己试一下。
/ * Returns a new Dataset by adding a column or replacing the existing column that has * the same name. * * @group untypedrel * @since 2.0.0 */ def withColumn(colName: String, col: Column): DataFrame = { 
    val resolver = sparkSession.sessionState.analyzer.resolver val output = queryExecution.analyzed.output val shouldReplace = output.exists(f => resolver(f.name, colName)) if (shouldReplace) { 
    val columns = output.map { 
    field => if (resolver(field.name, colName)) { 
    col.as(colName) } else { 
    Column(field) } } select(columns : _*) } else { 
    select(Column("*"), col.as(colName)) } } 

4、完整代码

下面的代码的功能是使用UDF给user表添加两列:name_len、isAdult,每个输出结果都是一样的

+-----+---+--------+-------+ | name|age|name_len|isAdult| +-----+---+--------+-------+ | Leo| 16| 3| false| |Marry| 21| 5| true| | Jack| 14| 4| false| | Tom| 18| 3| true| +-----+---+--------+-------+ 

代码:

package com.dkl.leanring.spark.sql import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession / * Spark Sql 用户自定义函数示例 */ object UdfDemo { 
    def main(args: Array[String]): Unit = { 
    oldUdf newUdf newDfUdf oldDfUdf } / * 根据年龄大小返回是否成年 成年:true,未成年:false */ def isAdult(age: Int) = { 
    if (age < 18) { 
    false } else { 
    true } } / * 旧版本(Spark1.x)Spark Sql udf示例 */ def oldUdf() { 
    //spark 初始化 val conf = new SparkConf() .setMaster("local") .setAppName("oldUdf") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ // 构造测试数据,有两个字段、名字和年龄 val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18)) //创建测试df val userDF = sc.parallelize(userData).toDF("name", "age") // 注册一张user表 userDF.registerTempTable("user") // 注册自定义函数(通过匿名函数) sqlContext.udf.register("strLen", (str: String) => str.length()) sqlContext.udf.register("isAdult", isAdult _) // 使用自定义函数 sqlContext.sql("select *,strLen(name)as name_len,isAdult(age) as isAdult from user").show //关闭 sc.stop() } / * 新版本(Spark2.x)Spark Sql udf示例 */ def newUdf() { 
    //spark初始化 val spark = SparkSession.builder().appName("newUdf").master("local").getOrCreate() // 构造测试数据,有两个字段、名字和年龄 val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18)) //创建测试df val userDF = spark.createDataFrame(userData).toDF("name", "age") // 注册一张user表 userDF.createOrReplaceTempView("user") //注册自定义函数(通过匿名函数) spark.udf.register("strLen", (str: String) => str.length()) //注册自定义函数(通过实名函数) spark.udf.register("isAdult", isAdult _) spark.sql("select *,strLen(name) as name_len,isAdult(age) as isAdult from user").show //关闭 spark.stop() } / * 新版本(Spark2.x)DataFrame udf示例 */ def newDfUdf() { 
    val spark = SparkSession.builder().appName("newDfUdf").master("local").getOrCreate() // 构造测试数据,有两个字段、名字和年龄 val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18)) //创建测试df val userDF = spark.createDataFrame(userData).toDF("name", "age") import org.apache.spark.sql.functions._ //注册自定义函数(通过匿名函数) val strLen = udf((str: String) => str.length()) //注册自定义函数(通过实名函数) val udf_isAdult = udf(isAdult _) //通过withColumn添加列 userDF.withColumn("name_len", strLen(col("name"))).withColumn("isAdult", udf_isAdult(col("age"))).show //通过select添加列 userDF.select(col("*"), strLen(col("name")) as "name_len", udf_isAdult(col("age")) as "isAdult").show //关闭 spark.stop() } / * 旧版本(Spark1.x)DataFrame udf示例 * 注意,这里只是用的Spark1.x创建sc的和df的语法,其中注册udf在Spark1.x也是可以使用的的 * 但是withColumn和select方法Spark2.0.0之后才有的,关于spark1.xDataFrame怎么使用注册好的UDF没有研究 */ def oldDfUdf() { 
    //spark 初始化 val conf = new SparkConf() .setMaster("local") .setAppName("oldDfUdf") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ // 构造测试数据,有两个字段、名字和年龄 val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18)) //创建测试df val userDF = sc.parallelize(userData).toDF("name", "age") import org.apache.spark.sql.functions._ //注册自定义函数(通过匿名函数) val strLen = udf((str: String) => str.length()) //注册自定义函数(通过实名函数) val udf_isAdult = udf(isAdult _) //通过withColumn添加列 userDF.withColumn("name_len", strLen(col("name"))).withColumn("isAdult", udf_isAdult(col("age"))).show //通过select添加列 userDF.select(col("*"), strLen(col("name")) as "name_len", udf_isAdult(col("age")) as "isAdult").show //关闭 sc.stop() } } 

Spark UDF使用详解及代码示例

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/203906.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月19日 下午9:19
下一篇 2026年3月19日 下午9:19


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号