Apache Kafka用Knative Sink¶
KafkaSink
は、受信したCloudEventを設定可能なApache Kafkaトピックに永続化するApache KafkaネイティブのSink実装です。このページでは、KnativeのKafkaSink
をインストールおよび設定する方法について説明します。
前提条件¶
Knative EventingがインストールされたKubernetesクラスターにアクセスできる必要があります。
インストール¶
-
Kafkaコントローラーをインストールします
kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-controller.yaml
-
KafkaSinkデータプレーンをインストールします
kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-sink.yaml
-
kafka-controller
とkafka-sink-receiver
のデプロイメントが実行されていることを確認しますkubectl get deployments.apps -n knative-eventing
出力例
NAME READY UP-TO-DATE AVAILABLE AGE eventing-controller 1/1 1 1 10s eventing-webhook 1/1 1 1 9s kafka-controller 1/1 1 1 3s kafka-sink-receiver 1/1 1 1 5s
KafkaSinkの例¶
KafkaSinkオブジェクトは次のようになります。
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: my-kafka-sink
namespace: default
spec:
topic: mytopic
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
出力トピックコンテンツモード¶
CloudEvent仕様では、CloudEventを転送するための2つのモード(構造化モードとバイナリモード)が定義されています。
「構造化モードメッセージ」とは、イベントがスタンドアロンのイベント形式を使用して完全にエンコードされ、メッセージ本文に格納されるメッセージのことです。
構造化コンテンツモードでは、イベントメタデータとデータがペイロード内に一緒に保持されるため、同じイベントを複数のルーティングホップや複数のプロトコルにわたって簡単に転送できます。
「バイナリモードメッセージ」とは、イベントデータがメッセージ本文に格納され、イベント属性がメッセージメタデータの一部として格納されるメッセージのことです。
バイナリコンテンツモードは、あらゆる形状のイベントデータに対応でき、トランスコードの必要がなく、効率的な転送が可能です。
contentMode
が指定されたKafkaSinkオブジェクトは次のようになります。
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: my-kafka-sink
namespace: default
spec:
topic: mytopic
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
# CloudEvent content mode of Kafka messages sent to the topic.
# Possible values:
# - structured
# - binary
#
# default: binary.
#
# CloudEvent spec references:
# - https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md#message
# - https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/bindings/kafka-protocol-binding.md#33-structured-content-mode
# - https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/bindings/kafka-protocol-binding.md#32-binary-content-mode
contentMode: binary # or structured
セキュリティ¶
Knativeは、以下のApache Kafkaセキュリティ機能をサポートしています
セキュリティ機能の有効化¶
セキュリティ機能を有効にするには、KafkaSink仕様でシークレットを参照できます
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: my-kafka-sink
namespace: default
spec:
topic: mytopic
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
auth:
secret:
ref:
name: my_secret
注意
シークレットmy_secret
は、KafkaSinkと同じ名前空間に存在する必要があります。証明書と鍵はPEM
形式である必要があります。
SASLを使用した認証¶
Knativeは、次のSASLメカニズムをサポートしています
PLAIN
SCRAM-SHA-256
SCRAM-SHA-512
特定のSASLメカニズムを使用するには、<sasl_mechanism>
を任意のメカニズムに置き換えてください。
暗号化なしのSASLを使用した認証¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SASL_PLAINTEXT \
--from-literal=sasl.mechanism=<sasl_mechanism> \
--from-literal=user=<my_user> \
--from-literal=password=<my_password>
SASLを使用した認証とSSLを使用した暗号化¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SASL_SSL \
--from-literal=sasl.mechanism=<sasl_mechanism> \
--from-file=ca.crt=caroot.pem \
--from-literal=user=<my_user> \
--from-literal=password=<my_password>
クライアント認証なしのSSLを使用した暗号化¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SSL \
--from-file=ca.crt=<my_caroot.pem_file_path> \
--from-literal=user.skip=true
SSLを使用した認証と暗号化¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SSL \
--from-file=ca.crt=<my_caroot.pem_file_path> \
--from-file=user.crt=<my_cert.pem_file_path> \
--from-file=user.key=<my_key.pem_file_path>
注意
ca.crt
を省略して、フォールバックを有効にし、システムのルートCAセットを使用できます。
Kafkaプロデューサーの設定¶
Kafkaプロデューサーは、Apache Kafkaクラスターにイベントを送信する役割を担うコンポーネントです。クラスター内のKafkaプロデューサーの設定を変更するには、knative-eventing
名前空間のconfig-kafka-sink-data-plane
ConfigMapを変更します。
このConfigMapで使用可能な設定に関するドキュメントは、Apache Kafka Webサイト、特にプロデューサー設定で入手できます。
データプレーンコンポーネントのデバッグログの有効化¶
データプレーンコンポーネントのデバッグログを有効にするには、kafka-config-logging
ConfigMapのログレベルをDEBUG
に変更します。
-
次の内容を含むYAMLファイルとして
kafka-config-logging
ConfigMapを作成しますapiVersion: v1 kind: ConfigMap metadata: name: kafka-config-logging namespace: knative-eventing data: config.xml: | <configuration> <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="net.logstash.logback.encoder.LogstashEncoder"/> </appender> <root level="DEBUG"> <appender-ref ref="jsonConsoleAppender"/> </root> </configuration>
-
次のコマンドを実行してYAMLファイルを適用します
ここで、kubectl apply -f <filename>.yaml
<filename>
は前の手順で作成したファイルの名前です。 -
kafka-sink-receiver
を再起動しますkubectl rollout restart deployment -n knative-eventing kafka-sink-receiver