コンテンツにスキップ

CloudEventsからApache Kafkaレコードへ、パートII

公開日:2023年4月3日、改訂日:2024年1月17日

CloudEventsからApache Kafkaレコードへ、パートII

著者:Daniele Zonca、Red Hatシニアプリンシパルソフトウェアエンジニア、Matthias Weßendorf、Red Hatシニアプリンシパルソフトウェアエンジニア

このブログ記事では、受信CloudEventsをApache Kafkaトピックに簡単に保存し、コンテンツベースのイベントルーティングにKnative BrokerおよびTrigger APIを使用する方法を学習します。

この投稿のパート1では、KnativeがCloudEventsをApache Kafkaトピックに取り込んでさらに処理するためにどのように役立つかを説明しました。記事では、kcat CLIのようなApache Kafkaエコシステムの標準ツールを使用したCloudEvents Kafkaレコードの処理を示しました。さらに、投稿では、Knativeがデフォルトで使用するCloudEventsのバイナリコンテンツモードの**利点**についても説明しました。この記事では、イベントルーティングにKnative BrokerおよびTrigger APIを活用することで、取り込まれたCloudEventsを処理するための異なるアプローチを示します。

Apache KafkaとKnative Brokerのセットアップ

Apache Kafka用Knative Brokerを使用するには、最初にApache Kafkaをインストールする必要があります。この投稿では、Strimziを搭載したローカルApache Kafkaインストールを使用しています(こちらで説明されています)。記事では、ローカル開発環境用にApache Kafka用Knative Brokerをインストールする方法についても説明しています。

注記

Apache Kafka用Knative Brokerの本番環境対応構成については、このブログを参照してください。

Knative Brokerコンポーネントのセットアップ

上記の記 事では、すべてのKnative BrokerKafkaクラスの形式になるように構成されているため、Apache Kafkaの新しいブローカーを作成するのは非常に簡単です。

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  name: my-demo-kafka-broker
  annotations:
    eventing.knative.dev/broker.class: Kafka
spec: {}

Brokerアドレス指定可能タイプであり、status.address.urlフィールドで定義されたアドレスを介してHTTP経由で受信CloudEventsを受信できます。

kubectl get brokers.eventing.knative.dev
NAME                   URL                                                                                           AGE   READY   REASON
my-demo-kafka-broker   http://kafka-broker-ingress.knative-eventing.svc.cluster.local/default/my-demo-kafka-broker   7s    True    

注記

ブローカーは、クラスタ内の上記のURLで到達可能です。それを行うために、Ingressを作成(および保護)することもできます。開発の場合は、knコマンドラインを使用してイベントを送信することもできます。Kn Event Pluginセクションを参照してください。

ただし、Apache Kafkaトピックに関する情報は表示されません。理由は、Broker実装で使用されるトピックは実装の詳細と見なされるためです。実際のブローカーオブジェクトを見てみましょう。

kubectl get brokers.eventing.knative.dev my-demo-kafka-broker -o yaml 
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    eventing.knative.dev/broker.class: Kafka
  name: my-demo-kafka-broker
  namespace: default
spec:
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing
status:
  address:
    url: http://kafka-broker-ingress.knative-eventing.svc.cluster.local/default/my-demo-kafka-broker
  annotations:
    bootstrap.servers: my-cluster-kafka-bootstrap.kafka:9092
    default.topic: knative-broker-default-my-demo-kafka-broker
    default.topic.partitions: "10"
    default.topic.replication.factor: "1"

上記はYAML表現の簡略版ですが、spec.configに注意してください。これは、クラスタ内のすべてのKafka対応Knative Brokerのデフォルト構成を指します。 kafka-broker-config ConfigMapは、パーティションレプリケーション係数などのノブを定義することで、基盤となるトピックの概念を構成します。ただし、ブローカーのstatusには、トピックの名前が表示されます:knative-broker-default-my-demo-kafka-broker。名前は規則knative-broker-<namespace>-<broker-name>に従います。

注記

デフォルトでは、Knative Kafka Brokerは独自の内部トピックを作成しますが、このアクションは一部の環境では制限される場合があります。この場合やその他の同様のユースケースでは、独自のトピックを持ち込むことができます。

コンシューマーアプリケーションのセットアップ

CloudEventsを取り込むためのHTTPエンドポイントとして機能するBrokerができたので、CloudEventsを受信*および*処理するアプリケーションを定義します。

apiVersion: v1
kind: Pod
metadata:
  name: log-receiver
  labels:
    app: log-receiver
spec:
  containers:
  - name: log-receiver
    image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
    imagePullPolicy: Always
    ports:
    - containerPort: 8080
      protocol: TCP
      name: log-receiver
---
apiVersion: v1
kind: Service
metadata:
  name: log-receiver
spec:
  selector:
    app: log-receiver
  ports:
    - port: 80
      protocol: TCP
      targetPort: log-receiver
      name: http

