YouTube Data APIを使用したデータパイプラインの構築

はじめに

本記事の目的

学習目的で実装をしているデータパイプライン構築で得た知見のアウトプットです。 YouTubeDataAPIはGCPに登録をすれば無料で利用できるAPIで、人気動画の一覧やその動画の視聴数、再生時間などのデータを取得することが可能です。 本記事では作成したパイプラインの全体像と各リソースの役割について解説します。

github.com

本記事で紹介すること/ 紹介しないこと

  • 紹介すること

    • パイプラインを構成する各リソースとその役割について
    • パイプラインの流れとポイントとなるコードについて
  • 紹介しないこと

    • YouTube Data APIの設定方法
    • CI/CDパイプラインについて(別途記事を執筆予定です)
    • GCPリソースの構築方法(Terraformについて)

使用技術とツールの紹介

本記事で使用する主要な技術とツールは以下の通りです。

  • YouTube Data API: YouTubeのデータを取得するためのAPI。
  • Cloud Scheduler: 定期的にジョブを実行するためのスケジューリングサービス。
  • Artifact Registory: コンテナイメージを保存し、管理するためのサービス。
  • Cloud Run: コンテナ化されたアプリケーションを実行するためのサービス。
  • FastAPI: 高速なWebアプリケーションフレームワーク。
  • Secret Manager: APIキーやその他の機密情報を安全に管理するためのサービス。
  • Google Cloud Storage (GCS): データを保存するためのオブジェクトストレージ。
  • Pub/Sub: イベント駆動型のメッセージングサービス。
  • BigQuery: 大規模データの分析を行うためのデータウェアハウス。

アーキテクチャ概要

全体のフローの説明

本データパイプラインは以下のようなフローで動作します。

  1. スケジューラが定期的にCloud Runを起動。
  2. Cloud Run上で動作するアプリケーションがYouTube Data APIを使用してデータを取得。
  3. 取得したデータをJSON形式でGoogle Cloud Storage (GCS)に保存。
  4. GCSにデータが格納されたことをPub/SubトピックへPublishし、サブスクライバーへメッセージを配信。
  5. Pub/Subイベントを受け取ったCloud RunがデータをBigQueryに挿入する処理を実行。

各コンポーネントの役割

  • Cloud Scheduler: 定期的にCloud Runを起動する。
  • Artifact Registory: Cloud Runで使用するコンテナイメージを保存しておく。
  • Cloud Run: YouTube Data APIからデータの取得とGCS, BQへの格納を行う。
  • Secret Manager: YouTube Data APIのAPI KEYを安全に管理し、Cloud Runから利用可能にする。
  • GCS: 取得したデータをJSON形式で保存する。
  • Pub/Sub: GCSへのデータ格納イベントを受信し、Cloud RunのBQ格納処理をトリガーする。
  • BigQuery: データを格納し、分析を可能にする。

手順詳細

ステップ1: スケジューラによるCloud Runの実行

スケジューラの設定

Cloud Schedulerを使用して、定期的にCloud Runを起動するジョブを設定しています。

  • ターゲットをHTTPに設定(Cloud Runのエンドポイントを指定)
  • Cloud Runの実行には認証が必要なため、実行時はCloud Run起動元の権限を付与したサービスアカウントを指定

ステップ2: YouTube Data APIからのデータ取得

# main.py
sc = SecretManagerInterface()
developer_key = sc.get_secret(
    project_id=PROJECT_ID,
    secret_id=SECRET_ID,
    version_id=SECRET_YOUTUBE_API_VERSION
)

youtube = YoutubeApiRequest(
    youtube_api_service_name=YOUTUBE_API_SERVICE_NAME,
    youtube_api_version=YOUTUBE_API_VERSION,
    developer_key=developer_key
)

res = youtube.get_youtube_data()
data = json.dumps(res)

Secret ManagerからAPIキーを取得

APIキーはコード内にハードコーディングせず、シークレットマネージャーに保存しています。

# secretmanager.py
from google.cloud import secretmanager


class SecretManagerInterface:
    def __init__(self) -> None:
        self.client = secretmanager.SecretManagerServiceClient()

    def get_secret(
        self,
        project_id: str,
        secret_id: str,
        version_id: str
    ) -> str:
        path = self.client.secret_version_path(
            project_id,
            secret_id,
            version_id
        )

        response = self.client.access_secret_version(
            request={"name": path}
        )

        secret_value = response.payload.data.decode('UTF-8')

        return secret_value

YouTubeData APIのコール

取得したAPI KEYを使用してYouTubeAPIにリクエストを送ります。

# youtube_api.py
class YoutubeApiRequest:
    def __init__(
        self,
        youtube_api_service_name: str,
        youtube_api_version: str,
        developer_key: str
    ) -> None:
        self.youtube = build(
            youtube_api_service_name,
            youtube_api_version,
            developerKey=developer_key
        )
        dt = datetime.now(pytz.timezone('Asia/Tokyo'))
        self.date_str = dt.strftime('%Y%m%d%H%M')


    def get_youtube_data(self) -> dict:
        request = self.youtube.videos().list(
            part="snippet, contentDetails, statistics",
            chart="mostPopular",
            maxResults=50,
            regionCode="JP"
        )

        res = request.execute()
        return res

