ETL的开发过程[通俗易懂]

ETL的开发过程[通俗易懂]在生产环境中,使用shell脚本完成一次etl操作1.定义一个etl函数,里面传入json行数据,用json.loads加载行数据,并对行数据进行判断,如果没有行数据,或data字段没有在行数据里,就直接返回空的结果,否则就继续往下执行2.接着获取行里的数据,用for循环判断,如果包含某个值,我就将变量赋值取出,装在集合容器里3.设置sparksession会话,并ena…

大家好,又见面了,我是你们的朋友全栈君。

在生产环境中, 使用shell脚本完成一次etl操作

1.定义一个etl函数, 里面传入json行数据, 用json.loads加载行数据,并对行数据进行判断,如果没有行数据,或data字段没有在行数据里, 就直接返回空的结果, 否则就继续往下执行

2.接着获取行里的数据, 用for循环判断, 如果包含某个值, 我就将变量赋值取出, 装在集合容器里

3.设置sparksession会话, 并enableHiveSupport, 我用的是hiveonspark模式,

4.初始化rdd, 从大数据emr集群中(也可能是从实时系统kafka读取数据)加载数据到rdd , 然后用自己自定义的etl解析过滤

5.将rdd转为df, createDateFream()要传两个参数,一个是rdd,一个是schema信息

6.将df创建临时表 createOrReplaceTemView()

7.将临时表表的数据加载到hive表中, 完成整个ETL操作

ETL常用场景:

1.清洗nginx日志信息, 预处理日志文件(每小时将上报的日志拉取到本机,hdfs命令上传集群),并清洗存入hive

2.每小时清洗用户表信息,

3.后处理清洗商户信息,

4.清洗并合并设备状态信息,

5.每小时清洗每日设备分成, 清洗并合并积分流水表信息, 每小时清洗支付宝订单表信息等,

def etl(row_str):
	result = []
	try:
		row = json.loads(row_str)
		if(not row) or ('data' not in row):
		return result
		
		获取行
		base = { 
   }
		for r_k in row:
			r_v = row[r_k]
			if r_k != 'data':
				r_k=r_k.lower()
				base[r_k]=r_k
				print(base)
		获取data
		for data in row['data']:
			base_data = base.copy()
			if data:
				for d_k in data:
					d_v = data[d_k]
					if d_k != 'list':
						d_k = d_k.lower()
						base_data[d_k] = d_v
						print(base_data)
		获取list
		 for list_ in data['list']:
                    if list_:
                        # print(list_)
                        list_data = base_data.copy()
                        # list_data.update(list_)
                        for l_k in list_:
                            l_v = list_[l_k]
                            l_k = l_k.lower()
                            list_data[l_k] = l_v
                        # print(list_data)
                        result += [list_data]
                        # print(result)
      except Exception as e:
      	print(e)
      	pass
      retuen result
      
  设置会话
  spark = SparkSession.builder.appName("程序名" % statdate分区日期)
  .enableHiveSupport()
  .getOrCreate()
  
  初始化rdd
  rawLogRDD = spark.sparkContext.textfile("hdfs://emr-cluster/ld_log")
  
  etl解析
  etllogRDD = rawLogRDD.flatMap(etl)
  可以进行测试打印
  for record in etlLogRDD.collect():
  	print(record)
  	
  	将rdd 转为df
  	sampleDF = spark.sql("select * from dept limit 1")
  	etlLogSchema = sampleDF.schema
  	etlLogSchema.__dict__['fields'] = etlLogSchema.__dict__['fields'][:-1]
etlLogSchema.__dict__['names'] = etlLogSchema.__dict__['names'][:-1]
etlLogDF = spark.createDataFrame(etlLogRDD,etlLogSchema)
测试:etlLogDF.printSchema()
etlLogDF.show()
exit()
创建临时表
etl.LogDF.createOrReplaceTmpView("etl_log")
写入分区表
spark.sql("alter table dept drop if exist partition(statdate='%s')" ) % statdate)
spark.sql("insert overwrite table dept partition(statdate='%s') select * from etl_log " % statdate)
		
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/142667.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • vue-router详解[通俗易懂]

    vue-router详解[通俗易懂]一、前言要学习vue-router就要先知道这里的路由是什么?为什么我们不能像原来一样直接用<a></a>标签编写链接哪?vue-router如何使用?常见路由操作有哪些?等等这些问题,就是本篇要探讨的主要问题vue-router二、vue-router是什么这里的路由并不是指我们平时所说的硬件路由器,这里的路由就是SPA(单页应用)的路径管理器。再通俗的说,vue-rou…

    2022年7月11日
    11
  • MySQL按天,按周,按月,按时间段统计【转载】

    MySQL按天,按周,按月,按时间段统计【转载】

    2021年5月15日
    134
  • Linux 修改文件权限

    Linux 修改文件权限目录:1、介绍:2、权限说明3、用户4、权限设置5、权限设置格式:(1)增加权限:(2)撤销权限(3)无任何权限1、介绍:​ Linux系统中,每个文件或目录都有访问许可权限,用它来确定以何种方式对文件或目录进行访问和操作。在Linux中,如果要对文件的权限进行修改,那么可在终端中使用chmod命令对其文件的权限进行修改,但是chmod命令修改文件权限有两种方式:1、字母法,2、数字法2、权限说明​ (1)只读:表示只允许读取内容,而禁止其对该文件做其他任何操作​ 字母法:‘r’

    2022年9月11日
    0
  • 光纤交换机配置zone[通俗易懂]

    光纤交换机配置zone[通俗易懂]B系列SAN交换机-Zoning配置信息型本文档提供了BrocadeSAN交换机zoning配置的分步骤指南。详细信息新建zone配置:a. 新建一个zoning配置,用cfgcreate命令。格式:cfgcreate“cfgname”,“zonename1;zonename2;…”举例:cfgcreate“cfg1”,“zone1;zone2”b. 新建一个zone,用zonecreate命令。格式:zonecreate“zone

    2022年5月22日
    34
  • mybatis map foreach_while的三个用法

    mybatis map foreach_while的三个用法MyBatis循环Map今天遇到一个比较特殊的业务,需要对传入的Map数据在映射文件中进行遍历,在之前的学习中,我们也知道MyBatis有默认对集合的操作list和array,但是没有默认的map,所有不能直接写collection=“map”,如果这么处理,它会当成是根据map.get(“map”)获取传递value只,大部分情况下是一个map中是不会有“map”这个key的,于是就是报错。如果你想用map标识来获取参数map,就需要保证传入的Map参数有@Param(“map”

    2022年8月30日
    0
  • 机房效果图制作|简易制作教程赘述

    机房效果图制作|简易制作教程赘述首先看图,这个图是人视角度,两侧显露出来的空间很大,注重表现的是两侧的机柜,包含列头柜,精密空调及上方的冷通道。第一步:客户肯定得提供图纸类的资料,如CAD图纸,或是手绘的平面布置图等。这里面需要包含机房的数量,排列组合为止或是模块化设置。第二步:拿到资料,和客户沟通之后,首先要明白客户表现的是什么效果,哪个地方是侧重点等。第三步:沟通完了就需要进行下一步,就是如果客户提供了CAD图纸

    2022年5月5日
    76

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号