こんにちは、ギフティでエンジニアをしている@gaaakusaunaです。ジムに行くのが寒い季節になってきました。寒いと外に出るのが億劫になりがちですが、その分学習時間が増えました(いいのか悪いのか)。そんな中、日頃業務で愛用しているSidekiqのとある仕組みを深掘る機会があったので、ここに書き記したいと思います。
はじめに
ギフティでは、Railsで非同期処理を行う際にSidekiqを使うケースが多く、私が所属するチームでもSidekiqを用いて実装を行っています。
Sidekiqには、ジョブの実行に失敗した場合に自動でリトライを行うAutomatic job retryという機能があります。
非常に便利な機能ですが、以下のようにperform
メソッド内で例外処理を書くと、Automatic job retryが実行されない問題に直面しました。
def perform(hoge_job_id) # ジョブ実行時の処理 Hoge.find(hoge_job_id).hoge! rescue => e # 任意の例外処理 ExceptionTracker.track(e) end
ドキュメントにはAutomatic job retryの概要は記載されているものの、なぜこのケースでリトライが動作しないのかという詳細までは書かれていません。 そこで、Sidekiqのソースコードを読み、Automatic job retryの実行ロジックについて調べてみました。
対象読者
- Sidekiqを使用したことがある方
- SidekiqのAutomatic job retryの仕組みについて関心がある方
本題
perform
で例外処理を書いたことで、Automatic job retryが実行されなくなったため、perform
が呼び出される前の処理に該当する部分のソースコードを追いながら、その仕組みを説明していきます。
そもそもperform
はどこで呼ばれているのか
まずはperform
がどこで呼ばれているかを把握することにしました。
調べたところ、Sidekiq::Processor#execute_job
がperform
を呼び出していることがわかりました。
# lib/sidekiq/processor.rb def execute_job(instance, cloned_args) instance.perform(*cloned_args) end
Sidekiq::Processor#execute_job
が実行されるまで
execute_job
は、perform
を呼び出す機能しか持っていなかったので、execute_job
が呼び出される前段階でAutomatic job retryが実装されていると予測しました。
そこで、execute_job
が呼び出されるまでの流れを調べたところ、
Sidekiq::Processor#process
が呼び出されるSidekiq::Processor#process
がSidekiq::Processor#dispatch
を呼ぶSidekiq::Processor#dispatch
のブロック処理でSidekiq::Middleware::Chain#invoke
が呼ばれるSidekiq::Middleware::Chain#invoke
のブロック処理でSidekiq::Processor#execute_job
が呼ばれる
となっていました。以下は該当箇所のソースコードです。
# lib/sidekiq/processor.rb def process(uow) jobstr = uow.job queue = uow.queue_name # Treat malformed JSON as a special case: job goes straight to the morgue. job_hash = nil begin job_hash = Sidekiq.load_json(jobstr) rescue => ex # 例外処理省略 end ack = false Thread.handle_interrupt(IGNORE_SHUTDOWN_INTERRUPTS) do Thread.handle_interrupt(ALLOW_SHUTDOWN_INTERRUPTS) do # `dispatch`を呼ぶ dispatch(job_hash, queue, jobstr) do |instance| # `invoke`が呼ばれる config.server_middleware.invoke(instance, job_hash, queue) do # `execute_job`が呼ばれる execute_job(instance, job_hash["args"]) end end ack = true rescue Sidekiq::Shutdown # 例外処理省略 end ensure if ack uow.acknowledge end end end
どこにAutomatic job retryのロジックが実装されているのか、execute_job
を呼び出しているSidekiq::Middleware::Chain#invoke
から順に見ていきました。
Sidekiq::Middleware::Chain#invoke
invoke
では、設定したミドルウェアがある場合traverse
で呼び出した後、yield
を実行しています。ない場合即座にyield
を実行しているので、ミドルウェアの設定がない場合は即座に Sidekiq::Processor#execute_job
が呼び出されます。
# lib/sidekiq/middleware/chain.rb def invoke(*args, &block) return yield if empty? chain = retrieve traverse(chain, 0, args, &block) end private def traverse(chain, index, args, &block) if index >= chain.size yield else chain[index].call(*args) do traverse(chain, index + 1, args, &block) end end end
コードを見た感じ、ここでAutomatic job retryの処理は行われていないので、Sidekiq::Processor#dispatch
を見ていきます。
Sidekiq::Processor#dispatch
dispatch
では、ログの設定やジョブのインスタンス作成等の複数の処理を実行した後、yield
を用いてSidekiq::Middleware::Chain#invoke
を呼び出しています。
# lib/sidekiq/processor.rb def dispatch(job_hash, queue, jobstr) @job_logger.prepare(job_hash) do @retrier.global(jobstr, queue) do @job_logger.call(job_hash, queue) do stats(jobstr, queue) do profile(job_hash) do @reloader.call do klass = Object.const_get(job_hash["class"]) instance = klass.new instance.jid = job_hash["jid"] instance._context = self @retrier.local(instance, jobstr, queue) do yield instance end end end end end end end end
ここで、dispatch
で呼び出されるメソッド群に注目すると、yield
の実行直前に@retrier.local
が呼び出されていること、またジョブのインスタンス作成前に@retrier.global
が呼び出されていることがわかります。この@retrier
に何が代入されているのか調べたところ、Sidekiq::JobRetry
クラスのインスタンスが代入されていました。
# lib/sidekiq/processor.rb def initialize(capsule, &block) # 一部省略 @retrier = Sidekiq::JobRetry.new(capsule) end
Automatic job retryのロジックは、このSidekiq::JobRetry
クラス内で実装されていそうな気配を感じます。@retrier.local
と@retrier.global
の実装をそれぞれ見て確かめにいきましょう。
Sidekiq::JobRetry
の仕組み
まず、Sidekiq::Processor#dispatch
内でjobインスタンス作成前に呼ばれていたglobal
の実装を見ていきます。
Sidekiq::JobRetry#global
# lib/sidekiq/job_retry.rb # The global retry handler requires only the barest of data. # We want to be able to retry as much as possible so we don't # require the job to be instantiated. def global(jobstr, queue) yield rescue Handled => ex raise ex rescue Sidekiq::Shutdown => ey # ignore, will be pushed back onto queue during hard_shutdown raise ey rescue Exception => e # ignore, will be pushed back onto queue during hard_shutdown raise Sidekiq::Shutdown if exception_caused_by_shutdown?(e) msg = Sidekiq.load_json(jobstr) if msg["retry"] process_retry(nil, msg, queue, e) else @capsule.config.death_handlers.each do |handler| handler.call(msg, e) rescue => handler_ex handle_exception(handler_ex, {context: "Error calling death handler", job: msg}) end end raise Handled end
コードより、このメソッドはジョブがインスタンス化される前段階でのエラーハンドリングを行うメソッドとわかりました。barest of data
と記載されていることから、おそらく、Redisから取得したjsonデータが不正な場合にこのハンドラーが呼ばれるのではと考えました。
次に、ジョブのインスタンス作成後に呼び出されるlocal
の実装を見ていきます。
Sidekiq::JobRetry#local
# lib/sidekiq/job_retry.rb # The local retry support means that any errors that occur within # this block can be associated with the given job instance. # This is required to support the `sidekiq_retries_exhausted` block. # # Note that any exception from the block is wrapped in the Skip # exception so the global block does not reprocess the error. The # Skip exception is unwrapped within Sidekiq::Processor#process before # calling the handle_exception handlers. def local(jobinst, jobstr, queue) yield rescue Handled => ex raise ex rescue Sidekiq::Shutdown => ey # ignore, will be pushed back onto queue during hard_shutdown raise ey rescue Exception => e # ignore, will be pushed back onto queue during hard_shutdown raise Sidekiq::Shutdown if exception_caused_by_shutdown?(e) msg = Sidekiq.load_json(jobstr) if msg["retry"].nil? msg["retry"] = jobinst.class.get_sidekiq_options["retry"] end raise e unless msg["retry"] process_retry(jobinst, msg, queue, e) # We've handled this error associated with this job, don't # need to handle it at the global level raise Handled end
コードより、ジョブがインスタンス化された後のエラーハンドリングを行うメソッドと分かりました。Sidekiq::Middleware::Chain#invoke
を除くと、local
がSidekiq::Processor#execute_job
が呼び出される直前に実行されるため、
このlocal
がSidekiq::Processor#execute_job
での例外をキャッチしていそうです。
そして、上記二つのメソッドは、process_retry
というメソッドを呼んでいることがわかります。
def global(jobstr, queue) msg = Sidekiq.load_json(jobstr) if msg["retry"] process_retry(nil, msg, queue, e) else # 以下省略 end def local(jobinst, jobstr, queue) msg = Sidekiq.load_json(jobstr) if msg["retry"].nil? msg["retry"] = jobinst.class.get_sidekiq_options["retry"] end end
メソッド名から、ここでAutomatic job retryのロジックが実装されている予感がします。早速みてみましょう。
Sidekiq::JobRetry#process_retry
# lib/sidekiq/job_retry.rb def process_retry(jobinst, msg, queue, exception) # 一部省略 return retries_exhausted(jobinst, msg, exception) if count >= max_retry_attempts jitter = rand(10 * (count + 1)) retry_at = Time.now.to_f + delay + jitter payload = Sidekiq.dump_json(msg) redis do |conn| conn.zadd("retry", retry_at.to_s, payload) end end
一部実装を省略していますが、最初にリトライ上限回数のチェックをした後、ジョブのpayload
をredisにリトライキューとして追加していることがわかりました。なので、このメソッドでAutomatic job retryが実行されることがわかりました。
考察
コードリーディングで分かったことを以下にまとめました。
Automatic job retryの実行ロジックについて
- Automatic job retryのロジックは、
Sidekiq::JobRetry#process_retry
に実装されている Sidekiq::JobRetry#local
とSidekiq::JobRetry#global
の二つのエラーハンドラーメソッドが、Sidekiq::JobRetry#process_retry
を呼び出す
perform
が呼び出される流れ
perform
は、Sidekiq::Processor#execute_job
によって呼び出されるSidekiq::Processor#execute_job
が呼ばれるまでに、Sidekiq::Processor#process
->Sidekiq::Processor#dispatch
->Sidekiq::Middleware::Chain#invoke
と処理が走るSidekiq::Processor#dispatch
内で、Sidekiq::JobRetry#global
->Sidekiq::JobRetry#local
の順でハンドラが登録されるSidekiq::Processor#execute_job
実行時の例外はSidekiq::JobRetry#local
でキャッチされる
上記より、perform
で例外処理をかくとAutomatic job retryが実行されない理由は、Sidekiq::JobRetry#local
が例外をキャッチする前にperform
で例外を処理してしまうため、Sidekiq::JobRetry#process_retry
が呼び出されないことと考えることができます。
デバッグと検証
上記の考察が本当に正しいのか、perform
で例外処理を書く場合と書かない場合での比較検証を行い確かめてみました。
検証内容
perform
で例外処理を書かない場合
perform
内で例外が発生した際に、Sidekiq::JobRetry#process_retry
が呼ばれるか- ジョブのリトライが実行されているか
perform
で例外処理をかく場合
perform
内で例外が発生した際に、Sidekiq::JobRetry#process_retry
が呼ばれないか- ジョブのリトライが実行されないか
検証環境
以下の環境で検証しました。
- Sidekiq "7.3.2"
- Rails "7.1.3.4"
- Ruby "3.2.3"
検証コード
以下検証に用いたサンプルコードです。
# 検証用のジョブ class HogeJob include Sidekiq::Job # performで例外処理を書かない場合 def perform Rails.logger.info("start hoge job") Hoge.hoge! Rails.logger.info("finish hoge job") end # performで例外処理を書く場合 # def perform # Rails.logger.info("start hoge job") # Hoge.hoge! # Rails.logger.info("finish hoge job") # rescue => e # puts "error_handled: #{e.message}" # end end # ジョブで実行される処理 class Hoge < ApplicationModel TestError = Class.new(StandardError) def self.hoge! raise TestError, "Test Error" end end
# Sidekiq側のデバッグ # lib/sidekiq/job_retry.rb def local(jobinst, jobstr, queue) puts "JobRetry.localが呼ばれた" # 以下省略 end def process_retry(jobinst, msg, queue, exception) puts "JobRetry.process_retryが呼ばれた" puts "jobinst: #{jobinst}" # 以下省略 end
結果
1. performで例外処理を書かない場合
以下のログが出力されました。
2024-12-14T02:23:35.691Z pid=25488 tid=c24 class=HogeJob jid=60c6c2861ce986ae8f029c5b INFO: start JobRetry.localが呼ばれた 2024-12-14T02:23:35.744Z pid=25488 tid=c24 class=HogeJob jid=60c6c2861ce986ae8f029c5b INFO: start hoge job JobRetry.process_retryが呼ばれた jobinst: #<HogeJob:0x000000010bfe09a0>
このことから、performで例外処理を書かない場合は、Sidekiq::JobRetry#local
で例外がキャッチされ、Sidekiq::JobRetry#process_retry
が呼び出されることが確認できました。
最後に、肝心のリトライが実行されているのか、Sidekiqのダッシュボードから確認しました。
無事、リトライが実行されていることを確認できました。
2. performで例外処理を書く場合
以下のログが出力されました。
2024-12-16T15:07:48.976Z pid=93451 tid=1osv class=HogeJob jid=52c04d85b7beb09304b15249 INFO: start JobRetry.localが呼ばれた 2024-12-16T15:07:48.986Z pid=93451 tid=1osv class=HogeJob jid=52c04d85b7beb09304b15249 INFO: start hoge job error_handled: Test Error
このことから、performで例外処理をかく場合は、Sidekiq::JobRetry#local
で例外がキャッチされる前にperform
で例外がキャッチされるため、Sidekiq::JobRetry#process_retry
が呼び出されないことが確認できました。
そして、リトライが実行されていないことがダッシュボードからも確認できました。
まとめ
コードリーディングと検証を踏まえて、Automatic job retryのロジックはSidekiq::JobRetry#process_retry
に実装されていて、Sidekiq::JobRetry#local
とSidekiq::JobRetry#global
がSidekiq::JobRetry#process_retry
を呼び出していることがわかりました。そのため、perform
で例外処理を書くとAutomatic job retryが実行されない理由は、Sidekiq::JobRetry#local
がperform
内の例外をキャッチできないことにあるとわかりました。
最後に、Sidekiqの仕組みに少し詳しくなれたことは、使う側としてとても大きな財産になりました。これからも、疲れない程度に(ここ大事!)興味のあるトピックは深掘っていきたいと思います。