Change algorithm of tootctl search deploy to improve performance (#18463)
				
					
				
			This commit is contained in:
		
							parent
							
								
									54bb659ad1
								
							
						
					
					
						commit
						a9b64b24d6
					
				
					 9 changed files with 294 additions and 103 deletions
				
			
		| 
						 | 
				
			
			@ -23,7 +23,7 @@ class AccountsIndex < Chewy::Index
 | 
			
		|||
    },
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  index_scope ::Account.searchable.includes(:account_stat), delete_if: ->(account) { account.destroyed? || !account.searchable? }
 | 
			
		||||
  index_scope ::Account.searchable.includes(:account_stat)
 | 
			
		||||
 | 
			
		||||
  root date_detection: false do
 | 
			
		||||
    field :id, type: 'long'
 | 
			
		||||
| 
						 | 
				
			
			@ -36,8 +36,8 @@ class AccountsIndex < Chewy::Index
 | 
			
		|||
      field :edge_ngram, type: 'text', analyzer: 'edge_ngram', search_analyzer: 'content'
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    field :following_count, type: 'long', value: ->(account) { account.following.local.count }
 | 
			
		||||
    field :followers_count, type: 'long', value: ->(account) { account.followers.local.count }
 | 
			
		||||
    field :following_count, type: 'long', value: ->(account) { account.following_count }
 | 
			
		||||
    field :followers_count, type: 'long', value: ->(account) { account.followers_count }
 | 
			
		||||
    field :last_status_at, type: 'date', value: ->(account) { account.last_status_at || account.created_at }
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -33,6 +33,8 @@ class StatusesIndex < Chewy::Index
 | 
			
		|||
    },
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  # We do not use delete_if option here because it would call a method that we
 | 
			
		||||
  # expect to be called with crutches without crutches, causing n+1 queries
 | 
			
		||||
  index_scope ::Status.unscoped.kept.without_reblogs.includes(:media_attachments, :preloadable_poll)
 | 
			
		||||
 | 
			
		||||
  crutch :mentions do |collection|
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -23,7 +23,11 @@ class TagsIndex < Chewy::Index
 | 
			
		|||
    },
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  index_scope ::Tag.listable, delete_if: ->(tag) { tag.destroyed? || !tag.listable? }
 | 
			
		||||
  index_scope ::Tag.listable
 | 
			
		||||
 | 
			
		||||
  crutch :time_period do
 | 
			
		||||
    7.days.ago.to_date..0.days.ago.to_date
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  root date_detection: false do
 | 
			
		||||
    field :name, type: 'text', analyzer: 'content' do
 | 
			
		||||
| 
						 | 
				
			
			@ -31,7 +35,7 @@ class TagsIndex < Chewy::Index
 | 
			
		|||
    end
 | 
			
		||||
 | 
			
		||||
    field :reviewed, type: 'boolean', value: ->(tag) { tag.reviewed? }
 | 
			
		||||
    field :usage, type: 'long', value: ->(tag) { tag.history.reduce(0) { |total, day| total + day.accounts } }
 | 
			
		||||
    field :usage, type: 'long', value: ->(tag, crutches) { tag.history.aggregate(crutches.time_period).accounts }
 | 
			
		||||
    field :last_status_at, type: 'date', value: ->(tag) { tag.last_status_at || tag.created_at }
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										30
									
								
								app/lib/importer/accounts_index_importer.rb
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										30
									
								
								app/lib/importer/accounts_index_importer.rb
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,30 @@
 | 
			
		|||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
class Importer::AccountsIndexImporter < Importer::BaseImporter
 | 
			
		||||
  def import!
 | 
			
		||||
    scope.includes(:account_stat).find_in_batches(batch_size: @batch_size) do |tmp|
 | 
			
		||||
      in_work_unit(tmp) do |accounts|
 | 
			
		||||
        bulk = Chewy::Index::Import::BulkBuilder.new(index, to_index: accounts).bulk_body
 | 
			
		||||
 | 
			
		||||
        indexed = bulk.select { |entry| entry[:index] }.size
 | 
			
		||||
        deleted = bulk.select { |entry| entry[:delete] }.size
 | 
			
		||||
 | 
			
		||||
        Chewy::Index::Import::BulkRequest.new(index).perform(bulk)
 | 
			
		||||
 | 
			
		||||
        [indexed, deleted]
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    wait!
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  private
 | 
			
		||||
 | 
			
		||||
  def index
 | 
			
		||||
    AccountsIndex
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def scope
 | 
			
		||||
    Account.searchable
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
							
								
								
									
										87
									
								
								app/lib/importer/base_importer.rb
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										87
									
								
								app/lib/importer/base_importer.rb
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,87 @@
 | 
			
		|||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
