数据仓库 数据集市_实时数仓应用场景

数据仓库 数据集市_实时数仓应用场景#实时数仓项目-数据采集与ODS层配置canal实时采集mysql数据一、mysql开启binlog二、安装配置canal采集数据到kafka三、启动kafka消费者验证ODS层数据处理导入hbase一、flink采集kafka数据配置canal实时采集mysql数据一、mysql开启binlog修改mysql的配置文件(linux:/etc/my.cnf,Windows:\my.ini)log-bin=mysql-bin#开期binlogbinlog-format=ROW#选择ROW

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

配置canal实时采集mysql数据

一、mysql开启binlog

  1. 修改mysql的配置文件(linux:/etc/my.cnf,Windows:\my.ini)
log-bin=mysql-bin # 开期binlog
binlog-format=ROW #选择ROW模式
binglog-do-db=dwshow #dwshow是数据库的名称

binlog-format可以选择statement,row,mixed,区别在于:

模式 区别
statement 记录写操作的语句,节省空间,但可能造成数据不一致
row 记录每次操作后每行记录的变化,占用空间较大
mixed 包含UUID(),udf是row模式极端情况仍旧会有不一致,对于binglong监控不方便
  1. 重启mysql,并创建按canal用户
systemctl restart mysqld
#MySQL重启之后,到下面路径中看有没有mysql-bin.*****文件 
cd /var/lib/mysq
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ;

二、安装配置canal采集数据到kafka

  1. canal下载安装包并解压到安装目录
    下载:https://github.com/alibaba/canal/releases
    把下载的Canal.deployer-1.1.4.tar.gz拷贝到linux,解压缩(路径可自行调整)
[root@linux123 ~]# mkdir /opt/modules/canal 
[root@linux123 mysql]# tar -zxf canal.deployer-1.1.4.tar.gz -C /opt/modules/canal
  1. 修改配置conf/canal.properties,配置zk和kafka地址
# 配置zookeeper地址 
canal.zkServers =linux121:2181,linux123:2181 
# tcp, kafka, RocketMQ 
canal.serverMode = kafka 
# 配置kafka地址 
canal.mq.servers =linux121:9092,linux123:9092
  1. 修改配置conf/example/instance.properties
    配置mysql的主机、用户名密码以及监控的kafka主题
# 配置MySQL数据库所在的主机 
  canal.instance.master.address = linux123:3306
 # username/password,配置数据库用户和密码
  canal.instance.dbUsername =canal 
  canal.instance.dbPassword =canal 
# mq config,对应Kafka主题: 
  canal.mq.topic=test
  1. 启动canalsh bin/startup.sh
  2. 关闭canalsh bin/stop.sh

三、启动kafka消费者验证

在这里插入图片描述

ODS层数据处理导入hbase

一、flink采集kafka数据

  1. 编写工具类获取kafka消费者作为flink数据源,需要设置server地址、key和value反序列化器、消费组Id、消费开始的offset
package myUtils

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

class SourceKafka { 
   

  def getKafkaSource(topicName:String): FlinkKafkaConsumer[String] ={ 
   
    val props = new Properties()
    props.setProperty("bootstrap.servers","linux121:9092,linux122:9092,linux123:9092");
    props.setProperty("group.id","consumer-group")
    props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    props.setProperty("auto.offset.reset","latest")
    new FlinkKafkaConsumer[String](topicName,new SimpleStringSchema(),props);

  }

}
  1. 从kafka获取数据并写入hbase,从kafka中获取json格式的数据,使用alibaba的fastjson进行解析
package ods

import java.util

import com.alibaba.fastjson.{ 
   JSON, JSONObject}
import models.TableObject
import myUtils.SourceKafka
import org.apache.flink.streaming.api.scala.{ 
   DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.scala._
object KafkaToHbase { 
   

  def main(args: Array[String]): Unit = { 
   
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val kafkaConsumer: FlinkKafkaConsumer[String] = new SourceKafka().getKafkaSource("test")

    kafkaConsumer.setStartFromLatest()

    val sourceStream: DataStream[String] = env.addSource(kafkaConsumer)//需要隐式转换

    val mappedStream: DataStream[util.ArrayList[TableObject]] = sourceStream.map(x => { 
   
      val jsonObj: JSONObject = JSON.parseObject(x)
      val database: AnyRef = jsonObj.get("database")
      val table: AnyRef = jsonObj.get("table")
      val typeInfo: AnyRef = jsonObj.get("type")
      val objects = new util.ArrayList[TableObject]()
      jsonObj.getJSONArray("data").forEach(x => { 
   
        println(database.toString + "...." + table.toString + ".." + typeInfo.toString + "..." + x.toString)
        objects.add(TableObject(database.toString, table.toString, typeInfo.toString, x.toString))
      })

      objects


    })

    mappedStream.addSink(new SinkHbase)

    env.execute()



  }

}

  1. 编写flink的hbasesink
    编写hbase连接方法
package myUtils

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{ 
   HBaseConfiguration, HConstants}
import org.apache.hadoop.hbase.client.{ 
   Connection, ConnectionFactory}

class ConnHBase { 
   

  def connToHabse:Connection={ 
   

    val conf: Configuration = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum","linux121,linux122,linux123")
    conf.set("hbase.zookeeper.property.clinetPort","2181")
    conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,30000)
    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,30000)
    val connection: Connection = ConnectionFactory.createConnection(conf)
    connection

  }

}

