コンテンツにスキップ

Apache Kafka用Knative Sink

KafkaSinkは、受信したCloudEventを設定可能なApache Kafkaトピックに永続化するApache KafkaネイティブのSink実装です。このページでは、KnativeのKafkaSinkをインストールおよび設定する方法について説明します。

前提条件

Knative EventingがインストールされたKubernetesクラスターにアクセスできる必要があります。

インストール

  1. Kafkaコントローラーをインストールします

    kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-controller.yaml
    
  2. KafkaSinkデータプレーンをインストールします

    kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-sink.yaml
    
  3. kafka-controllerkafka-sink-receiverのデプロイメントが実行されていることを確認します

    kubectl get deployments.apps -n knative-eventing
    

    出力例

    NAME                           READY   UP-TO-DATE   AVAILABLE   AGE
    eventing-controller            1/1     1            1           10s
    eventing-webhook               1/1     1            1           9s
    kafka-controller               1/1     1            1           3s
    kafka-sink-receiver            1/1     1            1           5s
    

KafkaSinkの例

KafkaSinkオブジェクトは次のようになります。

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
  name: my-kafka-sink
  namespace: default
spec:
  topic: mytopic
  bootstrapServers:
   - my-cluster-kafka-bootstrap.kafka:9092

出力トピックコンテンツモード

CloudEvent仕様では、CloudEventを転送するための2つのモード(構造化モードとバイナリモード)が定義されています。

「構造化モードメッセージ」とは、イベントがスタンドアロンのイベント形式を使用して完全にエンコードされ、メッセージ本文に格納されるメッセージのことです。

構造化コンテンツモードでは、イベントメタデータとデータがペイロード内に一緒に保持されるため、同じイベントを複数のルーティングホップや複数のプロトコルにわたって簡単に転送できます。

「バイナリモードメッセージ」とは、イベントデータがメッセージ本文に格納され、イベント属性がメッセージメタデータの一部として格納されるメッセージのことです。

バイナリコンテンツモードは、あらゆる形状のイベントデータに対応でき、トランスコードの必要がなく、効率的な転送が可能です。

contentModeが指定されたKafkaSinkオブジェクトは次のようになります。

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
  name: my-kafka-sink
  namespace: default
spec:
  topic: mytopic
  bootstrapServers:
   - my-cluster-kafka-bootstrap.kafka:9092

  # CloudEvent content mode of Kafka messages sent to the topic.
  # Possible values:
  # - structured
  # - binary
  #
  # default: binary.
  #
  # CloudEvent spec references:
  # - https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md#message
  # - https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/bindings/kafka-protocol-binding.md#33-structured-content-mode
  # - https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/bindings/kafka-protocol-binding.md#32-binary-content-mode
  contentMode: binary # or structured

セキュリティ

Knativeは、以下のApache Kafkaセキュリティ機能をサポートしています

セキュリティ機能の有効化

セキュリティ機能を有効にするには、KafkaSink仕様でシークレットを参照できます

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
   name: my-kafka-sink
   namespace: default
spec:
   topic: mytopic
   bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092
   auth:
     secret:
       ref:
         name: my_secret

注意

シークレットmy_secretは、KafkaSinkと同じ名前空間に存在する必要があります。証明書と鍵はPEM形式である必要があります。

SASLを使用した認証

Knativeは、次のSASLメカニズムをサポートしています

  • PLAIN
  • SCRAM-SHA-256
  • SCRAM-SHA-512

特定のSASLメカニズムを使用するには、<sasl_mechanism>を任意のメカニズムに置き換えてください。

暗号化なしのSASLを使用した認証

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SASL_PLAINTEXT \
  --from-literal=sasl.mechanism=<sasl_mechanism> \
  --from-literal=user=<my_user> \
  --from-literal=password=<my_password>

SASLを使用した認証とSSLを使用した暗号化

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SASL_SSL \
  --from-literal=sasl.mechanism=<sasl_mechanism> \
  --from-file=ca.crt=caroot.pem \
  --from-literal=user=<my_user> \
  --from-literal=password=<my_password>

クライアント認証なしのSSLを使用した暗号化

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SSL \
  --from-file=ca.crt=<my_caroot.pem_file_path> \
  --from-literal=user.skip=true

SSLを使用した認証と暗号化

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SSL \
  --from-file=ca.crt=<my_caroot.pem_file_path> \
  --from-file=user.crt=<my_cert.pem_file_path> \
  --from-file=user.key=<my_key.pem_file_path>

注意

ca.crtを省略して、フォールバックを有効にし、システムのルートCAセットを使用できます。

Kafkaプロデューサーの設定

Kafkaプロデューサーは、Apache Kafkaクラスターにイベントを送信する役割を担うコンポーネントです。クラスター内のKafkaプロデューサーの設定を変更するには、knative-eventing名前空間のconfig-kafka-sink-data-plane ConfigMapを変更します。

このConfigMapで使用可能な設定に関するドキュメントは、Apache Kafka Webサイト、特にプロデューサー設定で入手できます。

データプレーンコンポーネントのデバッグログの有効化

データプレーンコンポーネントのデバッグログを有効にするには、kafka-config-logging ConfigMapのログレベルをDEBUGに変更します。

  1. 次の内容を含むYAMLファイルとしてkafka-config-logging ConfigMapを作成します

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: kafka-config-logging
      namespace: knative-eventing
    data:
      config.xml: |
        <configuration>
          <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
            <encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
          </appender>
          <root level="DEBUG">
            <appender-ref ref="jsonConsoleAppender"/>
          </root>
        </configuration>
    
  2. 次のコマンドを実行してYAMLファイルを適用します

    kubectl apply -f <filename>.yaml
    
    ここで、<filename>は前の手順で作成したファイルの名前です。

  3. kafka-sink-receiverを再起動します

    kubectl rollout restart deployment -n knative-eventing kafka-sink-receiver
    

当社は、サイトのトラフィックを理解するために分析とCookieを使用しています。お客様のサイトの利用に関する情報は、その目的のためにGoogleと共有されます。詳細はこちら。