class Importer::BaseImporter
 | 
			
		||||
  # @param [Integer] batch_size
 | 
			
		||||
  # @param [Concurrent::ThreadPoolExecutor] executor
 | 
			
		||||
  def initialize(batch_size:, executor:)
 | 
			
		||||
    @batch_size = batch_size
 | 
			
		||||
    @executor   = executor
 | 
			
		||||
    @wait_for   = Concurrent::Set.new
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  # Callback to run when a concurrent work unit completes
 | 
			
		||||
  # @param [Proc]
 | 
			
		||||
  def on_progress(&block)
 | 
			
		||||
    @on_progress = block
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  # Callback to run when a concurrent work unit fails
 | 
			
		||||
  # @param [Proc]
 | 
			
		||||
  def on_failure(&block)
 | 
			
		||||
    @on_failure = block
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  # Reduce resource usage during and improve speed of indexing
 | 
			
		||||
  def optimize_for_import!
 | 
			
		||||
    Chewy.client.indices.put_settings index: index.index_name, body: { index: { refresh_interval: -1 } }
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  # Restore original index settings
 | 
			
		||||
  def optimize_for_search!
 | 
			
		||||
    Chewy.client.indices.put_settings index: index.index_name, body: { index: { refresh_interval: index.settings_hash[:settings][:index][:refresh_interval] } }
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  # Estimate the amount of documents that would be indexed. Not exact!
 | 
			
		||||
  # @returns [Integer]
 | 
			
		||||
  def estimate!
 | 
			
		||||
    ActiveRecord::Base.connection_pool.with_connection { |connection| connection.select_one("SELECT reltuples AS estimate FROM pg_class WHERE relname = '#{index.adapter.target.table_name}'")['estimate'].to_i }
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  # Import data from the database into the index
 | 
			
		||||
  def import!
 | 
			
		||||
    raise NotImplementedError
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  # Remove documents from the index that no longer exist in the database
 | 
			
		||||
  def clean_up!
 | 
			
		||||
    index.scroll_batches do |documents|
 | 
			
		||||
      ids           = documents.map { |doc| doc['_id'] }
 | 
			
		||||
      existence_map = index.adapter.target.where(id: ids).pluck(:id).each_with_object({}) { |id, map| map[id.to_s] = true }
 | 
			
		||||
      tmp           = ids.reject { |id| existence_map[id] }
 | 
			
		||||
 | 
			
		||||
      next if tmp.empty?
 | 
			
		||||
 | 
			
		||||
      in_work_unit(tmp) do |deleted_ids|
 | 
			
		||||
        bulk = Chewy::Index::Import::BulkBuilder.new(index, delete: deleted_ids).bulk_body
 | 
			
		||||
 | 
			
		||||
        Chewy::Index::Import::BulkRequest.new(index).perform(bulk)
 | 
			
		||||
 | 
			
		||||
        [0, bulk.size]
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    wait!
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  protected
 | 
			
		||||
 | 
			
		||||
  def in_work_unit(*args, &block)
 | 
			
		||||
    work_unit = Concurrent::Promises.future_on(@executor, *args, &block)
 | 
			
		||||
 | 
			
		||||
    work_unit.on_fulfillment!(&@on_progress)
 | 
			
		||||
    work_unit.on_rejection!(&@on_failure)
 | 
			
		||||
    work_unit.on_resolution! { @wait_for.delete(work_unit) }
 | 
			
		||||
 | 
			
		||||
    @wait_for << work_unit
 | 
			
		||||
  rescue Concurrent::RejectedExecutionError
 | 
			
		||||
    sleep(0.1) && retry # Backpressure
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def wait!
 | 
			
		||||
    Concurrent::Promises.zip(*@wait_for).wait
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def index
 | 
			
		||||
    raise NotImplementedError
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
							
								
								
									
										89
									
								
								app/lib/importer/statuses_index_importer.rb
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										89
									
								
								app/lib/importer/statuses_index_importer.rb
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,89 @@
 | 
			
		|||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
