コンテンツにスキップ

ビットコイン取引データを使用したKnative Eventing入門

公開日:2020年6月22日、  改訂日:2024年1月17日

ビットコイン取引データを使用したKnative Eventing入門

著者:IBMのソフトウェアエンジニア、Johana Saladas

私は、**イベントプロデューサー**と**イベントコンシューマー**を使用してクラウドネイティブのイベントエコシステムを簡単にデプロイできるシステムであるKnative Eventingを調べてきました。このデモの作業のほとんどはバージョン0.11で行われ、バージョン0.13でも実行しましたが、現在はバージョン0.15でも動作します。

このデモは、最初のKnativeコミュニティミートアップで発表されたため、ビデオバージョンをここで視聴することもできます。{{< youtube sGi_LuAaaT0 >}}

イベント駆動型アーキテクチャの主な利点のいくつかを調べるために、簡単なデモを作成しました。たとえば、

  • プッシュベースのメッセージング。
  • プロデューサーとコンシューマーの分離。
  • データが移動中にビジネスロジックを適用する。
  • データサイエンス向けのリアルタイムイベントストリーム — ミリ秒単位の意思決定(例:不正検出)。

この記事では、基本的なKnativeイベントコンポーネントである**Broker、Trigger、プロデューサー**、および**コンシューマー**を使用して、イベントのシナリオ例を実行する方法を紹介します。このデモでは、リアルタイムのストリーミングイベント、インストリーム変換、およびプッシュベースのフロントエンドの動作を示します。これを基礎として、可能性をさらに探求して構築することができます。

このシナリオでは、イベントストリームの例としてビットコイン取引イベントを使用します。イベントは、UIフロントエンドを介してリアルタイムで表示され、別のサービスによって合計取引値に基づいてサイズに分類されます。

下の図は、デプロイするものの計画を示しています。

Diagram of this example Knative Eventing scenario

このデモには、こちらのgithubリポジトリがあります。すべての個々のサービスのソースコードは、githubリポジトリで入手できます。Dockerハブから事前に構築されたイメージを使用する場合は、yamlディレクトリ内のファイルのみが必要になります。

ステップ1:名前空間を作成し、Brokerを作成する

kubectl apply -f 001-namespace.yaml

最初のステップは、kubernetes名前空間を作成し、knative-eventing-injection = enabledラベルを自動的に追加する001-namespace.yamlをデプロイすることです。これにより、Knative Eventing **broker**が作成されます。

**Broker**は、イベントソースまたは**プロデューサー**からイベントが送信される場所です。メッセージングチャネルとしてバッキングされる場合があります。デフォルトではインメモリですが、他のもの(Kafkaチャネルなど)にすることもできます。ここから、関心のあるサービスが利用できます。

ステップ2:ビットコインイベントソースをデプロイする

kubectl apply -f 010-deployment.yaml
私たちの**イベントソース**は、blockchain.info websocketから(未確認のビットコイン取引)メッセージを取得し、新しいCloudEventを作成するアプリケーションです。これは私たちのイベントのプロデューサーであり、他のサービスがサブスクライブする*可能性のある*イベントを*生成*します。

私たちの場合、**sink**変数を使用して、イベントを送信する場所を指示します。シンクは、デプロイメントの環境変数として渡されます。この場合は、Brokerアドレスです。Broker URLを取得するには、次のコマンドを使用します

kubectl get broker -n knative-eventing-websocket-source
イベントソースをデプロイしたら、ログを取得して実行されていることを確認できます
kubectl —namespace knative-eventing-websocket-source logs -l app=wseventsource — tail=100 -f

ステップ3:ビットコインイベントにevent-displayをサブスクライブする

kubectl apply -f 040-trigger.yaml

**Trigger**は、特定の属性に一致するイベントを選択して、指定されたサービスに配信するフィルターを提供します。このシナリオでは、3つのトリガーが指定されています。トリガーを設定するために、サブスクライブするサービスをまだデプロイする必要はありません。

ステップ4:コンシューマーサービスをデプロイする

私たちのシナリオでは、*複数*の**コンシューマー**があります。これらは、イベントに関心のある(または関心のない)サービスです。私たちには3つのコンシューマーサービスがあります

  • **event-display** — 「wss://ws.blockchain.info/inv」からのイベントにサブスクライブされたKubernetesデプロイメント。このサービスは、これらのイベントを取得し、UIフロントエンドを介してリアルタイムで表示します。このサービスはKubernetesサービスです。
    kubectl apply -f 050-kubernetesdeploy.yaml
    kubectl apply -f 060-kubernetesservice.yaml
    

このサービスをデプロイしたら、Webブラウザでlocalhost:31234にアクセスすると、UIにビットコイン取引イベントがリアルタイムでレンダリングされているのがわかります。このUIがリアルタイムで更新されているのがわかります

  • **classifier** — 「wss://ws.blockchain.info/inv」からのイベントにサブスクライブされています。このサービスはイベントを取得し、各トランザクション値を分類します。アプリケーションコードでは、新しい**type**と**source**を持つ新しいCloudEventが作成されます。これらの新しいイベントは、Knativeイベントエコシステムに 다시 放出されます。

ロジックはシンプルに保たれており、*small*と*large*の2つのサイズクラスしかありません。これは、データサイエンスまたはMLプロセスの一部として発生する可能性のある、インストリーム変換またはモデリングアプリケーションの例を表しています。このタイプのアーキテクチャは、不正検出、異常検出、および速度が重要なその他の意思決定に使用できます。

kubectl apply -f 031-classifier-service.yaml

  • **test-display** — **type**と**source**が「classifier」であるイベントにサブスクライブされたKnativeサービス。
    kubectl apply -f 030-test-display-service.yaml
    

このサービスは、トランザクションのサイズを含むイベントにサブスクライブするため、ログを表示するときにこれらを確認できます

kubectl logs -l serving.knative.dev/service=test-display -c user-container — tail=100 -n knative-eventing-websocket-source -f
Our test-display service consumes the size reply that is emitted by the classifier

参考文献

デモを試してみて、私に結果を教えてください!

詳細を知りたい場合は、Knativeドキュメントの他の例を確認できます。「Knativeクックブック」は、Knative ServingとEventingについて詳しく学ぶためのもう1つの優れたリソースです。

私たちは、サイトのトラフィックを理解するために、アナリティクスとCookieを使用しています。お客様のサイトのご利用に関する情報は、その目的のためにGoogleと共有されます。詳細はこちら