ユニファ開発者ブログ

ユニファ株式会社プロダクトデベロップメント本部メンバーによるブログです。

Amazon SQSとShoryukenを使ったバックグラウンド処理を検討してみる

Webエンジニアのほんまです。

弊社ではインフラの管理コストを極力減らすため、AWSのマネージドサービスへの利用を推奨しています。(例:RDS -> Aurora)

その中の一つに「バックグラウンドジョブに使うキューストアに Amazon SQS を使う」というものがあります。 SQSを使うことで急な負荷増減やデータ消失への対応といった悩ましい問題から解放されます。

Ruby on Rails で Amazon SQSをストアとするバックグラウンドジョブフレームワークだと Shoryuken がよく使われている印象です。 今回、Amazon SQS と Shoryuken でバッググラウンド処理を動かす方法、そして本番運用を見据えて検討が必要な事項を調査したのでシェアしたいと思います。

動かしてみる

早速 Amazon SQS と Shoryuken を使ったバックグラウンド処理を動かしてみます。

事前準備として、AWSアカウントを作成し、API呼び出し用の以下の情報を発行しておきます。

  • アクセスキーID
  • シークレットアクセスキー

キューの作成

まずはAmazon SQSにキューを作成します。 「ジョブストア用のキュー homma-tech-blog-test 」を「エラー用のデッドレターキュー(DLQ) homma-tech-blog-test-dlq 」の2つのキューを作成しました。

f:id:ryu39:20200603113237p:plain

またジョブストア用のキューの設定画面を開き、DLQ用の設定を追加しています。

f:id:ryu39:20200603113422p:plain

Railsアプリケーションの作成

まずはRailsアプリケーションを作成します。

$ rails new sqs_shoryuken_test

$ cd sqs_shoryuken_test

次に aws-sdk-sqsshoryuken gemをインストールします。 Gemfile に以下の2行を追加し、 bundle install を実行します。

diff --git a/Gemfile b/Gemfile
index 0026f33..195c31a 100644
--- a/Gemfile
+++ b/Gemfile
@@ -52,3 +52,6 @@ end

 # Windows does not include zoneinfo files, so bundle the tzinfo-data gem
 gem 'tzinfo-data', platforms: [:mingw, :mswin, :x64_mingw, :jruby]
+
+gem 'aws-sdk-sqs'
+gem 'shoryuken'

Shoryuken の設定ファイル( config/shoryuken.yml )を追加します。 queues の設定で、あらかじめ作成しておいたSQSのキュー名を指定するのがポイント。

delay: 25
concurrency: 3
queues:
  - homma-tech-blog-test

ActiveJob のバックエンドに Shoryuken を指定します。

diff --git a/config/application.rb b/config/application.rb
index d47f880..2bc7ac6 100644
--- a/config/application.rb
+++ b/config/application.rb
@@ -15,5 +15,7 @@ module SqsShoryukenTest
     # Application configuration can go into files in config/initializers
     # -- all .rb files in that directory are automatically loaded after loading
     # the framework and any gems in your application.
+
+    config.active_job.queue_adapter = :shoryuken
   end
 end

ここまでで最低限の設定は完了です。 あと、テスト用のActiveJobを1つ追加します。 queue_as でキューの名前を指定しないといけないことに注意です。

class PutStdoutJob < ::ApplicationJob
  queue_as :'homma-tech-blog-test'

  def perform(message)
    puts message
  end
end

ここまでくればRails側からSQSにJobを追加できるようになっています。 AWS関係の環境変数を設定後、Railsコンソールを使って追加してみます。

$ export AWS_REGION=ap-northeast-1
$ export AWS_ACCESS_KEY_ID=your_aws_access_key_id
$ export AWS_SECRET_ACCESS_KEY=your_aws_secret_access_key

$ bin/rails c
Running via Spring preloader in process xxxxx
Loading development environment (Rails 6.0.3.1)

irb(main):001:0> PutStdoutJob.perform_later("Hello, SQS and Shoryuken!!")
Enqueued PutStdoutJob (....)

上記の操作の後、SQSの管理画面を見るとメッセージが1つ追加されていることがわかります。

f:id:ryu39:20200603143059p:plain

ワーカープロセスの起動

次はShoryukenプロセスを立ち上げて、SQSに追加したジョブを取得し、実行してみます。

$ bundle exec shoryuken -C config/shoryuken.yml -R
# :
# => Hello, SQS and Shoryuken!!
# :

ジョブをpushする時に指定したメッセージが表示され、無事ジョブが実行できたことが確認できます。

