You are currently viewing 利用闲置资源搭建自己的数据湖(Data Lake)-面向初学者

利用闲置资源搭建自己的数据湖(Data Lake)-面向初学者

大家好~,

今天讲一下关于搭建自己的数据湖(Data Lake)。关于数据湖的文章很多,这里就不多介绍了。笔者发现自己的域名服务器提供网盘(WebDav)服务并且容量没有限制(不用太浪费了),所以试着在网盘上搭建了自己数据湖,在此分享。其实简单的说,我把企业级方案的存储层从HDFS或S3等改成了WebDav,然后在上面使用最新的Delta框架来处理数据湖。Delta Lake是Databricks公司的一个开源项目。Databricks是一个整合数据科学与数据工程的一体化平台。使用过Apache Spark的话可能听说过Databricks,它是Apache Spark的项目发起方。Delta 框架简单地讲就是为数据湖提供ACID事务能力。我们可以在现有的基于HDFS的数据湖上追加Delta Lake层,也可以完全从0开始直接使用Delta Lake存储数据。相较于企业级方案,我的域名服务器并不需要我支付额外的费用,这种剩余资源为何不利用起来呢。感兴趣的朋友们可以像我一样挖掘一下身边的可用资源。躺在我们身边的待用资源其实很多。

1. 为什么搭建自己的数据湖(Data Lake

那么真的有必要搭建自己的数据湖吗?笔者的主张是:是的!来说一下我的动机:

  1. 可以方便地做各种技术尝试

搞技术的同学们都会自己的PC上搭建本地环境以做各种测试,技术尝试。

  1. 充分利用自己的资源

如前所述由于我使用的域名服务器提供了无限制的存储空间,又支持网盘功能,所以,将这个无容量限制的网盘Mount到我的Mac上做数据湖存储是一个不错的方案

  1. 提高工程师对于技术追求的满足感

技术背景的很多人都对新技术有着探求心,程序员和工程师门总喜欢尝试一些新的技术,来满足自己

2. 为什么用Delta Lake,而不直接搭建数据湖

由于当今的数据拥有3V特性,即Volume(数据量大),Variety(数据种类多),Velocity(数据产生速度快)。因此很多公司内部都拥有自己的大数据平台,而大数据平台中存储数据的地方正是数据湖。通常可以将数据湖理解为储存大型数据的数据库,但是由于数据量的庞大,数据湖舍弃了数据库的一些特性,比如ACID事务功能,另外数据湖一般不支持索引,为了UPDATE/MERGE一条记录,需要在整个数据湖中进行一次检索,这种Full Scan的代价大,效率低。因此数据湖是一种针对查询进行特化的技术,仍然有很多技术挑战。主要有以下5条挑战:

1)同时读写, 并且要保证数据的一致性(read consistent data)

2)可以高吞吐从大表读数据(Scalable metadata handling)

3)遇到错误写出可以回滚(rollback) 可以删改(update/delete/merge)

4)在线业务不下线的同时可以重新处理历史数据(replay historical data)

5)处理迟到数据(late arriving data) 而无需推迟下阶段的数据处理

很多数据湖的方案费钱费力,将钱和时间大量用到了解决系统局限,而不是去从数据中提取有价值的信息。而Delta框架正是为解决以上问题而生。

3.  准备工作

在开始使用Delta Lake前我们先做一些准备工作。

3.1. 存储空间的准备

在一开始我们提到使用网盘(WebDav)来代替企业级数据湖的HDFS层。所以我们需要先将WebDav加载到自己的机子上。

如果你是一名Bloger,或许你已经有了自己的域名服务器,那么你可以去查一下自己的域名服务器是否有免费的WebDav资源可用。如果你正好使用的是Bluehost,那么跟着我来。

3.1.1 网盘(WebDav)的准备

我们来看一下如何在Bluehost上设置网盘。

Bluehost的网盘功能默认是开启的。我们在主控制页面(cpanel)左侧点击[Advaned],然后点击[Web Disk]打开以下页面。如图输入网盘连接用的用户名(Username)和密码(Password),以及网盘的存储位置(Directory),点击[Create]就创建完成了。

Bluehost提供了FTP,FTPS,SFTP和WebDav的网盘连接协议,FTP,FTPS是经典的文件传输协议,如果你喜欢FTP协议,Bluehost提供了可自定义的FTP账号。但是,相比基于HTTP(S)的WebDav协议,WebDav提供了包括GZIP压缩,多种认证方法等FTP所没有的功能。对于SFTP协议,虽然它可能是功能最强的协议了,但是如果你订阅的不是Bluehost最高级的套餐,将只能使用root来链接,不建议使用。因此,我们使用WebDav协议将网盘Mount到我们的PC上。MacOS上的Mount方法非常简单,打开“Connect to Server”,输入你的网盘地址和端口连接即可。网盘会被Mount到/Volumes/下,比如笔者的网盘目录是:
/Volumes/lunarwoffie.com。

