コンテンツへスキップ

Apache Kafka用Knativeブローカー

Apache Kafka用Knativeブローカーは、ネットワークホップを削減し、ブローカーとトリガーAPIモデルのApache Kafkaとのより良い統合を提供するために、ネイティブにApache KafkaをターゲットとするKnativeブローカーAPIの実装です。

注目すべき機能は次のとおりです。

Knative Kafkaブローカーは、バイナリコンテンツモードを使用して、受信したCloudEventsをKafkaレコードとして保存します。これは、トランスポートやルーティングの最適化により効率的であるため、JSONパースを回避できるためです。バイナリコンテンツモードを使用すると、すべてのCloudEvent属性と拡張機能はKafkaレコードのヘッダーにマップされ、CloudEventのdataはKafkaレコードの実際の値に対応します。これは、CloudEventsを理解していないシステムとの互換性を維持するために、邪魔にならないため、構造化コンテンツモードよりもバイナリコンテンツモードを使用することのもう1つの利点です。

前提条件

  1. Knative Eventingをインストール済みであること。
  2. Apache Kafkaクラスタにアクセスできること。

ヒント

Kafkaクラスタを設定する必要がある場合は、Strimziクイックスタートページの手順に従って実行できます。

インストール

  1. 次のコマンドを入力してKafkaコントローラーをインストールします。

    kubectl apply --filename https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-controller.yaml
    
  2. 次のコマンドを入力してKafkaブローカーデータプレーンをインストールします。

    kubectl apply --filename https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-broker.yaml
    
  3. 次のコマンドを入力して、kafka-controllerkafka-broker-receiverkafka-broker-dispatcherが実行されていることを確認します。

    kubectl get deployments.apps -n knative-eventing
    

    出力例

    NAME                           READY   UP-TO-DATE   AVAILABLE   AGE
    eventing-controller            1/1     1            1           10s
    eventing-webhook               1/1     1            1           9s
    kafka-controller               1/1     1            1           3s
    kafka-broker-dispatcher        1/1     1            1           4s
    kafka-broker-receiver          1/1     1            1           5s
    

Kafkaブローカーの作成

Kafkaブローカーオブジェクトは次のようになります。

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    # case-sensitive
    eventing.knative.dev/broker.class: Kafka
    # Optional annotation to point to an externally managed kafka topic:
    # kafka.eventing.knative.dev/external.topic: <topic-name>
  name: default
  namespace: default
spec:
  # Configuration specific to this broker.
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing
  # Optional dead letter sink, you can specify either:
  #  - deadLetterSink.ref, which is a reference to a Callable
  #  - deadLetterSink.uri, which is an absolute URI to a Callable (It can potentially be out of the Kubernetes cluster)
  delivery:
    deadLetterSink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: dlq-service

Kafkaブローカーの設定

spec.configは、次の様な任意のnamespace内の任意のConfigMapを参照する必要があります。

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-broker-config
  namespace: knative-eventing
data:
  # Number of topic partitions
  default.topic.partitions: "10"
  # Replication factor of topic messages.
  default.topic.replication.factor: "3"
  # A comma separated list of bootstrap servers. (It can be in or out the k8s cluster)
  bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"

このConfigMapは、クラスタのKnative Eventing SYSTEM_NAMESPACEにインストールされます。必要に応じてグローバル設定を編集できます。異なるnamespaceで異なるnameを持つ別のConfigMapを参照することにより、ブローカーごとにこれらの設定をオーバーライドすることもできます。

注記

default.topic.replication.factorの値は、クラスタ内のKafkaブローカーインスタンスの数以下である必要があります。たとえば、Kafkaブローカーが1つしかない場合は、default.topic.replication.factorの値を1より大きくすることはできません。

Knativeは、使用しているKafkaのバージョンのサポートするトピック設定オプションの完全なセットをサポートしています。これらの設定を行うには、default.topic.config.プレフィックスを使用してキーをconfigmapに追加する必要があります。たとえば、retention.msの値を設定するには、ConfigMapを次のように変更します。

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-broker-config
  namespace: knative-eventing
