Skip to content

Instantly share code, notes, and snippets.

@smoyte
Created February 4, 2014 17:45
Show Gist options
  • Select an option

  • Save smoyte/8808653 to your computer and use it in GitHub Desktop.

Select an option

Save smoyte/8808653 to your computer and use it in GitHub Desktop.

Revisions

  1. smoyte created this gist Feb 4, 2014.
    219 changes: 219 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,219 @@
    module Jobs

    def self.queued
    Sidekiq::Stats.new.enqueued
    end

    def self.last_job_performed_at
    Sidekiq.redis do |r|
    int = r.get('last_job_perform_at')
    int ? Time.at(int.to_i) : nil
    end
    end

    def self.num_email_retry_jobs
    Sidekiq::RetrySet.new.select { |job| job.klass =~ /Email$/ }.size
    end

    class Base

    class Instrumenter

    def self.stats
    Thread.current[:db_stats] ||= Stats.new
    end

    class Stats
    attr_accessor :query_count, :duration_ms

    def initialize
    @query_count = 0
    @duration_ms = 0
    end
    end

    def call(name, start, finish, message_id, values)
    stats = Instrumenter.stats
    stats.query_count += 1
    stats.duration_ms += (((finish - start).to_f) * 1000).to_i
    end
    end

    include Sidekiq::Worker

    def initialize
    @db_duration = 0
    end

    def log(*args)
    puts args
    args.each do |arg|
    Rails.logger.info "#{Time.now.to_formatted_s(:db)}: [#{self.class.name.upcase}] #{arg}"
    end
    true
    end

    def self.delayed_perform(opts={})
    self.new.perform(opts)
    end

    def execute(opts={})
    raise "Overwrite me!"
    end

    def last_db_duration
    @db_duration || 0
    end

    def ensure_db_instrumented
    @@instrumented ||= begin
    ActiveSupport::Notifications.subscribe('sql.active_record', Instrumenter.new)
    true
    end
    end

    def perform(*args)
    ensure_db_instrumented
    opts = args.extract_options!.with_indifferent_access

    if SiteSetting.queue_jobs?
    Sidekiq.redis do |r|
    r.set('last_job_perform_at', Time.now.to_i)
    end
    end

    if opts.delete(:sync_exec)
    if opts.has_key?(:current_site_id) && opts[:current_site_id] != RailsMultisite::ConnectionManagement.current_db
    raise ArgumentError.new("You can't connect to another database when executing a job synchronously.")
    else
    return execute(opts)
    end
    end


    dbs =
    if opts[:current_site_id]
    [opts[:current_site_id]]
    else
    RailsMultisite::ConnectionManagement.all_dbs
    end

    total_db_time = 0
    dbs.each do |db|
    begin
    thread_exception = nil
    # NOTE: This looks odd, in fact it looks crazy but there is a reason
    # A bug in therubyracer means that under certain conditions running in a fiber
    # can cause the whole v8 context to corrupt so much that it will hang sidekiq
    #
    # If you are brave and want to try to fix this either in celluloid or therubyracer, the repro is:
    #
    # 1. Create a big Discourse db: (you can start from script/profile_db_generator.rb)
    # 2. Queue a ton of jobs, eg: User.pluck(:id).each{|id| Jobs.enqueue(:user_email, type: :digest, user_id: id)};
    # 3. Run sidekiq
    #
    # The issue only happens in Ruby 2.0 for some reason, you start getting V8::Error with no context
    #
    # See: https://github.com/cowboyd/therubyracer/issues/206
    #
    # The restricted stack space of fibers opens a bunch of risks up, by avoiding them altogether
    # we can mitigate giving up a very marginal amount of throughput
    #
    # Ideally we could just tell sidekiq to avoid fibers

    t = Thread.new do
    begin
    RailsMultisite::ConnectionManagement.establish_connection(db: db)
    I18n.locale = SiteSetting.default_locale
    execute(opts)
    rescue => e
    thread_exception = e
    ensure
    ActiveRecord::Base.connection_handler.clear_active_connections!
    total_db_time += Instrumenter.stats.duration_ms
    end
    end
    t.join

    raise thread_exception if thread_exception
    end
    end

    ensure
    ActiveRecord::Base.connection_handler.clear_active_connections!
    @db_duration = total_db_time
    end

    end

    class Scheduled < Base
    include Sidetiq::Schedulable
    end

    def self.enqueue(job_name, opts={})
    klass = "Jobs::#{job_name.to_s.camelcase}".constantize

    # Unless we want to work on all sites
    unless opts.delete(:all_sites)
    opts[:current_site_id] ||= RailsMultisite::ConnectionManagement.current_db
    end

    # If we are able to queue a job, do it
    if SiteSetting.queue_jobs?
    if opts[:delay_for].present?
    klass.delay_for(opts.delete(:delay_for)).delayed_perform(opts)
    else
    Sidekiq::Client.enqueue(klass, opts)
    end
    else
    # Otherwise execute the job right away
    opts.delete(:delay_for)
    opts[:sync_exec] = true
    klass.new.perform(opts)
    end

    end

    def self.enqueue_in(secs, job_name, opts={})
    enqueue(job_name, opts.merge!(delay_for: secs))
    end

    def self.enqueue_at(datetime, job_name, opts={})
    enqueue_in( [(datetime - Time.zone.now).to_i, 0].max, job_name, opts )
    end

    def self.cancel_scheduled_job(job_name, params={})
    jobs = scheduled_for(job_name, params)
    return false if jobs.empty?
    jobs.each { |job| job.delete }
    true
    end

    def self.scheduled_for(job_name, params={})
    job_class = "Jobs::#{job_name.to_s.camelcase}"
    Sidekiq::ScheduledSet.new.select do |scheduled_job|
    if scheduled_job.klass == 'Sidekiq::Extensions::DelayedClass'
    job_args = YAML.load(scheduled_job.args[0])
    job_args_class, _, (job_args_params, *) = job_args
    if job_args_class.to_s == job_class && job_args_params
    matched = true
    params.each do |key, value|
    unless job_args_params[key] == value
    matched = false
    break
    end
    end
    matched
    else
    false
    end
    else
    false
    end
    end
    end
    end

    # Require all jobs
    Dir["#{Rails.root}/app/jobs/regular/*.rb"].each {|file| require_dependency file }
    Dir["#{Rails.root}/app/jobs/scheduled/*.rb"].each {|file| require_dependency file }