giftee Tech Blog

ギフティの開発を支えるメンバーの技術やデザイン、プロダクトマネジメントの情報を発信しています。

SidekiqのAutomatic job retryはどのように実行されるのか

eyecatch

こんにちは、ギフティでエンジニアをしている@gaaakusaunaです。ジムに行くのが寒い季節になってきました。寒いと外に出るのが億劫になりがちですが、その分学習時間が増えました(いいのか悪いのか)。そんな中、日頃業務で愛用しているSidekiqのとある仕組みを深掘る機会があったので、ここに書き記したいと思います。

はじめに

ギフティでは、Railsで非同期処理を行う際にSidekiqを使うケースが多く、私が所属するチームでもSidekiqを用いて実装を行っています。

Sidekiqとは

Sidekiqには、ジョブの実行に失敗した場合に自動でリトライを行うAutomatic job retryという機能があります。 非常に便利な機能ですが、以下のようにperformメソッド内で例外処理を書くと、Automatic job retryが実行されない問題に直面しました。

def perform(hoge_job_id)
  # ジョブ実行時の処理
  Hoge.find(hoge_job_id).hoge!
rescue => e
  # 任意の例外処理
  ExceptionTracker.track(e)
end

ドキュメントにはAutomatic job retryの概要は記載されているものの、なぜこのケースでリトライが動作しないのかという詳細までは書かれていません。 そこで、Sidekiqのソースコードを読み、Automatic job retryの実行ロジックについて調べてみました。

対象読者

  • Sidekiqを使用したことがある方
  • SidekiqのAutomatic job retryの仕組みについて関心がある方

本題

performで例外処理を書いたことで、Automatic job retryが実行されなくなったため、performが呼び出される前の処理に該当する部分のソースコードを追いながら、その仕組みを説明していきます。

そもそもperformはどこで呼ばれているのか

まずはperformがどこで呼ばれているかを把握することにしました。 調べたところ、Sidekiq::Processor#execute_jobperformを呼び出していることがわかりました。

# lib/sidekiq/processor.rb
def execute_job(instance, cloned_args)
  instance.perform(*cloned_args)
end

参照

Sidekiq::Processor#execute_jobが実行されるまで

execute_jobは、performを呼び出す機能しか持っていなかったので、execute_jobが呼び出される前段階でAutomatic job retryが実装されていると予測しました。 そこで、execute_jobが呼び出されるまでの流れを調べたところ、

  1. Sidekiq::Processor#processが呼び出される
  2. Sidekiq::Processor#processSidekiq::Processor#dispatchを呼ぶ
  3. Sidekiq::Processor#dispatchのブロック処理でSidekiq::Middleware::Chain#invokeが呼ばれる
  4. Sidekiq::Middleware::Chain#invokeのブロック処理でSidekiq::Processor#execute_jobが呼ばれる

となっていました。以下は該当箇所のソースコードです。

# lib/sidekiq/processor.rb
def process(uow)
  jobstr = uow.job
  queue = uow.queue_name

  # Treat malformed JSON as a special case: job goes straight to the morgue.
  job_hash = nil
  begin
    job_hash = Sidekiq.load_json(jobstr)
  rescue => ex
    # 例外処理省略
  end

  ack = false
  Thread.handle_interrupt(IGNORE_SHUTDOWN_INTERRUPTS) do
    Thread.handle_interrupt(ALLOW_SHUTDOWN_INTERRUPTS) do
      # `dispatch`を呼ぶ
      dispatch(job_hash, queue, jobstr) do |instance|
        # `invoke`が呼ばれる
        config.server_middleware.invoke(instance, job_hash, queue) do
          # `execute_job`が呼ばれる
          execute_job(instance, job_hash["args"])
        end
      end
      ack = true
    rescue Sidekiq::Shutdown
      # 例外処理省略
    end
  ensure
    if ack
      uow.acknowledge
    end
  end
end

参照

どこにAutomatic job retryのロジックが実装されているのか、execute_jobを呼び出しているSidekiq::Middleware::Chain#invokeから順に見ていきました。

