diff --git a/Gemfile b/Gemfile index 138566780..1ad600224 100644 --- a/Gemfile +++ b/Gemfile @@ -159,3 +159,6 @@ end gem 'concurrent-ruby', require: false gem 'connection_pool', require: false + +gem 'xorcist', '~> 1.1' +gem 'pluck_each', '~> 0.1.3' diff --git a/Gemfile.lock b/Gemfile.lock index aea592da1..915b45512 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -403,6 +403,9 @@ GEM pghero (2.7.2) activerecord (>= 5) pkg-config (1.4.4) + pluck_each (0.1.3) + activerecord (> 3.2.0) + activesupport (> 3.0.0) posix-spawn (0.3.15) premailer (1.14.2) addressable @@ -662,6 +665,7 @@ GEM websocket-extensions (>= 0.1.0) websocket-extensions (0.1.5) wisper (2.0.1) + xorcist (1.1.2) xpath (3.2.0) nokogiri (~> 1.8) @@ -748,6 +752,7 @@ DEPENDENCIES pg (~> 1.2) pghero (~> 2.7) pkg-config (~> 1.4) + pluck_each (~> 0.1.3) posix-spawn premailer-rails private_address_check (~> 0.5) @@ -796,3 +801,4 @@ DEPENDENCIES webmock (~> 3.9) webpacker (~> 5.2) webpush + xorcist (~> 1.1) diff --git a/app/controllers/activitypub/followers_synchronizations_controller.rb b/app/controllers/activitypub/followers_synchronizations_controller.rb new file mode 100644 index 000000000..525031105 --- /dev/null +++ b/app/controllers/activitypub/followers_synchronizations_controller.rb @@ -0,0 +1,36 @@ +# frozen_string_literal: true + +class ActivityPub::FollowersSynchronizationsController < ActivityPub::BaseController + include SignatureVerification + include AccountOwnedConcern + + before_action :require_signature! + before_action :set_items + before_action :set_cache_headers + + def show + expires_in 0, public: false + render json: collection_presenter, + serializer: ActivityPub::CollectionSerializer, + adapter: ActivityPub::Adapter, + content_type: 'application/activity+json' + end + + private + + def uri_prefix + signed_request_account.uri[/http(s?):\/\/[^\/]+\//] + end + + def set_items + @items = @account.followers.where(Account.arel_table[:uri].matches(uri_prefix + '%', false, true)).pluck(:uri) + end + + def collection_presenter + ActivityPub::CollectionPresenter.new( + id: account_followers_synchronization_url(@account), + type: :ordered, + items: @items + ) + end +end diff --git a/app/controllers/activitypub/inboxes_controller.rb b/app/controllers/activitypub/inboxes_controller.rb index 0a561e7f0..fdb60d590 100644 --- a/app/controllers/activitypub/inboxes_controller.rb +++ b/app/controllers/activitypub/inboxes_controller.rb @@ -11,6 +11,7 @@ class ActivityPub::InboxesController < ActivityPub::BaseController def create upgrade_account + process_collection_synchronization process_payload head 202 end @@ -52,6 +53,19 @@ class ActivityPub::InboxesController < ActivityPub::BaseController DeliveryFailureTracker.reset!(signed_request_account.inbox_url) end + def process_collection_synchronization + raw_params = request.headers['Collection-Synchronization'] + return if raw_params.blank? || ENV['DISABLE_FOLLOWERS_SYNCHRONIZATION'] == 'true' + + # Re-using the syntax for signature parameters + tree = SignatureParamsParser.new.parse(raw_params) + params = SignatureParamsTransformer.new.apply(tree) + + ActivityPub::PrepareFollowersSynchronizationService.new.call(signed_request_account, params) + rescue Parslet::ParseFailed + Rails.logger.warn 'Error parsing Collection-Synchronization header' + end + def process_payload ActivityPub::ProcessingWorker.perform_async(signed_request_account.id, body, @account&.id) end diff --git a/app/lib/activitypub/tag_manager.rb b/app/lib/activitypub/tag_manager.rb index 3f98dad2e..3f2ae1106 100644 --- a/app/lib/activitypub/tag_manager.rb +++ b/app/lib/activitypub/tag_manager.rb @@ -40,6 +40,10 @@ class ActivityPub::TagManager end end + def uri_for_username(username) + account_url(username: username) + end + def generate_uri_for(_target) URI.join(root_url, 'payloads', SecureRandom.uuid) end diff --git a/app/models/account.rb b/app/models/account.rb index 5acc8d621..59d338f5a 100644 --- a/app/models/account.rb +++ b/app/models/account.rb @@ -352,6 +352,12 @@ class Account < ApplicationRecord shared_inbox_url.presence || inbox_url end + def synchronization_uri_prefix + return 'local' if local? + + @synchronization_uri_prefix ||= uri[/http(s?):\/\/[^\/]+\//] + end + class Field < ActiveModelSerializers::Model attributes :name, :value, :verified_at, :account, :errors diff --git a/app/models/concerns/account_interactions.rb b/app/models/concerns/account_interactions.rb index 6a0ad5aa9..e2c4b8acf 100644 --- a/app/models/concerns/account_interactions.rb +++ b/app/models/concerns/account_interactions.rb @@ -243,6 +243,26 @@ module AccountInteractions .where('users.current_sign_in_at > ?', User::ACTIVE_DURATION.ago) end + def remote_followers_hash(url_prefix) + Rails.cache.fetch("followers_hash:#{id}:#{url_prefix}") do + digest = "\x00" * 32 + followers.where(Account.arel_table[:uri].matches(url_prefix + '%', false, true)).pluck_each(:uri) do |uri| + Xorcist.xor!(digest, Digest::SHA256.digest(uri)) + end + digest.unpack('H*')[0] + end + end + + def local_followers_hash + Rails.cache.fetch("followers_hash:#{id}:local") do + digest = "\x00" * 32 + followers.where(domain: nil).pluck_each(:username) do |username| + Xorcist.xor!(digest, Digest::SHA256.digest(ActivityPub::TagManager.instance.uri_for_username(username))) + end + digest.unpack('H*')[0] + end + end + private def remove_potential_friendship(other_account, mutual = false) diff --git a/app/models/follow.rb b/app/models/follow.rb index 0b4ddbf3f..55a9da792 100644 --- a/app/models/follow.rb +++ b/app/models/follow.rb @@ -41,8 +41,10 @@ class Follow < ApplicationRecord before_validation :set_uri, only: :create after_create :increment_cache_counters + after_create :invalidate_hash_cache after_destroy :remove_endorsements after_destroy :decrement_cache_counters + after_destroy :invalidate_hash_cache private @@ -63,4 +65,10 @@ class Follow < ApplicationRecord account&.decrement_count!(:following_count) target_account&.decrement_count!(:followers_count) end + + def invalidate_hash_cache + return if account.local? && target_account.local? + + Rails.cache.delete("followers_hash:#{target_account_id}:#{account.synchronization_uri_prefix}") + end end diff --git a/app/services/activitypub/prepare_followers_synchronization_service.rb b/app/services/activitypub/prepare_followers_synchronization_service.rb new file mode 100644 index 000000000..2d22ed701 --- /dev/null +++ b/app/services/activitypub/prepare_followers_synchronization_service.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +class ActivityPub::PrepareFollowersSynchronizationService < BaseService + include JsonLdHelper + + def call(account, params) + @account = account + + return if params['collectionId'] != @account.followers_url || invalid_origin?(params['url']) || @account.local_followers_hash == params['digest'] + + ActivityPub::FollowersSynchronizationWorker.perform_async(@account.id, params['url']) + end +end diff --git a/app/services/activitypub/synchronize_followers_service.rb b/app/services/activitypub/synchronize_followers_service.rb new file mode 100644 index 000000000..d83fcf55e --- /dev/null +++ b/app/services/activitypub/synchronize_followers_service.rb @@ -0,0 +1,74 @@ +# frozen_string_literal: true + +class ActivityPub::SynchronizeFollowersService < BaseService + include JsonLdHelper + include Payloadable + + def call(account, partial_collection_url) + @account = account + + items = collection_items(partial_collection_url) + return if items.nil? + + # There could be unresolved accounts (hence the call to .compact) but this + # should never happen in practice, since in almost all cases we keep an + # Account record, and should we not do that, we should have sent a Delete. + # In any case there is not much we can do if that occurs. + @expected_followers = items.map { |uri| ActivityPub::TagManager.instance.uri_to_resource(uri, Account) }.compact + + remove_unexpected_local_followers! + handle_unexpected_outgoing_follows! + end + + private + + def remove_unexpected_local_followers! + @account.followers.local.where.not(id: @expected_followers.map(&:id)).each do |unexpected_follower| + UnfollowService.new.call(unexpected_follower, @account) + end + end + + def handle_unexpected_outgoing_follows! + @expected_followers.each do |expected_follower| + next if expected_follower.following?(@account) + + if expected_follower.requested?(@account) + # For some reason the follow request went through but we missed it + expected_follower.follow_requests.find_by(target_account: @account)&.authorize! + else + # Since we were not aware of the follow from our side, we do not have an + # ID for it that we can include in the Undo activity. For this reason, + # the Undo may not work with software that relies exclusively on + # matching activity IDs and not the actor and target + follow = Follow.new(account: expected_follower, target_account: @account) + ActivityPub::DeliveryWorker.perform_async(build_undo_follow_json(follow), follow.account_id, follow.target_account.inbox_url) + end + end + end + + def build_undo_follow_json(follow) + Oj.dump(serialize_payload(follow, ActivityPub::UndoFollowSerializer)) + end + + def collection_items(collection_or_uri) + collection = fetch_collection(collection_or_uri) + return unless collection.is_a?(Hash) + + collection = fetch_collection(collection['first']) if collection['first'].present? + return unless collection.is_a?(Hash) + + case collection['type'] + when 'Collection', 'CollectionPage' + collection['items'] + when 'OrderedCollection', 'OrderedCollectionPage' + collection['orderedItems'] + end + end + + def fetch_collection(collection_or_uri) + return collection_or_uri if collection_or_uri.is_a?(Hash) + return if invalid_origin?(collection_or_uri) + + fetch_resource_without_id_validation(collection_or_uri, nil, true) + end +end diff --git a/app/workers/activitypub/delivery_worker.rb b/app/workers/activitypub/delivery_worker.rb index 60775787a..6c5a576a7 100644 --- a/app/workers/activitypub/delivery_worker.rb +++ b/app/workers/activitypub/delivery_worker.rb @@ -2,6 +2,7 @@ class ActivityPub::DeliveryWorker include Sidekiq::Worker + include RoutingHelper include JsonLdHelper STOPLIGHT_FAILURE_THRESHOLD = 10 @@ -38,9 +39,18 @@ class ActivityPub::DeliveryWorker Request.new(:post, @inbox_url, body: @json, http_client: http_client).tap do |request| request.on_behalf_of(@source_account, :uri, sign_with: @options[:sign_with]) request.add_headers(HEADERS) + request.add_headers({ 'Collection-Synchronization' => synchronization_header }) if ENV['DISABLE_FOLLOWERS_SYNCHRONIZATION'] != 'true' && @options[:synchronize_followers] end end + def synchronization_header + "collectionId=\"#{account_followers_url(@source_account)}\", digest=\"#{@source_account.remote_followers_hash(inbox_url_prefix)}\", url=\"#{account_followers_synchronization_url(@source_account)}\"" + end + + def inbox_url_prefix + @inbox_url[/http(s?):\/\/[^\/]+\//] + end + def perform_request light = Stoplight(@inbox_url) do request_pool.with(@host) do |http_client| diff --git a/app/workers/activitypub/distribution_worker.rb b/app/workers/activitypub/distribution_worker.rb index e4997ba0e..9b4814644 100644 --- a/app/workers/activitypub/distribution_worker.rb +++ b/app/workers/activitypub/distribution_worker.rb @@ -13,7 +13,7 @@ class ActivityPub::DistributionWorker return if skip_distribution? ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| - [payload, @account.id, inbox_url] + [payload, @account.id, inbox_url, { synchronize_followers: !@status.distributable? }] end relay! if relayable? diff --git a/app/workers/activitypub/followers_synchronization_worker.rb b/app/workers/activitypub/followers_synchronization_worker.rb new file mode 100644 index 000000000..35a3ef0b9 --- /dev/null +++ b/app/workers/activitypub/followers_synchronization_worker.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +class ActivityPub::FollowersSynchronizationWorker + include Sidekiq::Worker + + sidekiq_options queue: 'push', lock: :until_executed + + def perform(account_id, url) + @account = Account.find_by(id: account_id) + return true if @account.nil? + + ActivityPub::SynchronizeFollowersService.new.call(@account, url) + end +end diff --git a/config/routes.rb b/config/routes.rb index a21dbd45e..e0ef23723 100644 --- a/config/routes.rb +++ b/config/routes.rb @@ -83,6 +83,7 @@ Rails.application.routes.draw do resource :inbox, only: [:create], module: :activitypub resource :claim, only: [:create], module: :activitypub resources :collections, only: [:show], module: :activitypub + resource :followers_synchronization, only: [:show], module: :activitypub end resource :inbox, only: [:create], module: :activitypub diff --git a/spec/controllers/activitypub/followers_synchronizations_controller_spec.rb b/spec/controllers/activitypub/followers_synchronizations_controller_spec.rb new file mode 100644 index 000000000..a24d3f8e0 --- /dev/null +++ b/spec/controllers/activitypub/followers_synchronizations_controller_spec.rb @@ -0,0 +1,58 @@ +require 'rails_helper' + +RSpec.describe ActivityPub::FollowersSynchronizationsController, type: :controller do + let!(:account) { Fabricate(:account) } + let!(:follower_1) { Fabricate(:account, domain: 'example.com', uri: 'https://example.com/users/a') } + let!(:follower_2) { Fabricate(:account, domain: 'example.com', uri: 'https://example.com/users/b') } + let!(:follower_3) { Fabricate(:account, domain: 'foo.com', uri: 'https://foo.com/users/a') } + + before do + follower_1.follow!(account) + follower_2.follow!(account) + follower_3.follow!(account) + end + + before do + allow(controller).to receive(:signed_request_account).and_return(remote_account) + end + + describe 'GET #show' do + context 'without signature' do + let(:remote_account) { nil } + + before do + get :show, params: { account_username: account.username } + end + + it 'returns http not authorized' do + expect(response).to have_http_status(401) + end + end + + context 'with signature from example.com' do + let(:remote_account) { Fabricate(:account, domain: 'example.com', uri: 'https://example.com/instance') } + + before do + get :show, params: { account_username: account.username } + end + + it 'returns http success' do + expect(response).to have_http_status(200) + end + + it 'returns application/activity+json' do + expect(response.content_type).to eq 'application/activity+json' + end + + it 'returns orderedItems with followers from example.com' do + json = body_as_json + expect(json[:orderedItems]).to be_an Array + expect(json[:orderedItems].sort).to eq [follower_1.uri, follower_2.uri] + end + + it 'returns private Cache-Control header' do + expect(response.headers['Cache-Control']).to eq 'max-age=0, private' + end + end + end +end diff --git a/spec/controllers/activitypub/inboxes_controller_spec.rb b/spec/controllers/activitypub/inboxes_controller_spec.rb index f3bc23953..e5c004611 100644 --- a/spec/controllers/activitypub/inboxes_controller_spec.rb +++ b/spec/controllers/activitypub/inboxes_controller_spec.rb @@ -22,6 +22,56 @@ RSpec.describe ActivityPub::InboxesController, type: :controller do end end + context 'with Collection-Synchronization header' do + let(:remote_account) { Fabricate(:account, followers_url: 'https://example.com/followers', domain: 'example.com', uri: 'https://example.com/actor', protocol: :activitypub) } + let(:synchronization_collection) { remote_account.followers_url } + let(:synchronization_url) { 'https://example.com/followers-for-domain' } + let(:synchronization_hash) { 'somehash' } + let(:synchronization_header) { "collectionId=\"#{synchronization_collection}\", digest=\"#{synchronization_hash}\", url=\"#{synchronization_url}\"" } + + before do + allow(ActivityPub::FollowersSynchronizationWorker).to receive(:perform_async).and_return(nil) + allow_any_instance_of(Account).to receive(:local_followers_hash).and_return('somehash') + + request.headers['Collection-Synchronization'] = synchronization_header + post :create, body: '{}' + end + + context 'with mismatching target collection' do + let(:synchronization_collection) { 'https://example.com/followers2' } + + it 'does not start a synchronization job' do + expect(ActivityPub::FollowersSynchronizationWorker).not_to have_received(:perform_async) + end + end + + context 'with mismatching domain in partial collection attribute' do + let(:synchronization_url) { 'https://example.org/followers' } + + it 'does not start a synchronization job' do + expect(ActivityPub::FollowersSynchronizationWorker).not_to have_received(:perform_async) + end + end + + context 'with matching digest' do + it 'does not start a synchronization job' do + expect(ActivityPub::FollowersSynchronizationWorker).not_to have_received(:perform_async) + end + end + + context 'with mismatching digest' do + let(:synchronization_hash) { 'wronghash' } + + it 'starts a synchronization job' do + expect(ActivityPub::FollowersSynchronizationWorker).to have_received(:perform_async) + end + end + + it 'returns http accepted' do + expect(response).to have_http_status(202) + end + end + context 'without signature' do before do post :create, body: '{}' diff --git a/spec/models/concerns/account_interactions_spec.rb b/spec/models/concerns/account_interactions_spec.rb index f0380179c..85fbf7e79 100644 --- a/spec/models/concerns/account_interactions_spec.rb +++ b/spec/models/concerns/account_interactions_spec.rb @@ -539,6 +539,49 @@ describe AccountInteractions do end end + describe '#followers_hash' do + let(:me) { Fabricate(:account, username: 'Me') } + let(:remote_1) { Fabricate(:account, username: 'alice', domain: 'example.org', uri: 'https://example.org/users/alice') } + let(:remote_2) { Fabricate(:account, username: 'bob', domain: 'example.org', uri: 'https://example.org/users/bob') } + let(:remote_3) { Fabricate(:account, username: 'eve', domain: 'foo.org', uri: 'https://foo.org/users/eve') } + + before do + remote_1.follow!(me) + remote_2.follow!(me) + remote_3.follow!(me) + me.follow!(remote_1) + end + + context 'on a local user' do + it 'returns correct hash for remote domains' do + expect(me.remote_followers_hash('https://example.org/')).to eq '707962e297b7bd94468a21bc8e506a1bcea607a9142cd64e27c9b106b2a5f6ec' + expect(me.remote_followers_hash('https://foo.org/')).to eq 'ccb9c18a67134cfff9d62c7f7e7eb88e6b803446c244b84265565f4eba29df0e' + end + + it 'invalidates cache as needed when removing or adding followers' do + expect(me.remote_followers_hash('https://example.org/')).to eq '707962e297b7bd94468a21bc8e506a1bcea607a9142cd64e27c9b106b2a5f6ec' + remote_1.unfollow!(me) + expect(me.remote_followers_hash('https://example.org/')).to eq '241b00794ce9b46aa864f3220afadef128318da2659782985bac5ed5bd436bff' + remote_1.follow!(me) + expect(me.remote_followers_hash('https://example.org/')).to eq '707962e297b7bd94468a21bc8e506a1bcea607a9142cd64e27c9b106b2a5f6ec' + end + end + + context 'on a remote user' do + it 'returns correct hash for remote domains' do + expect(remote_1.local_followers_hash).to eq Digest::SHA256.hexdigest(ActivityPub::TagManager.instance.uri_for(me)) + end + + it 'invalidates cache as needed when removing or adding followers' do + expect(remote_1.local_followers_hash).to eq Digest::SHA256.hexdigest(ActivityPub::TagManager.instance.uri_for(me)) + me.unfollow!(remote_1) + expect(remote_1.local_followers_hash).to eq '0000000000000000000000000000000000000000000000000000000000000000' + me.follow!(remote_1) + expect(remote_1.local_followers_hash).to eq Digest::SHA256.hexdigest(ActivityPub::TagManager.instance.uri_for(me)) + end + end + end + describe 'muting an account' do let(:me) { Fabricate(:account, username: 'Me') } let(:you) { Fabricate(:account, username: 'You') } diff --git a/spec/services/activitypub/synchronize_followers_service_spec.rb b/spec/services/activitypub/synchronize_followers_service_spec.rb new file mode 100644 index 000000000..75dcf204b --- /dev/null +++ b/spec/services/activitypub/synchronize_followers_service_spec.rb @@ -0,0 +1,105 @@ +require 'rails_helper' + +RSpec.describe ActivityPub::SynchronizeFollowersService, type: :service do + let(:actor) { Fabricate(:account, domain: 'example.com', uri: 'http://example.com/account', inbox_url: 'http://example.com/inbox') } + let(:alice) { Fabricate(:account, username: 'alice') } + let(:bob) { Fabricate(:account, username: 'bob') } + let(:eve) { Fabricate(:account, username: 'eve') } + let(:mallory) { Fabricate(:account, username: 'mallory') } + let(:collection_uri) { 'http://example.com/partial-followers' } + + let(:items) do + [ + ActivityPub::TagManager.instance.uri_for(alice), + ActivityPub::TagManager.instance.uri_for(eve), + ActivityPub::TagManager.instance.uri_for(mallory), + ] + end + + let(:payload) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + type: 'Collection', + id: collection_uri, + items: items, + }.with_indifferent_access + end + + subject { described_class.new } + + shared_examples 'synchronizes followers' do + before do + alice.follow!(actor) + bob.follow!(actor) + mallory.request_follow!(actor) + + allow(ActivityPub::DeliveryWorker).to receive(:perform_async) + + subject.call(actor, collection_uri) + end + + it 'keeps expected followers' do + expect(alice.following?(actor)).to be true + end + + it 'removes local followers not in the remote list' do + expect(bob.following?(actor)).to be false + end + + it 'converts follow requests to follow relationships when they have been accepted' do + expect(mallory.following?(actor)).to be true + end + + it 'sends an Undo Follow to the actor' do + expect(ActivityPub::DeliveryWorker).to have_received(:perform_async).with(anything, eve.id, actor.inbox_url) + end + end + + describe '#call' do + context 'when the endpoint is a Collection of actor URIs' do + before do + stub_request(:get, collection_uri).to_return(status: 200, body: Oj.dump(payload)) + end + + it_behaves_like 'synchronizes followers' + end + + context 'when the endpoint is an OrderedCollection of actor URIs' do + let(:payload) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + type: 'OrderedCollection', + id: collection_uri, + orderedItems: items, + }.with_indifferent_access + end + + before do + stub_request(:get, collection_uri).to_return(status: 200, body: Oj.dump(payload)) + end + + it_behaves_like 'synchronizes followers' + end + + context 'when the endpoint is a paginated Collection of actor URIs' do + let(:payload) do + { + '@context': 'https://www.w3.org/ns/activitystreams', + type: 'Collection', + id: collection_uri, + first: { + type: 'CollectionPage', + partOf: collection_uri, + items: items, + } + }.with_indifferent_access + end + + before do + stub_request(:get, collection_uri).to_return(status: 200, body: Oj.dump(payload)) + end + + it_behaves_like 'synchronizes followers' + end + end +end diff --git a/spec/workers/activitypub/delivery_worker_spec.rb b/spec/workers/activitypub/delivery_worker_spec.rb index 351be185c..f4633731e 100644 --- a/spec/workers/activitypub/delivery_worker_spec.rb +++ b/spec/workers/activitypub/delivery_worker_spec.rb @@ -3,16 +3,22 @@ require 'rails_helper' describe ActivityPub::DeliveryWorker do + include RoutingHelper + subject { described_class.new } let(:sender) { Fabricate(:account) } let(:payload) { 'test' } + before do + allow_any_instance_of(Account).to receive(:remote_followers_hash).with('https://example.com/').and_return('somehash') + end + describe 'perform' do it 'performs a request' do stub_request(:post, 'https://example.com/api').to_return(status: 200) - subject.perform(payload, sender.id, 'https://example.com/api') - expect(a_request(:post, 'https://example.com/api')).to have_been_made.once + subject.perform(payload, sender.id, 'https://example.com/api', { synchronize_followers: true }) + expect(a_request(:post, 'https://example.com/api').with(headers: { 'Collection-Synchronization' => "collectionId=\"#{account_followers_url(sender)}\", digest=\"somehash\", url=\"#{account_followers_synchronization_url(sender)}\"" })).to have_been_made.once end it 'raises when request fails' do