コンテンツへスキップ

Knative Eventing、Functions、および Apache Camel K を使用した AWS S3 イベントの利用

公開日: 2024-06-14

Knative Eventing、Functions、および Apache Camel K を使用した AWS S3 イベントの利用

著者: Matthias Weßendorf、Red Hat のシニアプリンシパルソフトウェアエンジニア

このブログ投稿では、Knative Eventing および Functions を使用して、Knative Function で AWS S3 バケットからイベントを簡単に利用する方法を学びます。このブログ投稿は、Apache Camel K と Knative に関するシリーズの最初の投稿をベースにしています。

サーバーレス関数の一般的なユースケースの 1 つは、外部イベントソースから配信されるイベントに反応することです。この一般的な例として、サーバーレス関数で AWS S3 バケットから通知を受信することが挙げられます。しかし、AWS で実行する代わりに、カスタム Kubernetes セットアップで関数を実行する場合、オンプレミス環境でこれらのイベントをどのように受信できるでしょうか。

インストール

Apache Camel K のインストールには、CLI、Kustomize、OLM、Helm など、いくつかの選択肢があります。たとえば、Helm インストールは次のとおりです。

$ helm repo add camel-k https://apache.github.io/camel-k/charts/
$ helm install my-camel-k camel-k/camel-k

Camel K に加えて、ドキュメントに記載されているように、Knative Eventing もインストールする必要があります。

Knative ブローカーインスタンスの作成

システムの中核として Knative ブローカーを使用します。これは、イベントプロデューサーとイベントコンシューマーの両方のイベントメッシュとして機能します。

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  namespace: default
  name: my-broker

これで、イベントプロデューサーはイベントを送信でき、イベントコンシューマーはイベントを受信できます。

AWS S3 のイベントソースとしての Kamelet の使用

上記で作成したブローカーなどの Knative コンポーネントに Kamelet をバインドするには、Pipe API を使用します。Pipe を使用すると、Kamelet によって記述されたシステムから Knative リソース、**または** Knative リソースから Kamelet によって記述された別の (外部) システムへ、データを宣言的に移動できます。

以下は、すぐに使用できる Kamelet である aws-s3-source を使用する Pipe です。

apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
  name: aws-s3-source-pipe
  annotations:
    trait.camel.apache.org/mount.config: "secret:aws-s3-credentials"
spec:
  integration:
    dependencies:
    - "camel:cloudevents"
  source:
    ref:
      kind: Kamelet
      apiVersion: camel.apache.org/v1
      name: aws-s3-source
    properties:
      bucketNameOrArn: "${aws.s3.bucketNameOrArn}"
      accessKey: "${aws.s3.accessKey}"
      secretKey: "${aws.s3.secretKey}"
      region: "${aws.s3.region}"
    dataTypes:
      out:
        scheme: aws2-s3
        format: application-cloudevents
  sink:
    dataTypes:
      in:
        scheme: http
        format: application-cloudevents
    ref:
      kind: Broker
      apiVersion: eventing.knative.dev/v1
      name: my-broker

aws-s3-source Kamelet は、Pipesource として参照され、参照されたバケットでアクティビティが発生すると、CNCF CloudEvents をアウトバウンドの sink に送信します。ここでは、CloudEvents を受け入れる Knative ブローカーを使用します。後で、Knative Function をブローカーに接続して、ブローカーからイベントを受信します。

AWS S3 プロパティはシークレットに保存され、trait.camel.apache.org/mount.config アノテーションを介して Pipe にマウントされます。

コンシューマーとしての Knative Function の作成

Knative Function を使用して Knative ブローカーからメッセージを消費するには、単純な Golang 関数を作成する必要があります。ペイロードは CloudEvents として関数に送信されるため、次のコマンドを実行して、組み込みの cloudevents テンプレートを使用します。

$ func create -l go -t cloudevents s3-logger

これにより、現在のディレクトリの s3-logger フォルダーに新しいプロジェクトが作成され、Knative Function の Golang ファイルが含まれます。

package function

import (
    "context"
    "fmt"

    "github.com/cloudevents/sdk-go/v2/event"
)

// Handle an event.
func Handle(ctx context.Context, ce event.Event) (*event.Event, error) {
    /*
     * YOUR CODE HERE
     *
     * Try running `go test`.  Add more test as you code in `handle_test.go`.
     */

    fmt.Println("Received event")
    fmt.Println(ce) // echo to local output
    return &e, nil // echo to caller
}

Handle 関数を修正して、CloudEvent のいくつかの属性を出力できます。

func Handle(ctx context.Context, ce event.Event) (*event.Event, error) {
    fmt.Println("Received S3 event notification")
    fmt.Println("CloudEvent Subject attribute: " + ce.Subject())
    fmt.Println("CloudEvent Source attribute:  " + ce.Source())

  // Some processing of the payload of the CloudEvent...

    return nil, nil

上記の Handle 関数は、subject 属性で表されるファイル名と、受信した CloudEvent の source 属性として保存されるバケットの完全修飾名を出力するだけです。

現在、AWS-S3-Source はデータ全体を送信するため、data プロパティを使用してファイル全体にもアクセスできます。

AWS S3 イベントへの Knative Function のサブスクライブ

S3 イベント通知を受信できるようにするには、イベントを提供するブローカーに関数をサブスクライブする必要があります。しかし、どのようなイベントが利用可能かをどのように知るのでしょうか?そのため、システムで利用可能な Knative EventTypes を確認します。

$ kubectl get eventtypes.eventing.knative.dev 
NAME                                            TYPE                                      SOURCE                       SCHEMA   REFERENCE NAME   REFERENCE KIND   DESCRIPTION                             READY   REASON
et-my-broker-53bfa9803446c35c5a612c5a44a1c263   org.apache.camel.event.aws.s3.getObject   aws.s3.bucket.<bucketname>            my-broker        Broker           Event Type auto-created by controller   True

これで、名前空間に my-broker ブローカーで org.apache.camel.event.aws.s3.getObject イベントが利用可能であることがわかりました。

これで、指定されたブローカーからのそのイベントタイプへの関数のサブスクリプションを実行できます。

$ func subscribe --filter type=org.apache.camel.event.aws.s3.getObject --source my-broker

これにより、プロジェクトの func.yaml メタデータが更新され、関数がデプロイされると、CLI がコンシューマーをビルドおよびデプロイし、一致するフィルター引数を使用して Knative Trigger を作成します。

$ func deploy

デプロイされると、Knative Function のターミナルで、 S3 イベントに対して次の出力が表示されます。

Received S3 event notification

CloudEvent Subject attribute: my-file.txt
CloudEvent Source attribute:  aws.s3.bucket.<bucketname>

結論

Knative Eventing、Functions、および Apache Camel K を使用すると、AWS S3 などのサードパーティのクラウドサービスからの通知を、独自のオンプレミス Kubernetes クラスターで実行されている関数にトリガーできます。

当サイトでは、サイトのトラフィックを把握するために、アナリティクスとクッキーを使用しています。サイトの利用状況に関する情報は、その目的のためにGoogleと共有されます。詳細はこちら。