You are currently viewing 余った資源を利用して自分のデータレイクを構築します-初心者向け

余った資源を利用して自分のデータレイクを構築します-初心者向け

こんにちは、シュです。 今回は自分の資源を利用してデータレイクの構築について書きたいと思います。データレイクに関する記事はたくさんあるので、ここでは詳しく紹介しません。私が今使っているドメインサーバーはWeb Diskサービス(WebDav)を提供し、容量に制限がない(利用しないと勿体ない)ようなので、試しにWeb Disk上に独自のデータレイクを構築してみました。簡単に説明すると、エンタープライズレベルのデータレイクソリューションのストレージレイヤーをHDFSまたはS3からWeb Diskに変更し、最新のDeltaフレームワークを使用してデータレイクを処理しました。 Deltaフレームワークは、Databricks社のオープンソースプロジェクトです。Apache Sparkを使用したことがある場合は、Apache Sparkプロジェクトの提案者であるDatabricksについて聞いたことがあるかもしれません。 Deltaフレームワークは、データレイク上に実装が難しいACIDトランザクション機能を簡単に追加することができます。 Delta Lakeレイヤーを既存のHDFSデータレイクに追加するか、Delta Lakeを使用してゼロから直接データを格納することもできます。エンタープライズレベルのプランと比較して、ドメインサーバーでは既に自分が所有し、追加料金を支払う必要がないので、この余ったリソースを使用しない理由がありません。興味のある方は、ぜひ私のように周りに利用可能なリソースがあるかを探してみてください。意外と簡単に見つかるかもしれません。

1. なぜ自分用のデータレイクを構築するの?

では、本当に自分用のデータレイクを構築することが必要なのでしょうか。 私の主張はYesです! 私の動機について話してください:

  1. さまざまな技術を気軽に試すことができます

技術出身の方は自分のPCにローカル環境を作成し、いろんなテスト、技術を試します。

  1. 自分が持っているリソースを最大限に活用します

前述のように、私が使用するドメインサーバーは無制限のストレージスペースを提供し、Web Disk機能をサポートするため、データレイク用にこの容量無制限のWeb DiskをMacにマウントすることはリソース活用の良いソリューションです。

  1. 技術を追求するエンジニアの満足度を向上させます

技術のバックグラウンドを持つ多くの人々は新しいテクノロジーに探究心を持ちます。プログラマーやエンジニアは探究心に満足させるために新しいテクノロジーをチャレンジしたりします。

2. なぜデータレイクを直接に構築しないで、Deltaを使うの?

今日のデータには、Volume(データの量が多い)、Variety(データ種類が多い)、Velocity(データの生成速度が早い)の3V特性があるため、多くの企業には独自のビッグデータプラットフォームがあり、ビッグデータプラットフォームがデータを格納する場所はデータレイクです。データレイクは通常、ビッグデータを格納するためデータベースとして理解できますが、大量のデータを扱うため、データレイクはACIDトランザクション機能などのデータベース本来の一部の特性を破棄し、データ検索の性能が落ちないようにしています。さらに、データレイクは通常、インデックスをサポートしませんので、UPDATE/ MERGEを行う場合、データレイク全体で検索を実行する必要があり、コストがかかり、非効率的です。したがって、データレイクはクエリに特化したテクノロジーであり、まだ多くの技術的な課題がありますが、主に以下の5つの課題があります:

1)読み取りと書き込みを同時に行い、データの整合性を確保する(read consistent data)

2)大きなテーブルから高いスループットでデータを読み取ることができます(Scalable metadata handling)

3)エラーが発生した場合はロールバックできる(rollback) 変更可能(update/delete/merge)

4)オンラインビジネスが中断せずに、過去のデータを再処理できる(replay historical data)

