Skip to content

Instantly share code, notes, and snippets.

@dany1468
Created May 24, 2017 09:28
Show Gist options
  • Select an option

  • Save dany1468/b35f4192d4597632ff236ae8d4797b24 to your computer and use it in GitHub Desktop.

Select an option

Save dany1468/b35f4192d4597632ff236ae8d4797b24 to your computer and use it in GitHub Desktop.
Sidekiq 5 で変わった ActiveRecord のコネクション管理の方法と ActiveSupport::Executor/Reloader について ref: http://qiita.com/dany1468/items/dc133630b37d60ea6aa5
module ActionCable
class Engine < Rails::Engine
initializer "action_cable.set_work_hooks" do |app|
ActiveSupport.on_load(:action_cable) do
ActionCable::Server::Worker.set_callback :work, :around, prepend: true do |_, inner|
app.executor.wrap do
# If we took a while to get the lock, we may have been halted
# in the meantime. As we haven't started doing any real work
# yet, we should pretend that we never made it off the queue.
unless stopping?
inner.call
end
end
end
wrap = lambda do |_, inner|
app.executor.wrap(&inner)
end
ActionCable::Channel::Base.set_callback :subscribe, :around, prepend: true, &wrap
ActionCable::Channel::Base.set_callback :unsubscribe, :around, prepend: true, &wrap
# class_unload で server restart
app.reloader.before_class_unload do
ActionCable.server.restart
end
end
end
module Sidekiq
module Middleware
module Server
class RetryJobs
def call(worker, msg, queue)
yield
rescue Sidekiq::Shutdown
# ignore, will be pushed back onto queue during hard_shutdown
raise
rescue Exception => e
# ignore, will be pushed back onto queue during hard_shutdown
raise Sidekiq::Shutdown if exception_caused_by_shutdown?(e)
raise e unless msg['retry']
attempt_retry(worker, msg, queue, e)
end
require "active_support/execution_wrapper"
module ActiveSupport
class Executor < ExecutionWrapper
end
end
module ActiveSupport
class ExecutionWrapper
# callback 登録用のメソッド
def self.to_run(*args, &block)
def self.to_complete(*args, &block)
def self.register_hook(hook, outer: false)
def self.run!
def self.wrap
def self.active?
def run!
def complete!
end
module ActiveSupport
class ExecutionWrapper
define_callbacks :run
define_callbacks :complete
def self.to_run(*args, &block)
set_callback(:run, *args, &block)
end
def self.to_complete(*args, &block)
set_callback(:complete, *args, &block)
end
RunHook = Struct.new(:hook) do # :nodoc:
def before(target)
hook_state = target.send(:hook_state)
hook_state[hook] = hook.run
end
end
CompleteHook = Struct.new(:hook) do # :nodoc:
def before(target)
hook_state = target.send(:hook_state)
if hook_state.key?(hook)
hook.complete hook_state[hook]
end
end
alias after before
end
def self.register_hook(hook, outer: false)
if outer
to_run RunHook.new(hook), prepend: true
to_complete :after, CompleteHook.new(hook)
else
to_run RunHook.new(hook)
to_complete CompleteHook.new(hook)
end
end
private
def hook_state
@_hook_state ||= {}
end
module ActiveSupport
class ExecutionWrapper
Null = Object.new # :nodoc:
def Null.complete! # :nodoc:
end
def self.run!
if active?
Null # Null を返すと手動で complete! される場合にも何もしない
else
new.tap do |instance|
success = nil
begin
instance.run! # インスタンスメソッドの run! を実行
success = true
ensure
# 失敗した場合のみ complete! まで実行する。
# 成功の場合は利用側で手動で返却したインスタンスに対して complete! することを期待します。
instance.complete! unless success
end
end
end
end
def self.wrap
return yield if active?
instance = run! # self.run! を実行
begin
yield
ensure
instance.complete!
end
end
class << self # :nodoc:
attr_accessor :active
end
def self.inherited(other) # :nodoc:
super
other.active = Concurrent::Hash.new
end
self.active = Concurrent::Hash.new
def self.active? # :nodoc:
@active[Thread.current]
end
def run! # :nodoc:
self.class.active[Thread.current] = true
run_callbacks(:run)
end
def complete!
run_callbacks(:complete)
ensure
self.class.active.delete Thread.current
end
module ActiveSupport
class Reloader < ExecutionWrapper
# callback 登録用のメソッド
def self.to_prepare(*args, &block)
def self.before_class_unload(*args, &block)
def self.after_class_unload(*args, &block)
def self.reload!
def self.run!
def self.wrap
def self.check!
def self.reloaded!
def self.prepare!
def require_unload_lock!
def release_unload_lock!
def run!
def class_unload!(&block)
def complete!
module ActiveSupport
class Reloader < ExecutionWrapper
define_callbacks :prepare
define_callbacks :class_unload
def self.to_prepare(*args, &block)
set_callback(:prepare, *args, &block)
end
def self.before_class_unload(*args, &block)
set_callback(:class_unload, *args, &block)
end
def self.after_class_unload(*args, &block)
set_callback(:class_unload, :after, *args, &block)
end
to_run(:after) { self.class.prepare! }
module ActiveSupport
class Reloader < ExecutionWrapper
def self.run!
if check!
super
else
Null
end
end
# Run the supplied block as a work unit, reloading code as needed
def self.wrap
executor.wrap do
super
end
end
class_attribute :executor
class_attribute :check
self.executor = Executor
self.check = lambda { false }
def self.check!
@should_reload ||= check.call
end
def run!
super
release_unload_lock!
end
def complete!
super
self.class.reloaded!
ensure
release_unload_lock!
end
# Acquire the ActiveSupport::Dependencies::Interlock unload lock,
# ensuring it will be released automatically
def require_unload_lock!
unless @locked
ActiveSupport::Dependencies.interlock.start_unloading
@locked = true
end
end
# Release the unload lock if it has been previously obtained
def release_unload_lock!
if @locked
@locked = false
ActiveSupport::Dependencies.interlock.done_unloading
end
end
def initialize
super
@locked = false
end
# ===========
module ActiveSupport
class ExecutionWrapper
def self.run!
if active?
Null
else
new.tap do |instance|
success = nil
begin
instance.run!
success = true
ensure
instance.complete! unless success
end
end
end
end
def self.wrap
return yield if active?
instance = run!
begin
yield
ensure
instance.complete!
end
end
Reloader.wrap
-> ExecutionWrapper.wrap -> (active? pass)
-> Reloader.run! -> (check! pass)
-> ExecutionWrapper.run!
-> Reloader#run!
-> ExecutionWrapper#run! # ここで active? == true
-> Reloader#release_unload_lock!
-> Reloader#complete!
-> ExecutionWrapper#complete! # ここで active? == false
-> Reloader.reloaded!
-> Reloadder#release_unload_lock!
callback = lambda do
ActiveSupport::DescendantsTracker.clear
ActiveSupport::Dependencies.clear
end
module ActiveRecord
class QueryCache
def self.run
caching_pool = ActiveRecord::Base.connection_pool
caching_was_enabled = caching_pool.query_cache_enabled
caching_pool.enable_query_cache!
[caching_pool, caching_was_enabled]
end
def self.complete((caching_pool, caching_was_enabled))
caching_pool.disable_query_cache! unless caching_was_enabled
ActiveRecord::Base.connection_handler.connection_pool_list.each do |pool|
pool.release_connection if pool.active_connection? && !pool.connection.transaction_open?
end
end
def self.install_executor_hooks(executor = ActiveSupport::Executor)
# self が run と complete に反応できるので登録可能
executor.register_hook(self)
end
module ActionView
class Digestor
module PerExecutionDigestCacheExpiry
def self.before(target)
ActionView::LookupContext::DetailsKey.clear
end
end
module Sidekiq
class Processor
def process(work)
jobstr = work.job
queue = work.queue_name
@reloader.call do
ack = false
begin
job = Sidekiq.load_json(jobstr)
klass = job['class'.freeze].constantize
worker = klass.new
worker.jid = job['jid'.freeze]
stats(worker, job, queue) do
Sidekiq.server_middleware.invoke(worker, job, queue) do
ack = true
execute_job(worker, cloned(job['args'.freeze]))
end
end
ack = true
rescue Sidekiq::Shutdown
ack = false
rescue Exception => ex
handle_exception(ex, job || { :job => jobstr })
raise
ensure
work.acknowledge if ack
end
end
end
module Sidekiq
class Rails < ::Rails::Engine
config.after_initialize do
if ::Rails::VERSION::MAJOR >= 5
# The reloader also takes care of ActiveRecord but is incompatible with
# the ActiveRecord middleware so make sure it's not in the chain already.
if defined?(Sidekiq::Middleware::Server::ActiveRecord) && Sidekiq.server_middleware.exists?(Sidekiq::Middleware::Server::ActiveRecord)
raise ArgumentError, "You are using the Sidekiq ActiveRecord middleware and the new Rails 5 reloader which are incompatible. Please remove the ActiveRecord middleware from your Sidekiq middleware configuration."
elsif ::Rails.application.config.cache_classes
# The reloader API has proven to be troublesome under load in production.
# We won't use it at all when classes are cached, see #3154
Sidekiq.logger.debug { "Autoload disabled in #{::Rails.env}, Sidekiq will not reload changed classes" }
else
Sidekiq.options[:reloader] = Sidekiq::Rails::Reloader.new
end
end
end
module Sidekiq
class Processor
def process(work)
jobstr = work.job
queue = work.queue_name
ack = false
begin # @reloader.call が begin の内部に入った。
@reloader.call do
job = Sidekiq.load_json(jobstr)
klass = job['class'.freeze].constantize
worker = klass.new
worker.jid = job['jid'.freeze]
# 略
worker = nil
begin
reload do
# 開発中に修正する可能性のあるリロードしたいクラスがこれ。
# なので、reload はこの外側である必要がある。
worker = job.constantize.new
# middleware の呼び出しがここ。
# 一番外側に Retry middleware を配置したとしても、reload 自体のエラーはこの外側なので
# どうしようも無い。
execute_middleware(worker, ...) do
execute_job
end
end
rescue => ex
# 提案としては、こんな感じで reload よりさらに外側でリトライできるようになればいいよねってこと。
retry_failures(job_hash, worker, ex)
end
module Sidekiq
class JobRetry
class Skip < ::RuntimeError; end
include Sidekiq::Util
DEFAULT_MAX_RETRY_ATTEMPTS = 25
def initialize(options = {})
@max_retries = Sidekiq.options.merge(options).fetch(:max_retries, DEFAULT_MAX_RETRY_ATTEMPTS)
end
# The global retry handler requires only the barest of data.
# We want to be able to retry as much as possible so we don't
# require the worker to be instantiated.
def global(msg, queue)
yield
rescue Skip
raise
rescue Sidekiq::Shutdown
# ignore, will be pushed back onto queue during hard_shutdown
raise
rescue Exception => e
# ignore, will be pushed back onto queue during hard_shutdown
raise Sidekiq::Shutdown if exception_caused_by_shutdown?(e)
raise e unless msg['retry']
# 🏁 ここは第一引数(worker) をあえて nil にしている
attempt_retry(nil, msg, queue, e)
end
# The local retry support means that any errors that occur within
# this block can be associated with the given worker instance.
# This is required to support the `sidekiq_retries_exhausted` block.
def local(worker, msg, queue)
yield
rescue Skip
raise
rescue Sidekiq::Shutdown
# ignore, will be pushed back onto queue during hard_shutdown
raise
rescue Exception => e
# ignore, will be pushed back onto queue during hard_shutdown
raise Sidekiq::Shutdown if exception_caused_by_shutdown?(e)
if msg['retry'] == nil
msg['retry'] = worker.class.get_sidekiq_options['retry']
end
raise e unless msg['retry']
attempt_retry(worker, msg, queue, e)
# We've handled this error associated with this job, don't
# need to handle it at the global level
raise Skip
end
module ActiveSupport
class FileUpdateChecker
# Executes the given block and updates the latest watched files and
# timestamp.
def execute
@last_watched = watched
@last_update_at = updated_at(@last_watched)
@block.call
ensure
@watched = nil
@updated_at = nil
end
module Rails
class Application
module Finisher
initializer :configure_executor_for_concurrency do |app|
if config.allow_concurrency == false
# User has explicitly opted out of concurrent request
# handling: presumably their code is not threadsafe
app.executor.register_hook(MutexHook.new, outer: true)
elsif config.allow_concurrency == :unsafe
# Do nothing, even if we know this is dangerous. This is the
# historical behavior for true.
else
# Default concurrency setting: enabled, but safe
unless config.cache_classes && config.eager_load
# Without cache_classes + eager_load, the load interlock
# is required for proper operation
app.executor.register_hook(InterlockHook, outer: true)
end
end
end
class MutexHook
def initialize(mutex = Mutex.new)
@mutex = mutex
end
def run
@mutex.lock
end
def complete(_state)
@mutex.unlock
end
end
module InterlockHook
def self.run
ActiveSupport::Dependencies.interlock.start_running
end
def self.complete(_state)
ActiveSupport::Dependencies.interlock.done_running
end
end
module ActiveSupport
module Dependencies
class Interlock
def initialize # :nodoc:
@lock = ActiveSupport::Concurrency::ShareLock.new
end
def running
@lock.sharing do
yield
end
end
# 以下はいずれも #attempt_retry の内部で呼ばれるメソッド
def retries_exhausted(worker, msg, exception)
logger.debug { "Retries exhausted for job" }
begin
# worker が nil だと Sidekiq.default_retries_exhausted が使われる
block = worker && worker.sidekiq_retries_exhausted_block || Sidekiq.default_retries_exhausted
block.call(msg, exception) if block
rescue => e
handle_exception(e, { context: "Error calling retries_exhausted for #{msg['class']}", job: msg })
end
send_to_morgue(msg) unless msg['dead'] == false
end
def delay_for(worker, count, exception)
# worker が nil だとデフォルトの `seconds_to_delay` が使われる
worker && worker.sidekiq_retry_in_block? && retry_in(worker, count, exception) || seconds_to_delay(count)
end
def seconds_to_delay(count)
(count ** 4) + 15 + (rand(30)*(count+1))
end
# 一部コメントは削除しています
module Sidekiq
class Processor
def process(work)
jobstr = work.job
queue = work.queue_name
ack = false
begin
job_hash = nil
begin
job_hash = Sidekiq.load_json(jobstr)
rescue => ex
Sidekiq.logger.error { "Pushing job to dead queue due to invalid JSON: #{ex}" }
send_to_morgue(jobstr)
ack = true
raise
end
ack = true
# 🏁 ここで dispatch メソッドが呼び出される
dispatch(job_hash, queue) do |worker|
# ここで middleware の呼び出しや worker.perform の実行を行う
Sidekiq.server_middleware.invoke(worker, job_hash, queue) do
execute_job(worker, cloned(job_hash['args'.freeze]))
end
end
rescue Sidekiq::Shutdown
ack = false
rescue Exception => ex
handle_exception(ex, { :context => "Job raised exception", :job => job_hash, :jobstr => jobstr })
raise
ensure
work.acknowledge if ack
end
end
def dispatch(job_hash, queue)
pristine = cloned(job_hash)
# 外側のリトライ
@retrier.global(job_hash, queue) do
@logging.call(job_hash, queue) do
stats(pristine, queue) do
# Rails 5 requires a Reloader to wrap code execution. In order to
# constantize the worker and instantiate an instance, we have to call
# the Reloader. It handles code loading, db connection management, etc.
# Effectively this block denotes a "unit of work" to Rails.
# ここで reloader が入る
@reloader.call do
# 開発時に修正される Worker クラスの生成は reloader の内部。
klass = job_hash['class'.freeze].constantize
worker = klass.new
worker.jid = job_hash['jid'.freeze]
# 内側のリトライ。これまで Retry middleware が担っていたもの。
@retrier.local(worker, job_hash, queue) do
# この yield 呼び出し先で middleware の実行や worker.perform も実行される。
# よって、ActiveRecord の利用もあるが、reloader の内部なので、reloader を
# 抜ける時にコネクションはプールに返される。
yield worker
end
end
end
end
end
end
def execute_job(worker, cloned_args)
worker.perform(*cloned_args)
end
def send_to_morgue(msg)
now = Time.now.to_f
Sidekiq.redis do |conn|
conn.multi do
conn.zadd('dead', now, msg)
conn.zremrangebyscore('dead', '-inf', now - DeadSet.timeout)
conn.zremrangebyrank('dead', 0, -DeadSet.max_jobs)
end
end
end
class Reloader
def call
ActiveSupport::Dependencies.interlock.running do
begin
ActionDispatch::Reloader.prepare! if do_reload_now = reload_dependencies?
yield
ensure
ActionDispatch::Reloader.cleanup! if do_reload_now
end
end
end
private
def reload_dependencies?
@app.config.reload_classes_only_on_change != true || @app.reloaders.any?(&:updated?)
end
module ActionView
class Railtie < Rails::Engine
initializer "action_view.per_request_digest_cache" do |app|
ActiveSupport.on_load(:action_view) do
unless ActionView::Resolver.caching?
app.executor.to_run ActionView::Digestor::PerExecutionDigestCacheExpiry
end
end
end
class SampleHook
def self.run
'run_result' # ここで戻したものが hook_state を通して complete に渡る
end
def self.complete(run_result)
end
end
module ActiveSupport
class Reloader < ExecutionWrapper
def class_unload!(&block) # :nodoc:
require_unload_lock!
run_callbacks(:class_unload, &block)
end
# Acquire the ActiveSupport::Dependencies::Interlock unload lock,
# ensuring it will be released automatically
def require_unload_lock!
unless @locked
ActiveSupport::Dependencies.interlock.start_unloading
@locked = true
end
end
"{\"enqueued_at\":1494408578.771217,\"class\":\"SampleWorker\",\"queue\":\"default\",\"created_at\":1494408578.771217,\"args\":[123456,\"sample_args\"],\"jid\":\"73af9c9f8c5f453facdc0a1c\",\"retry\":1}"
class Reloader
class Reloader
def initialize(app = ::Rails.application)
Sidekiq.logger.debug "Enabling Rails 5+ live code reloading, so hot!" unless app.config.cache_classes
@app = app
end
def call
@app.reloader.wrap do
yield
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment