forked from cybrespace/mastodon
		
	Fix race condition when receiving an ActivityPub Create multiple times (#4930)
* Fix race condition when receiving an ActivityPub Create multiple times * Use a RedisLock to avoid concurrent processing of a same Create activity
This commit is contained in:
		
							parent
							
								
									bdcc9e2ceb
								
							
						
					
					
						commit
						4a73615193
					
				
					 1 changed files with 23 additions and 14 deletions
				
			
		|  | @ -4,26 +4,31 @@ class ActivityPub::Activity::Create < ActivityPub::Activity | ||||||
|   def perform |   def perform | ||||||
|     return if delete_arrived_first?(object_uri) || unsupported_object_type? |     return if delete_arrived_first?(object_uri) || unsupported_object_type? | ||||||
| 
 | 
 | ||||||
|     status = find_existing_status |     RedisLock.acquire(lock_options) do |lock| | ||||||
| 
 |       if lock.acquired? | ||||||
|     return status unless status.nil? |         @status = find_existing_status | ||||||
| 
 |         process_status if @status.nil? | ||||||
|     ApplicationRecord.transaction do |       end | ||||||
|       status = Status.create!(status_params) |  | ||||||
| 
 |  | ||||||
|       process_tags(status) |  | ||||||
|       process_attachments(status) |  | ||||||
|     end |     end | ||||||
| 
 | 
 | ||||||
|     resolve_thread(status) |     @status | ||||||
|     distribute(status) |  | ||||||
|     forward_for_reply if status.public_visibility? || status.unlisted_visibility? |  | ||||||
| 
 |  | ||||||
|     status |  | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|   private |   private | ||||||
| 
 | 
 | ||||||
|  |   def process_status | ||||||
|  |     ApplicationRecord.transaction do | ||||||
|  |       @status = Status.create!(status_params) | ||||||
|  | 
 | ||||||
|  |       process_tags(@status) | ||||||
|  |       process_attachments(@status) | ||||||
|  |     end | ||||||
|  | 
 | ||||||
|  |     resolve_thread(@status) | ||||||
|  |     distribute(@status) | ||||||
|  |     forward_for_reply if @status.public_visibility? || @status.unlisted_visibility? | ||||||
|  |   end | ||||||
|  | 
 | ||||||
|   def find_existing_status |   def find_existing_status | ||||||
|     status   = status_from_uri(object_uri) |     status   = status_from_uri(object_uri) | ||||||
|     status ||= Status.find_by(uri: @object['atomUri']) if @object['atomUri'].present? |     status ||= Status.find_by(uri: @object['atomUri']) if @object['atomUri'].present? | ||||||
|  | @ -182,4 +187,8 @@ class ActivityPub::Activity::Create < ActivityPub::Activity | ||||||
|     return unless @json['signature'].present? && reply_to_local? |     return unless @json['signature'].present? && reply_to_local? | ||||||
|     ActivityPub::RawDistributionWorker.perform_async(Oj.dump(@json), replied_to_status.account_id) |     ActivityPub::RawDistributionWorker.perform_async(Oj.dump(@json), replied_to_status.account_id) | ||||||
|   end |   end | ||||||
|  | 
 | ||||||
|  |   def lock_options | ||||||
|  |     { redis: Redis.current, key: "create:#{@object['id']}" } | ||||||
|  |   end | ||||||
| end | end | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		
		Reference in a new issue