コンテンツへスキップ

Apache Kafka用Knativeソース

stage version

KafkaSource は、既存の Apache Kafka トピックに保存されたメッセージを読み取り、それらのメッセージをCloudEventsとして構成済みの sink にHTTP経由で送信します。KafkaSource は、トピックパーティションに保存されたメッセージの順序を保持します。これは、同じパーティション内の次のメッセージを配信する前に、sink からの正常な応答を待つことによって実現します。

KafkaSourceコントローラーのインストール

  1. 次のコマンドを入力して、KafkaSource コントローラーをインストールします。

    kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-controller.yaml
    
  2. 次のコマンドを入力して、Kafka Sourceデータプレーンをインストールします。

    kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-source.yaml
    
  3. 次のコマンドを入力して、kafka-controllerkafka-source-dispatcher が実行されていることを確認します。

    kubectl get deployments.apps,statefulsets.apps -n knative-eventing
    

    出力例

    NAME                                           READY   UP-TO-DATE   AVAILABLE   AGE
    deployment.apps/kafka-controller               1/1     1            1           3s
    
    NAME                                       READY   AGE
    statefulset.apps/kafka-source-dispatcher   1/1     3s
    

オプション:Kafkaトピックの作成

Kafkaトピックを作成するセクションでは、Apache Kafkaを操作するためにStrimziを使用していることを前提としていますが、同等の操作はApache Kafka CLIまたは他のツールを使用して複製できます。

Strimziを使用している場合

  1. KafkaTopic YAMLファイルを作成します。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaTopic
    metadata:
      name: knative-demo-topic
      namespace: kafka
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      partitions: 3
      replicas: 1
      config:
        retention.ms: 7200000
        segment.bytes: 1073741824
    
  2. 次のコマンドを実行して、KafkaTopic YAMLファイルをデプロイします。

    kubectl apply -f <filename>.yaml
    
    ここで、<filename>KafkaTopic YAMLファイルの名前です。

    出力例

    kafkatopic.kafka.strimzi.io/knative-demo-topic created
    

  3. 次のコマンドを実行して、KafkaTopic が実行されていることを確認します。

    kubectl -n kafka get kafkatopics.kafka.strimzi.io
    

    出力例

    NAME                 CLUSTER      PARTITIONS   REPLICATION FACTOR
    knative-demo-topic   my-cluster   3            1
    

サービスの作成

  1. event-display サービスをYAMLファイルとして作成します。

    apiVersion: serving.knative.dev/v1
    kind: Service
    metadata:
      name: event-display
      namespace: default
    spec:
      template:
        spec:
          containers:
            - # This corresponds to
              # https://github.com/knative/eventing/tree/main/cmd/event_display/main.go
              image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
    
  2. 次のコマンドを実行してYAMLファイルを適用します。

    kubectl apply -f <filename>.yaml
    
    ここで、<filename> は前の手順で作成したファイルの名前です。

    出力例

    service.serving.knative.dev/event-display created
    

  3. 次のコマンドを実行して、サービスPodが実行されていることを確認します。

    kubectl get pods
    

    Pod名のプレフィックスは event-display です。

    NAME                                            READY     STATUS    RESTARTS   AGE
    event-display-00001-deployment-5d5df6c7-gv2j4   2/2       Running   0          72s
    

Kafkaイベントソース

  1. source/event-source.yaml をブートストラップサーバー、トピックなどに応じて変更します。

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: kafka-source
    spec:
      consumerGroup: knative-group
      bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
      topics:
      - knative-demo-topic
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
    
  2. イベントソースをデプロイします。

    kubectl apply -f event-source.yaml
    

    出力例

    kafkasource.sources.knative.dev/kafka-source created
    

  3. KafkaSourceの準備が整っていることを確認します。

    kubectl get kafkasource kafka-source
    

    出力例

    NAME           TOPICS                   BOOTSTRAPSERVERS                            READY   REASON   AGE
    kafka-source   ["knative-demo-topic"]   ["my-cluster-kafka-bootstrap.kafka:9092"]   True             26h
    

