ユニファ開発者ブログ

ユニファ株式会社プロダクトデベロップメント本部メンバーによるブログです。

SQS + Shoryukenのメッセージ重複配信問題にMiddlewareで対応する

まだ鍋シーズン序盤だというのに、ごま豆乳鍋を3日に1回は食べているWebエンジニアの本間です。

最近、Railsアプリケーションのバックグラウンドジョブのworkerとして Amazon SQS, Shoryuken, Active Job を組み合わせて使うことが増えました。 過去にも以下のようなエントリを書いてます。

tech.unifa-e.com

今回は、Shoryukenを使ったバックグラウンド処理を実運用で使用するにあたって1つ悩ましい問題があり、それをMiddlewareを使って解決した話をご紹介します。

メッセージ重複配信問題

Shoryukenは、ジョブストアにAmazon SQSを使用しています。 Amazon SQSのキューには、「標準キュー」と「FIFOキュー」の2種類があります。 多くの場合、1秒あたりのAPI呼び出し回数に制限のない「標準キュー」を選ぶケースが多いかと思います。

その「標準キュー」の特徴の一つに、「1つの同じメッセージが重複配信される可能性がある」というものがあります。 公式ドキュメント に下記のような記載があります。

少なくとも 1 回の配信 : ...メッセージコピーをもう一度受け取る場合があります。アプリケーションがべき等になるよう設計する必要があります...

特に対策を入れない場合、まれに1つのジョブが2回以上実行されてしまう可能性があります。 ジョブがべき等でなく、アプリケーションが重複配信を許容できない場合、なんらかの対策が必要になります。 (「FIFOキュー使えば?」と思われるかもしれませんが、FIFOキューはAPIの1秒間あたりの呼び出し回数に制限があるため、今回は対象外としています)

いくつか対策が考えられますが、今回はShoryukenのMiddlewareとRedisを使って排他制御の仕組みを実現して対応してみました。

対策詳細

今回、テストに使用したコードは以下のリポジトリにて公開しています。 ローカルでもテストしてみたい方はcloneして試していただければと思います。

https://bitbucket.org/unifa-public/shoryuken_prevent_dup_job_exec_test/src/master/

Shoryuken Middleware

Shoryuken Middlewareは、公式の下記ドキュメントに記載があります。

https://github.com/phstc/shoryuken/wiki/Middleware

Rack Middlewareと同じ仕組みで、ShoryukenでJobを実行する前後に共通の処理を差し込むことができます。

公式でデフォルトで組み込まれているMiddlewareもあります。下記コードで確認できます。

https://github.com/phstc/shoryuken/blob/master/lib/shoryuken/options.rb#L164-L175

デフォルトのMiddlewareの中では Shoryuken::Middleware::Server::AutoDelete が重要です。 他のMiddlewareはActiveJobの場合はほぼ関係ないのですが、このMiddlewareはActiveJobの場合、有効になっています。 このMiddlewareは、「ActiveJobがエラーをraiseしない場合、SQSのメッセージを自動で削除する」という処理を行います。 この挙動はジョブのリトライに大きな影響があるため、このMiddlewareとの関係も考えて実装する必要があります。

今回、下記のような流れで排他制御する Middleware::Lock を作ろうと思います。

f:id:ryu39:20201127155601p:plain

排他制御

今回のメインパートです。排他制御を行うMiddlewareを以下のように実装しました。

# lib/shoryuken/middleware/server/lock.rb

module Shoryuken::Middleware::Server
  class Lock
    include ::Shoryuken::Util

    class << self
      attr_accessor :redis_url
    end

    def call(_worker, queue, sqs_msg, _body)
      redis = ::Redis.new(url: self.class.redis_url)

      lock_key = "shoryuken_lock/#{queue}/#{sqs_msg.message_id}"

      locked = redis.set(lock_key, 'lock', ex: 60, nx: true)
      unless locked
        logger.info("Cannot get lock. lock_key: #{lock_key}")
        return
      end
      logger.debug("Locked. lock_key: #{lock_key}")

      begin
        yield
      rescue ::StandardError => _e
        redis.del(lock_key)
        logger.debug("Unlocked. lock_key: #{lock_key}")
        raise
      else
        redis.set(lock_key, 'processed', ex: 60)
        logger.debug("Expand lock expiration. lock_key: #{lock_key}")
      end
    end
  end
