Knative Eventing、Functions、および Apache Camel K を使用した AWS S3 イベントの利用 ¶
公開日: 2024-06-14
Knative Eventing、Functions、および Apache Camel K を使用した AWS S3 イベントの利用¶
著者: Matthias Weßendorf、Red Hat のシニアプリンシパルソフトウェアエンジニア
このブログ投稿では、Knative Eventing および Functions を使用して、Knative Function で AWS S3 バケットからイベントを簡単に利用する方法を学びます。このブログ投稿は、Apache Camel K と Knative に関するシリーズの最初の投稿をベースにしています。
サーバーレス関数の一般的なユースケースの 1 つは、外部イベントソースから配信されるイベントに反応することです。この一般的な例として、サーバーレス関数で AWS S3 バケットから通知を受信することが挙げられます。しかし、AWS で実行する代わりに、カスタム Kubernetes セットアップで関数を実行する場合、オンプレミス環境でこれらのイベントをどのように受信できるでしょうか。
インストール¶
Apache Camel K のインストールには、CLI、Kustomize、OLM、Helm など、いくつかの選択肢があります。たとえば、Helm インストールは次のとおりです。
$ helm repo add camel-k https://apache.github.io/camel-k/charts/
$ helm install my-camel-k camel-k/camel-k
Camel K に加えて、ドキュメントに記載されているように、Knative Eventing もインストールする必要があります。
Knative ブローカーインスタンスの作成¶
システムの中核として Knative ブローカーを使用します。これは、イベントプロデューサーとイベントコンシューマーの両方のイベントメッシュとして機能します。
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
namespace: default
name: my-broker
これで、イベントプロデューサーはイベントを送信でき、イベントコンシューマーはイベントを受信できます。
AWS S3 のイベントソースとしての Kamelet の使用¶
上記で作成したブローカーなどの Knative コンポーネントに Kamelet をバインドするには、Pipe
API を使用します。Pipe を使用すると、Kamelet によって記述されたシステムから Knative リソースへ、**または** Knative リソースから Kamelet によって記述された別の (外部) システムへ、データを宣言的に移動できます。
以下は、すぐに使用できる Kamelet
である aws-s3-source
を使用する Pipe
です。
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: aws-s3-source-pipe
annotations:
trait.camel.apache.org/mount.config: "secret:aws-s3-credentials"
spec:
integration:
dependencies:
- "camel:cloudevents"
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: aws-s3-source
properties:
bucketNameOrArn: "${aws.s3.bucketNameOrArn}"
accessKey: "${aws.s3.accessKey}"
secretKey: "${aws.s3.secretKey}"
region: "${aws.s3.region}"
dataTypes:
out:
scheme: aws2-s3
format: application-cloudevents
sink:
dataTypes:
in:
scheme: http
format: application-cloudevents
ref:
kind: Broker
apiVersion: eventing.knative.dev/v1
name: my-broker
aws-s3-source
Kamelet は、Pipe
の source
として参照され、参照されたバケットでアクティビティが発生すると、CNCF CloudEvents をアウトバウンドの sink
に送信します。ここでは、CloudEvents を受け入れる Knative ブローカーを使用します。後で、Knative Function をブローカーに接続して、ブローカーからイベントを受信します。
注
AWS S3 プロパティはシークレットに保存され、trait.camel.apache.org/mount.config
アノテーションを介して Pipe
にマウントされます。
コンシューマーとしての Knative Function の作成¶
Knative Function を使用して Knative ブローカーからメッセージを消費するには、単純な Golang 関数を作成する必要があります。ペイロードは CloudEvents として関数に送信されるため、次のコマンドを実行して、組み込みの cloudevents
テンプレートを使用します。
$ func create -l go -t cloudevents s3-logger
これにより、現在のディレクトリの s3-logger
フォルダーに新しいプロジェクトが作成され、Knative Function の Golang ファイルが含まれます。
package function
import (
"context"
"fmt"
"github.com/cloudevents/sdk-go/v2/event"
)
// Handle an event.
func Handle(ctx context.Context, ce event.Event) (*event.Event, error) {
/*
* YOUR CODE HERE
*
* Try running `go test`. Add more test as you code in `handle_test.go`.
*/
fmt.Println("Received event")
fmt.Println(ce) // echo to local output
return &e, nil // echo to caller
}
Handle
関数を修正して、CloudEvent のいくつかの属性を出力できます。
func Handle(ctx context.Context, ce event.Event) (*event.Event, error) {
fmt.Println("Received S3 event notification")
fmt.Println("CloudEvent Subject attribute: " + ce.Subject())
fmt.Println("CloudEvent Source attribute: " + ce.Source())
// Some processing of the payload of the CloudEvent...
return nil, nil
上記の Handle
関数は、subject
属性で表されるファイル名と、受信した CloudEvent の source
属性として保存されるバケットの完全修飾名を出力するだけです。
注
現在、AWS-S3-Source はデータ全体を送信するため、data
プロパティを使用してファイル全体にもアクセスできます。
AWS S3 イベントへの Knative Function のサブスクライブ¶
S3 イベント通知を受信できるようにするには、イベントを提供するブローカーに関数をサブスクライブする必要があります。しかし、どのようなイベントが利用可能かをどのように知るのでしょうか?そのため、システムで利用可能な Knative EventTypes を確認します。
$ kubectl get eventtypes.eventing.knative.dev
NAME TYPE SOURCE SCHEMA REFERENCE NAME REFERENCE KIND DESCRIPTION READY REASON
et-my-broker-53bfa9803446c35c5a612c5a44a1c263 org.apache.camel.event.aws.s3.getObject aws.s3.bucket.<bucketname> my-broker Broker Event Type auto-created by controller True
これで、名前空間に my-broker
ブローカーで org.apache.camel.event.aws.s3.getObject
イベントが利用可能であることがわかりました。
これで、指定されたブローカーからのそのイベントタイプへの関数のサブスクリプションを実行できます。
$ func subscribe --filter type=org.apache.camel.event.aws.s3.getObject --source my-broker
これにより、プロジェクトの func.yaml
メタデータが更新され、関数がデプロイされると、CLI がコンシューマーをビルドおよびデプロイし、一致するフィルター引数を使用して Knative Trigger
を作成します。
$ func deploy
デプロイされると、Knative Function のターミナルで、各 S3 イベントに対して次の出力が表示されます。
Received S3 event notification
CloudEvent Subject attribute: my-file.txt
CloudEvent Source attribute: aws.s3.bucket.<bucketname>
結論¶
Knative Eventing、Functions、および Apache Camel K を使用すると、AWS S3 などのサードパーティのクラウドサービスからの通知を、独自のオンプレミス Kubernetes クラスターで実行されている関数にトリガーできます。