继承RichSinkFunction编写flink到hbase的下沉器

package ods

import models.{ 
   AreaInfo, DataInfo, TableObject}
import org.apache.flink.streaming.api.functions.sink.{ 
   RichSinkFunction, SinkFunction}
import java.util

import com.alibaba.fastjson.JSON
import myUtils.ConnHBase
import org.apache.flink.configuration.Configuration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.{ 
   Connection, Delete, Put, Table}

class SinkHbase  extends  RichSinkFunction[util.ArrayList[TableObject]]{ 
   


  var connection:Connection= _
  var hbtable:Table =_

  override def invoke(value: util.ArrayList[TableObject], context: SinkFunction.Context[_]): Unit = { 
   
    value.forEach(x=>{ 
   
      println(x.toString)
      val database: String = x.dataBase
      val tableName: String = x.tableName
      val typeInfo: String = x.typeInfo
      hbtable = connection.getTable(TableName.valueOf(tableName))


      if(database.equalsIgnoreCase("dwshow")&&tableName.equalsIgnoreCase("lagou_trade_orders")){ 
   
        if(typeInfo.equalsIgnoreCase("insert")){ 
   
          value.forEach(x=>{ 
   
            val info: DataInfo = JSON.parseObject(x.dataInfo, classOf[DataInfo])
            insertTradeOrders(hbtable,info);

          })
        }else if(typeInfo.equalsIgnoreCase("update")){ 
   
          value.forEach(x=>{ 
   
            val info: DataInfo = JSON.parseObject(x.dataInfo, classOf[DataInfo])
            insertTradeOrders(hbtable,info)
          })
        }else if(typeInfo.equalsIgnoreCase("delete")){ 
   
          value.forEach(x=>{ 
   
            val info: DataInfo = JSON.parseObject(x.dataInfo, classOf[DataInfo])
            deleteTradeOrders(hbtable,info)
          })
        }
      }


      if(database.equalsIgnoreCase("dwshow")&&tableName.equalsIgnoreCase("lagou_area")){ 
   


        value.forEach(x=>{ 
   
          val info: AreaInfo = JSON.parseObject(x.dataInfo, classOf[AreaInfo])

          if(typeInfo.equalsIgnoreCase("insert")){ 
   
              insertArea(hbtable,info)
          }else if(typeInfo.equalsIgnoreCase("update")){ 
   
            insertArea(hbtable,info)
          }else if(typeInfo.equalsIgnoreCase("delete")){ 
   
            deleteArea(hbtable,info)
          }

        })
      }


    })
  }

  override def open(parameters: Configuration): Unit = { 
   
    connection = new ConnHBase().connToHabse
  }

  override def close(): Unit = { 
   

    if(hbtable!=null){ 
   
      hbtable.close()
    }
    if(connection!=null){ 
   
      connection.close()
    }

  }

  def insertTradeOrders(hbTable:Table,dataInfo:DataInfo)={ 
   

    val put = new Put(dataInfo.orderId.getBytes)
    put.addColumn("f1".getBytes,"modifiedTime".getBytes,dataInfo.modifiedTime.getBytes())
    put.addColumn("f1".getBytes,"orderNo".getBytes,dataInfo.orderNo.getBytes())
    put.addColumn("f1".getBytes,"isPay".getBytes,dataInfo.isPay.getBytes())
    put.addColumn("f1".getBytes,"tradeSrc".getBytes,dataInfo.tradeSrc.getBytes())
    put.addColumn("f1".getBytes,"payTime".getBytes,dataInfo.payTime.getBytes())
    put.addColumn("f1".getBytes,"productMoney".getBytes,dataInfo.productMoney.getBytes())
    put.addColumn("f1".getBytes,"totalMoney".getBytes,dataInfo.totalMoney.getBytes())
    put.addColumn("f1".getBytes,"dataFlag".getBytes,dataInfo.dataFlag.getBytes())
    put.addColumn("f1".getBytes,"userId".getBytes,dataInfo.userId.getBytes())
    put.addColumn("f1".getBytes,"areaId".getBytes,dataInfo.areaId.getBytes())
    put.addColumn("f1".getBytes,"createTime".getBytes,dataInfo.createTime.getBytes())
    put.addColumn("f1".getBytes,"payMethod".getBytes,dataInfo.payMethod.getBytes())
    put.addColumn("f1".getBytes,"isRefund".getBytes,dataInfo.isRefund.getBytes())
    put.addColumn("f1".getBytes,"tradeType".getBytes,dataInfo.tradeType.getBytes())
    put.addColumn("f1".getBytes,"status".getBytes,dataInfo.status.getBytes())

    hbTable.put(put)

  }