また、この後SQSの管理画面を確認するとキューのメッセージ数が0になっており、Shoryuken が取得したメッセージがSQSから削除されています。

エラー処理の確認

続けてエラー時の挙動も確認しておきます。 常にエラーをraiseするActiveJobクラスを作成し、これをSQS経由でShoryukenに実行させてみます。

class ErrorJob < ::ApplicationJob
  queue_as :'homma-tech-blog-test'

  def perform(*_args)
    puts 'Error!'
    raise 'Error!'
  end
end
# ErrorJob を push
$ rails runner 'ErrorJob.perform_later'

# Shoryuken で 処理開始
$ bundle exec shoryuken -C config/shoryuken.yml -R
# :
# => Error!
# => Error!
# => Error!
# => Error!
# => Error!(これで最後)

約30秒おきに、計5回(リトライは4回)実行されています。 5回実行しても成功しなかったメッセージはDLQに移動しています。

f:id:ryu39:20200603145544p:plain

DLQの設定をしている場合、エラー発生時は自動で数回リトライが実行され、それでも成功しない場合、DLQにメッセージを移動するという挙動になります(DLQの設定をしない場合、メッセージの有効期限まで永遠にリトライされる)。 この挙動は ActiveJob や Shoryuken は全く関与しておらず、SQS自体に備わっている機能です。

「約30秒」や「計5回」という値は、キューの設定(Default Visibility Timeout, Maximum Receives)で変更可能です。

これで Amazon SQS と Shoryuken を使ったバックグラウンドジョブ処理の大まかな流れが理解できました。 次は、これらを本番で運用する際に、検討しなければいけない事項をまとめて紹介します。

要検討事項

SQS と Shoryuken で分けて考えてみます。

Amazon SQS

キューの種類

Amazon SQSには2種類のキューがあります。

Standard Queueはほぼ制限なしのスループットが実現可能ですが、「メッセージの順序が保証されない」「1つのメッセージが、2回以上配信される恐れがある」という問題があります。 一方、FIFO Queueは順序、および1メッセージは必ず1回だけの配信が保証されますが、最大1秒間に300回までのAPI呼び出しとパフォーマンス面で若干の不安があります。

実行するバックグランド処理の要件に合わせて、「どちらのキューを使うか?」という検討が必要です。

基本的には「Standard Queue」で問題ないと思いますが、順序やメッセージ配信数が厳密に保証されないといけない場合、「FIFO Queue」を使うのが良いと思います。 また、後述しますが Shoryuken は複数のキューをフェッチできるため、2種類のキューを組み合わせて使用することもできます。

Default Visibility Timeout

Default Visibility Timeout(デフォルト可視性タイムアウト)の値は、SQSのエラー&リトライ処理を考える上で非常に重要です。 詳細は 公式ページ を確認してもらうとして、以下の2つの条件を満たす値をセットする必要があります。

  • 間違ってリトライされないように十分長い時間
  • エラー発生時のリトライ開始が遅すぎないぐらいの十分短い時間

デフォルトは30秒になっています。 どうしても都合のよい時間が見つからない場合、 Shoryuken の auto_visibility_timeout の機能を使ってもよいかもしれません。

https://github.com/phstc/shoryuken/wiki/Worker-options#auto_visibility_timeout

この機能を使用した場合、もし時間内に処理が成功しなくても、 Shoryuken が自動で Visibility Timeout を延長してくれます。 ただし、複雑な挙動をしていると考えられるため、正しく動作しないかもしれないです。

デッドレターキュー

エラー時の挙動で確認したデッドレターキュー(DLQ)の機能を使うかどうか、使った場合の運用に関しても検討が必要です。

特別な理由がない限り、DLQは使用すべきだと思います。 使用しない場合、メッセージの有効期限まで永遠にリトライ処理が繰り返される可能性があります。 これによりシステムのスループットが下がったり、余計なコストが発生する可能性があります。

DLQを使用する場合、以下の事項もあらかじめ検討しておく必要があります。

  • DLQの有効期限、maxReceiveCount はどうするか?
  • DLQにメッセージが移動されたことをどうやって検知するか?
  • DLQに移動されたメッセージをどのように扱うか?

遅延実行

SQSでの遅延実行(未来の時間を指定して実行)には制限があります。 Standard Queue のみ Message timer の機能を使うことで最大15分間の遅延実行はできますが、それを超えた値を指定することはできません。

