Change Web Push API deliveries to use request pooling (#16014)
This commit is contained in:
		
							parent
							
								
									463875f645
								
							
						
					
					
						commit
						120965eb0b
					
				
					 5 changed files with 150 additions and 58 deletions
				
			
		
							
								
								
									
										2
									
								
								Gemfile
									
										
									
									
									
								
							
							
						
						
									
										2
									
								
								Gemfile
									
										
									
									
									
								
							| 
						 | 
					@ -94,7 +94,7 @@ gem 'tty-prompt', '~> 0.23', require: false
 | 
				
			||||||
gem 'twitter-text', '~> 3.1.0'
 | 
					gem 'twitter-text', '~> 3.1.0'
 | 
				
			||||||
gem 'tzinfo-data', '~> 1.2021'
 | 
					gem 'tzinfo-data', '~> 1.2021'
 | 
				
			||||||
gem 'webpacker', '~> 5.2'
 | 
					gem 'webpacker', '~> 5.2'
 | 
				
			||||||
gem 'webpush'
 | 
					gem 'webpush', '~> 0.3'
 | 
				
			||||||
gem 'webauthn', '~> 3.0.0.alpha1'
 | 
					gem 'webauthn', '~> 3.0.0.alpha1'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
gem 'json-ld'
 | 
					gem 'json-ld'
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -808,5 +808,5 @@ DEPENDENCIES
 | 
				
			||||||
  webauthn (~> 3.0.0.alpha1)
 | 
					  webauthn (~> 3.0.0.alpha1)
 | 
				
			||||||
  webmock (~> 3.12)
 | 
					  webmock (~> 3.12)
 | 
				
			||||||
  webpacker (~> 5.2)
 | 
					  webpacker (~> 5.2)
 | 
				
			||||||
  webpush
 | 
					  webpush (~> 0.3)
 | 
				
			||||||
  xorcist (~> 1.1)
 | 
					  xorcist (~> 1.1)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -24,81 +24,80 @@ class Web::PushSubscription < ApplicationRecord
 | 
				
			||||||
  validates :key_p256dh, presence: true
 | 
					  validates :key_p256dh, presence: true
 | 
				
			||||||
  validates :key_auth, presence: true
 | 
					  validates :key_auth, presence: true
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  def push(notification)
 | 
					  delegate :locale, to: :associated_user
 | 
				
			||||||
    I18n.with_locale(associated_user&.locale || I18n.default_locale) do
 | 
					
 | 
				
			||||||
      push_payload(payload_for_notification(notification), 48.hours.seconds)
 | 
					  def encrypt(payload)
 | 
				
			||||||
 | 
					    Webpush::Encryption.encrypt(payload, key_p256dh, key_auth)
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  def audience
 | 
				
			||||||
 | 
					    @audience ||= Addressable::URI.parse(endpoint).normalized_site
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  def crypto_key_header
 | 
				
			||||||
 | 
					    p256ecdsa = vapid_key.public_key_for_push_header
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    "p256ecdsa=#{p256ecdsa}"
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  def authorization_header
 | 
				
			||||||
 | 
					    jwt = JWT.encode({ aud: audience, exp: 24.hours.from_now.to_i, sub: "mailto:#{contact_email}" }, vapid_key.curve, 'ES256', typ: 'JWT')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    "WebPush #{jwt}"
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  def pushable?(notification)
 | 
					  def pushable?(notification)
 | 
				
			||||||
    data&.key?('alerts') && ActiveModel::Type::Boolean.new.cast(data['alerts'][notification.type.to_s])
 | 
					    ActiveModel::Type::Boolean.new.cast(data&.dig('alerts', notification.type.to_s))
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  def associated_user
 | 
					  def associated_user
 | 
				
			||||||
    return @associated_user if defined?(@associated_user)
 | 
					    return @associated_user if defined?(@associated_user)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @associated_user = if user_id.nil?
 | 
					    @associated_user = begin
 | 
				
			||||||
 | 
					      if user_id.nil?
 | 
				
			||||||
        session_activation.user
 | 
					        session_activation.user
 | 
				
			||||||
      else
 | 
					      else
 | 
				
			||||||
        user
 | 
					        user
 | 
				
			||||||
      end
 | 
					      end
 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  def associated_access_token
 | 
					  def associated_access_token
 | 
				
			||||||
    return @associated_access_token if defined?(@associated_access_token)
 | 
					    return @associated_access_token if defined?(@associated_access_token)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @associated_access_token = if access_token_id.nil?
 | 
					    @associated_access_token = begin
 | 
				
			||||||
 | 
					      if access_token_id.nil?
 | 
				
			||||||
        find_or_create_access_token.token
 | 
					        find_or_create_access_token.token
 | 
				
			||||||
      else
 | 
					      else
 | 
				
			||||||
        access_token.token
 | 
					        access_token.token
 | 
				
			||||||
      end
 | 
					      end
 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  class << self
 | 
					  class << self
 | 
				
			||||||
    def unsubscribe_for(application_id, resource_owner)
 | 
					    def unsubscribe_for(application_id, resource_owner)
 | 
				
			||||||
      access_token_ids = Doorkeeper::AccessToken.where(application_id: application_id, resource_owner_id: resource_owner.id, revoked_at: nil)
 | 
					      access_token_ids = Doorkeeper::AccessToken.where(application_id: application_id, resource_owner_id: resource_owner.id, revoked_at: nil).pluck(:id)
 | 
				
			||||||
                                                .pluck(:id)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
      where(access_token_id: access_token_ids).delete_all
 | 
					      where(access_token_id: access_token_ids).delete_all
 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  private
 | 
					  private
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  def push_payload(message, ttl = 5.minutes.seconds)
 | 
					 | 
				
			||||||
    Webpush.payload_send(
 | 
					 | 
				
			||||||
      message: Oj.dump(message),
 | 
					 | 
				
			||||||
      endpoint: endpoint,
 | 
					 | 
				
			||||||
      p256dh: key_p256dh,
 | 
					 | 
				
			||||||
      auth: key_auth,
 | 
					 | 
				
			||||||
      ttl: ttl,
 | 
					 | 
				
			||||||
      ssl_timeout: 10,
 | 
					 | 
				
			||||||
      open_timeout: 10,
 | 
					 | 
				
			||||||
      read_timeout: 10,
 | 
					 | 
				
			||||||
      vapid: {
 | 
					 | 
				
			||||||
        subject: "mailto:#{::Setting.site_contact_email}",
 | 
					 | 
				
			||||||
        private_key: Rails.configuration.x.vapid_private_key,
 | 
					 | 
				
			||||||
        public_key: Rails.configuration.x.vapid_public_key,
 | 
					 | 
				
			||||||
      }
 | 
					 | 
				
			||||||
    )
 | 
					 | 
				
			||||||
  end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  def payload_for_notification(notification)
 | 
					 | 
				
			||||||
    ActiveModelSerializers::SerializableResource.new(
 | 
					 | 
				
			||||||
      notification,
 | 
					 | 
				
			||||||
      serializer: Web::NotificationSerializer,
 | 
					 | 
				
			||||||
      scope: self,
 | 
					 | 
				
			||||||
      scope_name: :current_push_subscription
 | 
					 | 
				
			||||||
    ).as_json
 | 
					 | 
				
			||||||
  end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
  def find_or_create_access_token
 | 
					  def find_or_create_access_token
 | 
				
			||||||
    Doorkeeper::AccessToken.find_or_create_for(
 | 
					    Doorkeeper::AccessToken.find_or_create_for(
 | 
				
			||||||
      application: Doorkeeper::Application.find_by(superapp: true),
 | 
					      application: Doorkeeper::Application.find_by(superapp: true),
 | 
				
			||||||
      resource_owner: session_activation.user_id,
 | 
					      resource_owner: user_id || session_activation.user_id,
 | 
				
			||||||
      scopes: Doorkeeper::OAuth::Scopes.from_string('read write follow push'),
 | 
					      scopes: Doorkeeper::OAuth::Scopes.from_string('read write follow push'),
 | 
				
			||||||
      expires_in: Doorkeeper.configuration.access_token_expires_in,
 | 
					      expires_in: Doorkeeper.configuration.access_token_expires_in,
 | 
				
			||||||
      use_refresh_token: Doorkeeper.configuration.refresh_token_enabled?
 | 
					      use_refresh_token: Doorkeeper.configuration.refresh_token_enabled?
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  def vapid_key
 | 
				
			||||||
 | 
					    @vapid_key ||= Webpush::VapidKey.from_keys(Rails.configuration.x.vapid_public_key, Rails.configuration.x.vapid_private_key)
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  def contact_email
 | 
				
			||||||
 | 
					    @contact_email ||= ::Setting.site_contact_email
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
end
 | 
					end
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -3,22 +3,67 @@
 | 
				
			||||||
class Web::PushNotificationWorker
 | 
					class Web::PushNotificationWorker
 | 
				
			||||||
  include Sidekiq::Worker
 | 
					  include Sidekiq::Worker
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  sidekiq_options backtrace: true, retry: 5
 | 
					  sidekiq_options queue: 'push', retry: 5
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  TTL     = 48.hours.to_s
 | 
				
			||||||
 | 
					  URGENCY = 'normal'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  def perform(subscription_id, notification_id)
 | 
					  def perform(subscription_id, notification_id)
 | 
				
			||||||
    subscription = ::Web::PushSubscription.find(subscription_id)
 | 
					    @subscription = Web::PushSubscription.find(subscription_id)
 | 
				
			||||||
    notification = Notification.find(notification_id)
 | 
					    @notification = Notification.find(notification_id)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    subscription.push(notification) unless notification.activity.nil?
 | 
					    # Polymorphically associated activity could have been deleted
 | 
				
			||||||
  rescue Webpush::ResponseError => e
 | 
					    # in the meantime, so we have to double-check before proceeding
 | 
				
			||||||
    code = e.response.code.to_i
 | 
					    return unless @notification.activity.present? && @subscription.pushable?(@notification)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if (400..499).cover?(code) && ![408, 429].include?(code)
 | 
					    payload = @subscription.encrypt(push_notification_json)
 | 
				
			||||||
      subscription.destroy!
 | 
					
 | 
				
			||||||
    else
 | 
					    request_pool.with(@subscription.audience) do |http_client|
 | 
				
			||||||
      raise e
 | 
					      request = Request.new(:post, @subscription.endpoint, body: payload.fetch(:ciphertext), http_client: http_client)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      request.add_headers(
 | 
				
			||||||
 | 
					        'Content-Type'     => 'application/octet-stream',
 | 
				
			||||||
 | 
					        'Ttl'              => TTL,
 | 
				
			||||||
 | 
					        'Urgency'          => URGENCY,
 | 
				
			||||||
 | 
					        'Content-Encoding' => 'aesgcm',
 | 
				
			||||||
 | 
					        'Encryption'       => "salt=#{Webpush.encode64(payload.fetch(:salt)).delete('=')}",
 | 
				
			||||||
 | 
					        'Crypto-Key'       => "dh=#{Webpush.encode64(payload.fetch(:server_public_key)).delete('=')};#{@subscription.crypto_key_header}",
 | 
				
			||||||
 | 
					        'Authorization'    => @subscription.authorization_header
 | 
				
			||||||
 | 
					      )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      request.perform do |response|
 | 
				
			||||||
 | 
					        # If the server responds with an error in the 4xx range
 | 
				
			||||||
 | 
					        # that isn't about rate-limiting or timeouts, we can
 | 
				
			||||||
 | 
					        # assume that the subscription is invalid or expired
 | 
				
			||||||
 | 
					        # and must be removed
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if (400..499).cover?(response.code) && ![408, 429].include?(response.code)
 | 
				
			||||||
 | 
					          @subscription.destroy!
 | 
				
			||||||
 | 
					        elsif !(200...300).cover?(response.code)
 | 
				
			||||||
 | 
					          raise Mastodon::UnexpectedResponseError, response
 | 
				
			||||||
 | 
					        end
 | 
				
			||||||
 | 
					      end
 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
  rescue ActiveRecord::RecordNotFound
 | 
					  rescue ActiveRecord::RecordNotFound
 | 
				
			||||||
    true
 | 
					    true
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  private
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  def push_notification_json
 | 
				
			||||||
 | 
					    json = I18n.with_locale(@subscription.locale || I18n.default_locale) do
 | 
				
			||||||
 | 
					      ActiveModelSerializers::SerializableResource.new(
 | 
				
			||||||
 | 
					        @notification,
 | 
				
			||||||
 | 
					        serializer: Web::NotificationSerializer,
 | 
				
			||||||
 | 
					        scope: @subscription,
 | 
				
			||||||
 | 
					        scope_name: :current_push_subscription
 | 
				
			||||||
 | 
					      ).as_json
 | 
				
			||||||
 | 
					    end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    Oj.dump(json)
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  def request_pool
 | 
				
			||||||
 | 
					    RequestPool.current
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
end
 | 
					end
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
							
								
								
									
										48
									
								
								spec/workers/web/push_notification_worker_spec.rb
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										48
									
								
								spec/workers/web/push_notification_worker_spec.rb
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
					@ -0,0 +1,48 @@
 | 
				
			||||||
 | 
					# frozen_string_literal: true
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					require 'rails_helper'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					describe Web::PushNotificationWorker do
 | 
				
			||||||
 | 
					  subject { described_class.new }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  let(:p256dh) { 'BN4GvZtEZiZuqFxSKVZfSfluwKBD7UxHNBmWkfiZfCtgDE8Bwh-_MtLXbBxTBAWH9r7IPKL0lhdcaqtL1dfxU5E=' }
 | 
				
			||||||
 | 
					  let(:auth) { 'Q2BoAjC09xH3ywDLNJr-dA==' }
 | 
				
			||||||
 | 
					  let(:endpoint) { 'https://updates.push.services.mozilla.com/push/v1/subscription-id' }
 | 
				
			||||||
 | 
					  let(:user) { Fabricate(:user) }
 | 
				
			||||||
 | 
					  let(:notification) { Fabricate(:notification) }
 | 
				
			||||||
 | 
					  let(:subscription) { Fabricate(:web_push_subscription, user_id: user.id, key_p256dh: p256dh, key_auth: auth, endpoint: endpoint, data: { alerts: { notification.type => true } }) }
 | 
				
			||||||
 | 
					  let(:vapid_public_key) { 'BB37UCyc8LLX4PNQSe-04vSFvpUWGrENubUaslVFM_l5TxcGVMY0C3RXPeUJAQHKYlcOM2P4vTYmkoo0VZGZTM4=' }
 | 
				
			||||||
 | 
					  let(:vapid_private_key) { 'OPrw1Sum3gRoL4-DXfSCC266r-qfFSRZrnj8MgIhRHg=' }
 | 
				
			||||||
 | 
					  let(:vapid_key) { Webpush::VapidKey.from_keys(vapid_public_key, vapid_private_key) }
 | 
				
			||||||
 | 
					  let(:contact_email) { 'sender@example.com' }
 | 
				
			||||||
 | 
					  let(:ciphertext) { "+\xB8\xDBT}\x13\xB6\xDD.\xF9\xB0\xA7\xC8\xD2\x80\xFD\x99#\xF7\xAC\x83\xA4\xDB,\x1F\xB5\xB9w\x85>\xF7\xADr" }
 | 
				
			||||||
 | 
					  let(:salt) { "X\x97\x953\xE4X\xF8_w\xE7T\x95\xC51q\xFE" }
 | 
				
			||||||
 | 
					  let(:server_public_key) { "\x04\b-RK9w\xDD$\x16lFz\xF9=\xB4~\xC6\x12k\xF3\xF40t\xA9\xC1\fR\xC3\x81\x80\xAC\f\x7F\xE4\xCC\x8E\xC2\x88 n\x8BB\xF1\x9C\x14\a\xFA\x8D\xC9\x80\xA1\xDDyU\\&c\x01\x88#\x118Ua" }
 | 
				
			||||||
 | 
					  let(:shared_secret) { "\t\xA7&\x85\t\xC5m\b\xA8\xA7\xF8B{1\xADk\xE1y'm\xEDE\xEC\xDD\xEDj\xB3$s\xA9\xDA\xF0" }
 | 
				
			||||||
 | 
					  let(:payload) { { ciphertext: ciphertext, salt: salt, server_public_key: server_public_key, shared_secret: shared_secret } }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  describe 'perform' do
 | 
				
			||||||
 | 
					    before do
 | 
				
			||||||
 | 
					      allow_any_instance_of(subscription.class).to receive(:contact_email).and_return(contact_email)
 | 
				
			||||||
 | 
					      allow_any_instance_of(subscription.class).to receive(:vapid_key).and_return(vapid_key)
 | 
				
			||||||
 | 
					      allow(Webpush::Encryption).to receive(:encrypt).and_return(payload)
 | 
				
			||||||
 | 
					      allow(JWT).to receive(:encode).and_return('jwt.encoded.payload')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      stub_request(:post, endpoint).to_return(status: 201, body: '')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      subject.perform(subscription.id, notification.id)
 | 
				
			||||||
 | 
					    end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    it 'calls the relevant service with the correct headers' do
 | 
				
			||||||
 | 
					      expect(a_request(:post, endpoint).with(headers: {
 | 
				
			||||||
 | 
					        'Content-Encoding' => 'aesgcm',
 | 
				
			||||||
 | 
					        'Content-Type' => 'application/octet-stream',
 | 
				
			||||||
 | 
					        'Crypto-Key' => 'dh=BAgtUks5d90kFmxGevk9tH7GEmvz9DB0qcEMUsOBgKwMf-TMjsKIIG6LQvGcFAf6jcmAod15VVwmYwGIIxE4VWE;p256ecdsa=' + vapid_public_key.delete('='),
 | 
				
			||||||
 | 
					        'Encryption' => 'salt=WJeVM-RY-F9351SVxTFx_g',
 | 
				
			||||||
 | 
					        'Ttl' => '172800',
 | 
				
			||||||
 | 
					        'Urgency' => 'normal',
 | 
				
			||||||
 | 
					        'Authorization' => 'WebPush jwt.encoded.payload',
 | 
				
			||||||
 | 
					      }, body: "+\xB8\xDBT}\u0013\xB6\xDD.\xF9\xB0\xA7\xC8Ҁ\xFD\x99#\xF7\xAC\x83\xA4\xDB,\u001F\xB5\xB9w\x85>\xF7\xADr")).to have_been_made
 | 
				
			||||||
 | 
					    end
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					end
 | 
				
			||||||
		Loading…
	
	Add table
		
		Reference in a new issue