  def deleteTradeOrders(hbtable:Table,dataInfo: DataInfo)={ 
   
    val delete = new Delete(dataInfo.orderId.getBytes());
    hbtable.delete(delete)
  }


  def insertArea(hbTable:Table,areaInfo: AreaInfo)={ 
   
    val put = new Put(areaInfo.id.getBytes())
    put.addColumn("f1".getBytes(),"name".getBytes(),areaInfo.name.getBytes())
    put.addColumn("f1".getBytes(),"pid".getBytes(),areaInfo.pid.getBytes())
    put.addColumn("f1".getBytes(),"sname".getBytes(),areaInfo.sname.getBytes())
    put.addColumn("f1".getBytes(),"level".getBytes(),areaInfo.level.getBytes())
    put.addColumn("f1".getBytes(),"citycode".getBytes(),areaInfo.citycode.getBytes())
    put.addColumn("f1".getBytes(),"yzcode".getBytes(),areaInfo.yzcode.getBytes())
    put.addColumn("f1".getBytes(),"mername".getBytes(),areaInfo.mername.getBytes())
    put.addColumn("f1".getBytes(),"Lng".getBytes(),areaInfo.Lng.getBytes())
    put.addColumn("f1".getBytes(),"Lat".getBytes(),areaInfo.Lat.getBytes())
    put.addColumn("f1".getBytes(),"pinyin".getBytes(),areaInfo.pinyin.getBytes())
    hbTable.put(put)

  }

  def deleteArea(hbTable:Table,areaInfo: AreaInfo)={ 
   
    val delete = new Delete(areaInfo.id.getBytes())
    hbTable.delete(delete)
  }










}

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/188734.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • Spring AOP中的JDK和CGLib动态代理哪个效率更高?

    Spring AOP中的JDK和CGLib动态代理哪个效率更高?一、背景今天有小伙伴面试的时候被问到:SpringAOP中JDK和CGLib动态代理哪个效率更高?二、基本概念首先,我们知道SpringAOP的底层实现有两种方式:一种是JDK动态代理,另一种是CGLib的方式。自Java1.3以后,Java提供了动态代理技术,允许开发者在运行期创建接口的代理实例,后来这项技术被用到了Spring的很多地方。JDK动态代理主要涉及…

    2022年5月1日
    43
  • Tasker使用企业微信api推送消息到普通微信「建议收藏」

    Tasker使用企业微信api推送消息到普通微信「建议收藏」注册https://work.weixin.qq.com/wework_admin/register_wx注册成功进入管理后台—>我的企业—>微工作台—>邀请关注*使用普通微信关注后才能接收消息应用与小程序—>创建应用*可见范围可以选整个企业企业ID我的企业—>…

    2022年5月23日
    61
  • Python基础知识总结[通俗易懂]

    Python基础知识总结[通俗易懂]一、Python的优缺点优点:1.简单,易学,免费,开源2.高级语言,解释型语言3.可移植性,可拓展性,可读性4.面向对象,丰富的库缺点:1.执行效率慢2.GIL锁限制并发

    2022年7月5日
    22
  • 头部公司的Robotaxi何时能拿掉安全员?

    头部公司的Robotaxi何时能拿掉安全员?今天想聊聊Robotaxi。聊这个话题的起因是今年7月上旬,汽车之心走访了上海、广州和深圳三地,深入体验了滴滴、小马智行、文远知行、元戎启行和AutoX这5家自动驾驶公司的…

    2022年5月5日
    52
  • 数据库 部分函数依赖 完全函数依赖 传递函数依赖 第一范式、第二范式、第三范式、BCNF范式区别

    数据库 部分函数依赖 完全函数依赖 传递函数依赖 第一范式、第二范式、第三范式、BCNF范式区别数据库部分函数依赖完全函数依赖传递函数依赖第一范式、第二范式、第三范式、BCNF范式区别在理解函数依赖之前,先来看一下函数依赖分析:在关系中,包括在任何候选码中的属性称为主属性;不包括在任何候选码中的属性称为非主属性。函数依赖只分析关系中的非主属性对主属性之间的依赖关系,并不分析主属性对主键(码)的依赖关系。具体关于部分函数依赖和完全函数依赖的定义,网上有很…

    2022年5月23日
    53
  • 最新最全的微信小程序入门学习教程,微信小程序零基础入门到精通

    最新最全的微信小程序入门学习教程,微信小程序零基础入门到精通从今天开始就来带领大家学习微信小程序了,只要你跟着我一步步来,相信你也可以上线一款属于自己的微信小程序一,认识小程序微信⼩程序,简称⼩程序,英⽂名MiniProgramMiniProgram,是⼀种不需要下载安装即可使⽤的应⽤,它实现了应⽤“触⼿可及”的梦想,⽤⼾扫⼀扫或搜⼀下即可打开应⽤1-1,微信小程序的优势1.微信有海量⽤⼾,⽽且粘性很⾼,在微信⾥开发产品更容易触达⽤⼾;2.推⼴app或公众号的成本太⾼。3.开发适配成本低。4.容易⼩规模试错,然后快速迭代。5.跨平台。

    2022年6月25日
    51

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号