Apache Kafka用Knativeソース¶
KafkaSource
は、既存の Apache Kafka トピックに保存されたメッセージを読み取り、それらのメッセージをCloudEventsとして構成済みの sink
にHTTP経由で送信します。KafkaSource
は、トピックパーティションに保存されたメッセージの順序を保持します。これは、同じパーティション内の次のメッセージを配信する前に、sink
からの正常な応答を待つことによって実現します。
KafkaSourceコントローラーのインストール¶
-
次のコマンドを入力して、
KafkaSource
コントローラーをインストールします。kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-controller.yaml
-
次のコマンドを入力して、Kafka Sourceデータプレーンをインストールします。
kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-source.yaml
-
次のコマンドを入力して、
kafka-controller
とkafka-source-dispatcher
が実行されていることを確認します。kubectl get deployments.apps,statefulsets.apps -n knative-eventing
出力例
NAME READY UP-TO-DATE AVAILABLE AGE deployment.apps/kafka-controller 1/1 1 1 3s NAME READY AGE statefulset.apps/kafka-source-dispatcher 1/1 3s
オプション:Kafkaトピックの作成¶
注
Kafkaトピックを作成するセクションでは、Apache Kafkaを操作するためにStrimziを使用していることを前提としていますが、同等の操作はApache Kafka CLIまたは他のツールを使用して複製できます。
Strimziを使用している場合
-
KafkaTopic
YAMLファイルを作成します。apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: knative-demo-topic namespace: kafka labels: strimzi.io/cluster: my-cluster spec: partitions: 3 replicas: 1 config: retention.ms: 7200000 segment.bytes: 1073741824
-
次のコマンドを実行して、
KafkaTopic
YAMLファイルをデプロイします。ここで、kubectl apply -f <filename>.yaml
<filename>
はKafkaTopic
YAMLファイルの名前です。出力例
kafkatopic.kafka.strimzi.io/knative-demo-topic created
-
次のコマンドを実行して、
KafkaTopic
が実行されていることを確認します。kubectl -n kafka get kafkatopics.kafka.strimzi.io
出力例
NAME CLUSTER PARTITIONS REPLICATION FACTOR knative-demo-topic my-cluster 3 1
サービスの作成¶
-
event-display
サービスをYAMLファイルとして作成します。apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display namespace: default spec: template: spec: containers: - # This corresponds to # https://github.com/knative/eventing/tree/main/cmd/event_display/main.go image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
-
次のコマンドを実行してYAMLファイルを適用します。
ここで、kubectl apply -f <filename>.yaml
<filename>
は前の手順で作成したファイルの名前です。出力例
service.serving.knative.dev/event-display created
-
次のコマンドを実行して、サービスPodが実行されていることを確認します。
kubectl get pods
Pod名のプレフィックスは
event-display
です。NAME READY STATUS RESTARTS AGE event-display-00001-deployment-5d5df6c7-gv2j4 2/2 Running 0 72s
Kafkaイベントソース¶
-
source/event-source.yaml
をブートストラップサーバー、トピックなどに応じて変更します。apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: kafka-source spec: consumerGroup: knative-group bootstrapServers: - my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace topics: - knative-demo-topic sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
-
イベントソースをデプロイします。
kubectl apply -f event-source.yaml
出力例
kafkasource.sources.knative.dev/kafka-source created
-
KafkaSourceの準備が整っていることを確認します。
kubectl get kafkasource kafka-source
出力例
NAME TOPICS BOOTSTRAPSERVERS READY REASON AGE kafka-source ["knative-demo-topic"] ["my-cluster-kafka-bootstrap.kafka:9092"] True 26h
スケーリング¶
より多くのコンシューマーをスケジュールしたり、より少ないコンシューマーをスケジュールしたりするために、KafkaSourceをスケーリングできます。また、それらを異なるディスパッチャーPodに割り当てることができます。kafkasourceステータスには、status.placementsキーの下にこのような割り当てが表示されます。
次の表記を使用して、kubectlでKafkaSourceをスケーリングできます。
kubectl scale kafkasource -n <ns> <kafkasource-name> --replicas=<number-of-replicas> # e.g. 12 replicas for a topic with 12 partitions
あるいは、GitOpsアプローチを使用している場合は、次の例に示すように consumers
キーを追加してリポジトリにコミットできます。
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
consumers: 12 # Number of replicas
topics:
- knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
KEDAによる自動スケーリング¶
KEDAを使用してKafkaSourceを自動スケーリングできます。この機能を有効にして構成する方法については、こちらの手順をお読みください。
確認¶
-
次の例のように、メッセージ(
{"msg": "This is a test!"}
)をApache Kafkaトピックに生成します。kubectl -n kafka run kafka-producer -ti --image=strimzi/kafka:0.14.0-kafka-2.3.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic knative-demo-topic
ヒント
コマンドプロンプトが表示されない場合は、Enterキーを押してみてください。
-
サービスがイベントソースからメッセージを受信したことを確認します。
kubectl logs --selector='serving.knative.dev/service=event-display' -c user-container
出力例
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.kafka.event source: /apis/v1/namespaces/default/kafkasources/kafka-source#my-topic subject: partition:0#564 id: partition:0/offset:564 time: 2020-02-10T18:10:23.861866615Z datacontenttype: application/json Extensions, key: Data, { "msg": "This is a test!" }
配信失敗の処理¶
KafkaSource
は Delivery
仕様を実装しており、イベント配信に失敗した場合に適用されるイベント配信パラメーターを構成できます。
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
topics:
- knative-demo-topic
delivery:
deadLetterSink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: example-sink
backoffDelay: <duration>
backoffPolicy: <policy-type>
retry: <integer>
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
delivery
APIについては、配信失敗の処理の章で説明しています。
オプション:キーデシリアライザーの指定¶
KafkaSource
がKafkaからメッセージを受信すると、キーを Key
という名前のEvent拡張にダンプし、Kafkaメッセージヘッダーを kafkaheader
で始まる拡張にダンプします。
4つのタイプの中からキーデシリアライザーを指定できます。
- UTF-8エンコードされた文字列の場合は
string
(デフォルト) - 32ビットと64ビットの符号付き整数の場合は
int
- 32ビットと64ビットの浮動小数点数の場合は
float
- Base64エンコードされたバイト配列の場合は
byte-array
キーデシリアライザーを指定するには、次の例に示すように、kafkasources.sources.knative.dev/key-type
ラベルを KafkaSource
定義に追加します。
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
labels:
kafkasources.sources.knative.dev/key-type: int
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
topics:
- knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
オプション:初期オフセットの指定¶
デフォルトでは、KafkaSource
は各パーティションの最新オフセットからの消費を開始します。最初のオフセットから消費したい場合は、initialOffsetフィールドを earliest
に設定します(例:)。
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
spec:
consumerGroup: knative-group
initialOffset: earliest
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
topics:
- knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
注
initialOffset
の有効な値は earliest
と latest
です。他の値を指定すると、検証エラーが発生します。このフィールドは、そのコンシューマーグループにコミットされたオフセットがない場合にのみ有効になります。
TLSが有効なKafkaブローカーへの接続¶
KafkaSourceはTLSおよびSASL認証方法をサポートしています。TLS認証を有効にするには、次のファイルが必要です。
- CA証明書
- クライアント証明書とキー
KafkaSourceはこれらのファイルがPEM形式であることを想定しています。JKSなどの別の形式の場合は、PEMに変換してください。
-
次のコマンドを実行して、KafkaSourceが設定されるネームスペースにシークレットとして証明書ファイルを作成します。
kubectl create secret generic cacert --from-file=caroot.pem
kubectl create secret tls kafka-secret --cert=certificate.pem --key=key.pem
-
KafkaSourceを適用します。
bootstrapServers
およびtopics
フィールドを適宜変更します。apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: kafka-source-with-tls spec: net: tls: enable: true cert: secretKeyRef: key: tls.crt name: kafka-secret key: secretKeyRef: key: tls.key name: kafka-secret caCert: secretKeyRef: key: caroot.pem name: cacert consumerGroup: knative-group bootstrapServers: - my-secure-kafka-bootstrap.kafka:443 topics: - knative-demo-topic sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
KafkaSourceのSASLの有効化¶
Simple Authentication and Security Layer(SASL)は、Apache Kafkaで認証に使用されます。クラスターでSASL認証を使用する場合は、Kafkaクラスターと通信するために、ユーザーがKnativeにクレデンシャルを提供する必要があります。そうしないと、イベントを生成または消費できません。
前提条件¶
- Simple Authentication and Security Layer(SASL)を持つKafkaクラスターにアクセスできること。
手順¶
-
次のコマンドを実行して、KafkaクラスターのSASL情報を使用するシークレットを作成します。
STRIMZI_CRT=$(kubectl -n kafka get secret example-cluster-cluster-ca-cert --template='{{index.data "ca.crt"}}' | base64 --decode )
SASL_PASSWD=$(kubectl -n kafka get secret example-user --template='{{index.data "password"}}' | base64 --decode )
kubectl create secret -n default generic <secret_name> \ --from-literal=ca.crt="$STRIMZI_CRT" \ --from-literal=password="$SASL_PASSWD" \ --from-literal=saslType="SCRAM-SHA-512" \ --from-literal=user="example-user"
-
次のspecオプションが含まれるようにKafkaSourceを作成または変更します。
apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: example-source spec: ... net: sasl: enable: true user: secretKeyRef: name: <secret_name> key: user password: secretKeyRef: name: <secret_name> key: password type: secretKeyRef: name: <secret_name> key: saslType tls: enable: true caCert: secretKeyRef: name: <secret_name> key: ca.crt ...
ここで、
<secret_name>
は前の手順で生成されたシークレットの名前です。
クリーンアップ手順¶
-
Kafkaイベントソースを削除します。
kubectl delete -f source/source.yaml kafkasource.sources.knative.dev
出力例
"kafka-source" deleted
-
event-display
サービスを削除します。kubectl delete -f source/event-display.yaml service.serving.knative.dev
出力例
"event-display" deleted
-
オプション:Apache Kafkaトピックを削除します。
kubectl delete -f kafka-topic.yaml
出力例
kafkatopic.kafka.strimzi.io "knative-demo-topic" deleted