参照

Sidekiq::Middleware::Chain#invoke

invokeでは、設定したミドルウェアがある場合traverseで呼び出した後、yieldを実行しています。ない場合即座にyieldを実行しているので、ミドルウェアの設定がない場合は即座に Sidekiq::Processor#execute_jobが呼び出されます。

# lib/sidekiq/middleware/chain.rb
def invoke(*args, &block)
  return yield if empty?


  chain = retrieve
  traverse(chain, 0, args, &block)
end


private

def traverse(chain, index, args, &block)
  if index >= chain.size
    yield
  else
    chain[index].call(*args) do
      traverse(chain, index + 1, args, &block)
    end
  end
end

参照

コードを見た感じ、ここでAutomatic job retryの処理は行われていないので、Sidekiq::Processor#dispatchを見ていきます。

Sidekiq::Processor#dispatch

dispatchでは、ログの設定やジョブのインスタンス作成等の複数の処理を実行した後、yieldを用いてSidekiq::Middleware::Chain#invokeを呼び出しています。

# lib/sidekiq/processor.rb
def dispatch(job_hash, queue, jobstr)
  @job_logger.prepare(job_hash) do
    @retrier.global(jobstr, queue) do
      @job_logger.call(job_hash, queue) do
        stats(jobstr, queue) do
          profile(job_hash) do
            @reloader.call do
              klass = Object.const_get(job_hash["class"])
              instance = klass.new
              instance.jid = job_hash["jid"]
              instance._context = self
              @retrier.local(instance, jobstr, queue) do
                yield instance
              end
            end
          end
        end
      end
    end
  end
end

参照

ここで、dispatchで呼び出されるメソッド群に注目すると、yieldの実行直前に@retrier.localが呼び出されていること、またジョブのインスタンス作成前に@retrier.globalが呼び出されていることがわかります。この@retrierに何が代入されているのか調べたところ、Sidekiq::JobRetryクラスのインスタンスが代入されていました。

# lib/sidekiq/processor.rb
def initialize(capsule, &block)
  # 一部省略
  @retrier = Sidekiq::JobRetry.new(capsule)
end

参照

Automatic job retryのロジックは、このSidekiq::JobRetryクラス内で実装されていそうな気配を感じます。@retrier.local@retrier.globalの実装をそれぞれ見て確かめにいきましょう。

Sidekiq::JobRetryの仕組み

まず、Sidekiq::Processor#dispatch内でjobインスタンス作成前に呼ばれていたglobalの実装を見ていきます。

Sidekiq::JobRetry#global

# lib/sidekiq/job_retry.rb

# The global retry handler requires only the barest of data.
# We want to be able to retry as much as possible so we don't
# require the job to be instantiated.
def global(jobstr, queue)
  yield
rescue Handled => ex
  raise ex
rescue Sidekiq::Shutdown => ey
  # ignore, will be pushed back onto queue during hard_shutdown
  raise ey
rescue Exception => e
  # ignore, will be pushed back onto queue during hard_shutdown
  raise Sidekiq::Shutdown if exception_caused_by_shutdown?(e)


  msg = Sidekiq.load_json(jobstr)
  if msg["retry"]
    process_retry(nil, msg, queue, e)
  else
    @capsule.config.death_handlers.each do |handler|
      handler.call(msg, e)
    rescue => handler_ex
      handle_exception(handler_ex, {context: "Error calling death handler", job: msg})
    end
  end


  raise Handled
end

参照

コードより、このメソッドはジョブがインスタンス化される前段階でのエラーハンドリングを行うメソッドとわかりました。barest of dataと記載されていることから、おそらく、Redisから取得したjsonデータが不正な場合にこのハンドラーが呼ばれるのではと考えました。

次に、ジョブのインスタンス作成後に呼び出されるlocalの実装を見ていきます。

Sidekiq::JobRetry#local

# lib/sidekiq/job_retry.rb