ここでは、CloudEventsを受信するHTTPサーバーを指す単純なPodとそのServiceを定義します。ご覧のとおり、これはKafka固有のコンシューマーでは*なく*、*任意*の言語の*任意*のHTTP Webサーバーを使用して、Apache KafkaトピックからのCloudEventsを処理できます。

コンシューマーアプリケーションの開発者は、Kafkaコンシューマーアプリケーションをプログラムする方法について詳しく知る必要はありません。Apache Kafka用Broker実装を備えたKnativeは、コンシューマーアプリケーションのHTTPプロキシとして機能することで、これを抽象化します。これにより、これらの焦点を絞った自己完結型のコンシューマーアプリケーションのエンジニアリング作業が劇的に簡素化されます。

Apache Kafkaを使用したメッセージルーティングルールの定義

Apache Kafkaのトピックは、同じ境界付けられたコンテキスト(ドメイン駆動設計原則を適用している場合)を参照する可能性のあるさまざまなタイプのイベントを含めるために使用されることがよくあります。これは、各コンシューマーがすべてのイベントを受信して、そのサブセットのみをフィルタリングおよび処理することを意味します。

これは、Apache Kafkaプロトコルの欠点の1つです。レコードのルーティング用の直接フィルターAPIはありません。イベントを処理またはフィルタリングして、別の宛先または他のKafkaトピックにルーティングするには、本格的なKafkaコンシューマークライアントを実装する必要があります。または、Kafka Streamsなどの追加ライブラリの使用が必要です。

ご想像の通り、これは非常に一般的なパターンであり、Knative Eventing はこれを API の一部として組み込んでいます。Trigger API は、CloudEvent のメタデータに基づいてルーティングを行うための強力なフィルタセットを定義します。

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: log-trigger
spec:
  broker: my-demo-kafka-broker
  filter:
    attributes:
      type: <cloud-event-type>
      <ce-extension>: <ce-extension-value>
  subscriber:
    ref:
      apiVersion: v1
      kind: Service
      name: log-receiver

ここでは、一連のfilterルールを定義する Trigger を見ていきます。これらのルールが一致する場合、Kafka トピックからの CloudEvent は HTTP を使用して、参照されている Web サーバーアプリケーションにルーティングされます。また、Knative の*実験的機能*として、Trigger API の filters フィールドを使用して SQL ライクなフィルタリングを可能にするものがあります。これは、CloudEvents Subscriptions APIを実装しています。

注記

CloudEvent のメタデータ属性と拡張機能に対して、Trigger にフィルタ属性を適用することを強くお勧めします。フィルタが提供されない場合、発生するすべての CloudEvent は参照されているサブスクライバーにルーティングされます。これは、ブローカー内のすべてのイベントのロガーを明示的に用意したい場合を除き、悪いアプリケーション設計です。

Knative Broker for Apache Kafka によって実行される Trigger の場合、Triggerkafka.eventing.knative.dev/delivery.order アノテーションを使用することで、配信されるイベントの順序を設定することも可能です。

Kn Event Plugin

イベントを送信する場合、HTTP を使用して CloudEvent を Broker に取り込むため、Apache Kafka Producer API を使用する必要はありません。1 つの選択肢として、Kubernetes クラスタ内に curl プログラムがインストールされた Pod を使用し、BrokerURL にイベントを送信することができます。しかし、ここでは、コマンドラインからクラウドイベントを管理するための kn クライアント CLI とその event plugin を使用します。

kn event send \
  --to Broker:eventing.knative.dev/v1:my-demo-kafka-broker \
  --type=dev.knative.blog.post \
  -f message="Hello"

上記のコマンドでは、messagedev.knative.blog.post タイプの CloudEvent として my-demo-kafka-broker オブジェクトに送信しています。kn event プラグインはこの呼び出しから有効な CloudEvent を生成し、参照されているコンポーネント(この例では Broker)のアドレス指定可能な URL に直接送信します。

結論

この例では、イベントの送信から受信までの単純なフローを示しました。メッセージは Knative Broker の背後にある Kafka トピックに永続化されます。そこから、標準の Apache Kafka API を使用して消費することもできます。しかし、Knative が提供する抽象化は、イベント駆動型アプリケーションの開発プロセスを簡素化します。過剰な設定なしに、メタデータに基づいてイベントをフィルタリングおよびルーティングすることも可能です。

さらに、Trigger/Filter の採用は、すべてのコンシューマーで同じパターンを再実装することを避ける方法であるだけでなく、コンシューマーが必要な場合にのみ呼び出されるため、メッセージ処理全体をより効率的にします。Knative Service である場合は、ゼロスケーリングも可能です。

サイトのトラフィックを理解するために、アナリティクスと Cookie を使用しています。お客様のサイトの使用に関する情報は、その目的のために Google と共有されます。詳細はこちら。