コンテンツにスキップ

KnativeとRestateを用いたステートフルアプリケーションの構築

公開日: 2024-09-02 、  改訂日: 2024-09-04

KnativeとRestateを用いたステートフルアプリケーションの構築

Building Stateful applications with Knative and Restate

著者: Francesco Guardiani、Restateシニアソフトウェアエンジニア、 Giselle van Dongen、Restateデベロッパーアドボケイト

KnativeはKubernetes上でのサーバーレスアプリケーションの開発と運用に革命をもたらしましたが、依然としてステートフルアプリケーションの構築は非常に困難です。

たとえば、状態を永続化する必要があるアプリケーションを構築したいとします。そのためには、サービスをデータベースに接続する必要があるかもしれず、その際には、リトライ、重複イベント、二重書き込み、その他あらゆる種類の分散システムの問題に対処する必要があります。

別の例として、異なるサービスを呼び出し、最終的にいずれかが失敗した場合に操作を補償する必要があるサービスオーケストレーターを構築したいとします。理想的には、ある操作を別の操作の後に実行し、いずれかが失敗した場合はロールバックを実行する、シーケンシャルなコードを記述したいだけです。しかし実際には、ダウンストリームサービスの呼び出し時のリトライ、オーケストレーターサービスの障害、さらに厄介なダウンストリームサービスの呼び出し時の長い待ち時間などの問題を解決する必要があるため、それほど簡単ではありません。

前述の問題を処理することなく、アプリの状態を埋め込み、複雑なサービス調整をすべてKnativeサービス内で実行できたらどうでしょうか?

Restateの登場

Restate は、ステートフルなサーバーレスアプリケーションを構築するためのオープンソースの耐久性のある実行エンジンです。言い換えれば、通常のRPCサービスのように見えるコードを構築し、コードは永続的に実行されます。つまり、エンジンは実行の進捗状況を保存します。クラッシュ後、エンジンはアプリケーションを透過的に以前の状態に復元し、中断した時点から実行を再開します。

実行の進捗状況を記録するもう1つの側面は、サービスの応答が遅いなどの理由で待ち時間が長い場合、エンジンが自動的に実行を中断してコンピューティングリソースの浪費を防ぐことです。「待ち時間」中は、アプリケーションをゼロにスケールダウンできます!

RestateとKnativeを一緒に使用することで、ステートフルエンティティの開発、マイクロサービスのオーケストレーション、サガパターンの実装、イベントの重複排除を行いながら、作業が必要ない場合はゼロにスケールダウンできます。 Restateは、状態の一貫性、クロスサービス通信、障害回復などの難しい分散システムの問題を処理します。

Restateでは、利用可能なRestate SDKのいずれかを使用してアプリケーションを構築し、Knativeサービスなどを使用してサーバーレス/ステートレスHTTPサーバーとしてデプロイします。現在、RestateはGolang、Java、Kotlin、Typescript、Rust、Pythonをサポートしています。サービスを呼び出すには、サービスに直接ではなくRestateにリクエストを送信します。Restateはクライアントとサービス間の「プロキシ」のように機能します。

Restateエンジンをデプロイするには、さまざまな戦略があります。データベースをデプロイするのと同じように、k8sクラスターにステートフルデプロイメントとしてデプロイするか、Restate Cloudマネージドサービスを使用できます。詳細については、Restateのデプロイ方法をご覧ください。

サインアップフローの例

仕組みを理解するために、KnativeとRestateを一緒に使用してサインアップフローを構築する方法の例を紹介します。サンプルアプリケーションは次のように構成されています。

  • ユーザー情報が保存されるユーザーサービス。
  • 新しいユーザーのサインアップ、確認メールの送信、およびその後のアクティブ化のフローをカプセル化するサインアップサービス。

ユーザーサービス

ユーザーサービスから始めましょう。

