JobSink:イベント発生時に長時間実行されるバックグラウンドジョブをトリガーします¶
通常、Knative Serviceと組み合わせたイベント処理は、HTTP接続を維持する必要があるため(そうでないとサービスはスケールダウンされるため)、比較的短い時間(数分)で完了することが期待されます。
長時間実行される接続を維持すると、失敗の可能性が高まるため、リクエストが再試行されると処理を再開する必要があります。
この制限は理想的ではありません。JobSink
は、長時間実行される非同期ジョブとタスクを作成するために使用できるリソースです。
JobSink
は、完全なKubernetes batch/v1ジョブリソースと機能、およびKueueのようなKubernetesジョブキューイングシステムをサポートしています。
前提条件¶
Knative EventingがインストールされているKubernetesクラスタへのアクセスが必要です。
使用方法¶
イベントがJobSink
に送信されると、EventingはJob
を作成し、受信したイベントをJSONファイルとして/etc/jobsink-event/event
にマウントします。
JobSink
の作成apiVersion: sinks.knative.dev/v1alpha1 kind: JobSink metadata: name: job-sink-logger spec: job: spec: completions: 1 parallelism: 1 template: spec: restartPolicy: Never containers: - name: main image: docker.io/library/bash:5 command: [ "cat" ] args: - "/etc/jobsink-event/event"
JobSink
リソースの適用kubectl apply -f <job-sink-file.yaml>
JobSink
の準備完了の確認出力例kubectl get jobsinks.sinks.knative.dev
NAME URL AGE READY REASON job-sink-logger http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger 5s True
JobSink
のトリガーkubectl run curl --image=curlimages/curl --rm=true --restart=Never -ti -- -X POST -v \ -H "content-type: application/json" \ -H "ce-specversion: 1.0" \ -H "ce-source: my/curl/command" \ -H "ce-type: my.demo.event" \ -H "ce-id: 123" \ -d '{"details":"JobSinkDemo"}' \ http://job-sink.knative-eventing.svc.cluster.local/default/job-sink-logger
Job
が作成され、イベントが出力されることの確認出力例kubectl logs job-sink-loggerszoi6-dqbtq
{"specversion":"1.0","id":"123","source":"my/curl/command","type":"my.demo.event","datacontenttype":"application/json","data":{"details":"JobSinkDemo"}}
JobSinkのべき等性¶
JobSink
は、受信した各異なるイベントに対してジョブを作成します。
イベントは、イベントのsource
属性とid
属性の組み合わせによって一意に識別されます。
同じsource
属性とid
属性を持つイベントが受信され、既にジョブが存在する場合、別のJob
は作成されません。
イベントファイルの読み取り¶
任意のCloudEvents JSONデシリアライザーを使用して、ファイルを読み取り、デシリアライズできます。
たとえば、次のスニペットはCloudEvents Go SDKを使用してイベントを読み取り、処理します。
package mytask
import (
"encoding/json"
"fmt"
"os"
cloudevents "github.com/cloudevents/sdk-go/v2"
)
func handleEvent() error {
eventBytes, err := os.ReadFile("/etc/jobsink-event/event")
if err != nil {
return err
}
event := &cloudevents.Event{}
if err := json.Unmarshal(eventBytes, event); err != nil {
return err
}
// Process event ...
fmt.Println(event)
return nil
}
さまざまなイベントソースからのジョブのトリガー¶
JobSink
は、任意のイベントソースまたはトリガーによってトリガーできます。
たとえば、KafkaSource
を使用してKafkaレコードがKafkaトピックに送信されたときにジョブをトリガーできます。
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
spec:
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
topics:
- knative-demo-topic
sink:
ref:
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
name: job-sink-logger
または、Trigger
を使用してKnativeブローカーがイベントを受信したときにトリガーできます。
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: my-job-sink-trigger
spec:
broker: my-broker
filter:
attributes:
type: dev.knative.foo.bar
myextension: my-extension-value
subscriber:
ref:
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
name: job-sink-logger
あるいは、Knativeブローカーのデッドレターシンクとしても使用できます。
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
name: my-broker
spec:
# ...
delivery:
deadLetterSink:
ref:
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
name: job-sink-logger
retry: 5
backoffPolicy: exponential
backoffDelay: "PT1S"
イベントファイルディレクトリのカスタマイズ¶
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
metadata:
name: job-sink-custom-mount-path
spec:
job:
spec:
completions: 1
parallelism: 1
template:
spec:
restartPolicy: Never
containers:
- name: main
image: docker.io/library/bash:5
command: [ "bash" ]
args:
- -c
- echo "Hello world!" && sleep 5
# The event will be available in a file at `/etc/custom-path/event`
volumeMounts:
- name: "jobsink-event"
mountPath: "/etc/custom-path"
readOnly: true
完了したジョブのクリーンアップ¶
完了したジョブをクリーンアップするには、spec.job.spec.ttlSecondsAfterFinished: 600
フィールドを設定すると、Kubernetesは600秒(10分)後に完了したジョブを削除します。
JobSinkの例¶
JobSink成功例¶
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
metadata:
name: job-sink-success
spec:
job:
metadata:
labels:
my-label: my-value
spec:
completions: 12
parallelism: 3
template:
spec:
restartPolicy: Never
containers:
- name: main
image: docker.io/library/bash:5
command: [ "bash" ]
args:
- -c
- echo "Hello world!" && sleep 5
backoffLimit: 6
podFailurePolicy:
rules:
- action: FailJob
onExitCodes:
containerName: main # optional
operator: In # one of: In, NotIn
values: [ 42 ]
- action: Ignore # one of: Ignore, FailJob, Count
onPodConditions:
- type: DisruptionTarget # indicates Pod disruption
JobSink失敗例¶
apiVersion: sinks.knative.dev/v1alpha1
kind: JobSink
metadata:
name: job-sink-failure
spec:
job:
metadata:
labels:
my-label: my-value
spec:
completions: 12
parallelism: 3
template:
spec:
restartPolicy: Never
containers:
- name: main
image: docker.io/library/bash:5
command: [ "bash" ] # example command simulating a bug which triggers the FailJob action
args:
- -c
- echo "Hello world!" && sleep 5 && exit 42
backoffLimit: 6
podFailurePolicy:
rules:
- action: FailJob
onExitCodes:
containerName: main # optional
operator: In # one of: In, NotIn
values: [ 42 ]
- action: Ignore # one of: Ignore, FailJob, Count
onPodConditions:
- type: DisruptionTarget # indicates Pod disruption