こんにちは、寒さ&リモート勤務で運動量が激減しているWebエンジニアの本間です。 平日は1日1000歩を切ってるので、なんとかしたい...。最近引っ越しして近くに白鳥がいる池があるので、朝散歩しようかなーと思ってたりなかったり。
さて、ユニファでは非同期処理にAmazon SQSを使うことが多くなってきており、開発に利用したりいくつかブログも書いたことがありました。
しかし、利用するのは毎回標準キューのみでFIFOキューは利用したことがありませんでした。 今回、FIFOキューに関して調査する機会があり、実際にコードを書いたりして挙動を確かめることができたので、その内容を紹介しようと思います。
比較
まずはドキュメントベースで標準キューとFIFOキューの比較をしてみます。
項目 | 標準キュー | FIFOキュー | 説明 |
---|---|---|---|
コスト | ○ | × | 標準キューと比較してFIFOキューは$0.1(2-3割)割高 |
同一メッセージの重複配信 | × | ○ | 標準キューは「少なくとも1回の配信」と重複配信の可能性あり。一方、FIFOキューは1回だけの配信を保証。 |
メッセージの順序保証 | × | ○ | 標準キューはできるだけ順序を維持するが、順序保証なし。一方、FIFOキューは到達順での配信が保証される。 |
送信時の重複排除 | × | ○ | 標準キューには送信時の重複排除の機能がないのに対し、FIFOキューではメッセージ重複排除 IDを利用して重複排除可能。 |
スループット | ○ | × | 標準キューは1秒あたりほぼ無限のAPIコールをサポート。一方、FIFOキューは1秒あたり300APIコールまでサポート。高スループットFIFOキューにすることで1秒あたり1500-3000APIまで引き上げ可能。 |
その他の制限 | ○ | × | FIFOキューは一部のAWSサービスと互換性がない場合あり、メッセージごとの遅延は利用不可。 |
機能面で特徴的なのは「同一メッセージの重複配信」、「メッセージの順序保証」、「送信時の重複排除」あたりでしょうか。 この辺を実際に動かしながら、確認していこうと思います。
詳細
FIFOキューには、標準キューにはない 新しいパラメーター がいくつか存在します。 その中でAPI呼び出し時に指定する以下の3つのパラメーターに着目して、それぞれの用途を調査してみます。
- メッセージグループID
- メッセージ重複排除ID
- 受信リクエスト試行 ID
メッセージグループID
FIFOキューにメッセージを送信するためには、メッセージグループIDは必須のパラメーターになっています。 メッセージグループIDの説明は以下になります。
メッセージが特定のメッセージグループに属することを指定するタグ。同じメッセージグループに属するメッセージは、常に、メッセージグループに対する厳密な順序で 1 つずつ処理されます(ただし、異なるメッセージグループに属するメッセージは、順序どおりに処理されない場合があります)。
複数のメッセージで同じIDを指定するか、異なるIDにするかで挙動が変わってくるようです。
そこで、メッセージグループIDを全て別々にしたケースと同じにしたケースで動きを確認してみようと思います。
(コードは全てRubyで書いてます。 aws-sdk-sqs
gemをインストールしておいてください)
まずメッセージグループIDを全て別々にしたケースです。
# AWSの認証関連の情報(例: AWS_PROFILE)を環境変数にセット require 'aws-sdk-sqs' require 'securerandom' require 'logger' QUEUE_URL = 'https://sqs.ap-northeast-1.amazonaws.com/aws_acccount_id/sqs_queue_name.fifo' logger = Logger.new(STDOUT) client = Aws::SQS::Client.new({ region: 'ap-northeast-1' }) # 30メッセージを別々のメッセージグループIDで送信 30.times do |i| message = { queue_url: QUEUE_URL, message_body: "test-#{i}", message_group_id: SecureRandom.uuid, message_deduplication_id: SecureRandom.uuid, } client.send_message(message) end # 5スレッドで並列にpolling、1メッセージにつき1秒sleep exit_from_loop = false threads = [*1..5].map do |i| Thread.new do thread_name = "thread-#{i}" logger2 = Logger.new(STDOUT) client = Aws::SQS::Client.new({ region: 'ap-northeast-1' }) loop do resp = client.receive_message({ queue_url: QUEUE_URL, wait_time_seconds: 5, }) unless resp.messages.empty? logger2.info(thread_name) { resp.messages[0].body } sleep 1.0 client.delete_message({ queue_url: QUEUE_URL, receipt_handle: resp.messages[0].receipt_handle, }) end break if exit_from_loop end logger2.info(thread_name) { "exit." } end end # 全てのメッセージを処理したら exit_threads = true
実行してみると、標準出力にはこんな感じで出力されました。
I, [2022-01-28T11:55:56.982842 #44341] INFO -- thread-5: test-0 I, [2022-01-28T11:55:56.983863 #44341] INFO -- thread-1: test-1 I, [2022-01-28T11:55:56.992731 #44341] INFO -- thread-4: test-3 I, [2022-01-28T11:55:56.993424 #44341] INFO -- thread-2: test-2 I, [2022-01-28T11:55:57.002554 #44341] INFO -- thread-3: test-4 I, [2022-01-28T11:55:58.052149 #44341] INFO -- thread-4: test-5 I, [2022-01-28T11:55:58.052831 #44341] INFO -- thread-1: test-6 : I, [2022-01-28T11:56:02.330709 #44341] INFO -- thread-5: test-28 I, [2022-01-28T11:56:02.332263 #44341] INFO -- thread-3: test-29 I, [2022-01-28T11:56:02.362004 #44341] INFO -- thread-4: test-26
5行ほぼ同時に出力→1秒待つ→5行出力→...、という形で出力されました。 表示されたログを見る限り、5スレッドで並列に処理できていそうです。 また、順序は厳密に保証されているわけではなく、ある程度ばらつきがありそうです。
次にメッセージグループIDを全て同じにした場合を試してみます。
# ここまで一緒 # 30メッセージを同じのメッセージグループIDで送信 30.times do |i| message = { queue_url: QUEUE_URL, message_body: "test-#{i}", message_group_id: "same_message_group_id", # <- ここを固定 message_deduplication_id: SecureRandom.uuid, } client.send_message(message) end # ここから一緒
I, [2022-01-28T11:57:26.358427 #44341] INFO -- thread-3: test-0 I, [2022-01-28T11:57:27.435053 #44341] INFO -- thread-1: test-1 I, [2022-01-28T11:57:28.492766 #44341] INFO -- thread-5: test-2 I, [2022-01-28T11:57:29.542155 #44341] INFO -- thread-4: test-3 I, [2022-01-28T11:57:30.584310 #44341] INFO -- thread-2: test-4 I, [2022-01-28T11:57:31.619432 #44341] INFO -- thread-3: test-5 I, [2022-01-28T11:57:32.684819 #44341] INFO -- thread-1: test-6 : I, [2022-01-28T11:57:54.605926 #44341] INFO -- thread-5: test-27 I, [2022-01-28T11:57:55.647280 #44341] INFO -- thread-4: test-28 I, [2022-01-28T11:57:56.690512 #44341] INFO -- thread-2: test-29
1行出力→1秒待つ→1行出力→...、となりました。このことから並列ではなく、直列に処理していそうです。 また先頭のメッセージが処理完了(削除APIが呼び出される)されるまでは、次のメッセージも処理されないことがわかります。 さらに順序も保証されており、メッセージを追加した順に処理されていることがわかりました。
以上から、 FIFOになるのは「同一のメッセージグループIDを指定した場合のみ」ということがわかりました。 メッセージグループIDが別々の場合、並列処理可能ですが、順序にはある程度ぱらつきが出るようです。
メッセージ重複排除ID
次にメッセージ重複排除IDの挙動を確かめてみます。 こちらもメッセージ送信時にセットするパラメーターですが、任意のパラメーターになります。
送信されたメッセージの重複排除に使用されるトークン。特定のメッセージ重複除外 ID を持つメッセージが正常に送信された場合、同じメッセージ重複除外 ID で送信されたメッセージは正常に受け入れられますが、5 分間の重複排除の間は配信されません。 メッセージの重複排除は、個々のメッセージグループではなく、キュー全体に適用されます。 Amazon SQS は、メッセージが受信され、削除されても、メッセージの重複除外 ID を追跡し続けます。
これを利用することで、複数のproducerから同一のメッセージが重複して登録することを防ぐことができるようです。
# AWSの認証関連の情報(例: AWS_PROFILE)を環境変数にセット require 'aws-sdk-sqs' require 'securerandom' require 'logger' QUEUE_URL = 'https://sqs.ap-northeast-1.amazonaws.com/aws_acccount_id/sqs_queue_name.fifo' logger = Logger.new(STDOUT) client = Aws::SQS::Client.new({ region: 'ap-northeast-1' }) # 30メッセージを同じメッセージグループID、メッセージ重複排除IDで送信。 30.times do |i| message = { queue_url: QUEUE_URL, message_body: "test-#{i}", message_group_id: "same_message_group_id", message_deduplication_id: "same_message_deduplication_id", } client.send_message(message) end # メッセージ数を取得 resp = client. get_queue_attributes({ queue_url: QUEUE_URL, attribute_names: ["ApproximateNumberOfMessages"], }) logger.info(resp.attributes["ApproximateNumberOfMessages"])
実行結果はこちら
I, [2022-01-28T16:33:09.177503 #49041] INFO -- : 1
30回APIを呼び出しましたが、登録されているメッセージ数は「1」だけです。 メッセージ重複排除IDにより、重複呼び出しが排除されていることを確認できました。
なお、このパラメーターを指定しない場合、メッセージBodyのSHA-256ハッシュ値が利用されるようです。 そのため、このパラメーターを指定せず、かつ偶然同じBodyのメッセージがSQSに届いた場合、意図せず重複排除されてしまう可能性があります。 メッセージ重複排除IDは、特に理由がなければ指定しておいた方が安全そうです。
また、メッセージ重複排除IDが有効なのは5分間だけで、それ以上経過した場合は重複登録できてしまうため注意が必要です。
受信リクエスト試行ID
これまで紹介した2つのパラメーターと異なり、受信リクエスト試行IDはメッセージ受信API時に指定するパラメーターです。
受信リクエスト試行 ID は、の重複排除に使用されるトークンです。ReceiveMessageを呼び出します。 SDK と Amazon SQS の間に接続性の問題が生じるような長期のネットワーク停止中は、受信リクエスト試行 ID を指定して、SDK オペレーションが失敗した場合に同じ受信リクエスト試行 ID を使用して再試行するのがベストプラクティスです。
説明がわかりづらいかもしれません。APIドキュメントの説明の方がわかりやすかったので、リンクを設置しておきます。
可視性タイムアウトが「30秒」のFIFOキューで、このパラメーターの有無の違いを見てみます。 まず、受信リクエスト試行IDなしで試してみます。
require 'aws-sdk-sqs' require 'securerandom' require 'logger' QUEUE_URL = 'https://sqs.ap-northeast-1.amazonaws.com/aws_acccount_id/sqs_queue_name.fifo' logger = Logger.new(STDOUT) client = Aws::SQS::Client.new({ region: 'ap-northeast-1' }) # 同じメッセージグループIDで、メッセージを3つ送信 3.times do |i| message = { queue_url: QUEUE_URL, message_body: "test-#{i}", message_group_id: "same_message_group_id", message_deduplication_id: SecureRandom.uuid, } client.send_message(message) end # メッセージを1つ受信 resp = client.receive_message({ queue_url: QUEUE_URL, }) logger.info(resp.messages.length) logger.info(resp.messages[0]&.body) # ここでエラーが起きたと仮定。削除APIを呼び出さない。 # エラーのリトライで再度メッセージを1つ受信 resp = client.receive_message({ queue_url: QUEUE_URL, }) # エラーが起きたメッセージも、後続のメッセージもいずれも取得できない logger.info(resp.messages.length) logger.info(resp.messages[0]&.body)
上記を実行すると以下のように出力されます。
I, [2022-01-28T15:11:27.101653 #47930] INFO -- : 1 I, [2022-01-28T15:11:27.101713 #47930] INFO -- : test-0 I, [2022-01-28T15:11:34.095674 #47930] INFO -- : 0 I, [2022-01-28T15:11:34.095727 #47930] INFO -- : nil
見ての通り、可視性タイムアウトの30秒を経過するまでは、3つのメッセージ全てが存在しないように見えています。 このようにすることで、1通目のメッセージが処理される前に、2通目のメッセージが処理されることを防いでくれます。 その後、可視性タイムアウトの30秒経過後、1通目のメッセージが再度取得できるようになり、処理のリトライが可能になります。
この挙動は、タイムアウトが短い時間の場合、特に問題ないと思います。 しかし、タイムアウトに長い時間を設定している場合、リトライまでの時間が長くなり、かつその間、同一のメッセージグループIDのメッセージは処理待ちで滞留してしまうことになります。 これを避けるため、可視性タイムアウトを経過する前にリトライを行うために利用するのが「受信リクエスト試行ID」になります。
受信リクエスト試行IDありのテストコードです。
# メッセージ送信までは同じ # 受信リクエスト試行ID receive_request_attempt_id = SecureRandom.uuid # 受信リクエスト試行ID付きで、メッセージを1つ受信 resp = client.receive_message({ queue_url: QUEUE_URL, receive_request_attempt_id: receive_request_attempt_id, # <- 追加 }) logger.info(resp.messages.length) logger.info(resp.messages[0]&.body) # ここでエラーが起きたと仮定。削除APIを呼び出さない。 # エラーのリトライで、受信リクエスト試行ID付きで再度メッセージを1つ受信 resp = client.receive_message({ queue_url: QUEUE_URL, receive_request_attempt_id: receive_request_attempt_id, # <- 追加 }) # ちゃんとメッセージが取得できている! logger.info(resp.messages.length) logger.info(resp.messages[0]&.body) # 削除も可能! client.delete_message({ queue_url: QUEUE_URL, receipt_handle: resp.messages[0].receipt_handle, }) # 2通目のメッセージ取得可能 receive_request_attempt_id = SecureRandom.uuid resp = client.receive_message({ queue_url: QUEUE_URL, receive_request_attempt_id: receive_request_attempt_id, }) logger.info(resp.messages.length) logger.info(resp.messages[0]&.body)
I, [2022-01-28T15:22:47.072771 #48120] INFO -- : 1 I, [2022-01-28T15:22:47.072849 #48120] INFO -- : test-0 I, [2022-01-28T15:22:55.002062 #48120] INFO -- : 1 I, [2022-01-28T15:22:55.002146 #48120] INFO -- : test-0 I, [2022-01-28T15:23:17.471713 #48120] INFO -- : 1 I, [2022-01-28T15:23:17.471775 #48120] INFO -- : test-1
これが受信リクエスト試行IDの利用方法になります。 ピンポイントな利用方法になりますが、リトライまでの時間やそれまでのメッセージの滞留にシビアなシステムを構築しないといけない場合、このパラメーターを利用できるかもしれません。
まとめ
今回、Amazon SQSのFIFOキューに関して、実際に使って挙動を確認してみました。
常にFIFOになるのではなく、「同一のメッセージグループIDが指定された場合のみ」ということがわかりました。 メッセージグループIDが異なる場合、標準キューと同じ感覚で並列処理することも確認できました。 常にFIFOキューとして扱いたい場合、常に同じメッセージグループIDを指定すればOKですし、この仕様は扱いやすいなと感じました。
今までSQSといえば特に考えずに標準キューを選んでいましたが、今回の調査でFIFOキューも候補に入れられるようになりました。適材適所でそれぞれを使い分けていきたいです。
以上になります、参考になったら幸いです。
ユニファでは、RubyもAWSも触りたい仲間を募集中です! ご興味のある方は是非、気軽にご連絡ください!
最後までご覧いただき、ありがとうございました。