class Importer::StatusesIndexImporter < Importer::BaseImporter
 | 
			
		||||
  def import!
 | 
			
		||||
    # The idea is that instead of iterating over all statuses in the database
 | 
			
		||||
    # and calculating the searchable_by for each of them (majority of which
 | 
			
		||||
    # would be empty), we approach the index from the other end
 | 
			
		||||
 | 
			
		||||
    scopes.each do |scope|
 | 
			
		||||
      # We could be tempted to keep track of status IDs we have already processed
 | 
			
		||||
      # from a different scope to avoid indexing them multiple times, but that
 | 
			
		||||
      # could end up being a very large array
 | 
			
		||||
 | 
			
		||||
      scope.find_in_batches(batch_size: @batch_size) do |tmp|
 | 
			
		||||
        in_work_unit(tmp.map(&:status_id)) do |status_ids|
 | 
			
		||||
          bulk = ActiveRecord::Base.connection_pool.with_connection do
 | 
			
		||||
            Chewy::Index::Import::BulkBuilder.new(index, to_index: Status.includes(:media_attachments, :preloadable_poll).where(id: status_ids)).bulk_body
 | 
			
		||||
          end
 | 
			
		||||
 | 
			
		||||
          indexed = 0
 | 
			
		||||
          deleted = 0
 | 
			
		||||
 | 
			
		||||
          # We can't use the delete_if proc to do the filtering because delete_if
 | 
			
		||||
          # is called before rendering the data and we need to filter based
 | 
			
		||||
          # on the results of the filter, so this filtering happens here instead
 | 
			
		||||
          bulk.map! do |entry|
 | 
			
		||||
            new_entry = begin
 | 
			
		||||
              if entry[:index] && entry.dig(:index, :data, 'searchable_by').blank?
 | 
			
		||||
                { delete: entry[:index].except(:data) }
 | 
			
		||||
              else
 | 
			
		||||
                entry
 | 
			
		||||
              end
 | 
			
		||||
            end
 | 
			
		||||
 | 
			
		||||
            if new_entry[:index]
 | 
			
		||||
              indexed += 1
 | 
			
		||||
            else
 | 
			
		||||
              deleted += 1
 | 
			
		||||
            end
 | 
			
		||||
 | 
			
		||||
            new_entry
 | 
			
		||||
          end
 | 
			
		||||
 | 
			
		||||
          Chewy::Index::Import::BulkRequest.new(index).perform(bulk)
 | 
			
		||||
 | 
			
		||||
          [indexed, deleted]
 | 
			
		||||
        end
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    wait!
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  private
 | 
			
		||||
 | 
			
		||||
  def index
 | 
			
		||||
    StatusesIndex
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def scopes
 | 
			
		||||
    [
 | 
			
		||||
      local_statuses_scope,
 | 
			
		||||
      local_mentions_scope,
 | 
			
		||||
      local_favourites_scope,
 | 
			
		||||
      local_votes_scope,
 | 
			
		||||
      local_bookmarks_scope,
 | 
			
		||||
    ]
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def local_mentions_scope
 | 
			
		||||
    Mention.where(account: Account.local, silent: false).select(:id, :status_id)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def local_favourites_scope
 | 
			
		||||
    Favourite.where(account: Account.local).select(:id, :status_id)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def local_bookmarks_scope
 | 
			
		||||
    Bookmark.select(:id, :status_id)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def local_votes_scope
 | 
			
		||||
    Poll.joins(:votes).where(votes: { account: Account.local }).select('polls.id, polls.status_id')
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def local_statuses_scope
 | 
			
		||||
    Status.local.select('id, coalesce(reblog_of_id, id) as status_id')
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
							
								
								
									
										26
									
								
								app/lib/importer/tags_index_importer.rb
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										26
									
								
								app/lib/importer/tags_index_importer.rb
									
										
									
									
									
										Normal file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,26 @@
 | 
			
		|||
# frozen_string_literal: true
 | 
			
		||||
 | 
			
		||||