data:
  # Number of topic partitions
  default.topic.partitions: "10"
  # Replication factor of topic messages.
  default.topic.replication.factor: "3"
  # A comma separated list of bootstrap servers. (It can be in or out the k8s cluster)
  bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"
  # Here is our retention.ms config
  default.topic.config.retention.ms: "3600"

デフォルトのブローカー実装として設定する

Knativeデプロイメント内のすべてのブローカーのデフォルト実装としてKafkaブローカーを設定するには、knative-eventing名前空間のconfig-br-defaults ConfigMapを変更してグローバル設定を適用できます。

これにより、metadata.annotations.eventing.knative.dev/broker.classspec.configなど、個々の設定や名前空間ごとの設定を行う必要がなくなります。

次のYAMLは、Kafkaブローカーをデフォルトの実装として使用するconfig-br-defaults ConfigMapの例です。

apiVersion: v1
kind: ConfigMap
metadata:
  name: config-br-defaults
  namespace: knative-eventing
data:
  default-br-config: |
    clusterDefault:
      brokerClass: Kafka
      apiVersion: v1
      kind: ConfigMap
      name: kafka-broker-config
      namespace: knative-eventing
    namespaceDefaults:
      namespace1:
        brokerClass: Kafka
        apiVersion: v1
        kind: ConfigMap
        name: kafka-broker-config
        namespace: knative-eventing
      namespace2:
        brokerClass: Kafka
        apiVersion: v1
        kind: ConfigMap
        name: kafka-broker-config
        namespace: knative-eventing

セキュリティ

Apache Kafkaはさまざまなセキュリティ機能をサポートしており、Knativeは次の機能をサポートしています。

セキュリティ機能を有効にするには、broker.spec.configによって参照されるConfigMapで、Secretを参照できます。

apiVersion: v1
kind: ConfigMap
metadata:
   name: kafka-broker-config
   namespace: knative-eventing
data:
   # Other configurations
   # ...

   # Reference a Secret called my_secret
   auth.secret.ref.name: my_secret

Secret my_secretは、broker.spec.configによって参照されるConfigMapと同じ名前空間(この場合はknative-eventing)に存在する必要があります。

注記

証明書とキーはPEM形式である必要があります。

SASLを使用した認証

Knativeは次のSASLメカニズムをサポートしています。

  • PLAIN
  • SCRAM-SHA-256
  • SCRAM-SHA-512

特定のSASLメカニズムを使用するには、<sasl_mechanism>を目的のメカニズムに置き換えます。

暗号化なしでSASLを使用した認証

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SASL_PLAINTEXT \
  --from-literal=sasl.mechanism=<sasl_mechanism> \
  --from-literal=user=<my_user> \
  --from-literal=password=<my_password>

SSLを使用した暗号化とSASLを使用した認証

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SASL_SSL \
  --from-literal=sasl.mechanism=<sasl_mechanism> \
  --from-file=ca.crt=caroot.pem \
  --from-literal=user=<my_user> \
  --from-literal=password=<my_password>

クライアント認証なしでSSLを使用した暗号化

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SSL \
  --from-file=ca.crt=<my_caroot.pem_file_path> \
  --from-literal=user.skip=true

SSLを使用した認証と暗号化

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SSL \
  --from-file=ca.crt=<my_caroot.pem_file_path> \
  --from-file=user.crt=<my_cert.pem_file_path> \
  --from-file=user.key=<my_key.pem_file_path>

注記

システムのルートCAセットを使用するには、ca.crtを省略できます。

独自のトピックを使用する

デフォルトでは、Knative Kafkaブローカーは独自の内部トピックを作成しますが、kafka.eventing.knative.dev/external.topicアノテーションを使用して、外部で管理されているトピックを指定することもできます。

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    # case-sensitive
    eventing.knative.dev/broker.class: Kafka
    kafka.eventing.knative.dev/external.topic: <my-topic-name>
  name: default
  namespace: default
spec:
  # other spec fields ...

注記

外部トピックを使用する場合、Knative Kafkaブローカーはトピックを所有せず、トピックの管理は行いません。これには、トピックのライフサイクルや一般的な有効性も含まれます。トピックへの一般的なアクセスに関するその他の制限が適用される場合があります。アクセス制御リスト(ACL)の使用に関するドキュメントを参照してください。

コンシューマーオフセットコミット間隔

