Fix background jobs not using locks like they are supposed to (#13361)
Also: - Fix locks not being removed when jobs go to the dead job queue - Add UI for managing locks to the Sidekiq dashboard - Remove unused Sidekiq workers Fix #13349
This commit is contained in:
		
							parent
							
								
									1fb92037e4
								
							
						
					
					
						commit
						9014367bd8
					
				
					 34 changed files with 23 additions and 152 deletions
				
			
		| 
						 | 
				
			
			@ -4,7 +4,7 @@ class ActivityPub::DistributePollUpdateWorker
 | 
			
		|||
  include Sidekiq::Worker
 | 
			
		||||
  include Payloadable
 | 
			
		||||
 | 
			
		||||
  sidekiq_options queue: 'push', unique: :until_executed, retry: 0
 | 
			
		||||
  sidekiq_options queue: 'push', lock: :until_executed, retry: 0
 | 
			
		||||
 | 
			
		||||
  def perform(status_id)
 | 
			
		||||
    @status  = Status.find(status_id)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,7 +3,7 @@
 | 
			
		|||
class ActivityPub::SynchronizeFeaturedCollectionWorker
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options queue: 'pull', unique: :until_executed
 | 
			
		||||
  sidekiq_options queue: 'pull', lock: :until_executed
 | 
			
		||||
 | 
			
		||||
  def perform(account_id)
 | 
			
		||||
    ActivityPub::FetchFeaturedCollectionService.new.call(Account.find(account_id))
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,9 +0,0 @@
 | 
			
		|||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
class AfterRemoteFollowRequestWorker
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options queue: 'pull', retry: 5
 | 
			
		||||
 | 
			
		||||
  def perform(follow_request_id); end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			@ -1,9 +0,0 @@
 | 
			
		|||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
class AfterRemoteFollowWorker
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options queue: 'pull', retry: 5
 | 
			
		||||
 | 
			
		||||
  def perform(follow_id); end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			@ -1,9 +0,0 @@
 | 
			
		|||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
class NotificationWorker
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options queue: 'push', retry: 5
 | 
			
		||||
 | 
			
		||||
  def perform(xml, source_account_id, target_account_id); end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			@ -3,7 +3,7 @@
 | 
			
		|||
class PollExpirationNotifyWorker
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options unique: :until_executed
 | 
			
		||||
  sidekiq_options lock: :until_executed
 | 
			
		||||
 | 
			
		||||
  def perform(poll_id)
 | 
			
		||||
    poll = Poll.find(poll_id)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,9 +0,0 @@
 | 
			
		|||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
class ProcessingWorker
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options backtrace: true
 | 
			
		||||
 | 
			
		||||
  def perform(account_id, body); end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			@ -3,7 +3,7 @@
 | 
			
		|||
class PublishScheduledStatusWorker
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options unique: :until_executed
 | 
			
		||||
  sidekiq_options lock: :until_executed
 | 
			
		||||
 | 
			
		||||
  def perform(scheduled_status_id)
 | 
			
		||||
    scheduled_status = ScheduledStatus.find(scheduled_status_id)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,9 +0,0 @@
 | 
			
		|||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
class Pubsubhubbub::ConfirmationWorker
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options queue: 'push', retry: false
 | 
			
		||||
 | 
			
		||||
  def perform(subscription_id, mode, secret = nil, lease_seconds = nil); end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			@ -1,9 +0,0 @@
 | 
			
		|||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
class Pubsubhubbub::DeliveryWorker
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options queue: 'push', retry: 3, dead: false
 | 
			
		||||
 | 
			
		||||
  def perform(subscription_id, payload); end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			@ -1,9 +0,0 @@
 | 
			
		|||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
class Pubsubhubbub::DistributionWorker
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options queue: 'push'
 | 
			
		||||
 | 
			
		||||
  def perform(stream_entry_ids); end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			@ -1,9 +0,0 @@
 | 
			
		|||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
class Pubsubhubbub::RawDistributionWorker
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options queue: 'push'
 | 
			
		||||
 | 
			
		||||
  def perform(xml, source_account_id); end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			@ -1,9 +0,0 @@
 | 
			
		|||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
class Pubsubhubbub::SubscribeWorker
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options queue: 'push', retry: 10, unique: :until_executed, dead: false
 | 
			
		||||
 | 
			
		||||
  def perform(account_id); end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			@ -1,9 +0,0 @@
 | 
			
		|||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
class Pubsubhubbub::UnsubscribeWorker
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options queue: 'push', retry: false, unique: :until_executed, dead: false
 | 
			
		||||
 | 
			
		||||
  def perform(account_id); end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			@ -3,7 +3,7 @@
 | 
			
		|||
class RegenerationWorker
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options unique: :until_executed
 | 
			
		||||
  sidekiq_options lock: :until_executed
 | 
			
		||||
 | 
			
		||||
  def perform(account_id, _ = :home)
 | 
			
		||||
    account = Account.find(account_id)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,9 +0,0 @@
 | 
			
		|||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
class RemoteProfileUpdateWorker
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options queue: 'pull'
 | 
			
		||||
 | 
			
		||||
  def perform(account_id, body, resubscribe); end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			@ -3,7 +3,7 @@
 | 
			
		|||
class ResolveAccountWorker
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options queue: 'pull', unique: :until_executed
 | 
			
		||||
  sidekiq_options queue: 'pull', lock: :until_executed
 | 
			
		||||
 | 
			
		||||
  def perform(uri)
 | 
			
		||||
    ResolveAccountService.new.call(uri)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,9 +0,0 @@
 | 
			
		|||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
class SalmonWorker
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options backtrace: true
 | 
			
		||||
 | 
			
		||||
  def perform(account_id, body); end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			@ -3,7 +3,7 @@
 | 
			
		|||
class Scheduler::BackupCleanupScheduler
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options unique: :until_executed, retry: 0
 | 
			
		||||
  sidekiq_options lock: :until_executed, retry: 0
 | 
			
		||||
 | 
			
		||||
  def perform
 | 
			
		||||
    old_backups.reorder(nil).find_each(&:destroy!)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,7 +3,7 @@
 | 
			
		|||
class Scheduler::DoorkeeperCleanupScheduler
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options unique: :until_executed, retry: 0
 | 
			
		||||
  sidekiq_options lock: :until_executed, retry: 0
 | 
			
		||||
 | 
			
		||||
  def perform
 | 
			
		||||
    Doorkeeper::AccessToken.where('revoked_at IS NOT NULL').where('revoked_at < NOW()').delete_all
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,7 +3,7 @@
 | 
			
		|||
class Scheduler::EmailScheduler
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options unique: :until_executed, retry: 0
 | 
			
		||||
  sidekiq_options lock: :until_executed, retry: 0
 | 
			
		||||
 | 
			
		||||
  FREQUENCY      = 7.days.freeze
 | 
			
		||||
  SIGN_IN_OFFSET = 1.day.freeze
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -4,7 +4,7 @@ class Scheduler::FeedCleanupScheduler
 | 
			
		|||
  include Sidekiq::Worker
 | 
			
		||||
  include Redisable
 | 
			
		||||
 | 
			
		||||
  sidekiq_options unique: :until_executed, retry: 0
 | 
			
		||||
  sidekiq_options lock: :until_executed, retry: 0
 | 
			
		||||
 | 
			
		||||
  def perform
 | 
			
		||||
    clean_home_feeds!
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -5,7 +5,7 @@ class Scheduler::IpCleanupScheduler
 | 
			
		|||
 | 
			
		||||
  RETENTION_PERIOD = 1.year
 | 
			
		||||
 | 
			
		||||
  sidekiq_options unique: :until_executed, retry: 0
 | 
			
		||||
  sidekiq_options lock: :until_executed, retry: 0
 | 
			
		||||
 | 
			
		||||
  def perform
 | 
			
		||||
    time_ago = RETENTION_PERIOD.ago
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,7 +3,7 @@
 | 
			
		|||
class Scheduler::MediaCleanupScheduler
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options unique: :until_executed, retry: 0
 | 
			
		||||
  sidekiq_options lock: :until_executed, retry: 0
 | 
			
		||||
 | 
			
		||||
  def perform
 | 
			
		||||
    unattached_media.find_each(&:destroy)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,7 +3,7 @@
 | 
			
		|||
class Scheduler::PgheroScheduler
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options unique: :until_executed, retry: 0
 | 
			
		||||
  sidekiq_options lock: :until_executed, retry: 0
 | 
			
		||||
 | 
			
		||||
  def perform
 | 
			
		||||
    PgHero.capture_space_stats
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,7 +3,7 @@
 | 
			
		|||
class Scheduler::ScheduledStatusesScheduler
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options unique: :until_executed, retry: 0
 | 
			
		||||
  sidekiq_options lock: :until_executed, retry: 0
 | 
			
		||||
 | 
			
		||||
  def perform
 | 
			
		||||
    publish_scheduled_statuses!
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,9 +0,0 @@
 | 
			
		|||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
class Scheduler::SubscriptionsCleanupScheduler
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options unique: :until_executed, retry: 0
 | 
			
		||||
 | 
			
		||||
  def perform; end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			@ -1,9 +0,0 @@
 | 
			
		|||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
class Scheduler::SubscriptionsScheduler
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options unique: :until_executed, retry: 0
 | 
			
		||||
 | 
			
		||||
  def perform; end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			@ -3,7 +3,7 @@
 | 
			
		|||
class Scheduler::TrendingTagsScheduler
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options unique: :until_executed, retry: 0
 | 
			
		||||
  sidekiq_options lock: :until_executed, retry: 0
 | 
			
		||||
 | 
			
		||||
  def perform
 | 
			
		||||
    TrendingTags.update! if Setting.trends
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,7 +3,7 @@
 | 
			
		|||
class Scheduler::UserCleanupScheduler
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options unique: :until_executed, retry: 0
 | 
			
		||||
  sidekiq_options lock: :until_executed, retry: 0
 | 
			
		||||
 | 
			
		||||
  def perform
 | 
			
		||||
    User.where('confirmed_at is NULL AND confirmation_sent_at <= ?', 2.days.ago).reorder(nil).find_in_batches do |batch|
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,7 +3,7 @@
 | 
			
		|||
class VerifyAccountLinksWorker
 | 
			
		||||
  include Sidekiq::Worker
 | 
			
		||||
 | 
			
		||||
  sidekiq_options queue: 'pull', retry: false, unique: :until_executed
 | 
			
		||||
  sidekiq_options queue: 'pull', retry: false, lock: :until_executed
 | 
			
		||||
 | 
			
		||||
  def perform(account_id)
 | 
			
		||||
    account = Account.find(account_id)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -13,6 +13,11 @@ Sidekiq.configure_server do |config|
 | 
			
		|||
  config.server_middleware do |chain|
 | 
			
		||||
    chain.add SidekiqErrorHandler
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  config.death_handlers << lambda do |job, _ex|
 | 
			
		||||
    digest = job['lock_digest']
 | 
			
		||||
    SidekiqUniqueJobs::Digests.delete_by_digest(digest) if digest
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
 | 
			
		||||
Sidekiq.configure_client do |config|
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,6 +1,6 @@
 | 
			
		|||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
require 'sidekiq/web'
 | 
			
		||||
require 'sidekiq_unique_jobs/web'
 | 
			
		||||
require 'sidekiq-scheduler/web'
 | 
			
		||||
 | 
			
		||||
Sidekiq::Web.set :session_secret, Rails.application.secrets[:secret_key_base]
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -91,10 +91,6 @@ RSpec.describe ImportService, type: :service do
 | 
			
		|||
 | 
			
		||||
    let(:csv) { attachment_fixture('mute-imports.txt') }
 | 
			
		||||
 | 
			
		||||
    before do
 | 
			
		||||
      allow(NotificationWorker).to receive(:perform_async)
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    describe 'when no accounts are followed' do
 | 
			
		||||
      let(:import) { Import.create(account: account, type: 'following', data: csv) }
 | 
			
		||||
      it 'follows the listed accounts, including boosts' do
 | 
			
		||||
| 
						 | 
				
			
			@ -135,10 +131,6 @@ RSpec.describe ImportService, type: :service do
 | 
			
		|||
 | 
			
		||||
    let(:csv) { attachment_fixture('new-following-imports.txt') }
 | 
			
		||||
 | 
			
		||||
    before do
 | 
			
		||||
      allow(NotificationWorker).to receive(:perform_async)
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    describe 'when no accounts are followed' do
 | 
			
		||||
      let(:import) { Import.create(account: account, type: 'following', data: csv) }
 | 
			
		||||
      it 'follows the listed accounts, respecting boosts' do
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		
		Reference in a new issue