# The local retry support means that any errors that occur within
# this block can be associated with the given job instance.
# This is required to support the `sidekiq_retries_exhausted` block.
#
# Note that any exception from the block is wrapped in the Skip
# exception so the global block does not reprocess the error.  The
# Skip exception is unwrapped within Sidekiq::Processor#process before
# calling the handle_exception handlers.
def local(jobinst, jobstr, queue)
  yield
rescue Handled => ex
  raise ex
rescue Sidekiq::Shutdown => ey
  # ignore, will be pushed back onto queue during hard_shutdown
  raise ey
rescue Exception => e
  # ignore, will be pushed back onto queue during hard_shutdown
  raise Sidekiq::Shutdown if exception_caused_by_shutdown?(e)


  msg = Sidekiq.load_json(jobstr)
  if msg["retry"].nil?
    msg["retry"] = jobinst.class.get_sidekiq_options["retry"]
  end


  raise e unless msg["retry"]
  process_retry(jobinst, msg, queue, e)
  # We've handled this error associated with this job, don't
  # need to handle it at the global level
  raise Handled
end

参照

コードより、ジョブがインスタンス化された後のエラーハンドリングを行うメソッドと分かりました。Sidekiq::Middleware::Chain#invokeを除くと、localSidekiq::Processor#execute_jobが呼び出される直前に実行されるため、 このlocalSidekiq::Processor#execute_jobでの例外をキャッチしていそうです。

そして、上記二つのメソッドは、process_retryというメソッドを呼んでいることがわかります。

def global(jobstr, queue)
  msg = Sidekiq.load_json(jobstr)
  if msg["retry"]
    process_retry(nil, msg, queue, e)
  else
 # 以下省略
end

def local(jobinst, jobstr, queue)
  msg = Sidekiq.load_json(jobstr)
  if msg["retry"].nil?
    msg["retry"] = jobinst.class.get_sidekiq_options["retry"]
  end
end

メソッド名から、ここでAutomatic job retryのロジックが実装されている予感がします。早速みてみましょう。

Sidekiq::JobRetry#process_retry

# lib/sidekiq/job_retry.rb

def process_retry(jobinst, msg, queue, exception)
# 一部省略
  return retries_exhausted(jobinst, msg, exception) if count >= max_retry_attempts

  jitter = rand(10 * (count + 1))
  retry_at = Time.now.to_f + delay + jitter
  payload = Sidekiq.dump_json(msg)
  redis do |conn|
    conn.zadd("retry", retry_at.to_s, payload)
  end
end

参照

一部実装を省略していますが、最初にリトライ上限回数のチェックをした後、ジョブのpayloadをredisにリトライキューとして追加していることがわかりました。なので、このメソッドでAutomatic job retryが実行されることがわかりました。

考察

コードリーディングで分かったことを以下にまとめました。

Automatic job retryの実行ロジックについて

  1. Automatic job retryのロジックは、Sidekiq::JobRetry#process_retryに実装されている
  2. Sidekiq::JobRetry#localSidekiq::JobRetry#globalの二つのエラーハンドラーメソッドが、Sidekiq::JobRetry#process_retryを呼び出す

performが呼び出される流れ

  1. performは、Sidekiq::Processor#execute_jobによって呼び出される
  2. Sidekiq::Processor#execute_jobが呼ばれるまでに、Sidekiq::Processor#process -> Sidekiq::Processor#dispatch -> Sidekiq::Middleware::Chain#invokeと処理が走る
  3. Sidekiq::Processor#dispatch内で、Sidekiq::JobRetry#global -> Sidekiq::JobRetry#localの順でハンドラが登録される
  4. Sidekiq::Processor#execute_job実行時の例外は Sidekiq::JobRetry#localでキャッチされる

上記より、performで例外処理をかくとAutomatic job retryが実行されない理由は、Sidekiq::JobRetry#localが例外をキャッチする前にperformで例外を処理してしまうため、Sidekiq::JobRetry#process_retryが呼び出されないことと考えることができます。

デバッグと検証

上記の考察が本当に正しいのか、performで例外処理を書く場合と書かない場合での比較検証を行い確かめてみました。