Kafkaコンシューマーは、オフセットをコミットすることで、最後に正常に送信されたイベントを追跡します。

Knative Kafkaブローカーは、auto.commit.interval.msミリ秒ごとにオフセットをコミットします。

注記

パフォーマンスへの悪影響を防ぐために、イベントがサブスクライバーに正常に送信されるたびにオフセットをコミットすることはお勧めしません。

この間隔は、knative-eventing名前空間のconfig-kafka-broker-data-plane ConfigMapのパラメーターauto.commit.interval.msを次のように変更して変更できます。

apiVersion: v1
kind: ConfigMap
metadata:
  name: config-kafka-broker-data-plane
  namespace: knative-eventing
data:
  # Some configurations omitted ...
  config-kafka-broker-consumer.properties: |
    # Some configurations omitted ...

    # Commit the offset every 5000 millisecods (5 seconds)
    auto.commit.interval.ms=5000

注記

Knative Kafkaブローカーは少なくとも一度の配信を保証しますが、これによりアプリケーションが重複イベントを受信する可能性があります。コミット間隔が長いほど、コンシューマーが再開するときに最後のコミットされたオフセットから再開するため、重複イベントを受信する可能性が高くなります。

Kafkaプロデューサーとコンシューマーの設定

Knativeは、ワークロードに合わせて変更できる、利用可能なKafkaプロデューサーとコンシューマーのすべての構成を公開します。

これらの構成は、knative-eventing名前空間内のconfig-kafka-broker-data-plane ConfigMapを変更することで変更できます。

このConfigMapで使用可能な設定に関するドキュメントは、Apache Kafkaウェブサイト、特にプロデューサー設定コンシューマー設定に掲載されています。

データプレーンコンポーネントのデバッグログを有効にする

次のYAMLは、インストール手順中に作成される、データプレーンコンポーネントのデフォルトのログ構成を示しています。

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-config-logging
  namespace: knative-eventing
data:
  config.xml: |
    <configuration>
      <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
      </appender>
      <root level="INFO">
        <appender-ref ref="jsonConsoleAppender"/>
      </root>
    </configuration>

ログレベルをDEBUGに変更するには、以下の手順に従ってください。

  1. 以下のkafka-config-logging ConfigMapを適用するか、ConfigMap kafka-config-logginglevel="INFO"level="DEBUG"に置き換えます。

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: kafka-config-logging
      namespace: knative-eventing
    data:
      config.xml: |
        <configuration>
          <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
            <encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
          </appender>
          <root level="DEBUG">
            <appender-ref ref="jsonConsoleAppender"/>
          </root>
        </configuration>
    
  2. 次のコマンドを入力して、kafka-broker-receiverkafka-broker-dispatcherを再起動します。

    kubectl rollout restart deployment -n knative-eventing kafka-broker-receiver
    kubectl rollout restart deployment -n knative-eventing kafka-broker-dispatcher
    

配信イベントの順序の設定

イベントのディスパッチ時に、Kafkaブローカーは異なる配信順序保証をサポートするように設定できます。

Triggerオブジェクトのkafka.eventing.knative.dev/delivery.orderアノテーションを使用して、イベントの配信順序を設定できます。

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: my-service-trigger
  annotations:
     kafka.eventing.knative.dev/delivery.order: ordered
spec:
  broker: my-kafka-broker
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: my-service

サポートされているコンシューマー配信保証は以下のとおりです。

  • unordered:非順序コンシューマーは、非ブロッキングコンシューマーであり、メッセージを順序付けせずに配信しますが、適切なオフセット管理を維持します。並列消費の需要が高く、明示的な順序付けが不要な場合に役立ちます。クリック分析の処理などがその例です。
  • ordered:順序付きコンシューマーは、パーティションごとのブロッキングコンシューマーであり、CloudEventサブスクライバーからの成功した応答を待つまで、そのパーティションの次のメッセージを配信しません。より厳格な順序付けが必要な場合、またはイベント間に関係またはグループ化がある場合に役立ちます。顧客注文の処理などがその例です。

デフォルトの順序保証はunordered配信です。

データプレーン分離と共有データプレーン

Knative Kafka Brokerの実装には、コントロールプレーンとデータプレーンの2つのプレーンがあります。コントロールプレーンは、Kubernetes APIと通信し、カスタムオブジェクトを監視し、データプレーンを管理するコントローラーで構成されています。

