コンテンツへスキップ

CloudEvents から Apache Kafka レコードへ、パート I

公開日: 2023-03-08 、  改訂日: 2024-01-17

CloudEvents から Apache Kafka レコードへ、パート I

著者: Daniele Zonca、Red Hat のシニアプリンシパルソフトウェアエンジニア、Matthias Weßendorf、Red Hat のプリンシパルソフトウェアエンジニア

このブログ記事では、KafkaSink コンポーネントを使用して、受信した CloudEvents を Apache Kafka トピックに簡単に保存する方法を学びます。

Apache Kafka は非常に多様なユースケースで使用されていますが、特に拡張可能性が限られているサードパーティのコンポーネントがある場合、Kafka プロトコルを採用する必要性が障壁になる可能性があります。

Kafka プロトコルをサポートしていないイベントのプロデューサーがあり、HTTP がより柔軟なオプションになる可能性があります。Strimzi プロジェクトには、HTTP 経由でプロデューサー/コンシューマー API を公開する ブリッジコンポーネントがありますが、Kafka 固有であるため、本質的に同じプロトコル (コンシューマーグループ、オフセットなど) です。

CloudEvents の要件が問題になると思いますか? CloudEvents は、HTTP 形式のバインディングも定義しており、特にバイナリモードでは、ほとんどの HTTP ペイロードはすでに有効な CloudEvents である可能性があります!

Apache Kafka 用の Knative Sink は、CloudEvent イングレス用の Kafka ネイティブ実装であり、構成可能なトピックに Apache Kafka レコードとしてイベントを永続化します。

Apache Kafka トピックの設定

KafkaSink コンポーネントを使用するには、Apache Kafka のトピックと、それへの適切なアクセス権が必要です。この記事では、Strimzi を使用して、ローカルの Apache Kafka インストールを使用します。こちらで説明されているように。Apache Kafka クラスターが Kubernetes 環境で実行されたら、トピックを作成する時間です。このために、Strimzi の KafkaTopic CRD を使用して、標準的な宣言型 Kubernetes 方式でトピックを作成します。

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: my-topic
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 1
  replicas: 1

これにより、partitionsreplicas の両方が 1 に設定されたシンプルなトピックが作成されますが、これは本番環境では推奨されません

注: Knative Kafka Broker の本番環境対応の構成については、このブログを参照してください。

マニフェストが Kubernetes クラスターに適用されると、次のようにクエリできます

kubectl get kafkatopics.kafka.strimzi.io -n kafka
NAME       CLUSTER      PARTITIONS   REPLICATION FACTOR   READY
my-topic   my-cluster   1            1                    True

KafkaSink コンポーネントの設定

Apache Kafka 用の Knative Sink のインストールについては、こちらで説明されています。

次に、ローカルの Strimzi ベースの Apache Kafka クラスター上の my-topic トピックにバインドする KafkaSink のインスタンスを作成します

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
  name: my-kafka-sink
  namespace: default
spec:
  topic: my-topic
  bootstrapServers:
   - my-cluster-kafka-bootstrap.kafka:9092

KafkaSink は、status.address.url フィールドで定義されたアドレスに HTTP 経由で受信 CloudEvents を受信できる Addressable タイプです。

kubectl get kafkasinks.eventing.knative.dev
NAME            URL                                                                                  AGE   READY   REASON
my-kafka-sink   http://kafka-sink-ingress.knative-eventing.svc.cluster.local/default/my-kafka-sink   13s   True

Kn イベントプラグイン

この時点で、curl プログラムがインストールされた Kubernetes クラスター内のポッドを使用して、KafkaSinkURL にイベントを送信できます。

ただし、代わりに、コマンドラインからクラウドイベントを管理するために、kn クライアント CLI とその イベントプラグインを使用しています

kn event send \
  --to KafkaSink:eventing.knative.dev/v1alpha1:my-kafka-sink \
  --type=dev.knative.blog.post \
  -f message="Hello"

上記のコマンドでは、dev.knative.blog.post を持つ CloudEvents として messagemy-kafka-sink オブジェクトに送信しています。kn event プラグインは、この呼び出しから有効な CloudEvents を生成し、参照されたシンクのアドレス指定可能な URL に直接送信します。

kcat によるイベント処理

kcat は、以前は kafkacat として知られていたプロジェクトで、Apache Kafka からレコードを生成および消費するためのコマンドラインモードを提供します。

これにより、Apache Kafka クラスターの my-topic トピックに保存された Apache Kafka レコード (CloudEvent として) を消費できます

kubectl run kcat -ti --image=docker.io/edenhill/kcat:1.7.1 --rm=true --restart=Never -- -C -b my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092 -t my-topic -f '\nHeaders: %h\nMessage value: %s\n\n '
上記のコマンドは、次のように Kafka レコードのすべてのヘッダーとその値を印刷します

Headers: ce_specversion=1.0,ce_id=ce5026d0-234e-4997-975a-c005f515fedf,ce_source=kn-event/v1.9.0,ce_type=ype=dev.knative.blog.post,content-type=application/json,ce_time=2023-02-13T12:52:20.654526321Z
Message value: {"message":"Hello"}

% Reached end of topic my-topic [0] at offset 2

CloudEvents バイナリモード

KafkaSink は、バイナリコンテンツモードをデフォルトで使用して、受信した CloudEvents を Kafka レコードとして保存することに注意することが重要です。これは、転送やルーティングの最適化のため、より効率的であり、JSON の解析を回避できるためです。バイナリコンテンツモードを使用すると、すべての CloudEvents 属性と拡張機能が、Kafka レコードのヘッダーとしてマッピングされます。一方、CloudEvent の data は、Kafka レコードの実際の値に対応します。これは、構造化コンテンツモードよりも バイナリコンテンツモードを使用するもう 1 つの利点です。構造化コンテンツモード妨害的 ではないため、CloudEvents を理解しないシステムと互換性があるためです。

展望

Knative KafkaSink コンポーネントによってバックアップされた Kafka トピックに保存されたメッセージは、Apache Kafka コミュニティのより大きなエコシステム内の任意のコンシューマーアプリケーションによって簡単に消費できます。この記事の次の投稿では、Apache Kafka 用の Knative ブローカー実装を使用して受信イベントを保存し、このフィルタリング機能が Apache Kafka 自体に直接組み込まれていないため、CloudEvents メタデータに基づいてルーティングするための Knative Eventing ツールを使用する方法を示します。

分析とクッキーを使用して、サイトトラフィックを把握します。サイトの使用に関する情報は、その目的のために Google と共有されます。 詳細はこちら。