構築するには、Restate *仮想オブジェクト*を作成します。これは、RPCハンドラーのセットとそれに関連付けられたK/Vストアをカプセル化する抽象概念です。仮想オブジェクトは、ハンドラーの1つを呼び出すときに提供するキーでアドレス指定できます。さらに、仮想オブジェクトにはキーごとに固有のロックがあります。つまり、Restateは、特定のキーに対して**最大1つのリクエスト**が同時に実行されるようにし、追加のリクエストは**キーごとの**キューに入れられます。

ユーザーデータを取得するためのハンドラーを定義することから始めましょう

// Struct to encapsulate the user service logic
type userObject struct{}

// User struct definition, ser/deserializeable with json
type User struct {
    Name     string `json:"name"`
    Surname  string `json:"surname"`
    Password string `json:"password"`
}

func (t *userObject) Get(ctx restate.ObjectSharedContext) (User, error) {
    return restate.Get[User](ctx, "user")
}

各Restateハンドラーは、`Context`、Restateが開発者に公開するさまざまな機能をカプセル化するインターフェースで呼び出されます。このコンテキストは、ハンドラーのタイプによって異なります。

この場合、`restate.Get`を使用します。これは、Restateの仮想オブジェクトK/Vストアから値を読み取ります。

次に、ユーザーを*初期化*するためのハンドラーを定義できます

// Initialize will initialize the user object
func (t *userObject) Initialize(ctx restate.ObjectContext, user User) error {
    // Check if the user doesn't exist first
    usr, err := restate.Get[*User](ctx, "user")
    if err != nil {
        return err
    }
    if usr != nil {
        return restate.TerminalError(fmt.Errorf("the user was already initialized"))
    }

    // Store the user
    restate.Set(ctx, "user", user)

    // Store the unactivated status
    restate.Set(ctx, "activated", false)

    return nil
}

`restate.Get`と同様に、`restate.Set`を使用して仮想オブジェクトK/Vストアに書き込むことができます。

最後に、ユーザーが初期化された後に*アクティブ化*するためのハンドラー

// Activate will signal the user is activated
func (t *userObject) Activate(ctx restate.ObjectContext) error {
    // Check if the user exists first
    usr, err := restate.Get[*User](ctx, "user")
    if err != nil {
        return err
    }
    if usr == nil {
        return restate.TerminalError(fmt.Errorf("the user doesn't exist"))
    }

    // Store the activated status
    restate.Set(ctx, "activated", false)

    return nil
}

これで、サインアップサービスを実装する準備が整いました。

サインアップサービス

サインアップサービスには、サインアップを調整する単一のハンドラーがあります

func (t *signupService) Signup(ctx restate.Context, newUser NewUser) (string, error) {
    // Initialize the newUser first
    user := User{
        Name:     newUser.Name,
        Surname:  newUser.Surname,
        Password: newUser.Password,
    }
    _, err := restate.Object[restate.Void](ctx, "User", newUser.Username, "Initialize").Request(user)
    if err != nil {
        return "", err
    }

    // Prepare an awakeable to await the email activation
    awakeable := restate.Awakeable[restate.Void](ctx)

    // Send the activation email
    _, err = restate.Run[restate.Void](ctx, func(ctx restate.RunContext) (restate.Void, error) {
        return restate.Void{}, sendEmail(newUser.Username, awakeable.Id())
    })
    if err != nil {
        return "", err
    }

    // Await the activation
    _, err = awakeable.Result()
    if err != nil {
        return "", err
    }

    // Activate the user
    _, err = restate.Object[restate.Void](ctx, "User", newUser.Username, "Activate").Request(user)
    if err != nil {
        return "", err
    }

    return fmt.Sprintf("The new user %s is signed up and activated", newUser.Username), nil
}

`restate.Call`を使用して、他のRestateサービスを呼び出すことができます。これらのリクエストは、 dokładnie raz 実行されることが保証されています。

`restate.Awakeable`を使用すると、発生する任意のイベントを待つことができます。Awakeable IDを提供するRestateにHTTPリクエストを送信するだけでリクエストを完了できます。この例では、メールにはAwakeable IDを含むリンクが埋め込まれており、ユーザーが確認ボタンをクリックすると完了します。

