mastodon/lib/mastodon/sidekiq_middleware.rb

77 lines
2.3 KiB
Ruby

# frozen_string_literal: true
class Mastodon::SidekiqMiddleware
BACKTRACE_LIMIT = 3
def call(_worker_class, job, _queue, &block)
setup_query_log_tags(job) do
Chewy.strategy(:mastodon, &block)
end
rescue Mastodon::HostValidationError
# Do not retry
rescue => e
clean_up_elasticsearch_connections!
limit_backtrace_and_raise(e)
ensure
clean_up_sockets!
end
private
def limit_backtrace_and_raise(exception)
exception.set_backtrace(exception.backtrace.first(BACKTRACE_LIMIT)) unless ENV['BACKTRACE']
raise exception
end
def clean_up_sockets!
clean_up_redis_socket!
clean_up_statsd_socket!
end
# This is a hack to immediately free up unused Elasticsearch connections.
#
# Indeed, Chewy creates one `Elasticsearch::Client` instance per thread,
# and each such client manages its long-lasting connection to
# Elasticsearch.
#
# As far as I know, neither `chewy`, `elasticsearch-transport` or even
# `faraday` provide a reliable way to immediately close a connection, and
# rely on the underlying object to be garbage-collected instead.
#
# Furthermore, `sidekiq` creates a new thread each time a job throws an
# exception, meaning that each failure will create a new connection, and
# the old one will only be closed on full garbage collection.
def clean_up_elasticsearch_connections!
return unless Chewy.enabled? && Chewy.current[:chewy_client].present?
Chewy.client.transport.transport.connections.each do |connection|
# NOTE: This bit of code is tailored for the HTTPClient Faraday adapter
connection.connection.app.instance_variable_get(:@client)&.reset_all
end
Chewy.current.delete(:chewy_client)
rescue
nil
end
def clean_up_redis_socket!
RedisConnection.pool.checkin if Thread.current[:redis]
Thread.current[:redis] = nil
end
def clean_up_statsd_socket!
Thread.current[:statsd_socket]&.close
Thread.current[:statsd_socket] = nil
end
def setup_query_log_tags(job, &block)
if Rails.configuration.active_record.query_log_tags_enabled
# If `wrapped` is set, this is an `ActiveJob` which is already in the execution context
sidekiq_job_class = job['wrapped'].present? ? nil : job['class'].to_s
ActiveSupport::ExecutionContext.set(sidekiq_job_class: sidekiq_job_class, &block)
else
yield
end
end
end