优秀的数据工程师,怎么用 Spark 在 TiDB 上做 OLAP 分析[通俗易懂]

优秀的数据工程师,怎么用 Spark 在 TiDB 上做 OLAP 分析[通俗易懂]优秀的数据工程师,怎么用 Spark 在 TiDB 上做 OLAP 分析

大家好,又见面了,我是你们的朋友全栈君。

作者:RickyHuo

本文转载自公众号「大道至简bigdata」

原文链接优秀的数据工程师,怎么用 Spark 在 TiDB 上做 OLAP 分析

TiDB 是一款定位于在线事务处理/在线分析处理的融合型数据库产品,实现了一键水平伸缩,强一致性的多副本数据安全,分布式事务,实时 OLAP 等重要特性。 TiSpark 是 PingCAP 为解决用户复杂 OLAP 需求而推出的产品。它借助 Spark 平台,同时融合 TiKV 分布式集群的优势。直接使用 TiSpark 完成 OLAP 操作需要了解 Spark,还需要一些开发工作。那么,有没有一些开箱即用的工具能帮我们更快速地使用 TiSpark 在 TiDB 上完成 OLAP 分析呢? 目前开源社区上有一款工具 Waterdrop,可以基于 Spark,在 TiSpark 的基础上快速实现 TiDB 数据读取和 OLAP 分析。项目地址: github.com/Interesting…

使用 Waterdrop 操作 TiDB

在我们线上有这么一个需求,从 TiDB 中读取某一天的网站访问数据,统计每个域名以及服务返回状态码的访问次数,最后将统计结果写入 TiDB 另外一个表中。 我们来看看 Waterdrop 是如何实现这么一个功能的。

Waterdrop

Waterdrop 是一个非常易用,高性能,能够应对海量数据的实时数据处理产品,它构建在 Spark 之上。Waterdrop 拥有着非常丰富的插件,支持从 TiDB、Kafka、HDFS、Kudu 中读取数据,进行各种各样的数据处理,然后将结果写入 TiDB、ClickHouse、Elasticsearch 或者 Kafka 中。

准备工作

1. TiDB 表结构介绍

Input(存储访问日志的表)

CREATE TABLE access_log (
    domain VARCHAR(255),
    datetime VARCHAR(63),
    remote_addr VARCHAR(63),
    http_ver VARCHAR(15),
    body_bytes_send INT,
    status INT,
    request_time FLOAT,
    url TEXT
)
+-----------------+--------------+------+------+---------+-------+
| Field           | Type         | Null | Key  | Default | Extra |
+-----------------+--------------+------+------+---------+-------+
| domain          | varchar(255) | YES  |      | NULL    |       |
| datetime        | varchar(63)  | YES  |      | NULL    |       |
| remote_addr     | varchar(63)  | YES  |      | NULL    |       |
| http_ver        | varchar(15)  | YES  |      | NULL    |       |
| body_bytes_send | int(11)      | YES  |      | NULL    |       |
| status          | int(11)      | YES  |      | NULL    |       |
| request_time    | float        | YES  |      | NULL    |       |
| url             | text         | YES  |      | NULL    |       |
+-----------------+--------------+------+------+---------+-------+
复制代码

Output(存储结果数据的表)

CREATE TABLE access_collect (
    date VARCHAR(23),
    domain VARCHAR(63),
    status INT,
    hit INT
)
+--------+-------------+------+------+---------+-------+
| Field  | Type        | Null | Key  | Default | Extra |
+--------+-------------+------+------+---------+-------+
| date   | varchar(23) | YES  |      | NULL    |       |
| domain | varchar(63) | YES  |      | NULL    |       |
| status | int(11)     | YES  |      | NULL    |       |
| hit    | int(11)     | YES  |      | NULL    |       |
+--------+-------------+------+------+---------+-------+
复制代码

2. 安装 Waterdrop

有了 TiDB 输入和输出表之后, 我们需要安装 Waterdrop,安装十分简单,无需配置系统环境变量

  1. 准备 Spark 环境

  2. 安装 Waterdrop

  3. 配置 Waterdrop

以下是简易步骤,具体安装可以参照 Quick Start。

# 下载安装Spark
cd /usr/local
wget https://archive.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.7.tgz
tar -xvf https://archive.apache.org/dist/spark/spark-2.1.0/spark-2.1.0-bin-hadoop2.7.tgz
wget
# 下载安装Waterdrop
https://github.com/InterestingLab/waterdrop/releases/download/v1.2.0/waterdrop-1.2.0.zip
unzip waterdrop-1.2.0.zip
cd waterdrop-1.2.0