スケーリング

より多くのコンシューマーをスケジュールしたり、より少ないコンシューマーをスケジュールしたりするために、KafkaSourceをスケーリングできます。また、それらを異なるディスパッチャーPodに割り当てることができます。kafkasourceステータスには、status.placementsキーの下にこのような割り当てが表示されます。

次の表記を使用して、kubectlでKafkaSourceをスケーリングできます。

kubectl scale kafkasource -n <ns> <kafkasource-name> --replicas=<number-of-replicas> # e.g. 12 replicas for a topic with 12 partitions

あるいは、GitOpsアプローチを使用している場合は、次の例に示すように consumers キーを追加してリポジトリにコミットできます。

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: kafka-source
    spec:
      consumerGroup: knative-group
      bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092 
      consumers: 12    # Number of replicas
      topics:
      - knative-demo-topic
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display

KEDAによる自動スケーリング

KEDAを使用してKafkaSourceを自動スケーリングできます。この機能を有効にして構成する方法については、こちらの手順をお読みください。

確認

  1. 次の例のように、メッセージ({"msg": "This is a test!"})をApache Kafkaトピックに生成します。

    kubectl -n kafka run kafka-producer -ti --image=strimzi/kafka:0.14.0-kafka-2.3.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic knative-demo-topic
    

    ヒント

    コマンドプロンプトが表示されない場合は、Enterキーを押してみてください。

  2. サービスがイベントソースからメッセージを受信したことを確認します。

    kubectl logs --selector='serving.knative.dev/service=event-display' -c user-container
    

    出力例

    ☁️ cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/kafka-source#my-topic
      subject: partition:0#564
      id: partition:0/offset:564
      time: 2020-02-10T18:10:23.861866615Z
      datacontenttype: application/json
    Extensions,
      key:
    Data,
        {
          "msg": "This is a test!"
        }
    

配信失敗の処理

KafkaSourceDelivery 仕様を実装しており、イベント配信に失敗した場合に適用されるイベント配信パラメーターを構成できます。

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: kafka-source
    spec:
      consumerGroup: knative-group
      bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
      topics:
      - knative-demo-topic
      delivery:
        deadLetterSink:
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: example-sink
        backoffDelay: <duration>
        backoffPolicy: <policy-type>
        retry: <integer>
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display

delivery APIについては、配信失敗の処理の章で説明しています。

オプション:キーデシリアライザーの指定

KafkaSource がKafkaからメッセージを受信すると、キーを Key という名前のEvent拡張にダンプし、Kafkaメッセージヘッダーを kafkaheader で始まる拡張にダンプします。

4つのタイプの中からキーデシリアライザーを指定できます。

  • UTF-8エンコードされた文字列の場合は string (デフォルト)
  • 32ビットと64ビットの符号付き整数の場合は int
  • 32ビットと64ビットの浮動小数点数の場合は float
  • Base64エンコードされたバイト配列の場合は byte-array

キーデシリアライザーを指定するには、次の例に示すように、kafkasources.sources.knative.dev/key-type ラベルを KafkaSource 定義に追加します。

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
labels:
  kafkasources.sources.knative.dev/key-type: int
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
topics:
- knative-demo-topic
sink:
  ref:
    apiVersion: serving.knative.dev/v1
    kind: Service
    name: event-display

オプション:初期オフセットの指定

デフォルトでは、KafkaSource は各パーティションの最新オフセットからの消費を開始します。最初のオフセットから消費したい場合は、initialOffsetフィールドを earliest に設定します(例:)。

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source
spec:
consumerGroup: knative-group
initialOffset: earliest
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
topics:
- knative-demo-topic
sink:
  ref:
    apiVersion: serving.knative.dev/v1
    kind: Service
    name: event-display

initialOffset の有効な値は earliestlatest です。他の値を指定すると、検証エラーが発生します。このフィールドは、そのコンシューマーグループにコミットされたオフセットがない場合にのみ有効になります。