ステップ3: 取得データをJSON形式で保存

取得したデータは生データのまま一度GCSへ格納します。 GCSへ格納後、ファイルのパスをPubSubにPublishします。

# main.py
gcs = GcsInterface(
    project_id=PROJECT_ID,
    bucket_name=BUCKET_NAME,
)
file_path = f"most_popular/{youtube.date_str}_popular.json"
logger.info(file_path)
gcs.upload_json(file_path, data)

pubsub = PubSubInterface(
    project_id=PROJECT_ID,
    topic_name=TOPIC_NAME
)
pubsub.publish(file_path)

Google Cloud Storage(GCS)への保存

# gcs.py
class GcsInterface:
    def __init__(
        self,
        project_id: str,
        bucket_name: str,
    ) -> None:
        self.client = storage.Client(project=project_id)
        self.bucket_name = bucket_name
        self.bucket = self.client.bucket(bucket_name)

    def upload_json(self, gcs_path: str, contents: str) -> None:
        blob = self.bucket.blob(gcs_path)

        try:
            blob.upload_from_string(contents, content_type="application/json")
        except Exception as e:
            raise Exception(e)

PubSubへメッセージをPublish

# pubsub.py
class PubSubInterface:
    def __init__(
        self,
        project_id: str,
        topic_name: str,
    ) -> None:
        self.publisher = pubsub_v1.PublisherClient()
        self.topic_path = self.publisher.topic_path(project_id, topic_name)

    def publish(self, data: str):
        logger.info(data)
        encode_data = data.encode("utf-8")
        self.publisher.publish(self.topic_path, data=encode_data)

ステップ4: GCSの格納をトリガーにメッセージを配信

Subscriptionの設定

  • PubSubトピックにメッセージが送信された時にCloudRunの/invoke/loadにリクエストを送信するサブスクリプションを設定
  • CloudRunの起動元権限が付与されているサービスアカウントを使用

ステップ5: データをBigQueryへ挿入

GCSのファイルを取得し、BQに取り込みやすい形に整形してから格納します。

@app.post("/invoke/load")
async def invoke_load_to_bq(background_tasks: BackgroundTasks, request: PubsubRequest) -> None:
    file_path = check_pubsub_message(request)
    background_tasks.add_task(load_bq, file_path)
    return {"message": "Message received and processing started BQ Load."}


async def load_bq(file_path):
    gcs = GcsInterface(project_id=PROJECT_ID, bucket_name=BUCKET_NAME)
    bq = BqInterface(project_id=PROJECT_ID)
    blob = gcs.get_file(file_path)

    dataset_name = "videos"
    table_name = "most_popular"

    if blob is not None:
        with blob.open(mode="r", encoding='utf-8') as f:
            data = json.load(f)

            created_at = extract_datetime_from_file_path(file_path)
            extract_data = extract_most_popular(data, created_at)

    logger.info("Insert Table Data")
    bq.insert_table_data(
        dataset_name=dataset_name,
        table_name=table_name,
        data=extract_data
    )

GCSからファイルを取得

PubSubからpushされたメッセージにGCSのパスが含まれているので、取得したパスのファイルをblob形式で取得します。

BQへデータの挿入

取得したデータをBQにinsertします。

from google.cloud import bigquery
from gcp.config.bq_schema import MOST_POPULAR_TABLE_SCHEMA

class BqInterface:
    def __init__(self, project_id: str) -> None:
        self.client = bigquery.Client(project=project_id)

    def insert_table_data(
        self,
        dataset_name: str,
        table_name: str,
        data: list[dict]
    ) -> None:
        job_config = bigquery.LoadJobConfig(
            schema=MOST_POPULAR_TABLE_SCHEMA,
        )

        table_id = f"{self.client.project}.{dataset_name}.{table_name}"
        table = self.client.get_table(table_id)
        job = self.client.load_table_from_json(
            json_rows=data,
            destination=table,
            job_config=job_config,
        )
        job.result()

まとめ

今後の拡張ポイント

  • ルーティングの作成: FastAPIでルーティングを作成し、PubSubのメッセージにpathパラメータを含めることでリクエストに応じた処理ができるようにする
  • BIツールの統合: BQに蓄積したデータを加工しBIツールで可視化を行う
  • 通知機能: 異常データが検出された場合に通知を行う機能を追加する。

感想

データエンジニアリングの勉強がしたい → いい感じのデータがない → とりあえずパイプラインでも組むか。というモチベーションで始めたのですが、色んな機能を盛り込みすぎて気づいたら半年近く経っていました。 今後は上記の拡張ポイントの実装を進めつつ、私のデータエンジニアリングの学習の基盤として利用していきたいです。

参考資料

公式ドキュメントへのリンク