コンテンツへスキップ

JobSink:イベント発生時に長時間実行されるバックグラウンドジョブをトリガーします

通常、Knative Serviceと組み合わせたイベント処理は、HTTP接続を維持する必要があるため(そうでないとサービスはスケールダウンされるため)、比較的短い時間(数分)で完了することが期待されます。

長時間実行される接続を維持すると、失敗の可能性が高まるため、リクエストが再試行されると処理を再開する必要があります。

この制限は理想的ではありません。JobSinkは、長時間実行される非同期ジョブとタスクを作成するために使用できるリソースです。

JobSinkは、完全なKubernetes batch/v1ジョブリソースと機能、およびKueueのようなKubernetesジョブキューイングシステムをサポートしています。

前提条件

Knative EventingがインストールされているKubernetesクラスタへのアクセスが必要です。

使用方法

イベントがJobSinkに送信されると、EventingはJobを作成し、受信したイベントをJSONファイルとして/etc/jobsink-event/eventにマウントします。

  1. JobSinkの作成
    apiVersion: sinks.knative.dev/v1alpha1
    kind: JobSink
    metadata:
      name: job-sink-logger
    spec:
      job:
        spec:
          completions: 1
          parallelism: 1
          template:
            spec:
              restartPolicy: Never
              containers:
                - name: main
                  image: docker.io/library/bash:5
                  command: [ "cat" ]
                  args:
                    - "/etc/jobsink-event/event"
    
  2. JobSinkリソースの適用
    kubectl apply -f <job-sink-file.yaml>
    
  3. JobSinkの準備完了の確認
    kubectl get jobsinks.sinks.knative.dev
    
    出力例
    NAME              URL                                                                          AGE   READY   REASON
    job-sink-logger   http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger   5s    True
    
  4. JobSinkのトリガー
    kubectl run curl --image=curlimages/curl --rm=true --restart=Never -ti -- -X POST -v \
       -H "content-type: application/json"  \
       -H "ce-specversion: 1.0" \
       -H "ce-source: my/curl/command" \
       -H "ce-type: my.demo.event" \
       -H "ce-id: 123" \
       -d '{"details":"JobSinkDemo"}' \
       http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger
    
  5. Jobが作成され、イベントが出力されることの確認
    kubectl logs job-sink-loggerszoi6-dqbtq
    
    出力例
    {"specversion":"1.0","id":"123","source":"my/curl/command","type":"my.demo.event","datacontenttype":"application/json","data":{"details":"JobSinkDemo"}}
    

JobSinkのべき等性

JobSinkは、受信した各異なるイベントに対してジョブを作成します。

イベントは、イベントのsource属性とid属性の組み合わせによって一意に識別されます。

同じsource属性とid属性を持つイベントが受信され、既にジョブが存在する場合、別のJob作成されません

イベントファイルの読み取り

任意のCloudEvents JSONデシリアライザーを使用して、ファイルを読み取り、デシリアライズできます。

たとえば、次のスニペットはCloudEvents Go SDKを使用してイベントを読み取り、処理します。

package mytask

import (
    "encoding/json"
    "fmt"
    "os"

    cloudevents "github.com/cloudevents/sdk-go/v2"
)

func handleEvent() error {
    eventBytes, err := os.ReadFile("/etc/jobsink-event/event")
    if err != nil {
        return err
    }

    event := &cloudevents.Event{}
    if err := json.Unmarshal(eventBytes, event); err != nil {
        return err
    }

    // Process event ...
    fmt.Println(event)

    return nil
}

さまざまなイベントソースからのジョブのトリガー

JobSinkは、任意のイベントソースまたはトリガーによってトリガーできます。

たとえば、KafkaSourceを使用してKafkaレコードがKafkaトピックに送信されたときにジョブをトリガーできます。

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - knative-demo-topic
  sink:
    ref:
      apiVersion: sinks.knative.dev/v1alpha1
      kind: JobSink
      name: job-sink-logger

または、Triggerを使用してKnativeブローカーがイベントを受信したときにトリガーできます。

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: my-job-sink-trigger
spec:
  broker: my-broker
  filter:
    attributes:
      type: dev.knative.foo.bar
      myextension: my-extension-value
    subscriber:
      ref:
        apiVersion: sinks.knative.dev/v1alpha1
        kind: JobSink
        name: job-sink-logger

あるいは、Knativeブローカーのデッドレターシンクとしても使用できます。

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  name: my-broker
spec:
  # ...

  delivery:
    deadLetterSink:
      ref:
        apiVersion: sinks.knative.dev/v1alpha1
        kind: JobSink
        name: job-sink-logger
    retry: 5
    backoffPolicy: exponential
    backoffDelay: "PT1S"

イベントファイルディレクトリのカスタマイズ

apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
metadata:
  name: job-sink-custom-mount-path
spec:
  job:
    spec:
      completions: 1
      parallelism: 1
      template:
        spec:
          restartPolicy: Never
          containers:
            - name: main
              image: docker.io/library/bash:5
              command: [ "bash" ]
              args:
                - -c
                - echo "Hello world!" && sleep 5

              # The event will be available in a file at `/etc/custom-path/event`
              volumeMounts:
                - name: "jobsink-event"
                  mountPath: "/etc/custom-path"
                  readOnly: true

完了したジョブのクリーンアップ

完了したジョブをクリーンアップするには、spec.job.spec.ttlSecondsAfterFinished: 600フィールドを設定すると、Kubernetesは600秒(10分)後に完了したジョブを削除します。

JobSinkの例

JobSink成功例

apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
metadata:
  name: job-sink-success
spec:
  job:
    metadata:
      labels:
        my-label: my-value
    spec:
      completions: 12
      parallelism: 3
      template:
        spec:
          restartPolicy: Never
          containers:
            - name: main
              image: docker.io/library/bash:5
              command: [ "bash" ]
              args:
                - -c
                - echo "Hello world!" && sleep 5
      backoffLimit: 6
      podFailurePolicy:
        rules:
          - action: FailJob
            onExitCodes:
              containerName: main      # optional
              operator: In             # one of: In, NotIn
              values: [ 42 ]
          - action: Ignore             # one of: Ignore, FailJob, Count
            onPodConditions:
              - type: DisruptionTarget   # indicates Pod disruption

JobSink失敗例

apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
metadata:
  name: job-sink-failure
spec:
  job:
    metadata:
      labels:
        my-label: my-value
    spec:
      completions: 12
      parallelism: 3
      template:
        spec:
          restartPolicy: Never
          containers:
            - name: main
              image: docker.io/library/bash:5
              command: [ "bash" ]        # example command simulating a bug which triggers the FailJob action
              args:
                - -c
                - echo "Hello world!" && sleep 5 && exit 42
      backoffLimit: 6
      podFailurePolicy:
        rules:
          - action: FailJob
            onExitCodes:
              containerName: main      # optional
              operator: In             # one of: In, NotIn
              values: [ 42 ]
          - action: Ignore             # one of: Ignore, FailJob, Count
            onPodConditions:
              - type: DisruptionTarget   # indicates Pod disruption

サイトトラフィックを理解するために、分析とCookieを使用しています。その目的で、サイトの使用に関する情報はGoogleと共有されます。詳細はこちら。