TLSが有効なKafkaブローカーへの接続

KafkaSourceはTLSおよびSASL認証方法をサポートしています。TLS認証を有効にするには、次のファイルが必要です。

  • CA証明書
  • クライアント証明書とキー

KafkaSourceはこれらのファイルがPEM形式であることを想定しています。JKSなどの別の形式の場合は、PEMに変換してください。

  1. 次のコマンドを実行して、KafkaSourceが設定されるネームスペースにシークレットとして証明書ファイルを作成します。

    kubectl create secret generic cacert --from-file=caroot.pem
    
    kubectl create secret tls kafka-secret --cert=certificate.pem --key=key.pem
    
  2. KafkaSourceを適用します。bootstrapServers および topics フィールドを適宜変更します。

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
     name: kafka-source-with-tls
    spec:
     net:
       tls:
         enable: true
         cert:
           secretKeyRef:
             key: tls.crt
             name: kafka-secret
         key:
           secretKeyRef:
             key: tls.key
             name: kafka-secret
         caCert:
           secretKeyRef:
             key: caroot.pem
             name: cacert
     consumerGroup: knative-group
     bootstrapServers:
     - my-secure-kafka-bootstrap.kafka:443
     topics:
     - knative-demo-topic
     sink:
       ref:
         apiVersion: serving.knative.dev/v1
         kind: Service
         name: event-display
    

KafkaSourceのSASLの有効化

Simple Authentication and Security Layer(SASL)は、Apache Kafkaで認証に使用されます。クラスターでSASL認証を使用する場合は、Kafkaクラスターと通信するために、ユーザーがKnativeにクレデンシャルを提供する必要があります。そうしないと、イベントを生成または消費できません。

前提条件

  • Simple Authentication and Security Layer(SASL)を持つKafkaクラスターにアクセスできること。

手順

  1. 次のコマンドを実行して、KafkaクラスターのSASL情報を使用するシークレットを作成します。

    STRIMZI_CRT=$(kubectl -n kafka get secret example-cluster-cluster-ca-cert --template='{{index.data "ca.crt"}}' | base64 --decode )
    
    SASL_PASSWD=$(kubectl -n kafka get secret example-user --template='{{index.data "password"}}' | base64 --decode )
    
    kubectl create secret -n default generic <secret_name> \
        --from-literal=ca.crt="$STRIMZI_CRT" \
        --from-literal=password="$SASL_PASSWD" \
        --from-literal=saslType="SCRAM-SHA-512" \
        --from-literal=user="example-user"
    
  2. 次のspecオプションが含まれるようにKafkaSourceを作成または変更します。

    apiVersion: sources.knative.dev/v1beta1
    kind: KafkaSource
    metadata:
      name: example-source
    spec:
    ...
      net:
        sasl:
          enable: true
          user:
            secretKeyRef:
              name: <secret_name>
              key: user
          password:
            secretKeyRef:
              name: <secret_name>
              key: password
          type:
            secretKeyRef:
              name: <secret_name>
              key: saslType
        tls:
          enable: true
          caCert:
            secretKeyRef:
              name: <secret_name>
              key: ca.crt
    ...
    

    ここで、<secret_name> は前の手順で生成されたシークレットの名前です。

クリーンアップ手順

  1. Kafkaイベントソースを削除します。

    kubectl delete -f source/source.yaml kafkasource.sources.knative.dev
    

    出力例

    "kafka-source" deleted
    

  2. event-display サービスを削除します。

    kubectl delete -f source/event-display.yaml service.serving.knative.dev
    

    出力例

    "event-display" deleted
    

  3. オプション:Apache Kafkaトピックを削除します。

    kubectl delete -f kafka-topic.yaml
    

    出力例

    kafkatopic.kafka.strimzi.io "knative-demo-topic" deleted
    

当サイトでは、サイトのトラフィックを把握するために分析と Cookie を使用しています。お客様のサイト利用に関する情報は、その目的のために Google と共有されます。詳細はこちら