CloudEventsからApache Kafkaレコードへ、パートII ¶
公開日:2023年4月3日、改訂日:2024年1月17日
CloudEventsからApache Kafkaレコードへ、パートII¶
著者:Daniele Zonca、Red Hatシニアプリンシパルソフトウェアエンジニア、Matthias Weßendorf、Red Hatシニアプリンシパルソフトウェアエンジニア
このブログ記事では、受信CloudEventsをApache Kafkaトピックに簡単に保存し、コンテンツベースのイベントルーティングにKnative BrokerおよびTrigger APIを使用する方法を学習します。
この投稿のパート1では、KnativeがCloudEventsをApache Kafkaトピックに取り込んでさらに処理するためにどのように役立つかを説明しました。記事では、kcat
CLIのようなApache Kafkaエコシステムの標準ツールを使用したCloudEvents Kafkaレコードの処理を示しました。さらに、投稿では、Knativeがデフォルトで使用するCloudEventsのバイナリコンテンツモードの**利点**についても説明しました。この記事では、イベントルーティングにKnative BrokerおよびTrigger APIを活用することで、取り込まれたCloudEventsを処理するための異なるアプローチを示します。
Apache KafkaとKnative Brokerのセットアップ¶
Apache Kafka用Knative Brokerを使用するには、最初にApache Kafkaをインストールする必要があります。この投稿では、Strimziを搭載したローカルApache Kafkaインストールを使用しています(こちらで説明されています)。記事では、ローカル開発環境用にApache Kafka用Knative Brokerをインストールする方法についても説明しています。
注記
Apache Kafka用Knative Brokerの本番環境対応構成については、このブログを参照してください。
Knative Brokerコンポーネントのセットアップ¶
上記の記 事では、すべてのKnative Broker
がKafka
クラスの形式になるように構成されているため、Apache Kafkaの新しいブローカーを作成するのは非常に簡単です。
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
name: my-demo-kafka-broker
annotations:
eventing.knative.dev/broker.class: Kafka
spec: {}
Broker
はアドレス指定可能
タイプであり、status.address.url
フィールドで定義されたアドレスを介してHTTP経由で受信CloudEventsを受信できます。
kubectl get brokers.eventing.knative.dev
NAME URL AGE READY REASON
my-demo-kafka-broker http://kafka-broker-ingress.knative-eventing.svc.cluster.local/default/my-demo-kafka-broker 7s True
注記
ブローカーは、クラスタ内の上記のURLで到達可能です。それを行うために、Ingress
を作成(および保護)することもできます。開発の場合は、kn
コマンドラインを使用してイベントを送信することもできます。Kn Event Pluginセクションを参照してください。
ただし、Apache Kafkaトピックに関する情報は表示されません。理由は、Broker実装で使用されるトピックは実装の詳細と見なされるためです。実際のブローカーオブジェクトを見てみましょう。
kubectl get brokers.eventing.knative.dev my-demo-kafka-broker -o yaml
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
eventing.knative.dev/broker.class: Kafka
name: my-demo-kafka-broker
namespace: default
spec:
config:
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
status:
address:
url: http://kafka-broker-ingress.knative-eventing.svc.cluster.local/default/my-demo-kafka-broker
annotations:
bootstrap.servers: my-cluster-kafka-bootstrap.kafka:9092
default.topic: knative-broker-default-my-demo-kafka-broker
default.topic.partitions: "10"
default.topic.replication.factor: "1"
上記はYAML表現の簡略版ですが、spec.config
に注意してください。これは、クラスタ内のすべてのKafka対応Knative Brokerのデフォルト構成を指します。 kafka-broker-config
ConfigMapは、パーティション
やレプリケーション係数
などのノブを定義することで、基盤となるトピックの概念を構成します。ただし、ブローカーのstatus
には、トピックの名前が表示されます:knative-broker-default-my-demo-kafka-broker
。名前は規則knative-broker-<namespace>-<broker-name>
に従います。
注記
デフォルトでは、Knative Kafka Brokerは独自の内部トピックを作成しますが、このアクションは一部の環境では制限される場合があります。この場合やその他の同様のユースケースでは、独自のトピックを持ち込むことができます。
コンシューマーアプリケーションのセットアップ¶
CloudEventsを取り込むためのHTTPエンドポイントとして機能するBroker
ができたので、CloudEventsを受信*および*処理するアプリケーションを定義します。
apiVersion: v1
kind: Pod
metadata:
name: log-receiver
labels:
app: log-receiver
spec:
containers:
- name: log-receiver
image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
imagePullPolicy: Always
ports:
- containerPort: 8080
protocol: TCP
name: log-receiver
---
apiVersion: v1
kind: Service
metadata:
name: log-receiver
spec:
selector:
app: log-receiver
ports:
- port: 80
protocol: TCP
targetPort: log-receiver
name: http
ここでは、CloudEventsを受信するHTTPサーバーを指す単純なPod
とそのService
を定義します。ご覧のとおり、これはKafka固有のコンシューマーでは*なく*、*任意*の言語の*任意*のHTTP Webサーバーを使用して、Apache KafkaトピックからのCloudEventsを処理できます。
コンシューマーアプリケーションの開発者は、Kafkaコンシューマーアプリケーションをプログラムする方法について詳しく知る必要はありません。Apache Kafka用Broker実装を備えたKnativeは、コンシューマーアプリケーションのHTTPプロキシとして機能することで、これを抽象化します。これにより、これらの焦点を絞った自己完結型のコンシューマーアプリケーションのエンジニアリング作業が劇的に簡素化されます。
Apache Kafkaを使用したメッセージルーティングルールの定義¶
Apache Kafkaのトピックは、同じ境界付けられたコンテキスト(ドメイン駆動設計原則を適用している場合)を参照する可能性のあるさまざまなタイプのイベントを含めるために使用されることがよくあります。これは、各コンシューマーがすべてのイベントを受信して、そのサブセットのみをフィルタリングおよび処理することを意味します。
これは、Apache Kafkaプロトコルの欠点の1つです。レコードのルーティング用の直接フィルターAPIはありません。イベントを処理またはフィルタリングして、別の宛先または他のKafkaトピックにルーティングするには、本格的なKafkaコンシューマークライアントを実装する必要があります。または、Kafka Streamsなどの追加ライブラリの使用が必要です。
ご想像の通り、これは非常に一般的なパターンであり、Knative Eventing はこれを API の一部として組み込んでいます。Trigger
API は、CloudEvent のメタデータに基づいてルーティングを行うための強力なフィルタセットを定義します。
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: log-trigger
spec:
broker: my-demo-kafka-broker
filter:
attributes:
type: <cloud-event-type>
<ce-extension>: <ce-extension-value>
subscriber:
ref:
apiVersion: v1
kind: Service
name: log-receiver
ここでは、一連のfilter
ルールを定義する Trigger を見ていきます。これらのルールが一致する場合、Kafka トピックからの CloudEvent は HTTP を使用して、参照されている Web サーバーアプリケーションにルーティングされます。また、Knative の*実験的機能*として、Trigger
API の filters
フィールドを使用して SQL ライクなフィルタリングを可能にするものがあります。これは、CloudEvents Subscriptions APIを実装しています。
注記
CloudEvent のメタデータ属性と拡張機能に対して、Trigger
にフィルタ属性を適用することを強くお勧めします。フィルタが提供されない場合、発生するすべての CloudEvent は参照されているサブスクライバーにルーティングされます。これは、ブローカー内のすべてのイベントのロガーを明示的に用意したい場合を除き、悪いアプリケーション設計です。
Knative Broker for Apache Kafka によって実行される Trigger
の場合、Trigger
に kafka.eventing.knative.dev/delivery.order
アノテーションを使用することで、配信されるイベントの順序を設定することも可能です。
Kn Event Plugin¶
イベントを送信する場合、HTTP を使用して CloudEvent を Broker に取り込むため、Apache Kafka Producer API を使用する必要はありません。1 つの選択肢として、Kubernetes クラスタ内に curl
プログラムがインストールされた Pod
を使用し、Broker
の URL
にイベントを送信することができます。しかし、ここでは、コマンドラインからクラウドイベントを管理するための kn
クライアント CLI とその event plugin を使用します。
kn event send \
--to Broker:eventing.knative.dev/v1:my-demo-kafka-broker \
--type=dev.knative.blog.post \
-f message="Hello"
上記のコマンドでは、message
を dev.knative.blog.post
タイプの CloudEvent として my-demo-kafka-broker
オブジェクトに送信しています。kn event
プラグインはこの呼び出しから有効な CloudEvent を生成し、参照されているコンポーネント(この例では Broker
)のアドレス指定可能な URL に直接送信します。
結論¶
この例では、イベントの送信から受信までの単純なフローを示しました。メッセージは Knative Broker の背後にある Kafka トピックに永続化されます。そこから、標準の Apache Kafka API を使用して消費することもできます。しかし、Knative が提供する抽象化は、イベント駆動型アプリケーションの開発プロセスを簡素化します。過剰な設定なしに、メタデータに基づいてイベントをフィルタリングおよびルーティングすることも可能です。
さらに、Trigger
/Filter
の採用は、すべてのコンシューマーで同じパターンを再実装することを避ける方法であるだけでなく、コンシューマーが必要な場合にのみ呼び出されるため、メッセージ処理全体をより効率的にします。Knative Service である場合は、ゼロスケーリングも可能です。