vim config/waterdrop-env.sh
# 指定Spark安装路径
SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.1.0-bin-hadoop2.7}
复制代码

实现 Waterdrop 处理流程

我们仅需要编写一个 Waterdrop 配置文件即可完成数据的读取、处理、写入。

Waterdrop 配置文件由四个部分组成,分别是 SparkInputFilterOutputInput 部分用于指定数据的输入源,Filter 部分用于定义各种各样的数据处理、聚合,Output 部分负责将处理之后的数据写入指定的数据库或者消息队列。

整个处理流程为 Input -> Filter -> Output,整个流程组成了 Waterdrop 的处理流程(Pipeline)。

以下是一个具体配置,此配置来源于线上实际应用,但是为了演示有所简化。

Input (TiDB)

这里部分配置定义输入源,如下是从 TiDB 一张表中读取数据。

input {
    tidb {
        database = "nginx"
        pre_sql = "select * from nginx.access_log"
        table_name = "spark_nginx_input"
    }
}
复制代码

Filter

在 Filter 部分,这里我们配置一系列的转化, 大部分数据分析的需求,都是在 Filter 完成的。Waterdrop 提供了丰富的插件,足以满足各种数据分析需求。这里我们通过 SQL 插件完成数据的聚合操作。

filter {
    sql {
        table_name = "spark_nginx_log"
        sql = "select count(*) as hit, domain, status, substring(datetime, 1, 10) as date from spark_nginx_log where substring(datetime, 1, 10)='2019-01-20' group by domain, status, substring(datetime, 1, 10)"
    }
}
复制代码

Output (TiDB)

最后, 我们将处理后的结果写入 TiDB 另外一张表中。TiDB Output 是通过 JDBC 实现的。

output {
    tidb {
        url = "jdbc:mysql://127.0.0.1:4000/nginx?useUnicode=true&characterEncoding=utf8"
        table = "access_collect"
        user = "username"
        password = "password"
        save_mode = "append"
    }
}
复制代码

Spark

这一部分是 Spark 的相关配置,主要配置 Spark 执行时所需的资源大小以及其他 Spark 配置。

我们的 TiDB Input 插件是基于 TiSpark 实现的,而 TiSpark 依赖于 TiKV 集群和 Placement Driver (PD)。因此我们需要指定 PD 节点信息以及 TiSpark 相关配置 spark.tispark.pd.addressesspark.sql.extensions

spark {
  spark.app.name = "Waterdrop-tidb"
  spark.executor.instances = 2
  spark.executor.cores = 1
  spark.executor.memory = "1g"
  # Set for TiSpark
  spark.tispark.pd.addresses = "localhost:2379"
  spark.sql.extensions = "org.apache.spark.sql.TiExtensions"
}
复制代码

运行 Waterdrop

我们将上述四部分配置组合成我们最终的配置文件 conf/tidb.conf

spark {
    spark.app.name = "Waterdrop-tidb"
    spark.executor.instances = 2
    spark.executor.cores = 1
    spark.executor.memory = "1g"
    # Set for TiSpark
    spark.tispark.pd.addresses = "localhost:2379"
    spark.sql.extensions = "org.apache.spark.sql.TiExtensions"
}
input {
    tidb {
        database = "nginx"
        pre_sql = "select * from nginx.access_log"
        table_name = "spark_table"
    }
}
filter {
    sql {
        table_name = "spark_nginx_log"
        sql = "select count(*) as hit, domain, status, substring(datetime, 1, 10) as date from spark_nginx_log where substring(datetime, 1, 10)='2019-01-20' group by domain, status, substring(datetime, 1, 10)"
    }
}
output {
    tidb {
        url = "jdbc:mysql://127.0.0.1:4000/nginx?useUnicode=true&characterEncoding=utf8"
        table = "access_collect"
        user = "username"
        password = "password"
        save_mode = "append"
    }
}
复制代码

执行命令,指定配置文件,运行 Waterdrop ,即可实现我们的数据处理逻辑。

  • Local

./bin/start-waterdrop.sh --config config/tidb.conf --deploy-mode client --master 'local[2]'

  • yarn-client

./bin/start-waterdrop.sh --config config/tidb.conf --deploy-mode client --master yarn

  • yarn-cluster

./bin/start-waterdrop.sh --config config/tidb.conf --deploy-mode cluster -master yarn