`restate.Run`を使用すると、任意のコードを実行し、結果をメモすることができます。クラッシュした場合、Restateはそのコードチャンクを再実行せず、保存された結果を読み込んで後続の操作に使用します。

HTTPサービスを開始し、Knativeでデプロイする

HTTPを使用してサービスを公開するには

func main() {
    // Read PORT env injected by Knative Serving
    port := os.Getenv("PORT")
    if port == "" {
        port = "9080"
    }
    bindAddress := fmt.Sprintf(":%s", port)

    // Bind services to the Restate HTTP/2 server
    srv := server.NewRestate().
        Bind(restate.Reflect(&userObject{})).
        Bind(restate.Reflect(&signupService{}))

    // Start HTTP/2 server
    if err := srv.Start(context.Background(), bindAddress); err != nil {
        slog.Error("application exited unexpectedly", "err", err.Error())
        os.Exit(1)
    }
}

これで、`ko`などのツールを使用してコンテナイメージをビルドできます

$ ko build main.go -B

そして、`kn`でデプロイします

$ kn service create signup \
  --image $MY_IMAGE_REGISTRY/main.go \
  --port h2c:8080

リクエストを送信する前に、Restateに新しいサービスデプロイメントについて通知する必要があります

$ restate deployments register http://signup.default.svc

これで完了です!リクエストを送信する準備が整いました

$ curl http://localhost:8080/Signup/Signup --json '{"username": "slinkydeveloper", "name": "Francesco", "surname": "Guardiani", "password": "Pizza-without-pineapple"}'

注:コード例の一部は簡潔にするために省略されています。詳細と`kind`を使用してローカルで実行する方法については、完全な例をご覧ください。

私たちはあなたをサポートします

仮に、SignupフローにおけるsendEmail関数が、初回サインアップ時に失敗したとしましょう。何が起きるでしょうか?

Restateを使用しない場合、ループ内でsendEmailの実行を数回リトライする必要があります。しかし、sendEmailのリトライ中に、**サインアップサービスがクラッシュしたり停止したり**したらどうなるでしょうか?その場合、サインアップの進捗状況を追跡できなくなり、ユーザーが次回F5キーを押したときに、以前のサインアップの状態を再構築したり破棄したりするためのロジックが必要になります。

Restateを使用すると、sendEmailが失敗した場合、自動的にリトライされます。また、以前に実行されたすべての操作(この場合はUser/Initializeハンドラーの呼び出し)は再実行されず、結果値のみが復元されます。

これは、RestateのDurable Execution Engineのおかげです。このエンジンはアプリケーションの進捗状況を記録し、クラッシュが発生した場合には、最後に中断された時点から再開します。さらに、Restateは、進捗が不可能な場合(例えば、長時間のスリープ中や、別のサービスからの応答を待機している場合など)に実行を中断することができます。しかも、ビジネスロジックを異なるハンドラーのシーケンスに分割することなく、これらすべてを実現します。そうです、お察しの通り、**待機中にKnativeサービスをゼロにスケールダウンできるのです!**

次へ

この記事では、Restateを使用してステートフルエンティティとシンプルなオーケストレーションフローを構築し、Knativeにデプロイする方法を説明しました。

RestateとKnativeを組み合わせることで、ステートフルアプリケーションの開発の容易さとサーバーレスアプリケーションの利点を両立させることができます。

RestateとKnativeを組み合わせることで、さらに多くのことが実現できます。ワークフローサガステートフルイベント処理 (Knative Eventingも組み合わせる!) など、アイデアは尽きません。Restateの例で、何が構築できるかを理解してください: https://github.com/restatedev/examples

サイトのトラフィックを理解するために、アナリティクスとCookieを使用しています。お客様のサイトの使用に関する情報は、その目的のためにGoogleと共有されます。詳細はこちら