Fundamentals of Data Engineering 輪読会

Materialized Views, Federation, and Query Virtualization

Views

Viewは単なるクエリであり実体をもたない。そのためViewからselectするときは、そのViewと結合させて新しいクエリを作成して実行する。(つまりViewはテーブルではない)

Viewはデータベースにおいて複数のroleを果たしている

  • セキュリティを担保する役割

    • ユーザーのロールに応じて、アクセスさせたくない列や行をもとのテーブルからフィルターしたViewを提供する(所感: 聞く話しではあるが管理が複雑になりそう。この方法ではなくDWHに備わっている列レベルのアクセス制御を使うほうが今はよさそう。)
  • 重複データを排除して提供する役割

    • インサートオンリーならば各レコードの最新を返すようにすれば重複削除できる
  • 一般的なデータアクセスパターンを提供すること

    • 例えば5つのテーブルをジョインして分析用のワイドテーブル(View)を作る
    • アナリストは毎回テーブルをjoinするクエリを書かなくてもViewからクエリすればよい

Materialized Views

通常はマテビューと呼ばれている。マテリアルという名の通りViewとは違い実体を持っている。

通常のビューの問題点は事前計算しないことである。つまりビューに対してクエリを実行する場合毎回ビューのクエリが再実行されてしまう。(例えば5つのテーブルを結合しているビューの場合は実行のたびにJoinがはしる)

マテリアルビューでは全部 or 一部の計算を事前に行う。例えば5つのテーブルをjoinするマテビューの場合は、もとのテーブルに変更が入ったときにマテビューも更新される。そのためユーザーがマテビューにくえりするときは事前に計算されたデータにアクセスできる。

note: この本では特に記載されていませんが、マテビューに関してはjoinに関する制約(単一テーブルだけとか, Inner joinだけとか)があることがあるので任意のビューをマテビューに置き換えれる...というようなわけではないです。その場合はマテビューの比較対象はビューではなくテーブルになると思います。

データベースによっては、クエリが直接マテビューを参照していなかったとしても重要な役割を果たしていることがある(!!) 多くのクエリオプティマイザはマテビューに 似た クエリを識別し、クエリを 事前計算されているマテビューから参照する ようなクエリに書き換える。(私が知らなかっただけでクラウドのDWHには実装されているよう)

Composable materialized views

一般にマテビューはComposableではない。つまりマテビューをもとにして他のマテビューを作成することはできない。一方で最近そのようなことを可能にするツールが出てきた(すごい!)

Databricksのlive tablesではデータの到着後、各々のデータが更新される。データのフローに基づいて後続のテーブルも更新される。

cf . Delta Live Tables

cf. Databricks SQLでのマテリアライズドビューの使用

redshiftには同様の機能はなさそうだけど他のもの(BigQuery)であるかは不明

Federated queries

Federated queriesとは外部のデータソース(RDDMSやストレージ)をまたいで、OLAPのデータベース間でクエリを実行するような仕組みである。例えばPostgreSQLMySQLのデータを統合してクエリを行うことができる

例えばBigQueryのデータをAthenaのデータと結合したクエリ(つまりクラウドベンダーをまたがって)を行うことができる

他の例としてはSnowflakeはs3のバケットを外部テーブルとして定義できる。ただs3からデータをテーブルとしてloadするという意味ではない。外部テーブルに対するクエリが実行されるときにS3のファイルを読み込んでデータを処理する。(逆に言えば毎回データ転送がはしる。)

いくつかのOLAPシステムではfederated queriesに対してマテビューを作成できる。これをもちいれば元のネイティブなテーブルなみの速度が出るうえに外部データソースが変更されるたびにマテビューも更新される。

Data Virtualization

Data Virtualizationとはfederated queriesと関係がある。しかし典型的にはデータを処理たりクエリを実行するものであり、内部にデータを保存するものではない。TrinolやPrestoが有名な例である。外部テーブルを扱うようなクエリエンジンはData Virtualizationのエンジンと考えることができる。重要なのはサポートされている外部ソースと、性能である。

また重要な概念としてクエリプッシュダウンがある。たとえばsnowflakeからのデータをmysqlからのデータとjoinしてfilterするとする。クエリプッシュダウンはソースデータシステムに可能なかぎりの仕事を移そうとする。クエリエンジンは

  • virtulationレイヤから計算をオフロードする
  • networkを経由するデータの量を減らす
select * 
from snowflake join mysql
on snowflake.user_id = mysql.user_id
where snowflake.user_created_at > '2024-01-01' and mysql.user.county = 'JP'

というクエリを考える。

愚直に実行する場合

  • snowflakemysqlからテーブルの全レコードをロード
  • user_idでジョイン
  • created_atとcountryでフィルター

とテーブルのロードが重い。そうではなく最後のフィルターの部分をもとのソースシステム側にさせて計算コストとロードする量を減らす。

  • snowflakeからcreated_atが条件をみたすデータをロード
  • mysqlからcountryが条件をみたすデータをロード
  • user_idでジョイン

フィルターが事前に実行されるためネットワークを経由するデータ量が減り負荷が減る。

Data仮想化はデータが複数にまたがっているような組織においては有効にはたらくが無計画に使うべきではない。とはいうのは仮想化したところでクエリは本番DBに影響を与えるからだ。例えばTrinoではデータを保存しないので、実行のたびに毎回MySQLからデータを抽出することになる。

代替手段としてデータ仮想化はデータ取り込みや処理のパイプラインとして使うことができる。つまりTrinoを深夜に一回実行しMySQLからデータを抽出してS3に保存し、そのデータを下流で参照する。そうすれば本番DBをまもることができる

データ仮想化をサイロ化されたデータを抽象化によって統合する技術のように考えることもできる。これはデータメッシュの考え方とも適合する。つまり各々がそれぞれのデータソースに対して責任を持って管理する。