irb(main):001:0> PutStdoutJob.set(wait: 1.minute).perform_later("1 minute later") # => OK
irb(main):002:0> PutStdoutJob.set(wait: 15.minutes).perform_later("15 minutes later") # => OK
irb(main):003:0> PutStdoutJob.set(wait: 16.minutes).perform_later("16 minutes later") # => NG(raise error)
RuntimeError (The maximum allowed delay is 15 minutes)

もし15分より大きな値で遅延実行させたい場合、SQS & Shoryuken の組み合わせでは解決できないため、何らかのスケジューラーを間に入れるか、SQSを使わないようにする必要があります。

その他の制限事項

  • メッセージの保持期間は最大14日間
  • 1メッセージのサイズは最大256KB

Shoryuken

マルチスレッドベース

Shoryuken は1プロセス複数スレッドで動作します。 そのため、2点ほど注意することがあります。

  • 同じオブジェクトを複数スレッドで共有する
  • 2コア以上のCPUがあっても使いきれない

前者はマルチスレッドプログラミングでよくある問題です。 同じオブジェクトを複数スレッドで同時に使うことで、予期せぬ問題が発生する可能性があります。 基本的には、複数スレッドで同じオブジェクトを使わない(スレッド毎にインスタンスを生成)ようにし、どうしても必要な場合、排他処理を入れるのが良いと思います。

後者もRubyではよく知られた問題です。 基本的には1コアCPUのサーバーで動かし、処理性能をあげたい場合、サーバーの台数を増やす方法が管理も容易でオススメです。 2コア以上のCPUがあるサーバーで動かさないといけない場合、コア数に合わせて Shoryuken のプロセスも増やした方がよいです。

ActiveJob

ActiveJob を使うかどうかも検討が必要です。

使う場合、以下のメリットがあります。

  • バックエンドのフレームワークを切り替えが容易
  • Railsエンジニアが見慣れた書式、追加の学習が不要

一方、使わない場合、以下のメリットがあります。

  • JobごとにShoryuken独自のオプションが指定可能

重複や順不同を考慮した実装

特にキューに Standard Queue を選択した場合、1つのメッセージが重複して配信されたり、メッセージの順序が入力時と異なるなどの点を考慮して実装が必要です。

FIFO Queueの場合でも、Visibility Timeout 内で処理が終わらずメッセージが重複実行される可能性があるため、実装時に考慮しておいた方が良さそうです。

リトライ処理

リトライ処理に関しては、SQSのVisibility Timeoutを使ったリトライに任せて、アプリケーション側では何もしないのが無難です。 アプリケーション側でリトライ処理をすると、予期せぬ二重実行が発生する可能性があります。

リトライ間隔は、特に設定しなければSQSのDefault Visibility Timeoutの値になります。 exponential backoff(リトライ回数に応じて、次回の実行開始タイミングを遅らせる機能)がShoryukenにあるので、必要があれば設定して使うことができます。

キュー戦略

Shoryuken は複数キューのpollingができます。

例えば、以下のように指定することで Shoryuken は2つのキューを交互にフェッチするようになります。

queues:
  - queue      # Standard queue
  - queue.fifo # FIFO queue

queue_as でどちらのキューにpushするか指定できるので、厳密に実行したいジョブをFIFO Queueに、それ以外をStandard Queueにpushすることができます。

また、以下のようにキュー名とフェッチの優先度を指定することもできます。

queues:
  - [high, 9]
  - [middle, 3]
  - [low, 1]

上記の例だと low と比較して high は9倍、 middle は3倍の頻度でフェッチされます。 low より highmiddle が単位時間の処理数が上がるため、優先度をつけたジョブ処理を実施することができます。

ただし、 high が0件になるまで middlelow に手をつけない、ということは、単体の Shoryuken プロセスではできません。 high のみを専用に処理する Shoryuken プロセスを別で立ち上げるなどの代替が必要です。

「どういったキューを作成し」「どういった優先度でフェッチするか」は柔軟な設定が可能であるため、アプリケーションの要件に合わせて検討する必要があります。

その他

その他、細かい設定として以下の検討が必要です。

上記の値はAPIの呼び出し数に大きく影響するため、無駄なコストを抑えたい場合、しっかり検討する必要があります。

まとめ

今回、Amazon SQS と Shoryuken を使ったバッググラウンド処理に関して検討してみました。

これまで使っていた Redis & resque/sidekiq と異なる検討が必要な部分がいくつかあることがわかりました。 キューストアにSQSを採用するとインフラ管理コストの面でメリットが大きいので、これらの事項をしっかり検討しつつ、積極的に採用していきたいです。

以上になります。最後までご覧いただきありがとうございました。