end

上から順に説明していくと、まずRedisを使って排他制御を行うためRedisのClientを生成しています。(この部分は毎回newするのではなく、newしたものを使いまわせるとよいかもしれません)

排他制御のロックのキーに、キューのURLとSQSのメッセージIDを含めています。 これにより、「同一キューの同一メッセージ」のみを排他制御の対象にしています。

Redisの :nx オプション付きのsetを使って排他制御をしています。 :nx オプションを使った排他制御は、すでに他の方がよいエントリを書いてくださっているため省略します。 また、予期せずロックが残り続けてしまうことを避けるため :ex オプションでタイムアウトを指定しています。(今回は60秒) ここは、SQSのvisibility timeoutと値を合わせるとよいかもしれません。

ロックが取得できない場合、ジョブは実行せずそのままreturnしています。 ロックが取得できた場合、yieldでジョブを実行しています。

もしジョブ実行中に予期せぬエラーが発生した場合、ロックを削除しています。 これはSQSでvisibility timeout経過後、自動でリトライが実行されますが、その時にロックが残っているとリトライがスキップされしまうためです。

またジョブが正常終了した場合、ロックの有効期限を延長しています。 これはジョブが正常終了した後にメッセージが重複配信された場合でも、実行をスキップするために対応しています。

このようなジョブ正常終了後の重複配信を防ぐ場合、ロックはずっと残しておくのが正しいです。 ただ、ずっと残しておくとRedis内のロックが溜まり続けてしまいます。 ロックが溜まり続けるといつか予期せぬ問題が発生するかもしれないため、「これぐらい待てばもう重複配信されないだろう」という期間を有効期限にセットし、ロックが溜まり続けないようにしています。 ここの有効期限は実運用を行って調整する必要があるかもしれません。

Middlewareの差し込み

Middlewareを実装しただけでは使用されません。 initializerで実装したMiddlewareを差し込む必要があります。

# config/initializers/shoryuken.rb

::Shoryuken.configure_server do |config|
  # snip...
  
  require 'shoryuken/middleware/server/lock'
  ::Shoryuken::Middleware::Server::Lock.redis_url = 'redis://localhost:6379/0'
  config.server_middleware do |chain|
    chain.insert_before(::Shoryuken::Middleware::Server::AutoDelete,
                        ::Shoryuken::Middleware::Server::Lock)
  end
end

Shoryuken::Middleware::Server::AutoDelete の前に差し込んでいます。 これで当初想定したフローで排他制御ができたと思います。

また、エラー発生時のフローは以下のようになります。

f:id:ryu39:20201127155620p:plain

エラー発生時にロックを開放しているので、問題なくリトライできるはずです。

公開リポジトリにローカルで簡単にテストした結果を貼り付けてあるので、よかったらご覧ください。

https://bitbucket.org/unifa-public/shoryuken_prevent_dup_job_exec_test/src/master/#markdown-header-test-exclusive-lock

問題点

考えられるこの手法の問題点は以下になります。

  • (こういった現象が発生するか不明ですが)非常に大きな時間差で重複配信されたメッセージに対応できない。
  • 排他制御が不要なJobだったとしても、強制的に排他制御されてしまう。
  • Redisへのアクセスが毎回発生するため、その分パフォーマンスが低下する。場合によっては、Redisが処理性能のボトルネックになる可能性がある。

これらの問題点が許容できるのであれば、有力な選択肢の一つになるかなと思います。

まとめ

ShoryukenのMiddlewareとRedisを使うことで、それほど大きな実装の負荷なく重複メッセージへの対応を行うことができました。 Middlewareの機能は今回紹介した排他制御だけではなく、ロギングやエラー処理などにも応用が効きますので、気になる方はチェックしてもらえればと思います。

それでは、最後までご覧いただきありがとうございました。

We are hiring!!!

現在、ユニファでは技術職を絶賛募集中です! 今回の記事のようなことを業務でやりたい方は、下記ページからご応募お待ちしています。 unifa-e.com