ひとことで言うと#
データが**生成される場所から、分析・活用される場所まで流れる道筋(パイプライン)**を設計すること。散在するデータを収集し、変換・加工し、分析可能な形で格納し、ダッシュボードやモデルに届ける「データの水道管」を構築する。
押さえておきたい用語#
- ETL / ELT
- Extract(抽出)→ Transform(変換)→ Load(格納)の頭文字。ETLは変換してから格納、ELTは格納してから変換する方式のこと。クラウドDWHの普及でELTが主流になりつつある。
- オーケストレーション(Orchestration)
- パイプラインの実行順序・スケジュール・依存関係を管理する仕組みのこと。AirflowやDagsterなどのツールで「毎朝6時にAを実行→完了後にBを実行」のように制御する。
- べき等性(Idempotency)
- 同じ処理を何回実行しても結果が変わらない性質のこと。パイプラインが途中で失敗した際に安全にリトライできるために不可欠。
- データリネージ(Data Lineage)
- あるデータがどのソースから来て、どう変換されたかを追跡できる記録のこと。問題発生時の原因特定やデータの信頼性判断に必要。
データパイプライン設計の全体像#
こんな悩みに効く#
- データがあちこちに散らばっていて、分析のたびに手作業で集めている
- レポート作成に毎回何時間もかかり、自動化したい
- データの鮮度が悪く、先週のデータで今日の判断をしている
基本の使い方#
まず、どこからデータを取り、何に使うのかを明確にする。
データソースの例:
- Webアプリのデータベース(PostgreSQL、MySQLなど)
- SaaSツール(Salesforce、GA4、Slack、HubSpotなど)
- ログデータ(サーバーログ、アプリログ)
- 外部API(天気、株価、SNSデータ)
- スプレッドシート、CSVファイル
利用目的の例:
- 経営ダッシュボード(日次更新)
- マーケティング分析(週次レポート)
- 機械学習モデルの学習データ
- リアルタイムのアラート・通知
利用目的から逆算して設計するのが鉄則。「とりあえず全データを集める」はNG。
データの変換処理の流れを設計する。
ETL(Extract → Transform → Load):
- データを抽出し、変換してから格納する
- データ量が少ない場合やオンプレミス環境に適する
ELT(Extract → Load → Transform):
- データをまず格納し、格納先で変換する
- BigQueryやSnowflakeなどクラウドDWHの処理能力を活かす
- 現在の主流
設計のポイント:
| 設計項目 | 決めること |
|---|---|
| 更新頻度 | リアルタイム?日次?週次? |
| データ量 | GB単位?TB単位? |
| 遅延許容度 | 数秒以内?数時間OK? |
| エラー処理 | 失敗時の再実行方法、通知先 |
| べき等性 | 同じ処理を再実行しても結果が変わらないか |
要件に合ったツールを選ぶ。
データ取り込み(Extract/Load):
- Fivetran、Airbyte、Stitch → SaaSからの取り込みに強い
- Embulk、Apache Kafka → 大規模・リアルタイム処理
データ変換(Transform):
- dbt → SQLベースの変換。現在の主流。バージョン管理可能
- Apache Spark → 大規模データの分散処理
- Pandas/Python → 小規模データの柔軟な変換
データ格納(Storage):
- BigQuery、Snowflake、Redshift → クラウドDWH
- PostgreSQL → 小〜中規模
オーケストレーション(実行管理):
- Airflow、Dagster、Prefect → パイプラインのスケジュール管理・監視
小規模ならFivetran + BigQuery + dbt + Lookerのような組み合わせで始めるのがおすすめ。
パイプラインは構築して終わりではなく、運用し続けるもの。
必要な仕組み:
- データ品質チェック: NULLの割合、値の範囲、レコード数の急変を自動検知
- アラート: パイプラインの失敗、データ遅延をSlack/メールで通知
- リネージュ(データの血統): どのデータがどこから来て、どう変換されたかを追跡可能に
- ドキュメント: テーブル定義、変換ロジック、更新スケジュールを記録
dbtのテスト機能やGreat Expectationsなどのデータ品質ツールを活用する。
具体例#
従業員30人のBtoB SaaS。データ分析はスプレッドシートの手作業で、レポート作成に毎週半日かかっている。
ソースは自社アプリDB(PostgreSQL)、Stripe、HubSpot、GA4の4つ。目的はプロダクト分析・売上ダッシュボード・営業進捗・マーケ分析と整理した。更新頻度は日次で十分、リアルタイムは不要と判断。
ツールは Fivetran(各SaaSからの自動取り込み)+ BigQuery(クラウドDWH)+ dbt(SQL変換)+ Looker Studio(ダッシュボード)の組み合わせで選定。
エンジニア1名・2週間・月額ツールコスト約5万円で完成。レポート作成が毎週4時間→自動更新でゼロになり、データの鮮度も1週間遅れから前日分まで縮まった。
月商2億円のEC企業。実店舗3店とECサイトで在庫を共有しているが、同期が1日1回のバッチ処理。ECで注文が入っても実店舗で売り切れていることが月平均120件発生し、キャンセル対応に追われていた。
各店舗のPOSデータとEC注文DBをKafkaでリアルタイム取り込みし、「入荷 - 販売 - 予約 = 現在庫」を即時計算して在庫マスターに反映するパイプラインを構築。在庫数がマイナスになった瞬間のアラートと、POS連携5分超過時の自動エスカレーションも組み込んだ。
在庫不一致によるキャンセルが月120件→8件に減少。キャンセル対応の月40時間がほぼゼロになり、EC売上も月平均12%向上した。
従業員150名の食品製造業。経営管理部の担当者が毎月初に3日間かけて、生産管理システム・会計システム・Excelの3ソースからデータを手動で集めて月次レポートを作成。ミスも月平均2件発生していた。
初期予算50万円以内という制約の中で、3ソース(CSVエクスポート、API、Google Drive)をPythonスクリプトで結合・集計し、BigQuery(無料枠)→ Looker Studioへ流すパイプラインを構築。オーケストレーションはCloud Scheduler + Cloud Functionsで月額数百円に抑えた。
まず月次レポートの自動化(2週間)→日次生産実績ダッシュボード(1週間)→品質管理データ追加(1週間)と段階的に拡張。レポート作成の3日が不要になり、その工数を分析に回した結果、原材料仕入れタイミングの最適化で年間約300万円のコスト削減につながった。初期投資は委託費込みで約30万円。
やりがちな失敗パターン#
- 最初から完璧なパイプラインを作ろうとする — すべてのデータソースを一度に接続しようとすると、構築に何ヶ月もかかる。最も重要なデータソース1〜2個から始めて、段階的に拡張する
- データ品質の監視を後回しにする — パイプラインが動いていても、データが壊れていたら意味がない。構築と同時にデータ品質テストを組み込む
- 変換ロジックをドキュメント化しない — 「このカラムはどう計算しているの?」に答えられなくなる。dbtのドキュメント機能やデータカタログを活用し、変換ロジックを常に追跡可能にする
- エラーハンドリングを想定しない — ソースシステムの仕様変更やネットワーク障害でパイプラインは必ず壊れる。リトライ・通知・ロールバックの仕組みを最初から設計する
まとめ#
データパイプライン設計は、散在するデータを収集・変換・格納・活用する「データの水道管」を構築すること。最初は最も重要なデータソース1つから逆算して設計する。パイプラインが動き出すと、「先週のデータで今日の判断をしていた」という感覚が消える。