大家好~,
今天讲一下关于搭建自己的数据湖(Data Lake)。关于数据湖的文章很多,这里就不多介绍了。笔者发现自己的域名服务器提供网盘(WebDav)服务并且容量没有限制(不用太浪费了),所以试着在网盘上搭建了自己数据湖,在此分享。其实简单的说,我把企业级方案的存储层从HDFS或S3等改成了WebDav,然后在上面使用最新的Delta框架来处理数据湖。Delta Lake是Databricks公司的一个开源项目。Databricks是一个整合数据科学与数据工程的一体化平台。使用过Apache Spark的话可能听说过Databricks,它是Apache Spark的项目发起方。Delta 框架简单地讲就是为数据湖提供ACID事务能力。我们可以在现有的基于HDFS的数据湖上追加Delta Lake层,也可以完全从0开始直接使用Delta Lake存储数据。相较于企业级方案,我的域名服务器并不需要我支付额外的费用,这种剩余资源为何不利用起来呢。感兴趣的朋友们可以像我一样挖掘一下身边的可用资源。躺在我们身边的待用资源其实很多。
1. 为什么搭建自己的数据湖(Data Lake)
那么真的有必要搭建自己的数据湖吗?笔者的主张是:是的!来说一下我的动机:
- 可以方便地做各种技术尝试
搞技术的同学们都会自己的PC上搭建本地环境以做各种测试,技术尝试。
- 充分利用自己的资源
如前所述由于我使用的域名服务器提供了无限制的存储空间,又支持网盘功能,所以,将这个无容量限制的网盘Mount到我的Mac上做数据湖存储是一个不错的方案
- 提高工程师对于技术追求的满足感
技术背景的很多人都对新技术有着探求心,程序员和工程师门总喜欢尝试一些新的技术,来满足自己
2. 为什么用Delta Lake,而不直接搭建数据湖
由于当今的数据拥有3V特性,即Volume(数据量大),Variety(数据种类多),Velocity(数据产生速度快)。因此很多公司内部都拥有自己的大数据平台,而大数据平台中存储数据的地方正是数据湖。通常可以将数据湖理解为储存大型数据的数据库,但是由于数据量的庞大,数据湖舍弃了数据库的一些特性,比如ACID事务功能,另外数据湖一般不支持索引,为了UPDATE/MERGE一条记录,需要在整个数据湖中进行一次检索,这种Full Scan的代价大,效率低。因此数据湖是一种针对查询进行特化的技术,仍然有很多技术挑战。主要有以下5条挑战:
1)同时读写, 并且要保证数据的一致性(read consistent data)
2)可以高吞吐从大表读数据(Scalable metadata handling)
3)遇到错误写出可以回滚(rollback) 可以删改(update/delete/merge)
4)在线业务不下线的同时可以重新处理历史数据(replay historical data)
5)处理迟到数据(late arriving data) 而无需推迟下阶段的数据处理
很多数据湖的方案费钱费力,将钱和时间大量用到了解决系统局限,而不是去从数据中提取有价值的信息。而Delta框架正是为解决以上问题而生。
3. 准备工作
在开始使用Delta Lake前我们先做一些准备工作。
3.1. 存储空间的准备
在一开始我们提到使用网盘(WebDav)来代替企业级数据湖的HDFS层。所以我们需要先将WebDav加载到自己的机子上。
如果你是一名Bloger,或许你已经有了自己的域名服务器,那么你可以去查一下自己的域名服务器是否有免费的WebDav资源可用。如果你正好使用的是Bluehost,那么跟着我来。
3.1.1 网盘(WebDav)的准备
我们来看一下如何在Bluehost上设置网盘。
Bluehost的网盘功能默认是开启的。我们在主控制页面(cpanel)左侧点击[Advaned],然后点击[Web Disk]打开以下页面。如图输入网盘连接用的用户名(Username)和密码(Password),以及网盘的存储位置(Directory),点击[Create]就创建完成了。
Bluehost提供了FTP,FTPS,SFTP和WebDav的网盘连接协议,FTP,FTPS是经典的文件传输协议,如果你喜欢FTP协议,Bluehost提供了可自定义的FTP账号。但是,相比基于HTTP(S)的WebDav协议,WebDav提供了包括GZIP压缩,多种认证方法等FTP所没有的功能。对于SFTP协议,虽然它可能是功能最强的协议了,但是如果你订阅的不是Bluehost最高级的套餐,将只能使用root来链接,不建议使用。因此,我们使用WebDav协议将网盘Mount到我们的PC上。MacOS上的Mount方法非常简单,打开“Connect to Server”,输入你的网盘地址和端口连接即可。网盘会被Mount到/Volumes/下,比如笔者的网盘目录是:
/Volumes/lunarwoffie.com。
这里有几个坑大家注意一下:
- 不要使用Bluehost提供给你的WebDav地址,你很可能无法连接,正确的地址是 你的域名:2078。这在Bluehost的技术支持文档里有说明。
- 如果你想从命令行操作,很遗憾MacOS从命令行Mount一个WebDav协议的网盘并不容易。
- 不要在Finder中访问你的网盘。由于MacOS特有的.DS_Store文件的存在,造成显示网盘的内容非常慢。因此,使用“Connect to Server”加载完网盘后,立即关闭弹出的Finder窗口。从命令行访问你的网盘。
对于其他OS,操作雷同,在此不再详述。
3.2. Notebook环境的准备
接下来我们要准备操作Delta Lake的环境。由于笔者习惯使用Notebook做数据分析。这里详细讲解一下Notebook的配置方法。当然,Notebook并不是唯一的选择。你可以使用任何自己熟悉的编程环境。另外,想偷懒的朋友们请看3.2.2节如何使用Databricks的Notebook。
3.2.1. 使用Jupyter Notebook的配置方法
熟悉数据科学或数据分析的朋友们肯定对Jupyter Notebook不陌生。由于需要配置Apache Spark在Jupyter上运行,为了简化步骤,我们直接使用Jupyter官方提供的Docker Image。从这个Docker Image启动的Docker容器(Container)直接包含了可以调用Apache Spark的Jupyter Notebook。
3.2.1.1. 下载并安装Docker for Mac
Docker简单地讲就是一种虚拟机,与传统虚拟机不同的是,Docker能够共享宿主的OS内核,主要用来运行程序,比如这次我们用来运行Jupyter Notebook。而传统虚拟机是用来运行一个完整的OS的。
我们从Docker的官方网站上下载Docker for Mac并安装。
启动Docker后,在Mac的右上角出现了Docker图标就启动完成了。
3.2.1.2. 启动一个pyspark-notebook容器(Container)
是的,pyspark-notebook就是我们要使用的Docker Image。我们不需要手动去下载,只要在命令行执行以下命令,Docker会自动去下载并用这个Docker Image来启动一个容器。
$ docker run -d -p 8888:8888 \ -v /Volumes/extend/workspace/github/datascience-notebooks:/home/jovyan/notebook \ -v /Volumes/lunarwoffie.com:/home/jovyan/webmount \ -e JUPYTER_ENABLE_LAB=yes \ --name pyspark-notebook georgezhu/pyspark-notebook \ start-notebook.sh --NotebookApp.token=''
这里为不熟悉Docker的同学们简单地解释一下命令的意思。
- docker run 用来从一个Docker Image启动一个容器
- -d 运行在后台
- -p 端口映射。Docker容器内的Jupyter Notebook默认启动在8888端口,将容器内的8888端口映射给宿主的8888端口。
- -v 容器的Volume映射。使我们从Docker容器内部能够访问宿主的资源。这里我们加载了datascience-notebook这个workspace和刚创建的网盘到容器中。
- -e JUPYTER_ENABLE_LAB=yes 使用Jupyter Lab代替Jupyter Notebook。根据官方解释,Lab在将来会彻底替换掉Notebook。
- –name pyspark-notebook georgezhu/pyspark-notebook 从–name pyspark-notebook georgezhu/pyspark-notebook启动一个name为pyspark-notebook的容器
- start-notebook.sh –NotebookApp.token=” 不使用任何认证登陆Jupyter Notebook
3.2.1.3. 打开Jupyter Notebook并确认Spark可用
http://localhost:8888
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master("local") \
.appName("trip data visualization") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
3.2.2. 使用Databricks的配置方法
Databricks是Apache Spark的东家,更是Delta Lake的东家。自然Databricks为我们准备好了一切,并且Databricks Community版本是免费的。听上去很棒吧。但是注意了,在Databricks Community中运行Notebook需要启动一个AWS EC2的实例。而这个实例在2小时的空闲后,会被Databricks自动关闭。这时需要重启EC2实例,并从头运行你的Notebook。因此如果你只是想试一下Delta框架,Databricks Community是个不错的选择,否则可以选择订阅Databricks或者使用我们的Jupyter Notebook方案。
上图展示了在Databricks的Notebook中加载Azure的Blob Storage的方法,你可以加载任何存储介质,包括我们自己的云盘。然后就可以在上面操作Delta Lake了。
Databricks的UI操作简单易懂,在此不再详述。
4. 开始在Delta Lake之旅
那么准备都做好了,我们开始真正地搭建自己的Delta Lake。
4.1. 读入数据
我们使用花旗银行开源的Citi Bike数据集。
我将下载使用JC-202003-citibike-tripdata.csv。你可以下载你喜欢的任何文件。然后把文件放到刚才Mount到本地的网盘中。我放到了这个位置:
/Volumes/lunarwoffie.com/filestore/citi-bike/JC-202003-citibike-tripdata.csv
在刚才建立的Notebook中输入以下代码读入数据。注意文件的读取位置,由于我们使用了Docker容器,根据启动Docker容器时设定的Volume映射,load数据文件时要使用映射在Docker容器内的文件位置。
trip_df = spark.read.format('csv') \
.options(header='true', inferSchema='true') \
.load('/home/jovyan/webmount/filestore/citi-bike/JC-202003-citibike-tripdata.csv') \
.withColumnRenamed("start station id", "start_station_id") \
.withColumnRenamed("start station name", "start_station_name") \
.withColumnRenamed("start station latitude", "start_station_latitude") \
.withColumnRenamed("start station longitude", "start_station_longitude") \
.withColumnRenamed("end station id", "end_station_id") \
.withColumnRenamed("end station name", "end_station_name") \
.withColumnRenamed("end station latitude", "end_station_latitude") \
.withColumnRenamed("end station longitude", "end_station_longitude") \
.withColumnRenamed("birth year", "birth_year")
4.2. 查看数据内容
trip_df.show(5)
+------------+--------------------+--------------------+----------------+------------------+----------------------+-----------------------+--------------+-----------------+--------------------+---------------------+------+----------+----------+------+ |tripduration| starttime| stoptime|start_station_id|start_station_name|start_station_latitude|start_station_longitude|end_station_id| end_station_name|end_station_latitude|end_station_longitude|bikeid| usertype|birth_year|gender| +------------+--------------------+--------------------+----------------+------------------+----------------------+-----------------------+--------------+-----------------+--------------------+---------------------+------+----------+----------+------+ | 389|2020-03-01 00:14:...|2020-03-01 00:20:...| 3202| Newport PATH| 40.7272235| -74.0337589| 3203| Hamilton Park| 40.727595966| -74.044247311| 42381|Subscriber| 1992| 1| | 242|2020-03-01 00:48:...|2020-03-01 00:52:...| 3185| City Hall| 40.7177325| -74.043845| 3205|JC Medical Center| 40.71653978099194| -74.0496379137039| 42155|Subscriber| 1991| 1| | 124|2020-03-01 01:08:...|2020-03-01 01:10:...| 3272| Jersey & 3rd| 40.72333158646436| -74.04595255851744| 3278| Monmouth and 6th| 40.72568548362901| -74.04879033565521| 42376|Subscriber| 1987| 0| | 104|2020-03-01 01:22:...|2020-03-01 01:24:...| 3202| Newport PATH| 40.7272235| -74.0337589| 3638| Washington St| 40.7242941| -74.0354826| 42350|Subscriber| 1993| 1| | 228|2020-03-01 01:39:...|2020-03-01 01:43:...| 3194| McGinley Square| 40.7253399253558| -74.06762212514877| 3280| Astor Place| 40.71928220070702| -74.07126188278198| 42235|Subscriber| 1988| 1| +------------+--------------------+--------------------+----------------+------------------+----------------------+-----------------------+--------------+-----------------+--------------------+---------------------+------+----------+----------+------+
4.3. 将数据保存到Delta Lake中
trip_df.write.format("delta").mode("overwrite").save("/home/jovyan/webmount/delta/citi-bike/JC-202003-citibike-tripdata/")
从Delta Lake中读取保存的数据
trip_df = spark.read.format("delta").load("/home/jovyan/webmount/delta/citi-bike/JC-202003-citibike-tripdata/")
4.4. 创建表
display(spark.sql("DROP TABLE IF EXISTS trips"))
display(spark.sql("CREATE TABLE trips USING DELTA LOCATION '/home/jovyan/webmount/delta/citi-bike/JC-202003-citibike-tripdata/'"))
好了,欢迎从数据仓库回到数据库的世界,现在我们可以用我们熟知的一切数据库知识来操作数据仓库了。
4.5. 简单的数据分析
计算总骑行时间最长的5辆citi-bike
total_trip_df = trip_df.select("bikeid", "tripduration").groupBy('bikeid').agg(sum("tripduration"))
total_trip_df = total_trip_df.orderBy(total_trip_df["sum(tripduration)"].desc())
total_trip_df.show(5)
结果如下:
+------+-----------------+ |bikeid|sum(tripduration)| +------+-----------------+ | 42153| 52738| | 42267| 24485| | 42229| 13866| | 42547| 9593| | 42176| 8393| +------+-----------------+
5. 注意点
如果你在中国国内,Bluehost的连接速度会非常慢,请使用国内的服务供应商。
6. 总结
在这篇文章中我们尝试利用自己的闲置资源干点什么。我们在域名服务器提供的网盘功能上,使用最新的Delta框架搭建了自己的数据湖。