検証内容

performで例外処理を書かない場合

  1. perform内で例外が発生した際に、Sidekiq::JobRetry#process_retryが呼ばれるか
  2. ジョブのリトライが実行されているか

performで例外処理をかく場合

  1. perform内で例外が発生した際に、Sidekiq::JobRetry#process_retryが呼ばれないか
  2. ジョブのリトライが実行されないか

検証環境

以下の環境で検証しました。

  • Sidekiq "7.3.2"
  • Rails "7.1.3.4"
  • Ruby "3.2.3"

検証コード

以下検証に用いたサンプルコードです。

# 検証用のジョブ
class HogeJob
  include Sidekiq::Job

  # performで例外処理を書かない場合
  def perform
    Rails.logger.info("start hoge job")
    Hoge.hoge!
    Rails.logger.info("finish hoge job")
  end

  # performで例外処理を書く場合
  # def perform
  #   Rails.logger.info("start hoge job")
  #   Hoge.hoge!
  #   Rails.logger.info("finish hoge job")
  # rescue => e
  #   puts "error_handled: #{e.message}"
  # end
end

# ジョブで実行される処理
class Hoge < ApplicationModel
  TestError = Class.new(StandardError)

  def self.hoge!
    raise TestError, "Test Error"
  end
end
# Sidekiq側のデバッグ
# lib/sidekiq/job_retry.rb

def local(jobinst, jobstr, queue)
  puts "JobRetry.localが呼ばれた"
  # 以下省略
end


def process_retry(jobinst, msg, queue, exception)
  puts "JobRetry.process_retryが呼ばれた"
  puts "jobinst: #{jobinst}"
  # 以下省略
end

結果

1. performで例外処理を書かない場合

以下のログが出力されました。

2024-12-14T02:23:35.691Z pid=25488 tid=c24 class=HogeJob jid=60c6c2861ce986ae8f029c5b INFO: start
JobRetry.localが呼ばれた
2024-12-14T02:23:35.744Z pid=25488 tid=c24 class=HogeJob jid=60c6c2861ce986ae8f029c5b INFO: start hoge job
JobRetry.process_retryが呼ばれた
jobinst: #<HogeJob:0x000000010bfe09a0>

このことから、performで例外処理を書かない場合は、Sidekiq::JobRetry#localで例外がキャッチされ、Sidekiq::JobRetry#process_retryが呼び出されることが確認できました。

最後に、肝心のリトライが実行されているのか、Sidekiqのダッシュボードから確認しました。

sidekiq_dashboard

無事、リトライが実行されていることを確認できました。

2. performで例外処理を書く場合

以下のログが出力されました。

2024-12-16T15:07:48.976Z pid=93451 tid=1osv class=HogeJob jid=52c04d85b7beb09304b15249 INFO: start
JobRetry.localが呼ばれた
2024-12-16T15:07:48.986Z pid=93451 tid=1osv class=HogeJob jid=52c04d85b7beb09304b15249 INFO: start hoge job
error_handled: Test Error

このことから、performで例外処理をかく場合は、Sidekiq::JobRetry#localで例外がキャッチされる前にperformで例外がキャッチされるため、Sidekiq::JobRetry#process_retryが呼び出されないことが確認できました。

sidekiq_dashboard

そして、リトライが実行されていないことがダッシュボードからも確認できました。

まとめ

コードリーディングと検証を踏まえて、Automatic job retryのロジックはSidekiq::JobRetry#process_retryに実装されていて、Sidekiq::JobRetry#localSidekiq::JobRetry#globalSidekiq::JobRetry#process_retryを呼び出していることがわかりました。そのため、performで例外処理を書くとAutomatic job retryが実行されない理由は、Sidekiq::JobRetry#localperform内の例外をキャッチできないことにあるとわかりました。

最後に、Sidekiqの仕組みに少し詳しくなれたことは、使う側としてとても大きな財産になりました。これからも、疲れない程度に(ここ大事!)興味のあるトピックは深掘っていきたいと思います。