Apache Kafka用Knativeブローカー¶
Apache Kafka用Knativeブローカーは、ネットワークホップを削減し、ブローカーとトリガーAPIモデルのApache Kafkaとのより良い統合を提供するために、ネイティブにApache KafkaをターゲットとするKnativeブローカーAPIの実装です。
注目すべき機能は次のとおりです。
- コントロールプレーン高可用性
- 水平方向にスケーラブルなデータプレーン
- 広範囲に設定可能
- CloudEventsパーティショニング拡張機能に基づいたイベントの順序付き配信
- 互換性マトリックスを参照してください。任意のKafkaバージョンをサポートします。
- 2つのデータプレーンモードをサポートします。名前空間ごとのデータプレーン分離または共有データプレーン。
Knative Kafkaブローカーは、バイナリコンテンツモードを使用して、受信したCloudEventsをKafkaレコードとして保存します。これは、トランスポートやルーティングの最適化により効率的であるため、JSONパースを回避できるためです。バイナリコンテンツモード
を使用すると、すべてのCloudEvent属性と拡張機能はKafkaレコードのヘッダーにマップされ、CloudEventのdata
はKafkaレコードの実際の値に対応します。これは、CloudEventsを理解していないシステムとの互換性を維持するために、邪魔にならないため、構造化コンテンツモード
よりもバイナリコンテンツモード
を使用することのもう1つの利点です。
前提条件¶
- Knative Eventingをインストール済みであること。
- Apache Kafkaクラスタにアクセスできること。
ヒント
Kafkaクラスタを設定する必要がある場合は、Strimziクイックスタートページの手順に従って実行できます。
インストール¶
-
次のコマンドを入力してKafkaコントローラーをインストールします。
kubectl apply --filename https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-controller.yaml
-
次のコマンドを入力してKafkaブローカーデータプレーンをインストールします。
kubectl apply --filename https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-broker.yaml
-
次のコマンドを入力して、
kafka-controller
、kafka-broker-receiver
、kafka-broker-dispatcher
が実行されていることを確認します。kubectl get deployments.apps -n knative-eventing
出力例
NAME READY UP-TO-DATE AVAILABLE AGE eventing-controller 1/1 1 1 10s eventing-webhook 1/1 1 1 9s kafka-controller 1/1 1 1 3s kafka-broker-dispatcher 1/1 1 1 4s kafka-broker-receiver 1/1 1 1 5s
Kafkaブローカーの作成¶
Kafkaブローカーオブジェクトは次のようになります。
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
# case-sensitive
eventing.knative.dev/broker.class: Kafka
# Optional annotation to point to an externally managed kafka topic:
# kafka.eventing.knative.dev/external.topic: <topic-name>
name: default
namespace: default
spec:
# Configuration specific to this broker.
config:
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
# Optional dead letter sink, you can specify either:
# - deadLetterSink.ref, which is a reference to a Callable
# - deadLetterSink.uri, which is an absolute URI to a Callable (It can potentially be out of the Kubernetes cluster)
delivery:
deadLetterSink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: dlq-service
Kafkaブローカーの設定¶
spec.config
は、次の様な任意のnamespace
内の任意のConfigMap
を参照する必要があります。
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-broker-config
namespace: knative-eventing
data:
# Number of topic partitions
default.topic.partitions: "10"
# Replication factor of topic messages.
default.topic.replication.factor: "3"
# A comma separated list of bootstrap servers. (It can be in or out the k8s cluster)
bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"
このConfigMap
は、クラスタのKnative Eventing SYSTEM_NAMESPACE
にインストールされます。必要に応じてグローバル設定を編集できます。異なるnamespace
で異なるname
を持つ別のConfigMap
を参照することにより、ブローカーごとにこれらの設定をオーバーライドすることもできます。
注記
default.topic.replication.factor
の値は、クラスタ内のKafkaブローカーインスタンスの数以下である必要があります。たとえば、Kafkaブローカーが1つしかない場合は、default.topic.replication.factor
の値を1
より大きくすることはできません。
Knativeは、使用しているKafkaのバージョンのサポートするトピック設定オプションの完全なセットをサポートしています。これらの設定を行うには、default.topic.config.
プレフィックスを使用してキーをconfigmapに追加する必要があります。たとえば、retention.ms
の値を設定するには、ConfigMap
を次のように変更します。
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-broker-config
namespace: knative-eventing
data:
# Number of topic partitions
default.topic.partitions: "10"
# Replication factor of topic messages.
default.topic.replication.factor: "3"
# A comma separated list of bootstrap servers. (It can be in or out the k8s cluster)
bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"
# Here is our retention.ms config
default.topic.config.retention.ms: "3600"
デフォルトのブローカー実装として設定する¶
Knativeデプロイメント内のすべてのブローカーのデフォルト実装としてKafkaブローカーを設定するには、knative-eventing
名前空間のconfig-br-defaults
ConfigMapを変更してグローバル設定を適用できます。
これにより、metadata.annotations.eventing.knative.dev/broker.class
やspec.config
など、個々の設定や名前空間ごとの設定を行う必要がなくなります。
次のYAMLは、Kafkaブローカーをデフォルトの実装として使用するconfig-br-defaults
ConfigMapの例です。
apiVersion: v1
kind: ConfigMap
metadata:
name: config-br-defaults
namespace: knative-eventing
data:
default-br-config: |
clusterDefault:
brokerClass: Kafka
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
namespaceDefaults:
namespace1:
brokerClass: Kafka
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
namespace2:
brokerClass: Kafka
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
セキュリティ¶
Apache Kafkaはさまざまなセキュリティ機能をサポートしており、Knativeは次の機能をサポートしています。
セキュリティ機能を有効にするには、broker.spec.config
によって参照されるConfigMap
で、Secret
を参照できます。
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-broker-config
namespace: knative-eventing
data:
# Other configurations
# ...
# Reference a Secret called my_secret
auth.secret.ref.name: my_secret
Secret
my_secret
は、broker.spec.config
によって参照されるConfigMap
と同じ名前空間(この場合はknative-eventing
)に存在する必要があります。
注記
証明書とキーはPEM
形式である必要があります。
SASLを使用した認証¶
Knativeは次のSASLメカニズムをサポートしています。
PLAIN
SCRAM-SHA-256
SCRAM-SHA-512
特定のSASLメカニズムを使用するには、<sasl_mechanism>
を目的のメカニズムに置き換えます。
暗号化なしでSASLを使用した認証¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SASL_PLAINTEXT \
--from-literal=sasl.mechanism=<sasl_mechanism> \
--from-literal=user=<my_user> \
--from-literal=password=<my_password>
SSLを使用した暗号化とSASLを使用した認証¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SASL_SSL \
--from-literal=sasl.mechanism=<sasl_mechanism> \
--from-file=ca.crt=caroot.pem \
--from-literal=user=<my_user> \
--from-literal=password=<my_password>
クライアント認証なしでSSLを使用した暗号化¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SSL \
--from-file=ca.crt=<my_caroot.pem_file_path> \
--from-literal=user.skip=true
SSLを使用した認証と暗号化¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SSL \
--from-file=ca.crt=<my_caroot.pem_file_path> \
--from-file=user.crt=<my_cert.pem_file_path> \
--from-file=user.key=<my_key.pem_file_path>
注記
システムのルートCAセットを使用するには、ca.crt
を省略できます。
独自のトピックを使用する¶
デフォルトでは、Knative Kafkaブローカーは独自の内部トピックを作成しますが、kafka.eventing.knative.dev/external.topic
アノテーションを使用して、外部で管理されているトピックを指定することもできます。
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
# case-sensitive
eventing.knative.dev/broker.class: Kafka
kafka.eventing.knative.dev/external.topic: <my-topic-name>
name: default
namespace: default
spec:
# other spec fields ...
注記
外部トピックを使用する場合、Knative Kafkaブローカーはトピックを所有せず、トピックの管理は行いません。これには、トピックのライフサイクルや一般的な有効性も含まれます。トピックへの一般的なアクセスに関するその他の制限が適用される場合があります。アクセス制御リスト(ACL)の使用に関するドキュメントを参照してください。
コンシューマーオフセットコミット間隔¶
Kafkaコンシューマーは、オフセットをコミットすることで、最後に正常に送信されたイベントを追跡します。
Knative Kafkaブローカーは、auto.commit.interval.ms
ミリ秒ごとにオフセットをコミットします。
注記
パフォーマンスへの悪影響を防ぐために、イベントがサブスクライバーに正常に送信されるたびにオフセットをコミットすることはお勧めしません。
この間隔は、knative-eventing
名前空間のconfig-kafka-broker-data-plane
ConfigMap
のパラメーターauto.commit.interval.ms
を次のように変更して変更できます。
apiVersion: v1
kind: ConfigMap
metadata:
name: config-kafka-broker-data-plane
namespace: knative-eventing
data:
# Some configurations omitted ...
config-kafka-broker-consumer.properties: |
# Some configurations omitted ...
# Commit the offset every 5000 millisecods (5 seconds)
auto.commit.interval.ms=5000
注記
Knative Kafkaブローカーは少なくとも一度の配信を保証しますが、これによりアプリケーションが重複イベントを受信する可能性があります。コミット間隔が長いほど、コンシューマーが再開するときに最後のコミットされたオフセットから再開するため、重複イベントを受信する可能性が高くなります。
Kafkaプロデューサーとコンシューマーの設定¶
Knativeは、ワークロードに合わせて変更できる、利用可能なKafkaプロデューサーとコンシューマーのすべての構成を公開します。
これらの構成は、knative-eventing
名前空間内のconfig-kafka-broker-data-plane
ConfigMap
を変更することで変更できます。
このConfigMap
で使用可能な設定に関するドキュメントは、Apache Kafkaウェブサイト、特にプロデューサー設定とコンシューマー設定に掲載されています。
データプレーンコンポーネントのデバッグログを有効にする¶
次のYAMLは、インストール手順中に作成される、データプレーンコンポーネントのデフォルトのログ構成を示しています。
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-config-logging
namespace: knative-eventing
data:
config.xml: |
<configuration>
<appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
</appender>
<root level="INFO">
<appender-ref ref="jsonConsoleAppender"/>
</root>
</configuration>
ログレベルをDEBUG
に変更するには、以下の手順に従ってください。
-
以下の
kafka-config-logging
ConfigMap
を適用するか、ConfigMap
kafka-config-logging
のlevel="INFO"
をlevel="DEBUG"
に置き換えます。apiVersion: v1 kind: ConfigMap metadata: name: kafka-config-logging namespace: knative-eventing data: config.xml: | <configuration> <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="net.logstash.logback.encoder.LogstashEncoder"/> </appender> <root level="DEBUG"> <appender-ref ref="jsonConsoleAppender"/> </root> </configuration>
-
次のコマンドを入力して、
kafka-broker-receiver
とkafka-broker-dispatcher
を再起動します。kubectl rollout restart deployment -n knative-eventing kafka-broker-receiver kubectl rollout restart deployment -n knative-eventing kafka-broker-dispatcher
配信イベントの順序の設定¶
イベントのディスパッチ時に、Kafkaブローカーは異なる配信順序保証をサポートするように設定できます。
Trigger
オブジェクトのkafka.eventing.knative.dev/delivery.order
アノテーションを使用して、イベントの配信順序を設定できます。
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: my-service-trigger
annotations:
kafka.eventing.knative.dev/delivery.order: ordered
spec:
broker: my-kafka-broker
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: my-service
サポートされているコンシューマー配信保証は以下のとおりです。
unordered
:非順序コンシューマーは、非ブロッキングコンシューマーであり、メッセージを順序付けせずに配信しますが、適切なオフセット管理を維持します。並列消費の需要が高く、明示的な順序付けが不要な場合に役立ちます。クリック分析の処理などがその例です。ordered
:順序付きコンシューマーは、パーティションごとのブロッキングコンシューマーであり、CloudEventサブスクライバーからの成功した応答を待つまで、そのパーティションの次のメッセージを配信しません。より厳格な順序付けが必要な場合、またはイベント間に関係またはグループ化がある場合に役立ちます。顧客注文の処理などがその例です。
デフォルトの順序保証はunordered
配信です。
データプレーン分離と共有データプレーン¶
Knative Kafka Brokerの実装には、コントロールプレーンとデータプレーンの2つのプレーンがあります。コントロールプレーンは、Kubernetes APIと通信し、カスタムオブジェクトを監視し、データプレーンを管理するコントローラーで構成されています。
データプレーンは、着信イベントをリッスンし、Apache Kafkaと通信し、イベントをイベントシンクに送信するコンポーネントのコレクションです。これがイベントの流れる場所です。Knative Kafka Brokerデータプレーンは、kafka-broker-receiver
とkafka-broker-dispatcher
デプロイメントで構成されています。
Kafka
ブローカークラスを使用する場合、Knative Kafka Brokerは共有データプレーンを使用します。つまり、knative-eventing
名前空間内のkafka-broker-receiver
とkafka-broker-dispatcher
デプロイメントは、クラスタ内のすべてのKafkaブローカーで使用されます。
ただし、KafkaNamespaced
がブローカークラスとして設定されている場合、Kafkaブローカーコントローラーは、ブローカーが存在する各名前空間に新しいデータプレーンを作成します。このデータプレーンは、その名前空間内のすべてのKafkaNamespaced
ブローカーによって使用されます。
これにより、データプレーン間の分離が提供されます。つまり、ユーザー名前空間内のkafka-broker-receiver
とkafka-broker-dispatcher
デプロイメントはその名前空間内のブローカーに対してのみ使用されます。
注記
個別のデータプレーンという結果として、このセキュリティ機能はより多くのデプロイメントを作成し、より多くのリソースを使用します。このような分離要件がない限り、Kafka
クラスの通常のブローカーを使用することをお勧めします。
KafkaNamespaced
ブローカーを作成するには、eventing.knative.dev/broker.class
アノテーションをKafkaNamespaced
に設定する必要があります。
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
# case-sensitive
eventing.knative.dev/broker.class: KafkaNamespaced
name: default
namespace: my-namespace
spec:
config:
# the referenced `configmap` must be in the same namespace with the `Broker` object, in this case `my-namespace`
apiVersion: v1
kind: ConfigMap
name: my-config
# namespace: my-namespace # no need to define, defaults to Broker's namespace
注記
spec.config
で指定されたconfigmap
は、Broker
オブジェクトと同じ名前空間に**存在する**必要があります。
apiVersion: v1
kind: ConfigMap
metadata:
name: my-config
namespace: my-namespace
data:
...
KafkaNamespaced
クラスの最初のBroker
が作成されると、kafka-broker-receiver
とkafka-broker-dispatcher
のデプロイメントがその名前空間に作成されます。その後、同じ名前空間内のKafkaNamespaced
クラスのすべてのブローカーは同じデータプレーンを使用します。名前空間にKafkaNamespaced
クラスのブローカーが存在しない場合、その名前空間内のデータプレーンは削除されます。
KafkaNamespaced
ブローカーの設定¶
Kafka
ブローカークラスで使用可能なすべての構成メカニズムは、これらの例外を除いて、KafkaNamespaced
クラスのブローカーでも使用できます。
- このページでは、
knative-eventing
名前空間内のconfig-kafka-broker-data-plane
configmapを変更することで、プロデューサーとコンシューマーの設定を行う方法について説明しています。Kafkaブローカーコントローラーは、このconfigmapをユーザー名前空間に伝播するため、現在、名前空間ごとにプロデューサーとコンシューマーの設定を行う方法はありません。knative-eventing
名前空間内のconfig-kafka-broker-data-plane
ConfigMap
に設定された値は、ユーザー名前空間でも使用されます。 - 同じ伝播のため、名前空間ごとにコンシューマーオフセットコミット間隔を設定することもできません。
- さらにいくつかのconfigmap(
config-tracing
とkafka-config-logging
)が伝播されます。つまり、トレーシングとロギングも名前空間ごとに設定できません。 - 同様に、データプレーンデプロイメントは
knative-eventing
名前空間からユーザー名前空間に伝播されます。つまり、データプレーンデプロイメントは名前空間ごとに設定できず、knative-eventing
名前空間のものと同じになります。
KEDAによるトリガーの自動スケーリングの有効化と設定¶
KEDAを使用してKafkaブローカーを参照するトリガーの自動スケーリングを有効化および設定するには、ここにある手順に従ってください。
追加情報¶
- バグの報告または機能のリクエストを行うには、eventing-kafka-brokerリポジトリで問題を開いてください。