这里有几个坑大家注意一下:

  1. 不要使用Bluehost提供给你的WebDav地址,你很可能无法连接,正确的地址是 你的域名:2078。这在Bluehost的技术支持文档里有说明。
  2. 如果你想从命令行操作,很遗憾MacOS从命令行Mount一个WebDav协议的网盘并不容易。
  3. 不要在Finder中访问你的网盘。由于MacOS特有的.DS_Store文件的存在,造成显示网盘的内容非常慢。因此,使用“Connect to Server”加载完网盘后,立即关闭弹出的Finder窗口。从命令行访问你的网盘。

对于其他OS,操作雷同,在此不再详述。

3.2. Notebook环境的准备

接下来我们要准备操作Delta Lake的环境。由于笔者习惯使用Notebook做数据分析。这里详细讲解一下Notebook的配置方法。当然,Notebook并不是唯一的选择。你可以使用任何自己熟悉的编程环境。另外,想偷懒的朋友们请看3.2.2节如何使用Databricks的Notebook。

3.2.1. 使用Jupyter Notebook的配置方法

熟悉数据科学或数据分析的朋友们肯定对Jupyter Notebook不陌生。由于需要配置Apache Spark在Jupyter上运行,为了简化步骤,我们直接使用Jupyter官方提供的Docker Image。从这个Docker Image启动的Docker容器(Container)直接包含了可以调用Apache Spark的Jupyter Notebook。

3.2.1.1. 下载并安装Docker for Mac

Docker简单地讲就是一种虚拟机,与传统虚拟机不同的是,Docker能够共享宿主的OS内核,主要用来运行程序,比如这次我们用来运行Jupyter Notebook。而传统虚拟机是用来运行一个完整的OS的。

我们从Docker的官方网站上下载Docker for Mac并安装。

启动Docker后,在Mac的右上角出现了Docker图标就启动完成了。

3.2.1.2. 启动一个pyspark-notebook容器(Container)

是的,pyspark-notebook就是我们要使用的Docker Image。我们不需要手动去下载,只要在命令行执行以下命令,Docker会自动去下载并用这个Docker Image来启动一个容器。

$ docker run -d -p 8888:8888 \
-v /Volumes/extend/workspace/github/datascience-notebooks:/home/jovyan/notebook \
-v /Volumes/lunarwoffie.com:/home/jovyan/webmount \
-e JUPYTER_ENABLE_LAB=yes \
--name pyspark-notebook georgezhu/pyspark-notebook \
start-notebook.sh --NotebookApp.token=''

这里为不熟悉Docker的同学们简单地解释一下命令的意思。

  1. docker run 用来从一个Docker Image启动一个容器
  2. -d 运行在后台
  3. -p 端口映射。Docker容器内的Jupyter Notebook默认启动在8888端口,将容器内的8888端口映射给宿主的8888端口。
  4. -v 容器的Volume映射。使我们从Docker容器内部能够访问宿主的资源。这里我们加载了datascience-notebook这个workspace和刚创建的网盘到容器中。
  5. -e JUPYTER_ENABLE_LAB=yes 使用Jupyter Lab代替Jupyter Notebook。根据官方解释,Lab在将来会彻底替换掉Notebook。
  6. –name pyspark-notebook georgezhu/pyspark-notebook 从–name pyspark-notebook georgezhu/pyspark-notebook启动一个name为pyspark-notebook的容器
  7. start-notebook.sh –NotebookApp.token=” 不使用任何认证登陆Jupyter Notebook

3.2.1.3. 打开Jupyter Notebook并确认Spark可用

在浏览器中打开以下地址:
http://localhost:8888
新建一份Notebook,我们试一下Apache Spark是否可用。 用以下代码尝试建立一个SparkSession,变量名为spark。在Databricks中不需要初始化SparkSession,一个可用的spark变量已近准备好。
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local") \
        .appName("trip data visualization") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()

3.2.2. 使用Databricks的配置方法

Databricks是Apache Spark的东家,更是Delta Lake的东家。自然Databricks为我们准备好了一切,并且Databricks Community版本是免费的。听上去很棒吧。但是注意了,在Databricks Community中运行Notebook需要启动一个AWS EC2的实例。而这个实例在2小时的空闲后,会被Databricks自动关闭。这时需要重启EC2实例,并从头运行你的Notebook。因此如果你只是想试一下Delta框架,Databricks Community是个不错的选择,否则可以选择订阅Databricks或者使用我们的Jupyter Notebook方案。

上图展示了在Databricks的Notebook中加载Azure的Blob Storage的方法,你可以加载任何存储介质,包括我们自己的云盘。然后就可以在上面操作Delta Lake了。

Databricks的UI操作简单易懂,在此不再详述。

4. 开始在Delta Lake之旅

那么准备都做好了,我们开始真正地搭建自己的Delta Lake。

4.1. 读入数据

我们使用花旗银行开源的Citi Bike数据集

我将下载使用JC-202003-citibike-tripdata.csv。你可以下载你喜欢的任何文件。然后把文件放到刚才Mount到本地的网盘中。我放到了这个位置:

/Volumes/lunarwoffie.com/filestore/citi-bike/JC-202003-citibike-tripdata.csv

在刚才建立的Notebook中输入以下代码读入数据。注意文件的读取位置,由于我们使用了Docker容器,根据启动Docker容器时设定的Volume映射,load数据文件时要使用映射在Docker容器内的文件位置。

trip_df = spark.read.format('csv') \
        .options(header='true', inferSchema='true') \
        .load('/home/jovyan/webmount/filestore/citi-bike/JC-202003-citibike-tripdata.csv') \
        .withColumnRenamed("start station id", "start_station_id") \
        .withColumnRenamed("start station name", "start_station_name") \
        .withColumnRenamed("start station latitude", "start_station_latitude") \
        .withColumnRenamed("start station longitude", "start_station_longitude") \
        .withColumnRenamed("end station id", "end_station_id") \
        .withColumnRenamed("end station name", "end_station_name") \
        .withColumnRenamed("end station latitude", "end_station_latitude") \
        .withColumnRenamed("end station longitude", "end_station_longitude") \
        .withColumnRenamed("birth year", "birth_year")

4.2. 查看数据内容

trip_df.show(5)
结果如下:
+------------+--------------------+--------------------+----------------+------------------+----------------------+-----------------------+--------------+-----------------+--------------------+---------------------+------+----------+----------+------+
|tripduration|           starttime|            stoptime|start_station_id|start_station_name|start_station_latitude|start_station_longitude|end_station_id| end_station_name|end_station_latitude|end_station_longitude|bikeid|  usertype|birth_year|gender|
+------------+--------------------+--------------------+----------------+------------------+----------------------+-----------------------+--------------+-----------------+--------------------+---------------------+------+----------+----------+------+
|         389|2020-03-01 00:14:...|2020-03-01 00:20:...|            3202|      Newport PATH|            40.7272235|            -74.0337589|          3203|    Hamilton Park|        40.727595966|        -74.044247311| 42381|Subscriber|      1992|     1|
|         242|2020-03-01 00:48:...|2020-03-01 00:52:...|            3185|         City Hall|            40.7177325|             -74.043845|          3205|JC Medical Center|   40.71653978099194|    -74.0496379137039| 42155|Subscriber|      1991|     1|
|         124|2020-03-01 01:08:...|2020-03-01 01:10:...|            3272|      Jersey & 3rd|     40.72333158646436|     -74.04595255851744|          3278| Monmouth and 6th|   40.72568548362901|   -74.04879033565521| 42376|Subscriber|      1987|     0|
|         104|2020-03-01 01:22:...|2020-03-01 01:24:...|            3202|      Newport PATH|            40.7272235|            -74.0337589|          3638|    Washington St|          40.7242941|          -74.0354826| 42350|Subscriber|      1993|     1|
|         228|2020-03-01 01:39:...|2020-03-01 01:43:...|            3194|   McGinley Square|      40.7253399253558|     -74.06762212514877|          3280|      Astor Place|   40.71928220070702|   -74.07126188278198| 42235|Subscriber|      1988|     1|
+------------+--------------------+--------------------+----------------+------------------+----------------------+-----------------------+--------------+-----------------+--------------------+---------------------+------+----------+----------+------+

4.3. 将数据保存到Delta Lake

我们用一下代码将CSV保存到Delta Lake中
trip_df.write.format("delta").mode("overwrite").save("/home/jovyan/webmount/delta/citi-bike/JC-202003-citibike-tripdata/")
  从Delta Lake中读取保存的数据
trip_df = spark.read.format("delta").load("/home/jovyan/webmount/delta/citi-bike/JC-202003-citibike-tripdata/")

4.4. 创建表

display(spark.sql("DROP TABLE IF EXISTS trips"))
display(spark.sql("CREATE TABLE trips USING DELTA LOCATION '/home/jovyan/webmount/delta/citi-bike/JC-202003-citibike-tripdata/'"))

好了,欢迎从数据仓库回到数据库的世界,现在我们可以用我们熟知的一切数据库知识来操作数据仓库了。

4.5. 简单的数据分析

计算总骑行时间最长的5辆citi-bike

total_trip_df = trip_df.select("bikeid", "tripduration").groupBy('bikeid').agg(sum("tripduration"))
total_trip_df = total_trip_df.orderBy(total_trip_df["sum(tripduration)"].desc())
total_trip_df.show(5)

结果如下:

+------+-----------------+
|bikeid|sum(tripduration)|
+------+-----------------+
| 42153|            52738|
| 42267|            24485|
| 42229|            13866|
| 42547|             9593|
| 42176|             8393|
+------+-----------------+

5. 注意点

如果你在中国国内,Bluehost的连接速度会非常慢,请使用国内的服务供应商。

6. 总结

在这篇文章中我们尝试利用自己的闲置资源干点什么。我们在域名服务器提供的网盘功能上,使用最新的Delta框架搭建了自己的数据湖。