如果是本机测试验证逻辑,用本地模式(Local)就可以了,一般生产环境下,都是使用 yarn-client 或者 yarn-cluster 模式。

检查结果

mysql> select * from access_collect;
+------------+--------+--------+------+
| date       | domain | status | hit  |
+------------+--------+--------+------+
| 2019-01-20 | b.com  |    200 |   63 |
| 2019-01-20 | a.com  |    200 |   85 |
+------------+--------+--------+------+
2 rows in set (0.21 sec)
复制代码

总结

在这篇文章中,我们介绍了如何使用 Waterdrop 从 TiDB 中读取数据,做简单的数据处理之后写入 TiDB 另外一个表中。仅通过一个配置文件便可快速完成数据的导入,无需编写任何代码。

除了支持 TiDB 数据源之外,Waterdrop 同样支持 Elasticsearch,Kafka,Kudu, ClickHouse 等数据源。

与此同时,我们正在研发一个重要功能,就是在 Waterdrop 中,利用 TiDB 的事务特性,实现从 Kafka 到 TiDB 流式数据处理,并且支持端(Kafka)到端(TiDB)的 Exactly-Once 数据一致性

希望了解 Waterdrop 和 TiDB,ClickHouse、Elasticsearch、Kafka 结合使用的更多功能和案例,可以直接进入项目主页:github.com/Interesting… ,或者联系项目负责人: Garyelephan(微信: garyelephant)、RickyHuo (微信: chodomatte1994)。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/106966.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • [关系图谱] 一.Gephi通过共现矩阵构建知网作者关系图谱

    [关系图谱] 一.Gephi通过共现矩阵构建知网作者关系图谱作者最近研究人物关系图谱,准备发表相关的文章,原本是用PythonNetworkx库绘制,但效果不太理想;故改为Gephi软件,发现其非常好看,特分享几篇文章供大家交流学习,希望对您有所帮助,尤其是引文分析、社交网络、主题分布等方向的同学。后续的文章将尽可能的使用Markdown语法撰写了。参考文章:【python数据挖掘课程】十七.社交网络Networkx库分析人物关系(初识篇)…

    2022年6月26日
    49
  • Python网络爬虫精要

    Python网络爬虫精要

    2021年11月8日
    40
  • 什么是语义分割_词法分析语法分析语义分析

    什么是语义分割_词法分析语法分析语义分析文章目录引言1混淆矩阵2语义分割PA:像素准确率CPA:类别像素准确率MPA:类别平均像素准确率IoU:交并比MIoU:平均交并比(改进,先求IoU,再求MIoU,这里有误)3综合实例步骤一:输入真实、预测图片步骤二:求出混淆矩阵步骤三:评价指标计算PACPAMPAIoUMIoU4测试代码参考引言语义分割是像素级别的分类,其常用评价指标:像素准确率(PixelAccuracy,PA…

    2022年8月21日
    6
  • lspci 指令_plsr指令

    lspci 指令_plsr指令1.指令名称lspci2.指令简介lspci是一个用于显示系统中所有PCI/PCIe总线及设备信息的工具。默认情况下,它只显示设备的最简要信息。通过传入指定参数,可以输出更详细的信息,或者可以按照特定格式输出,以便于用其他程序进行解析。有些PCI/PCIe的设备信息需要在root权限下才能获取到。3.参数先来看看不加任何参数时输出的结果。bryan@bryan-pc:~$lspci00:00.0Hostbridge:IntelCorporation8.

    2025年10月14日
    3
  • JavaWeb-过滤器Filter学习(四)敏感词过滤实例

    JavaWeb-过滤器Filter学习(四)敏感词过滤实例通过Filter来实现留言板的敏感词过滤…思路很简单,我们这里的敏感词是直接先放进去的,实际项目中,肯定是存在数据库中。在Filter过滤器中,我们先拿到用户提交的留言,如果出现了敏感词,我们就用*号来替换。代码演示:index.jsp:<%@pagelanguage="java"import="java.util.*"pageEncoding="UTF-8"%><%@taglibur

    2022年5月27日
    35
  • SQL基础语句大全

    SQL基础语句大全SQL基础语句大全此文章基本涵盖SQL的基础应用语句你好!这是本人在大学自学Java时记录的SQL基础语句,希望可以对自学的小白们给与一定帮助,有错误也欢迎大家可以帮助纠正。数据类型1.整数:int和bigintbigint等效Java中的long2.浮点数:double(m,d)m总长度d小数长度eg:double(5,3)26.789decimal是一个超高…

    2022年5月1日
    43

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号