5)遅れてきたデータを次のデータ処理プロセスに伸ばすことなくすぐに処理できる(late arriving data

データレイクソリューションの多くはコストと手間がかかり、データから貴重な情報を抽出するのではなく、システムの制限問題を解決するために多くのお金と時間を費やしてしまっています。 Deltaフレームワークは、上記の問題を解決するために生まれました。

3.  事前準備

Delta Lakeの使用する前に、いくつかの準備作業を行います。

3.1. Web Diskストレージの準備

最初に、エンタープライズデータレイクのHDFSレイヤーを置き換えるWeb Diskについて説明しました。 では、最初にWeb Diskを自分のPCにロードする必要があります。 もしあなたもブロガーなら、おそらくすでに自分自身のドメインサーバーを持っているでしょう、ぜひそのドメインサーバーに利用可能な無料のWeb Diskがあるかをチェックしてください。私はBluehostのサービスを利用しているので、以降Bluehostを使用することを前提で進みます。もちろん私の知る限りでは、他のドメインサーバーのプロバイダーのほとんども無料のWeb Diskを提供しているので、他のプロバイダーを使用している場合でもご安心ください。

3.1.1 Web Disk(WebDav)の準備

BluehostでWeb Diskを設定する方法を見てみましょう。

BluehostのWeb Disk機能はデフォルトで有効になっています。 メインコントロールページ(cpanel)の左側にある[Advaned]ボタンをクリックし、[Web Disk]をクリックしてWeb Diskで設定ページを開きます。 図のようにWeb Disk接続用のユーザー名(Username)とパスワード(Password)、ストレージの場所(Directory)を入力し、[Create]ボタンをクリックするとWeb Disk作成完了です。

Bluehostは、FTP、FTPS、SFTPおよびWebDavの接続プロトコルを提供しています。FTPおよびFTPSは、少し古い的なファイル転送プロトコルですが今でも広く使われています。FTPプロトコルを使いたい場合は、Bluehostがカスタマイズ可能なFTPアカウントを提供していますが、HTTP(S)に基づくWebDavプロトコルと比較して、WebDavはGZIP圧縮や複数の認証方法など、FTPに含まれていない機能を有するので、WebDavを使った方がいいでしょう。SFTPプロトコルが最も強力なプロトコルかもしれませんが、Bluehostの上位プランでないと、ルートユーザのみが接続できるので、お勧めできません。したがって、WebDavプロトコルを使用して、Web DiskをPCにマウントします。 MacOSでのマウント方法は非常に簡単です。[Connect to Server]を開き、接続するWeb Diskのアドレスとポートを入力し接続すると、/Volumesの下にマウントされます。たとえば、私の場合、Web Diskのローカルディレクトリは次のとおりです:
/Volumes/lunarwoffie.com

いくつかの罠がありますので、ご注意ください:

  1. Bluehostのcpanel上で表示されたWebDavのURLで接続しても繋がらないので、正しいURLは ドメイン:2078。これはBluehost公式ドキュメントに記載がありますが、画面上の表示が見違ったままのようです。
  2. MacOSを使う場合、残念ながら、端末でコマンドを使ってWeb DiskをMountするのが困難です。
  3. MacOSのFinderでMountしたストレージにアクセスしないでください。MacOSには.DS_Storeという特殊な隠しファイルが存在するため、Finderからのアクセスが非常に遅くなる可能性があります。Mount後、端末でストレージにアクセスしてください。Web Diskのローカルディレクトリは前述の通り:/Volumes/<domain name> になります。

3.2. Notebook環境の準備

次に、Delta Lakeを操作する環境を準備します。Notebookはデータ分析をするのに広くつくわれるツールなので、ここでは、DeltaをNotebookで動かす設定について詳しく説明します。もちろん、Notebookが唯一の選択肢ではありません。 使い慣れたプログラミング環境であればどれでも使用できます。また、設定したくない方は3.2.2節のDatabricks Notebookの使用方法を参照してください。

3.2.1. Jupyter Notebookの設定方法

データサイエンスやデータ分析に詳しい方は、Jupyter Notebookを使ったことがあるでしょう。Apache SparkをJupyterで実行するように設定する必要があるため、Jupyterが提供するSparkが設定済みの公式Dockerイメージを使用します。 このDockerイメージから起動されたDockerコンテナー(Container)には、Apache Sparkを呼び出すことができるJupyter Notebookが直接含まれています。

3.2.1.1. Docker for Macのインストール

Dockerは一種の仮想マシンです。従来の仮想マシンとは異なり、DockerはホストのOSカーネルを共有できます、主にプログラムの実行に使用されます。たとえば、今回はJupyter Notebookの実行に使用します。 従来の仮想マシンは、OS自体を実行するために使用されます。 ではDockerのサイトからDocker for Mac并をダウンロードしてインストールしましょう。

3.2.1.2. pyspark-notebookのコンテナー(Container)を起動

はい、pyspark-notebookはDocker Image名です。手動でダウンロードする必要がありません。次のコマンドを実行すれば、Dockerが自動的にダウンロードしてコンテナーを起動してくれます。

$ docker run -d -p 8888:8888 \
-v /Volumes/extend/workspace/github/datascience-notebooks:/home/jovyan/notebook \
-v /Volumes/lunarwoffie.com:/home/jovyan/webmount \
-e JUPYTER_ENABLE_LAB=yes \
--name pyspark-notebook georgezhu/pyspark-notebook \
start-notebook.sh --NotebookApp.token=''

Docker初心者の方のためにコマンドの意味を簡単に説明します:

  1. docker run Dockerイメージからコンテナーを起動するためのコマンド
  2. -d バックグラウンドで実行
  3. -p ポートマッピング。 Dockerコンテナ内のJupyter Notebookはデフォルトでポート8888で起動されるので、コンテナのポート8888をホストのポート8888にマップし、ホストからJupyter Notebookにアクセスできるようにします。
  4. -v コンテナのボリュームマッピング。 Dockerコンテナー内からホストのリソースにアクセスできます。 ここで、datascience-notebookワークスペースと先ほどMountしたWeb Diskをコンテナーにマップします。
  5. -e JUPYTER_ENABLE_LAB=yes Jupyter Notebookの代わりにJupyter Labを使用します。 公式説明によると、Labは将来的にNotebookを完全に置き換える予定です。
  6. –name pyspark-notebook georgezhu/pyspark-notebook –name pyspark-notebook georgezhu / pyspark-notebookからpyspark-notebookという名前のコンテナーを起動します。
  7. start-notebook.sh –NotebookApp.token=” 認証なしでJupyter  Notebookにアクセスできるようにします

3.2.1.3.Sparkが使えることを確認

ブラウザーで以下のURLにアクセスします:

http://localhost:8888

新規Notebookを作成し,次のソースコードでSparkSessionを初期化してみてください。変数名はsparkです。 Databricksの場合はSparkSessionを初期化する必要がありません。利用可能なspark変数は準備できました。

from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local") \
        .appName("trip data visualization") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()

3.2.2. Databricksを使う場合の設定方法

DatabricksはApache Sparkの提案者であり、Delta Lakeの提案者です。 当然、DatabricksプラットフォームにはSparkのすべてが備わっており、しかもDatabricks Community版は無料で使えます。素晴らしい。ただし、Databricks Community版でNotebookを実行するには、AWS EC2のインスタンスを起動する必要があることに注意してください。 そして、このインスタンスは、2時間のアイドル時間後にDatabricksによって自動的に閉じられます。 このとき、EC2インスタンスを再起動し、Notebookを最初から実行する必要があります。 なので、Deltaフレームワークを試してみたい場合は、Databricks Community版が適しています。それ以外の場合は、Databricksのプランにお金を払うか、Jupyter Notebookの案を使いましょう。

上の画像は、Databricks NotebookにAzure Blob StorageをMountする方法を示しています。自分のWeb Diskを含む任意のストレージをMountできます。 その後、Delta Lakeを操作できます。

4. ではDelta Lakeを使ってみよう

すべての準備ができましたので、Deltaフレームワークを使って、Web Diskを操作してみましょう。

4.1. データを読み込む

オープンデータのCiti Bikeを扱います使います。

上記サイトからJC-202003-citibike-tripdata.csvをダウンロードして使用します(好きなファイルをダウンロードしてください)。次に、ローカルにMountしたWeb Diskにファイルをおきます。 私の場合以下に置きました:

/Volumes/lunarwoffie.com/filestore/citi-bike/JC-202003-citibike-tripdata.csv

先ほど作成したNotebookに次のコードを入力して、データを読み取ります。 ファイルの読み取り位置に注意してください。Dockerコンテナーを使用しているので、Dockerコンテナーの起動時に設定されたボリュームマッピングに従って、Dockerコンテナー内にマップされたファイルの場所からデータファイルを読み込む必要があります。

trip_df = spark.read.format('csv') \
        .options(header='true', inferSchema='true') \
        .load('/home/jovyan/webmount/filestore/citi-bike/JC-202003-citibike-tripdata.csv') \
        .withColumnRenamed("start station id", "start_station_id") \
        .withColumnRenamed("start station name", "start_station_name") \
        .withColumnRenamed("start station latitude", "start_station_latitude") \
        .withColumnRenamed("start station longitude", "start_station_longitude") \
        .withColumnRenamed("end station id", "end_station_id") \
        .withColumnRenamed("end station name", "end_station_name") \
        .withColumnRenamed("end station latitude", "end_station_latitude") \
        .withColumnRenamed("end station longitude", "end_station_longitude") \
        .withColumnRenamed("birth year", "birth_year")

4.2. データの中身をのぞいてみる

trip_df.show(5)

結果は以下の通り:

+------------+--------------------+--------------------+----------------+------------------+----------------------+-----------------------+--------------+-----------------+--------------------+---------------------+------+----------+----------+------+
|tripduration|           starttime|            stoptime|start_station_id|start_station_name|start_station_latitude|start_station_longitude|end_station_id| end_station_name|end_station_latitude|end_station_longitude|bikeid|  usertype|birth_year|gender|
+------------+--------------------+--------------------+----------------+------------------+----------------------+-----------------------+--------------+-----------------+--------------------+---------------------+------+----------+----------+------+
|         389|2020-03-01 00:14:...|2020-03-01 00:20:...|            3202|      Newport PATH|            40.7272235|            -74.0337589|          3203|    Hamilton Park|        40.727595966|        -74.044247311| 42381|Subscriber|      1992|     1|
|         242|2020-03-01 00:48:...|2020-03-01 00:52:...|            3185|         City Hall|            40.7177325|             -74.043845|          3205|JC Medical Center|   40.71653978099194|    -74.0496379137039| 42155|Subscriber|      1991|     1|
|         124|2020-03-01 01:08:...|2020-03-01 01:10:...|            3272|      Jersey & 3rd|     40.72333158646436|     -74.04595255851744|          3278| Monmouth and 6th|   40.72568548362901|   -74.04879033565521| 42376|Subscriber|      1987|     0|
|         104|2020-03-01 01:22:...|2020-03-01 01:24:...|            3202|      Newport PATH|            40.7272235|            -74.0337589|          3638|    Washington St|          40.7242941|          -74.0354826| 42350|Subscriber|      1993|     1|
|         228|2020-03-01 01:39:...|2020-03-01 01:43:...|            3194|   McGinley Square|      40.7253399253558|     -74.06762212514877|          3280|      Astor Place|   40.71928220070702|   -74.07126188278198| 42235|Subscriber|      1988|     1|
+------------+--------------------+--------------------+----------------+------------------+----------------------+-----------------------+--------------+-----------------+--------------------+---------------------+------+----------+----------+------+

4.3. データをDelta Lakeに保存する

次のソースコードでCSVファイルのデータをDelta Lakeに保存します。

trip_df.write.format("delta").mode("overwrite").save("/home/jovyan/webmount/delta/citi-bike/JC-202003-citibike-tripdata/")

Delta Lakeから保存されたデータを読み取ります。

trip_df = spark.read.format("delta").load("/home/jovyan/webmount/delta/citi-bike/JC-202003-citibike-tripdata/")

4.4. テーブルを作成

display(spark.sql("DROP TABLE IF EXISTS trips"))
display(spark.sql("CREATE TABLE trips USING DELTA LOCATION '/home/jovyan/webmount/delta/citi-bike/JC-202003-citibike-tripdata/'"))

はい、データベースの世界へお帰り!ここから、データベースの知識でデータレイクを操作しましょう。

4.5. 簡単なデータ分析

総騎乗時間が最も長い5つのciti-bikeを計算する

total_trip_df = trip_df.select("bikeid", "tripduration").groupBy('bikeid').agg(sum("tripduration"))
total_trip_df = total_trip_df.orderBy(total_trip_df["sum(tripduration)"].desc())
total_trip_df.show(5)

結果は以下の通り:

+------+-----------------+
|bikeid|sum(tripduration)|
+------+-----------------+
| 42153|            52738|
| 42267|            24485|
| 42229|            13866|
| 42547|             9593|
| 42176|             8393|
+------+-----------------+

5. 最後に

この記事では、利用されていないリソースを使用して、最新のDeltaフレームワークを使って、ドメインサーバー(Bluehost社)のWeb Diskを通して自分のデータレイクを構築しました。