class Importer::TagsIndexImporter < Importer::BaseImporter
 | 
			
		||||
  def import!
 | 
			
		||||
    index.adapter.default_scope.find_in_batches(batch_size: @batch_size) do |tmp|
 | 
			
		||||
      in_work_unit(tmp) do |tags|
 | 
			
		||||
        bulk = Chewy::Index::Import::BulkBuilder.new(index, to_index: tags).bulk_body
 | 
			
		||||
 | 
			
		||||
        indexed = bulk.select { |entry| entry[:index] }.size
 | 
			
		||||
        deleted = bulk.select { |entry| entry[:delete] }.size
 | 
			
		||||
 | 
			
		||||
        Chewy::Index::Import::BulkRequest.new(index).perform(bulk)
 | 
			
		||||
 | 
			
		||||
        [indexed, deleted]
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    wait!
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  private
 | 
			
		||||
 | 
			
		||||
  def index
 | 
			
		||||
    TagsIndex
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			@ -11,11 +11,11 @@ class Trends::History
 | 
			
		|||
    end
 | 
			
		||||
 | 
			
		||||
    def uses
 | 
			
		||||
      redis.mget(*@days.map { |day| day.key_for(:uses) }).map(&:to_i).sum
 | 
			
		||||
      with_redis { |redis| redis.mget(*@days.map { |day| day.key_for(:uses) }).map(&:to_i).sum }
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    def accounts
 | 
			
		||||
      redis.pfcount(*@days.map { |day| day.key_for(:accounts) })
 | 
			
		||||
      with_redis { |redis| redis.pfcount(*@days.map { |day| day.key_for(:accounts) }) }
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -33,19 +33,21 @@ class Trends::History
 | 
			
		|||
    attr_reader :day
 | 
			
		||||
 | 
			
		||||
    def accounts
 | 
			
		||||
      redis.pfcount(key_for(:accounts))
 | 
			
		||||
      with_redis { |redis| redis.pfcount(key_for(:accounts)) }
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    def uses
 | 
			
		||||
      redis.get(key_for(:uses))&.to_i || 0
 | 
			
		||||
      with_redis { |redis| redis.get(key_for(:uses))&.to_i || 0 }
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    def add(account_id)
 | 
			
		||||
      redis.pipelined do
 | 
			
		||||
        redis.incrby(key_for(:uses), 1)
 | 
			
		||||
        redis.pfadd(key_for(:accounts), account_id)
 | 
			
		||||
        redis.expire(key_for(:uses), EXPIRE_AFTER)
 | 
			
		||||
        redis.expire(key_for(:accounts), EXPIRE_AFTER)
 | 
			
		||||
      with_redis do |redis|
 | 
			
		||||
        redis.pipelined do |pipeline|
 | 
			
		||||
          pipeline.incrby(key_for(:uses), 1)
 | 
			
		||||
          pipeline.pfadd(key_for(:accounts), account_id)
 | 
			
		||||
          pipeline.expire(key_for(:uses), EXPIRE_AFTER)
 | 
			
		||||
          pipeline.expire(key_for(:accounts), EXPIRE_AFTER)
 | 
			
		||||
        end
 | 
			
		||||
      end
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -16,19 +16,21 @@ module Mastodon
 | 
			
		|||
      StatusesIndex,
 | 
			
		||||
    ].freeze
 | 
			
		||||
 | 
			
		||||
    option :concurrency, type: :numeric, default: 2, aliases: [:c], desc: 'Workload will be split between this number of threads'
 | 
			
		||||
    option :batch_size, type: :numeric, default: 1_000, aliases: [:b], desc: 'Number of records in each batch'
 | 
			
		||||
    option :concurrency, type: :numeric, default: 5, aliases: [:c], desc: 'Workload will be split between this number of threads'
 | 
			
		||||
    option :batch_size, type: :numeric, default: 100, aliases: [:b], desc: 'Number of records in each batch'
 | 
			
		||||
    option :only, type: :array, enum: %w(accounts tags statuses), desc: 'Only process these indices'
 | 
			
		||||
    option :import, type: :boolean, default: true, desc: 'Import data from the database to the index'
 | 
			
		||||
    option :clean, type: :boolean, default: true, desc: 'Remove outdated documents from the index'
 | 
			
		||||
    desc 'deploy', 'Create or upgrade Elasticsearch indices and populate them'
 | 
			
		||||
    long_desc <<~LONG_DESC
 | 
			
		||||
      If Elasticsearch is empty, this command will create the necessary indices
 | 
			
		||||
      and then import data from the database into those indices.
 | 
			
		||||
 | 
			
		||||
      This command will also upgrade indices if the underlying schema has been
 | 
			
		||||
      changed since the last run.
 | 
			
		||||
      changed since the last run. Index upgrades erase index data.
 | 
			
		||||
 | 
			
		||||
      Even if creating or upgrading indices is not necessary, data from the
 | 
			
		||||
      database will be imported into the indices.
 | 
			
		||||
      database will be imported into the indices, unless overriden with --no-import.
 | 
			
		||||
    LONG_DESC
 | 
			
		||||
    def deploy
 | 
			
		||||
      if options[:concurrency] < 1
 | 
			
		||||
