CloudEvents から Apache Kafka レコードへ、パート I ¶
公開日: 2023-03-08 、 改訂日: 2024-01-17
CloudEvents から Apache Kafka レコードへ、パート I¶
著者: Daniele Zonca、Red Hat のシニアプリンシパルソフトウェアエンジニア、Matthias Weßendorf、Red Hat のプリンシパルソフトウェアエンジニア
このブログ記事では、KafkaSink コンポーネントを使用して、受信した CloudEvents を Apache Kafka トピックに簡単に保存する方法を学びます。
Apache Kafka は非常に多様なユースケースで使用されていますが、特に拡張可能性が限られているサードパーティのコンポーネントがある場合、Kafka プロトコルを採用する必要性が障壁になる可能性があります。
Kafka プロトコルをサポートしていないイベントのプロデューサーがあり、HTTP がより柔軟なオプションになる可能性があります。Strimzi プロジェクトには、HTTP 経由でプロデューサー/コンシューマー API を公開する ブリッジコンポーネントがありますが、Kafka 固有であるため、本質的に同じプロトコル (コンシューマーグループ、オフセットなど) です。
CloudEvents の要件が問題になると思いますか? CloudEvents は、HTTP 形式のバインディングも定義しており、特にバイナリモードでは、ほとんどの HTTP ペイロードはすでに有効な CloudEvents である可能性があります!
Apache Kafka 用の Knative Sink は、CloudEvent イングレス用の Kafka ネイティブ実装であり、構成可能なトピックに Apache Kafka レコードとしてイベントを永続化します。
Apache Kafka トピックの設定¶
KafkaSink
コンポーネントを使用するには、Apache Kafka のトピックと、それへの適切なアクセス権が必要です。この記事では、Strimzi を使用して、ローカルの Apache Kafka インストールを使用します。こちらで説明されているように。Apache Kafka クラスターが Kubernetes 環境で実行されたら、トピックを作成する時間です。このために、Strimzi の KafkaTopic
CRD を使用して、標準的な宣言型 Kubernetes 方式でトピックを作成します。
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: my-topic
namespace: kafka
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 1
replicas: 1
これにより、partitions
と replicas
の両方が 1
に設定されたシンプルなトピックが作成されますが、これは本番環境では推奨されません。
注: Knative Kafka Broker の本番環境対応の構成については、このブログを参照してください。
マニフェストが Kubernetes クラスターに適用されると、次のようにクエリできます
kubectl get kafkatopics.kafka.strimzi.io -n kafka
NAME CLUSTER PARTITIONS REPLICATION FACTOR READY
my-topic my-cluster 1 1 True
KafkaSink コンポーネントの設定¶
Apache Kafka 用の Knative Sink のインストールについては、こちらで説明されています。
次に、ローカルの Strimzi ベースの Apache Kafka クラスター上の my-topic
トピックにバインドする KafkaSink
のインスタンスを作成します
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: my-kafka-sink
namespace: default
spec:
topic: my-topic
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
KafkaSink
は、status.address.url
フィールドで定義されたアドレスに HTTP 経由で受信 CloudEvents を受信できる Addressable
タイプです。
kubectl get kafkasinks.eventing.knative.dev
NAME URL AGE READY REASON
my-kafka-sink http://kafka-sink-ingress.knative-eventing.svc.cluster.local/default/my-kafka-sink 13s True
Kn イベントプラグイン¶
この時点で、curl
プログラムがインストールされた Kubernetes クラスター内のポッドを使用して、KafkaSink
の URL
にイベントを送信できます。
ただし、代わりに、コマンドラインからクラウドイベントを管理するために、kn
クライアント CLI とその イベントプラグインを使用しています
kn event send \
--to KafkaSink:eventing.knative.dev/v1alpha1:my-kafka-sink \
--type=dev.knative.blog.post \
-f message="Hello"
上記のコマンドでは、dev.knative.blog.post
を持つ CloudEvents として message
を my-kafka-sink
オブジェクトに送信しています。kn event
プラグインは、この呼び出しから有効な CloudEvents を生成し、参照されたシンクのアドレス指定可能な URL に直接送信します。
kcat によるイベント処理¶
kcat は、以前は kafkacat として知られていたプロジェクトで、Apache Kafka からレコードを生成および消費するためのコマンドラインモードを提供します。
これにより、Apache Kafka クラスターの my-topic
トピックに保存された Apache Kafka レコード (CloudEvent として) を消費できます
kubectl run kcat -ti --image=docker.io/edenhill/kcat:1.7.1 --rm=true --restart=Never -- -C -b my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092 -t my-topic -f '\nHeaders: %h\nMessage value: %s\n\n '
Headers: ce_specversion=1.0,ce_id=ce5026d0-234e-4997-975a-c005f515fedf,ce_source=kn-event/v1.9.0,ce_type=ype=dev.knative.blog.post,content-type=application/json,ce_time=2023-02-13T12:52:20.654526321Z
Message value: {"message":"Hello"}
% Reached end of topic my-topic [0] at offset 2
CloudEvents バイナリモード¶
KafkaSink
は、バイナリコンテンツモードをデフォルトで使用して、受信した CloudEvents を Kafka レコードとして保存することに注意することが重要です。これは、転送やルーティングの最適化のため、より効率的であり、JSON の解析を回避できるためです。バイナリコンテンツモード
を使用すると、すべての CloudEvents 属性と拡張機能が、Kafka レコードのヘッダーとしてマッピングされます。一方、CloudEvent の data
は、Kafka レコードの実際の値に対応します。これは、構造化コンテンツモード
よりも バイナリコンテンツモード
を使用するもう 1 つの利点です。構造化コンテンツモード
は 妨害的 ではないため、CloudEvents を理解しないシステムと互換性があるためです。
展望¶
Knative KafkaSink
コンポーネントによってバックアップされた Kafka トピックに保存されたメッセージは、Apache Kafka コミュニティのより大きなエコシステム内の任意のコンシューマーアプリケーションによって簡単に消費できます。この記事の次の投稿では、Apache Kafka 用の Knative ブローカー実装を使用して受信イベントを保存し、このフィルタリング機能が Apache Kafka 自体に直接組み込まれていないため、CloudEvents メタデータに基づいてルーティングするための Knative Eventing ツールを使用する方法を示します。