はじめに
本記事の目的
学習目的で実装をしているデータパイプライン構築で得た知見のアウトプットです。 YouTubeDataAPIはGCPに登録をすれば無料で利用できるAPIで、人気動画の一覧やその動画の視聴数、再生時間などのデータを取得することが可能です。 本記事では作成したパイプラインの全体像と各リソースの役割について解説します。
本記事で紹介すること/ 紹介しないこと
紹介すること
- パイプラインを構成する各リソースとその役割について
- パイプラインの流れとポイントとなるコードについて
紹介しないこと
- YouTube Data APIの設定方法
- CI/CDパイプラインについて(別途記事を執筆予定です)
- GCPリソースの構築方法(Terraformについて)
使用技術とツールの紹介
本記事で使用する主要な技術とツールは以下の通りです。
- YouTube Data API: YouTubeのデータを取得するためのAPI。
- Cloud Scheduler: 定期的にジョブを実行するためのスケジューリングサービス。
- Artifact Registory: コンテナイメージを保存し、管理するためのサービス。
- Cloud Run: コンテナ化されたアプリケーションを実行するためのサービス。
- FastAPI: 高速なWebアプリケーションフレームワーク。
- Secret Manager: APIキーやその他の機密情報を安全に管理するためのサービス。
- Google Cloud Storage (GCS): データを保存するためのオブジェクトストレージ。
- Pub/Sub: イベント駆動型のメッセージングサービス。
- BigQuery: 大規模データの分析を行うためのデータウェアハウス。
アーキテクチャ概要
全体のフローの説明
本データパイプラインは以下のようなフローで動作します。
- スケジューラが定期的にCloud Runを起動。
- Cloud Run上で動作するアプリケーションがYouTube Data APIを使用してデータを取得。
- 取得したデータをJSON形式でGoogle Cloud Storage (GCS)に保存。
- GCSにデータが格納されたことをPub/SubトピックへPublishし、サブスクライバーへメッセージを配信。
- Pub/Subイベントを受け取ったCloud RunがデータをBigQueryに挿入する処理を実行。
各コンポーネントの役割
- Cloud Scheduler: 定期的にCloud Runを起動する。
- Artifact Registory: Cloud Runで使用するコンテナイメージを保存しておく。
- Cloud Run: YouTube Data APIからデータの取得とGCS, BQへの格納を行う。
- Secret Manager: YouTube Data APIのAPI KEYを安全に管理し、Cloud Runから利用可能にする。
- GCS: 取得したデータをJSON形式で保存する。
- Pub/Sub: GCSへのデータ格納イベントを受信し、Cloud RunのBQ格納処理をトリガーする。
- BigQuery: データを格納し、分析を可能にする。
手順詳細
ステップ1: スケジューラによるCloud Runの実行
スケジューラの設定
Cloud Schedulerを使用して、定期的にCloud Runを起動するジョブを設定しています。
- ターゲットをHTTPに設定(Cloud Runのエンドポイントを指定)
- Cloud Runの実行には認証が必要なため、実行時はCloud Run起動元の権限を付与したサービスアカウントを指定
ステップ2: YouTube Data APIからのデータ取得
# main.py
sc = SecretManagerInterface()
developer_key = sc.get_secret(
project_id=PROJECT_ID,
secret_id=SECRET_ID,
version_id=SECRET_YOUTUBE_API_VERSION
)
youtube = YoutubeApiRequest(
youtube_api_service_name=YOUTUBE_API_SERVICE_NAME,
youtube_api_version=YOUTUBE_API_VERSION,
developer_key=developer_key
)
res = youtube.get_youtube_data()
data = json.dumps(res)
Secret ManagerからAPIキーを取得
APIキーはコード内にハードコーディングせず、シークレットマネージャーに保存しています。
# secretmanager.py from google.cloud import secretmanager class SecretManagerInterface: def __init__(self) -> None: self.client = secretmanager.SecretManagerServiceClient() def get_secret( self, project_id: str, secret_id: str, version_id: str ) -> str: path = self.client.secret_version_path( project_id, secret_id, version_id ) response = self.client.access_secret_version( request={"name": path} ) secret_value = response.payload.data.decode('UTF-8') return secret_value
YouTubeData APIのコール
取得したAPI KEYを使用してYouTubeAPIにリクエストを送ります。
# youtube_api.py class YoutubeApiRequest: def __init__( self, youtube_api_service_name: str, youtube_api_version: str, developer_key: str ) -> None: self.youtube = build( youtube_api_service_name, youtube_api_version, developerKey=developer_key ) dt = datetime.now(pytz.timezone('Asia/Tokyo')) self.date_str = dt.strftime('%Y%m%d%H%M') def get_youtube_data(self) -> dict: request = self.youtube.videos().list( part="snippet, contentDetails, statistics", chart="mostPopular", maxResults=50, regionCode="JP" ) res = request.execute() return res
ステップ3: 取得データをJSON形式で保存
取得したデータは生データのまま一度GCSへ格納します。 GCSへ格納後、ファイルのパスをPubSubにPublishします。
# main.py gcs = GcsInterface( project_id=PROJECT_ID, bucket_name=BUCKET_NAME, ) file_path = f"most_popular/{youtube.date_str}_popular.json" logger.info(file_path) gcs.upload_json(file_path, data) pubsub = PubSubInterface( project_id=PROJECT_ID, topic_name=TOPIC_NAME ) pubsub.publish(file_path)
Google Cloud Storage(GCS)への保存
# gcs.py class GcsInterface: def __init__( self, project_id: str, bucket_name: str, ) -> None: self.client = storage.Client(project=project_id) self.bucket_name = bucket_name self.bucket = self.client.bucket(bucket_name) def upload_json(self, gcs_path: str, contents: str) -> None: blob = self.bucket.blob(gcs_path) try: blob.upload_from_string(contents, content_type="application/json") except Exception as e: raise Exception(e)
PubSubへメッセージをPublish
# pubsub.py class PubSubInterface: def __init__( self, project_id: str, topic_name: str, ) -> None: self.publisher = pubsub_v1.PublisherClient() self.topic_path = self.publisher.topic_path(project_id, topic_name) def publish(self, data: str): logger.info(data) encode_data = data.encode("utf-8") self.publisher.publish(self.topic_path, data=encode_data)
ステップ4: GCSの格納をトリガーにメッセージを配信
Subscriptionの設定
- PubSubトピックにメッセージが送信された時にCloudRunの/invoke/loadにリクエストを送信するサブスクリプションを設定
- CloudRunの起動元権限が付与されているサービスアカウントを使用
ステップ5: データをBigQueryへ挿入
GCSのファイルを取得し、BQに取り込みやすい形に整形してから格納します。
@app.post("/invoke/load") async def invoke_load_to_bq(background_tasks: BackgroundTasks, request: PubsubRequest) -> None: file_path = check_pubsub_message(request) background_tasks.add_task(load_bq, file_path) return {"message": "Message received and processing started BQ Load."} async def load_bq(file_path): gcs = GcsInterface(project_id=PROJECT_ID, bucket_name=BUCKET_NAME) bq = BqInterface(project_id=PROJECT_ID) blob = gcs.get_file(file_path) dataset_name = "videos" table_name = "most_popular" if blob is not None: with blob.open(mode="r", encoding='utf-8') as f: data = json.load(f) created_at = extract_datetime_from_file_path(file_path) extract_data = extract_most_popular(data, created_at) logger.info("Insert Table Data") bq.insert_table_data( dataset_name=dataset_name, table_name=table_name, data=extract_data )
GCSからファイルを取得
PubSubからpushされたメッセージにGCSのパスが含まれているので、取得したパスのファイルをblob形式で取得します。
BQへデータの挿入
取得したデータをBQにinsertします。
from google.cloud import bigquery from gcp.config.bq_schema import MOST_POPULAR_TABLE_SCHEMA class BqInterface: def __init__(self, project_id: str) -> None: self.client = bigquery.Client(project=project_id) def insert_table_data( self, dataset_name: str, table_name: str, data: list[dict] ) -> None: job_config = bigquery.LoadJobConfig( schema=MOST_POPULAR_TABLE_SCHEMA, ) table_id = f"{self.client.project}.{dataset_name}.{table_name}" table = self.client.get_table(table_id) job = self.client.load_table_from_json( json_rows=data, destination=table, job_config=job_config, ) job.result()
まとめ
今後の拡張ポイント
- ルーティングの作成: FastAPIでルーティングを作成し、PubSubのメッセージにpathパラメータを含めることでリクエストに応じた処理ができるようにする
- BIツールの統合: BQに蓄積したデータを加工しBIツールで可視化を行う
- 通知機能: 異常データが検出された場合に通知を行う機能を追加する。
感想
データエンジニアリングの勉強がしたい → いい感じのデータがない → とりあえずパイプラインでも組むか。というモチベーションで始めたのですが、色んな機能を盛り込みすぎて気づいたら半年近く経っていました。 今後は上記の拡張ポイントの実装を進めつつ、私のデータエンジニアリングの学習の基盤として利用していきたいです。