Apache Kafka用Knativeブローカー¶
Apache Kafka用Knativeブローカーは、ネットワークホップを削減し、ブローカーとトリガーAPIモデルのApache Kafkaとのより良い統合を提供するために、ネイティブにApache KafkaをターゲットとするKnativeブローカーAPIの実装です。
注目すべき機能は次のとおりです。
- コントロールプレーン高可用性
- 水平方向にスケーラブルなデータプレーン
- 広範囲に設定可能
- CloudEventsパーティショニング拡張機能に基づいたイベントの順序付き配信
- 互換性マトリックスを参照してください。任意のKafkaバージョンをサポートします。
- 2つのデータプレーンモードをサポートします。名前空間ごとのデータプレーン分離または共有データプレーン。
Knative Kafkaブローカーは、バイナリコンテンツモードを使用して、受信したCloudEventsをKafkaレコードとして保存します。これは、トランスポートやルーティングの最適化により効率的であるため、JSONパースを回避できるためです。バイナリコンテンツモードを使用すると、すべてのCloudEvent属性と拡張機能はKafkaレコードのヘッダーにマップされ、CloudEventのdataはKafkaレコードの実際の値に対応します。これは、CloudEventsを理解していないシステムとの互換性を維持するために、邪魔にならないため、構造化コンテンツモードよりもバイナリコンテンツモードを使用することのもう1つの利点です。
前提条件¶
- Knative Eventingをインストール済みであること。
- Apache Kafkaクラスタにアクセスできること。
ヒント
Kafkaクラスタを設定する必要がある場合は、Strimziクイックスタートページの手順に従って実行できます。
インストール¶
-
次のコマンドを入力してKafkaコントローラーをインストールします。
kubectl apply --filename https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-controller.yaml -
次のコマンドを入力してKafkaブローカーデータプレーンをインストールします。
kubectl apply --filename https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-broker.yaml -
次のコマンドを入力して、
kafka-controller、kafka-broker-receiver、kafka-broker-dispatcherが実行されていることを確認します。kubectl get deployments.apps -n knative-eventing出力例
NAME READY UP-TO-DATE AVAILABLE AGE eventing-controller 1/1 1 1 10s eventing-webhook 1/1 1 1 9s kafka-controller 1/1 1 1 3s kafka-broker-dispatcher 1/1 1 1 4s kafka-broker-receiver 1/1 1 1 5s
Kafkaブローカーの作成¶
Kafkaブローカーオブジェクトは次のようになります。
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
# case-sensitive
eventing.knative.dev/broker.class: Kafka
# Optional annotation to point to an externally managed kafka topic:
# kafka.eventing.knative.dev/external.topic: <topic-name>
name: default
namespace: default
spec:
# Configuration specific to this broker.
config:
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
# Optional dead letter sink, you can specify either:
# - deadLetterSink.ref, which is a reference to a Callable
# - deadLetterSink.uri, which is an absolute URI to a Callable (It can potentially be out of the Kubernetes cluster)
delivery:
deadLetterSink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: dlq-service
Kafkaブローカーの設定¶
spec.configは、次の様な任意のnamespace内の任意のConfigMapを参照する必要があります。
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-broker-config
namespace: knative-eventing
data:
# Number of topic partitions
default.topic.partitions: "10"
# Replication factor of topic messages.
default.topic.replication.factor: "3"
# A comma separated list of bootstrap servers. (It can be in or out the k8s cluster)
bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"
このConfigMapは、クラスタのKnative Eventing SYSTEM_NAMESPACEにインストールされます。必要に応じてグローバル設定を編集できます。異なるnamespaceで異なるnameを持つ別のConfigMapを参照することにより、ブローカーごとにこれらの設定をオーバーライドすることもできます。
注記
default.topic.replication.factorの値は、クラスタ内のKafkaブローカーインスタンスの数以下である必要があります。たとえば、Kafkaブローカーが1つしかない場合は、default.topic.replication.factorの値を1より大きくすることはできません。
Knativeは、使用しているKafkaのバージョンのサポートするトピック設定オプションの完全なセットをサポートしています。これらの設定を行うには、default.topic.config.プレフィックスを使用してキーをconfigmapに追加する必要があります。たとえば、retention.msの値を設定するには、ConfigMapを次のように変更します。
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-broker-config
namespace: knative-eventing
data:
# Number of topic partitions
default.topic.partitions: "10"
# Replication factor of topic messages.
default.topic.replication.factor: "3"
# A comma separated list of bootstrap servers. (It can be in or out the k8s cluster)
bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"
# Here is our retention.ms config
default.topic.config.retention.ms: "3600"
デフォルトのブローカー実装として設定する¶
Knativeデプロイメント内のすべてのブローカーのデフォルト実装としてKafkaブローカーを設定するには、knative-eventing名前空間のconfig-br-defaults ConfigMapを変更してグローバル設定を適用できます。
これにより、metadata.annotations.eventing.knative.dev/broker.classやspec.configなど、個々の設定や名前空間ごとの設定を行う必要がなくなります。
次のYAMLは、Kafkaブローカーをデフォルトの実装として使用するconfig-br-defaults ConfigMapの例です。
apiVersion: v1
kind: ConfigMap
metadata:
name: config-br-defaults
namespace: knative-eventing
data:
default-br-config: |
clusterDefault:
brokerClass: Kafka
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
namespaceDefaults:
namespace1:
brokerClass: Kafka
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
namespace2:
brokerClass: Kafka
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
セキュリティ¶
Apache Kafkaはさまざまなセキュリティ機能をサポートしており、Knativeは次の機能をサポートしています。
セキュリティ機能を有効にするには、broker.spec.configによって参照されるConfigMapで、Secretを参照できます。
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-broker-config
namespace: knative-eventing
data:
# Other configurations
# ...
# Reference a Secret called my_secret
auth.secret.ref.name: my_secret
Secret my_secretは、broker.spec.configによって参照されるConfigMapと同じ名前空間(この場合はknative-eventing)に存在する必要があります。
注記
証明書とキーはPEM形式である必要があります。
SASLを使用した認証¶
Knativeは次のSASLメカニズムをサポートしています。
PLAINSCRAM-SHA-256SCRAM-SHA-512
特定のSASLメカニズムを使用するには、<sasl_mechanism>を目的のメカニズムに置き換えます。
暗号化なしでSASLを使用した認証¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SASL_PLAINTEXT \
--from-literal=sasl.mechanism=<sasl_mechanism> \
--from-literal=user=<my_user> \
--from-literal=password=<my_password>
SSLを使用した暗号化とSASLを使用した認証¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SASL_SSL \
--from-literal=sasl.mechanism=<sasl_mechanism> \
--from-file=ca.crt=caroot.pem \
--from-literal=user=<my_user> \
--from-literal=password=<my_password>
クライアント認証なしでSSLを使用した暗号化¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SSL \
--from-file=ca.crt=<my_caroot.pem_file_path> \
--from-literal=user.skip=true
SSLを使用した認証と暗号化¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SSL \
--from-file=ca.crt=<my_caroot.pem_file_path> \
--from-file=user.crt=<my_cert.pem_file_path> \
--from-file=user.key=<my_key.pem_file_path>
注記
システムのルートCAセットを使用するには、ca.crtを省略できます。
独自のトピックを使用する¶
デフォルトでは、Knative Kafkaブローカーは独自の内部トピックを作成しますが、kafka.eventing.knative.dev/external.topicアノテーションを使用して、外部で管理されているトピックを指定することもできます。
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
# case-sensitive
eventing.knative.dev/broker.class: Kafka
kafka.eventing.knative.dev/external.topic: <my-topic-name>
name: default
namespace: default
spec:
# other spec fields ...
注記
外部トピックを使用する場合、Knative Kafkaブローカーはトピックを所有せず、トピックの管理は行いません。これには、トピックのライフサイクルや一般的な有効性も含まれます。トピックへの一般的なアクセスに関するその他の制限が適用される場合があります。アクセス制御リスト(ACL)の使用に関するドキュメントを参照してください。
コンシューマーオフセットコミット間隔¶
Kafkaコンシューマーは、オフセットをコミットすることで、最後に正常に送信されたイベントを追跡します。
Knative Kafkaブローカーは、auto.commit.interval.msミリ秒ごとにオフセットをコミットします。
注記
パフォーマンスへの悪影響を防ぐために、イベントがサブスクライバーに正常に送信されるたびにオフセットをコミットすることはお勧めしません。
この間隔は、knative-eventing名前空間のconfig-kafka-broker-data-plane ConfigMapのパラメーターauto.commit.interval.msを次のように変更して変更できます。
apiVersion: v1
kind: ConfigMap
metadata:
name: config-kafka-broker-data-plane
namespace: knative-eventing
data:
# Some configurations omitted ...
config-kafka-broker-consumer.properties: |
# Some configurations omitted ...
# Commit the offset every 5000 millisecods (5 seconds)
auto.commit.interval.ms=5000
注記
Knative Kafkaブローカーは少なくとも一度の配信を保証しますが、これによりアプリケーションが重複イベントを受信する可能性があります。コミット間隔が長いほど、コンシューマーが再開するときに最後のコミットされたオフセットから再開するため、重複イベントを受信する可能性が高くなります。
Kafkaプロデューサーとコンシューマーの設定¶
Knativeは、ワークロードに合わせて変更できる、利用可能なKafkaプロデューサーとコンシューマーのすべての構成を公開します。
これらの構成は、knative-eventing名前空間内のconfig-kafka-broker-data-plane ConfigMapを変更することで変更できます。
このConfigMapで使用可能な設定に関するドキュメントは、Apache Kafkaウェブサイト、特にプロデューサー設定とコンシューマー設定に掲載されています。
データプレーンコンポーネントのデバッグログを有効にする¶
次のYAMLは、インストール手順中に作成される、データプレーンコンポーネントのデフォルトのログ構成を示しています。
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-config-logging
namespace: knative-eventing
data:
config.xml: |
<configuration>
<appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
</appender>
<root level="INFO">
<appender-ref ref="jsonConsoleAppender"/>
</root>
</configuration>
ログレベルをDEBUGに変更するには、以下の手順に従ってください。
-
以下の
kafka-config-loggingConfigMapを適用するか、ConfigMapkafka-config-loggingのlevel="INFO"をlevel="DEBUG"に置き換えます。apiVersion: v1 kind: ConfigMap metadata: name: kafka-config-logging namespace: knative-eventing data: config.xml: | <configuration> <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="net.logstash.logback.encoder.LogstashEncoder"/> </appender> <root level="DEBUG"> <appender-ref ref="jsonConsoleAppender"/> </root> </configuration> -
次のコマンドを入力して、
kafka-broker-receiverとkafka-broker-dispatcherを再起動します。kubectl rollout restart deployment -n knative-eventing kafka-broker-receiver kubectl rollout restart deployment -n knative-eventing kafka-broker-dispatcher
配信イベントの順序の設定¶
イベントのディスパッチ時に、Kafkaブローカーは異なる配信順序保証をサポートするように設定できます。
Triggerオブジェクトのkafka.eventing.knative.dev/delivery.orderアノテーションを使用して、イベントの配信順序を設定できます。
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: my-service-trigger
annotations:
kafka.eventing.knative.dev/delivery.order: ordered
spec:
broker: my-kafka-broker
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: my-service
サポートされているコンシューマー配信保証は以下のとおりです。
unordered:非順序コンシューマーは、非ブロッキングコンシューマーであり、メッセージを順序付けせずに配信しますが、適切なオフセット管理を維持します。並列消費の需要が高く、明示的な順序付けが不要な場合に役立ちます。クリック分析の処理などがその例です。ordered:順序付きコンシューマーは、パーティションごとのブロッキングコンシューマーであり、CloudEventサブスクライバーからの成功した応答を待つまで、そのパーティションの次のメッセージを配信しません。より厳格な順序付けが必要な場合、またはイベント間に関係またはグループ化がある場合に役立ちます。顧客注文の処理などがその例です。
デフォルトの順序保証はunordered配信です。
データプレーン分離と共有データプレーン¶
Knative Kafka Brokerの実装には、コントロールプレーンとデータプレーンの2つのプレーンがあります。コントロールプレーンは、Kubernetes APIと通信し、カスタムオブジェクトを監視し、データプレーンを管理するコントローラーで構成されています。
データプレーンは、着信イベントをリッスンし、Apache Kafkaと通信し、イベントをイベントシンクに送信するコンポーネントのコレクションです。これがイベントの流れる場所です。Knative Kafka Brokerデータプレーンは、kafka-broker-receiverとkafka-broker-dispatcherデプロイメントで構成されています。
Kafkaブローカークラスを使用する場合、Knative Kafka Brokerは共有データプレーンを使用します。つまり、knative-eventing名前空間内のkafka-broker-receiverとkafka-broker-dispatcherデプロイメントは、クラスタ内のすべてのKafkaブローカーで使用されます。
ただし、KafkaNamespacedがブローカークラスとして設定されている場合、Kafkaブローカーコントローラーは、ブローカーが存在する各名前空間に新しいデータプレーンを作成します。このデータプレーンは、その名前空間内のすべてのKafkaNamespacedブローカーによって使用されます。
これにより、データプレーン間の分離が提供されます。つまり、ユーザー名前空間内のkafka-broker-receiverとkafka-broker-dispatcherデプロイメントはその名前空間内のブローカーに対してのみ使用されます。
注記
個別のデータプレーンという結果として、このセキュリティ機能はより多くのデプロイメントを作成し、より多くのリソースを使用します。このような分離要件がない限り、Kafkaクラスの通常のブローカーを使用することをお勧めします。
KafkaNamespacedブローカーを作成するには、eventing.knative.dev/broker.classアノテーションをKafkaNamespacedに設定する必要があります。
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
# case-sensitive
eventing.knative.dev/broker.class: KafkaNamespaced
name: default
namespace: my-namespace
spec:
config:
# the referenced `configmap` must be in the same namespace with the `Broker` object, in this case `my-namespace`
apiVersion: v1
kind: ConfigMap
name: my-config
# namespace: my-namespace # no need to define, defaults to Broker's namespace
注記
spec.configで指定されたconfigmapは、Brokerオブジェクトと同じ名前空間に**存在する**必要があります。
apiVersion: v1
kind: ConfigMap
metadata:
name: my-config
namespace: my-namespace
data:
...
KafkaNamespacedクラスの最初のBrokerが作成されると、kafka-broker-receiverとkafka-broker-dispatcherのデプロイメントがその名前空間に作成されます。その後、同じ名前空間内のKafkaNamespacedクラスのすべてのブローカーは同じデータプレーンを使用します。名前空間にKafkaNamespacedクラスのブローカーが存在しない場合、その名前空間内のデータプレーンは削除されます。
KafkaNamespacedブローカーの設定¶
Kafkaブローカークラスで使用可能なすべての構成メカニズムは、これらの例外を除いて、KafkaNamespacedクラスのブローカーでも使用できます。
- このページでは、
knative-eventing名前空間内のconfig-kafka-broker-data-planeconfigmapを変更することで、プロデューサーとコンシューマーの設定を行う方法について説明しています。Kafkaブローカーコントローラーは、このconfigmapをユーザー名前空間に伝播するため、現在、名前空間ごとにプロデューサーとコンシューマーの設定を行う方法はありません。knative-eventing名前空間内のconfig-kafka-broker-data-planeConfigMapに設定された値は、ユーザー名前空間でも使用されます。 - 同じ伝播のため、名前空間ごとにコンシューマーオフセットコミット間隔を設定することもできません。
- さらにいくつかのconfigmap(
config-tracingとkafka-config-logging)が伝播されます。つまり、トレーシングとロギングも名前空間ごとに設定できません。 - 同様に、データプレーンデプロイメントは
knative-eventing名前空間からユーザー名前空間に伝播されます。つまり、データプレーンデプロイメントは名前空間ごとに設定できず、knative-eventing名前空間のものと同じになります。
KEDAによるトリガーの自動スケーリングの有効化と設定¶
KEDAを使用してKafkaブローカーを参照するトリガーの自動スケーリングを有効化および設定するには、ここにある手順に従ってください。
追加情報¶
- バグの報告または機能のリクエストを行うには、eventing-kafka-brokerリポジトリで問題を開いてください。