データプレーンは、着信イベントをリッスンし、Apache Kafkaと通信し、イベントをイベントシンクに送信するコンポーネントのコレクションです。これがイベントの流れる場所です。Knative Kafka Brokerデータプレーンは、kafka-broker-receiverkafka-broker-dispatcherデプロイメントで構成されています。

Kafkaブローカークラスを使用する場合、Knative Kafka Brokerは共有データプレーンを使用します。つまり、knative-eventing名前空間内のkafka-broker-receiverkafka-broker-dispatcherデプロイメントは、クラスタ内のすべてのKafkaブローカーで使用されます。

ただし、KafkaNamespacedがブローカークラスとして設定されている場合、Kafkaブローカーコントローラーは、ブローカーが存在する各名前空間に新しいデータプレーンを作成します。このデータプレーンは、その名前空間内のすべてのKafkaNamespacedブローカーによって使用されます。

これにより、データプレーン間の分離が提供されます。つまり、ユーザー名前空間内のkafka-broker-receiverkafka-broker-dispatcherデプロイメントはその名前空間内のブローカーに対してのみ使用されます。

注記

個別のデータプレーンという結果として、このセキュリティ機能はより多くのデプロイメントを作成し、より多くのリソースを使用します。このような分離要件がない限り、Kafkaクラスの通常のブローカーを使用することをお勧めします。

KafkaNamespacedブローカーを作成するには、eventing.knative.dev/broker.classアノテーションをKafkaNamespacedに設定する必要があります。

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    # case-sensitive
    eventing.knative.dev/broker.class: KafkaNamespaced
  name: default
  namespace: my-namespace
spec:
  config:
     # the referenced `configmap` must be in the same namespace with the `Broker` object, in this case `my-namespace`
    apiVersion: v1
    kind: ConfigMap
    name: my-config
    # namespace: my-namespace # no need to define, defaults to Broker's namespace

注記

spec.configで指定されたconfigmapは、Brokerオブジェクトと同じ名前空間に**存在する**必要があります。

apiVersion: v1
kind: ConfigMap
metadata:
  name: my-config
  namespace: my-namespace
data:
  ...

KafkaNamespacedクラスの最初のBrokerが作成されると、kafka-broker-receiverkafka-broker-dispatcherのデプロイメントがその名前空間に作成されます。その後、同じ名前空間内のKafkaNamespacedクラスのすべてのブローカーは同じデータプレーンを使用します。名前空間にKafkaNamespacedクラスのブローカーが存在しない場合、その名前空間内のデータプレーンは削除されます。

KafkaNamespacedブローカーの設定

Kafkaブローカークラスで使用可能なすべての構成メカニズムは、これらの例外を除いて、KafkaNamespacedクラスのブローカーでも使用できます。

  • このページでは、knative-eventing名前空間内のconfig-kafka-broker-data-plane configmapを変更することで、プロデューサーとコンシューマーの設定を行う方法について説明しています。Kafkaブローカーコントローラーは、このconfigmapをユーザー名前空間に伝播するため、現在、名前空間ごとにプロデューサーとコンシューマーの設定を行う方法はありません。knative-eventing名前空間内のconfig-kafka-broker-data-plane ConfigMapに設定された値は、ユーザー名前空間でも使用されます。
  • 同じ伝播のため、名前空間ごとにコンシューマーオフセットコミット間隔を設定することもできません。
  • さらにいくつかのconfigmap(config-tracingkafka-config-logging)が伝播されます。つまり、トレーシングとロギングも名前空間ごとに設定できません。
  • 同様に、データプレーンデプロイメントはknative-eventing名前空間からユーザー名前空間に伝播されます。つまり、データプレーンデプロイメントは名前空間ごとに設定できず、knative-eventing名前空間のものと同じになります。

KEDAによるトリガーの自動スケーリングの有効化と設定

KEDAを使用してKafkaブローカーを参照するトリガーの自動スケーリングを有効化および設定するには、ここにある手順に従ってください。

追加情報

サイトトラフィックを理解するために、分析とCookieを使用しています。サイトの使用に関する情報は、その目的でGoogleと共有されます。詳細情報。