Google Cloud 上の Knative を使用したイベント駆動イメージ処理と BigQuery 処理パイプライン ¶
公開日: 2020-07-14 , 改訂日: 2024-01-17
Google Cloud 上の Knative を使用したイベント駆動イメージ処理と BigQuery 処理パイプライン¶
作成者: メテ・アタメル、ソフトウェアエンジニア&Google の開発者アドボケート
このブログ記事では、私が最近 Knative Eventing で構築した 2 つのイベント駆動処理パイプラインの概要を説明します。その中で、イベントソース、カスタムイベント、および Knative によって提供されるその他のコンポーネントについて説明します。これらはイベント駆動アーキテクチャの開発を大幅に簡素化します。
これらのパイプラインはどちらも GitHub でソースコード、構成、詳しい手順とともに、私の Knative チュートリアル の一部として利用できます。
使用した Knative コンポーネント¶
これらのサンプルパイプラインを作成するときは、開発を大幅に簡素化したいくつかの Knative コンポーネントを利用しました。具体的には
- イベントソースは、クラスター内の外部イベントを読み取ることを可能にします。Knative-GCP Sources は、さまざまな Google Cloud ソースからイベントを読み取る準備ができた多数のイベントソースを提供します。
- ブローカーとトリガーは、イベントのルーティング方法をプロデューサーまたはコンシューマーが知らなくてもイベント配信を提供します。
- カスタムイベントとイベント返信: Knative では、すべてのイベントは CloudEvents なので、イベントやそれらを読み書きするためのさまざまな SDK に標準形式を使用すると便利です。Knative はカスタムイベントとイベント返信をサポートしています。どのサービスもイベントを受信し、いくつかの処理を行い、新しいデータを含むカスタムイベントを作成してブローカーに返信して、他のサービスがそのカスタムイベントを読み取れるようにできます。これは、各サービスが少し作業してメッセージを次のサービスに渡すパイプラインでは便利です。
画像処理パイプライン¶
この画像処理パイプラインの例では、ユーザーは Google Cloud のストレージバケットに画像をアップロードし、さまざまな Knative サービスによって画像を処理し、処理された画像を出力バケットに保存します。
パイプラインの要件を 2 つ定義しました
- アップロードされた画像がパイプラインで送信される前にフィルタリングされます。たとえば、アダルト向けテーマや暴力的な画像は許可されません。
- パイプラインには、必要に応じて追加または削除できる任意の数の処理サービスを含めることができます。
アーキテクチャ¶
このセクションでは、画像処理パイプラインのアーキテクチャについて説明します。パイプラインは、Google Cloud 上の Google Kubernetes Engine (GKE) にデプロイされます。
- 画像は入力の Cloud Storage バケットに保存されています。
- Cloud Storage 更新イベントは CloudStorageSource により Knative に読み込まれます。
- フィルター サービスによって、Cloud Storage イベントが受信されます。Vision API を使用して画像の安全性を判断するか、画像をフィルターする必要があります。画像が安全な場合、フィルター サービスは
dev.knative.samples.fileuploaded
タイプのカスタム CloudEvent を作成して、ブローカーに返します。 - リサイザーサービスによって
fileuploaded
イベントが受信されてから、ImageSharp ライブラリを使用して画像がリサイズされます。次に、サービスによってリサイズされた画像が出力バケットに保存され、dev.knative.samples.fileresized
タイプのカスタム CloudEvent が作成されて、イベントがブローカーに返されます。 - ウォーターマークサービスによって
fileresized
イベントが受信されてから、ImageSharp ライブラリを使用して画像にウォーターマークが追加されて、画像が出力バケットに保存されます。 - ラベラーによって
fileuploaded
イベントが受信されてから、Vision API を使用して画像からラベルが抽出され、ラベルが出力バケットに保存されます。
パイプラインのテスト¶
パイプラインをテストするために、お気に入りのビーチ(リオデジャネイロのイパネマ)の写真をバケットにアップロードしました
数秒後、出力バケットに 3 つのファイルが表示されました
gsutil ls gs://knative-atamel-images-output
gs://knative-atamel-images-output/beach-400x400-watermark.jpeg
gs://knative-atamel-images-output/beach-400x400.png
gs://knative-atamel-images-output/beach-labels.txt
テキストファイルで 空、水域、海、自然、海岸、水、夕日、水平線、雲、海岸
のラベルと、リサイズされてウォーターマークが付けられた画像を見ることができます
BigQuery 処理パイプライン¶
このパイプラインの例はスケジュール駆動パイプラインで、英国とキプロスの COVID-19 症例の日次数をクエリして見つけます。BigQuery の公開 COVID-19 データセットを使用してデータを取得し、グラフを生成し、1 日に 1 回、各国のグラフを含むメールを自分に送信します。
アーキテクチャ¶
パイプラインのアーキテクチャを以下に示します。
- 2 か国(英国とキプロス)について 2 つの
CloudSchedulerSources
を設定して、QueryRunner
サービスを 1 日に 1 回呼び出します。 - QueryRunner サービスは UK とキプロスのスケジュールイベントを受信し、BigQuery の公開 COVID-19 データセットを使用してそれぞれの COVID-19 のケースを照会し、その結果を別の BigQuery テーブルに保存します。これが完了すると、QueryRunner サービスは
dev.knative.samples.querycompleted
タイプのカスタム CloudEvent を返します。 - ChartCreator サービスは
querycompleted
CloudEvent を受信し、Matplotlib
を使用して BigQuery データからチャートを作成し、それを Cloud Storage バケットに保存します。 - 通知サービスは、CloudStorageSource を経由してバケットから
com.google.cloud.storage.object.finalize
CloudEvent を受信し、SendGrid を使用してユーザーにメール通知を送信します。
パイプラインをテストする¶
CloudSchedulerSource は CloudScheduler ジョブを作成します
gcloud scheduler jobs list
ID LOCATION SCHEDULE (TZ) TARGET_TYPE STATE
cre-scheduler-2bcb33d8-3165-4eca-9428-feb99bc320e2 europe-west1 0 16 * * * (UTC) Pub/Sub ENABLED
cre-scheduler-714c0b82-c441-42f4-8f99-0e2eac9a5869 europe-west1 0 17 * * * (UTC) Pub/Sub ENABLED
ジョブをトリガーします
gcloud scheduler jobs run cre-scheduler-2bcb33d8-3165-4eca-9428-feb99bc320e2
数分で、以下のようなチャートを含むメールを受け取ることになります
これで投稿は終わりです。すでに述べたように、詳細な手順が必要な場合は、image-processing-pipeline と bigquery-processing-pipeline を自分の Knative チュートリアル の一部として確認できます。
質問やコメントがある場合は、Twitter @meteatamel までお気軽にご連絡ください。
Mete Atamel - 開発者アドボケイト、Google Cloud