| 
						 | 
				
			
			@ -49,6 +51,8 @@ module Mastodon
 | 
			
		|||
        end
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      pool      = Concurrent::FixedThreadPool.new(options[:concurrency], max_queue: options[:concurrency] * 10)
 | 
			
		||||
      importers = indices.index_with { |index| "Importer::#{index.name}Importer".constantize.new(batch_size: options[:batch_size], executor: pool) }
 | 
			
		||||
      progress  = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false)
 | 
			
		||||
 | 
			
		||||
      # First, ensure all indices are created and have the correct
 | 
			
		||||
| 
						 | 
				
			
			@ -59,99 +63,46 @@ module Mastodon
 | 
			
		|||
        index.specification.lock!
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
      progress.title = 'Estimating workload '
 | 
			
		||||
      progress.total = indices.sum { |index| importers[index].estimate! }
 | 
			
		||||
 | 
			
		||||
      reset_connection_pools!
 | 
			
		||||
 | 
			
		||||
      pool    = Concurrent::FixedThreadPool.new(options[:concurrency])
 | 
			
		||||
      added   = Concurrent::AtomicFixnum.new(0)
 | 
			
		||||
      removed = Concurrent::AtomicFixnum.new(0)
 | 
			
		||||
      added   = 0
 | 
			
		||||
      removed = 0
 | 
			
		||||
 | 
			
		||||
      progress.title = 'Estimating workload '
 | 
			
		||||
 | 
			
		||||
      # Estimate the amount of data that has to be imported first
 | 
			
		||||
      progress.total = indices.sum { |index| index.adapter.default_scope.count }
 | 
			
		||||
 | 
			
		||||
      # Now import all the actual data. Mind that unlike chewy:sync, we don't
 | 
			
		||||
      # fetch and compare all record IDs from the database and the index to
 | 
			
		||||
      # find out which to add and which to remove from the index. Because with
 | 
			
		||||
      # potentially millions of rows, the memory footprint of such a calculation
 | 
			
		||||
      # is uneconomical. So we only ever add.
 | 
			
		||||
      indices.each do |index|
 | 
			
		||||
        importer = importers[index]
 | 
			
		||||
        importer.optimize_for_import!
 | 
			
		||||
 | 
			
		||||
        importer.on_progress do |(indexed, deleted)|
 | 
			
		||||
          progress.total = nil if progress.progress + indexed + deleted > progress.total
 | 
			
		||||
          progress.progress += indexed + deleted
 | 
			
		||||
          added   += indexed
 | 
			
		||||
          removed += deleted
 | 
			
		||||
        end
 | 
			
		||||
 | 
			
		||||
        importer.on_failure do |reason|
 | 
			
		||||
          progress.log(pastel.red("Error while importing #{index}: #{reason}"))
 | 
			
		||||
        end
 | 
			
		||||
 | 
			
		||||
        if options[:import]
 | 
			
		||||
          progress.title = "Importing #{index} "
 | 
			
		||||
        batch_size     = options[:batch_size]
 | 
			
		||||
        slice_size     = (batch_size / options[:concurrency]).ceil
 | 
			
		||||
 | 
			
		||||
        index.adapter.default_scope.reorder(nil).find_in_batches(batch_size: batch_size) do |batch|
 | 
			
		||||
          futures = []
 | 
			
		||||
 | 
			
		||||
          batch.each_slice(slice_size) do |records|
 | 
			
		||||
            futures << Concurrent::Future.execute(executor: pool) do
 | 
			
		||||
              begin
 | 
			
		||||
                if !progress.total.nil? && progress.progress + records.size > progress.total
 | 
			
		||||
                  # The number of items has changed between start and now,
 | 
			
		||||
                  # since there is no good way to predict the final count from
 | 
			
		||||
                  # here, just change the progress bar to an indeterminate one
 | 
			
		||||
 | 
			
		||||
                  progress.total = nil
 | 
			
		||||
          importer.import!
 | 
			
		||||
        end
 | 
			
		||||
 | 
			
		||||
                grouped_records = nil
 | 
			
		||||
                bulk_body       = nil
 | 
			
		||||
                index_count     = 0
 | 
			
		||||
                delete_count    = 0
 | 
			
		||||
 | 
			
		||||
                ActiveRecord::Base.connection_pool.with_connection do
 | 
			
		||||
                  grouped_records = records.to_a.group_by do |record|
 | 
			
		||||
                    index.adapter.send(:delete_from_index?, record) ? :delete : :to_index
 | 
			
		||||
        if options[:clean]
 | 
			
		||||
          progress.title = "Cleaning #{index} "
 | 
			
		||||
          importer.clean_up!
 | 
			
		||||
        end
 | 
			
		||||
 | 
			
		||||
                  bulk_body = Chewy::Index::Import::BulkBuilder.new(index, **grouped_records).bulk_body
 | 
			
		||||
                end
 | 
			
		||||
 | 
			
		||||
                index_count  = grouped_records[:to_index].size  if grouped_records.key?(:to_index)
 | 
			
		||||
                delete_count = grouped_records[:delete].size    if grouped_records.key?(:delete)
 | 
			
		||||
 | 
			
		||||
                # The following is an optimization for statuses specifically, since
 | 
			
		||||
                # we want to de-index statuses that cannot be searched by anybody,
 | 
			
		||||
                # but can't use Chewy's delete_if logic because it doesn't use
 | 
			
		||||
                # crutches and our searchable_by logic depends on them
 | 
			
		||||
                if index == StatusesIndex
 | 
			
		||||
                  bulk_body.map! do |entry|
 | 
			
		||||
                    if entry[:to_index] && entry.dig(:to_index, :data, 'searchable_by').blank?
 | 
			
		||||
                      index_count  -= 1
 | 
			
		||||
                      delete_count += 1
 | 
			
		||||
 | 
			
		||||
                      { delete: entry[:to_index].except(:data) }
 | 
			
		||||
                    else
 | 
			
		||||
                      entry
 | 
			
		||||
                    end
 | 
			
		||||
                  end
 | 
			
		||||
                end
 | 
			
		||||
 | 
			
		||||
                Chewy::Index::Import::BulkRequest.new(index).perform(bulk_body)
 | 
			
		||||
 | 
			
		||||
                progress.progress += records.size
 | 
			
		||||
 | 
			
		||||
                added.increment(index_count)
 | 
			
		||||
                removed.increment(delete_count)
 | 
			
		||||
 | 
			
		||||
                sleep 1
 | 
			
		||||
              rescue => e
 | 
			
		||||
                progress.log pastel.red("Error importing #{index}: #{e}")
 | 
			
		||||
      ensure
 | 
			
		||||
                RedisConfiguration.pool.checkin if Thread.current[:redis]
 | 
			
		||||
                Thread.current[:redis] = nil
 | 
			
		||||
              end
 | 
			
		||||
            end
 | 
			
		||||
        importer.optimize_for_search!
 | 
			
		||||
      end
 | 
			
		||||
 | 
			
		||||
          futures.map(&:value)
 | 
			
		||||
        end
 | 
			
		||||
      end
 | 
			
		||||
      progress.title = 'Done! '
 | 
			
		||||
      progress.finish
 | 
			
		||||
 | 
			
		||||
      progress.title = ''
 | 
			
		||||
      progress.stop
 | 
			
		||||
 | 
			
		||||
      say("Indexed #{added.value} records, de-indexed #{removed.value}", :green, true)
 | 
			
		||||
      say("Indexed #{added} records, de-indexed #{removed}", :green, true)
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		
		Reference in a new issue