Streaming Transformations and Processing

ストリーミングにおける変換とクエリの境界はあいまいでもう少し説明が必要

basics

note

クエリとはデータを取得したり操作する方法全般が該当する。(SELECT, UPDATE, INSERT, DELETEなど。)変換はデータを使いやすい形にデータを変形することを指す。バッチで行うような処理をストリーミングで行うと本番環境にかかる負荷が大きいので、バッチとストリーミングはわけて議論される。本章の内容はクエリと変換は明示的に区別できないという内容なので両社の違いがわからない....なのが普通なので問題ない。

ストリーミングのおけるクエリは動的に実行されデータの現在の姿を提示する一方で変換はデータは下流で用いるためにデータを変換することである。

例えば、IoTをデータソースとしたストリームを処理することを考える。各イベントにはデバイスIDとイベントの中身が含まれている。このデータを他のデータベースに含まれているデバイスメタデータと突き合わせてenrichすることを考える。

実行のイメージ

// イベントのデータ
{
"device_id": "hoge",
"time": "2024-04-01 15:00:00",
"temperature": 23.0 
}
// 外部データベースにあるデバイスのデータ
{
"device_id" : "hoge",
"prefecture": "大阪",
"located_at": "1990-10-01"
}
// をdevice_idでjoinする
{
"device_id": "hoge",
"time": "2024-04-01 15:00:00",
"temperature": 23.0 
"prefecture": "大阪",
"located_at": "1990-10-01"
}

ストリームプロセスエンジンは、外部のデータベースからdeviceIDをキーにメタデータを取得して元のイベントデータに追記して新しいイベントを作成する。そのあと別のストリームに投げる。ライブクエリやトリガーされたメトリクスはエンリッチしたストリームに対して実行される(例えば大阪の温度を測定して24度を超えたらアラームを出す(リアルタイム性が求められる))

Transformations and queries are continuum

バッチでも曖昧であったクエリと変換の境界はストリーミングにおいてはさらにあいまいになる。例えばロールアップ集計をして次のストリームに送信したときこれはクエリ?変換?

後にもっと良い表現ができるかもしれないが今はクエリと変換という言葉を使おう。

Streaming DAGs

note: DAG(有効非巡回グラフ)とは ループしない ような向き付きグラフ。

ストリームでのenrichmentとjoinと密接に関連するものとしてStreaming DAGがある。以前説明したオーケストレーションというものはもともとはバッチの概念である。しかし、複数のストリームでエンリッチしたりマージしたりスプリットをリアルタイムで行いたい場合はどうすればよいだろうか?

Streaming DAGが有用なシンプルな例を考えてみよう。webサイトのクリックストリームとIoTデータを結合することを考える。(具体的にどんなIoTのデータを結合させようとしているのか例が思いつかない...WebサイトみているときにiPhoneのデータを送ったりみたいな悪いことができるのだろうか?)

データを結合することによってユーザーの動向をみることができる。また各データストリームは標準のフォーマットに前処理しておく必要がある。(Combined streamに対してクエリするので)

このような処理はストリーミングストア(ex. Kafka)とストリーミングプロセッサ(ex. Flink)を組み合わせれば、これまでも可能であった。これは複雑なRube Goldberg Machineを作成するに匹敵するようなことだった。

Note: Rube Goldberg Machine ピタゴラスイッチのようなもの ja.wikipedia.org

Pulsarは抽象化によって劇的にこれらのDAGの扱いを単純化した。複数のシステムにまたがったフローを管理するのではなく、エンジニアはStreaming DAGをコードとして単一システムで管理できるようになった。

cf. Pulsarについて qiita.com

Micro-batch versus true streaming

note: マイクロバッチはバッチの間隔を非常に短くしたもののこと。ニアリアルタイムでデータを処理することができ、ストリーミングとかかれることもある(と思う)。真のストリーミングはイベント単位で処理すること。

マイクロバッチと真のストリーミング間の論争はこれまで行われてきた。基本的には各フレームワークユースケース、パフォーマンス要件や能力を理解することが重要である。

マイクロバッチはバッチ指向のフレームワークをストリーミングに適用することである。実行間隔は2分かもしれないし1秒かもしれない(どの間隔からがマイクロバッチなのかという定義はない)。いくつかのフレームワーク(Spark Streaming)ではこのユースケースのもとに設計されており、適切にリソースを割り当てればバッチ頻度が高くてもよいパフォーマンスを出す。

真のストリーミングとは(ex. Beam, Flink)とは一度に一つのイベントを処理するように設計されている。しかし、これはオーバーヘッドが大きい。注意しないいけないのは真のストリーミングシステムであっても、多くの処理はバッチで行われる。基本的なデータを追加するだけのenrichのプロセスであれば、一度に一つのイベントを低レイテンシで処理できるであろう。一方でwindowをつかってメトリクスを計算するような処理においては数秒、数分の時間が必要になるであろう。

マイクロバッチと真のストリーミング、どちらかを使うべきな状況とは?これに対する一般的な答えはない。ユースケースにおいてはマイクロバッチが有効なことがあるだろうし、Sparkの専門家がチームにいるなら、Spark Streaming(マイクロバッチ)の導入が素早く行えるだろう。

ドメインの専門知識と実機テストを代替するものはない。専門家に意見を聞くのがよいし、また今はクラウド上でテストで行うことができる。(マイクロバッチとストリーミングの両方を扱ったことがあってアーキテクチャレビューできる専門家って日本に何人いるんだろう...)

ベンダはチェリーピックで非現実的な設定なサンプルを用意することで悪名高い。

de facto haphazardly warrant blurry but what if we ~ amount to ~ rather than the term is used to ~ dismiss