diff --git a/app/chewy/statuses_index.rb b/app/chewy/statuses_index.rb index d4b05fca9..47cb856ea 100644 --- a/app/chewy/statuses_index.rb +++ b/app/chewy/statuses_index.rb @@ -31,7 +31,7 @@ class StatusesIndex < Chewy::Index }, } - define_type ::Status.unscoped.kept.without_reblogs.includes(:media_attachments), delete_if: ->(status) { status.searchable_by.empty? } do + define_type ::Status.unscoped.kept.without_reblogs.includes(:media_attachments, :preloadable_poll) do crutch :mentions do |collection| data = ::Mention.where(status_id: collection.map(&:id)).where(account: Account.local, silent: false).pluck(:status_id, :account_id) data.each.with_object({}) { |(id, name), result| (result[id] ||= []).push(name) } diff --git a/lib/mastodon/cli_helper.rb b/lib/mastodon/cli_helper.rb index 4a20fa8d6..ed22f44b2 100644 --- a/lib/mastodon/cli_helper.rb +++ b/lib/mastodon/cli_helper.rb @@ -7,6 +7,7 @@ ActiveRecord::Base.logger = dev_null ActiveJob::Base.logger = dev_null HttpLog.configuration.logger = dev_null Paperclip.options[:log] = false +Chewy.logger = dev_null module Mastodon module CLIHelper diff --git a/lib/mastodon/search_cli.rb b/lib/mastodon/search_cli.rb index 8bd5f9543..22a0acec8 100644 --- a/lib/mastodon/search_cli.rb +++ b/lib/mastodon/search_cli.rb @@ -6,8 +6,19 @@ require_relative 'cli_helper' module Mastodon class SearchCLI < Thor - option :processes, default: 2, aliases: [:p] - desc 'deploy', 'Create or update an ElasticSearch index and populate it' + include CLIHelper + + # Indices are sorted by amount of data to be expected in each, so that + # smaller indices can go online sooner + INDICES = [ + AccountsIndex, + TagsIndex, + StatusesIndex, + ].freeze + + option :concurrency, type: :numeric, default: 2, aliases: [:c], desc: 'Workload will be split between this number of threads' + option :only, type: :array, enum: %w(accounts tags statuses), desc: 'Only process these indices' + desc 'deploy', 'Create or upgrade ElasticSearch indices and populate them' long_desc <<~LONG_DESC If ElasticSearch is empty, this command will create the necessary indices and then import data from the database into those indices. @@ -15,27 +26,126 @@ module Mastodon This command will also upgrade indices if the underlying schema has been changed since the last run. - With the --processes option, parallelize execution of the command. The - default is 2. If "auto" is specified, the number is automatically - derived from available CPUs. + Even if creating or upgrading indices is not necessary, data from the + database will be imported into the indices. LONG_DESC def deploy - processed = Chewy::RakeHelper.upgrade(parallel: processes) - Chewy::RakeHelper.sync(except: processed, parallel: processes) - end - - private - - def processes - return true if options[:processes] == 'auto' - - num = options[:processes].to_i - - if num < 2 - nil - else - num + if options[:concurrency] < 1 + say('Cannot run with this concurrency setting, must be at least 1', :red) + exit(1) end + + indices = begin + if options[:only] + options[:only].map { |str| "#{str.camelize}Index".constantize } + else + INDICES + end + end + + progress = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false) + + # First, ensure all indices are created and have the correct + # structure, so that live data can already be written + indices.select { |index| index.specification.changed? }.each do |index| + progress.title = "Upgrading #{index} " + index.purge + index.specification.lock! + end + + ActiveRecord::Base.configurations[Rails.env]['pool'] = options[:concurrency] + 1 + + pool = Concurrent::FixedThreadPool.new(options[:concurrency]) + added = Concurrent::AtomicFixnum.new(0) + removed = Concurrent::AtomicFixnum.new(0) + + progress.title = 'Estimating workload ' + + # Estimate the amount of data that has to be imported first + indices.each do |index| + index.types.each do |type| + progress.total = (progress.total || 0) + type.adapter.default_scope.count + end + end + + # Now import all the actual data. Mind that unlike chewy:sync, we don't + # fetch and compare all record IDs from the database and the index to + # find out which to add and which to remove from the index. Because with + # potentially millions of rows, the memory footprint of such a calculation + # is uneconomical. So we only ever add. + indices.each do |index| + progress.title = "Importing #{index} " + batch_size = 1_000 + slice_size = (batch_size / options[:concurrency]).ceil + + index.types.each do |type| + type.adapter.default_scope.reorder(nil).find_in_batches(batch_size: batch_size) do |batch| + futures = [] + + batch.each_slice(slice_size) do |records| + futures << Concurrent::Future.execute(executor: pool) do + begin + if !progress.total.nil? && progress.progress + records.size > progress.total + # The number of items has changed between start and now, + # since there is no good way to predict the final count from + # here, just change the progress bar to an indeterminate one + + progress.total = nil + end + + grouped_records = nil + bulk_body = nil + index_count = 0 + delete_count = 0 + + ActiveRecord::Base.connection_pool.with_connection do + grouped_records = type.adapter.send(:grouped_objects, records) + bulk_body = Chewy::Type::Import::BulkBuilder.new(type, grouped_records).bulk_body + end + + index_count = grouped_records[:index].size if grouped_records.key?(:index) + delete_count = grouped_records[:delete].size if grouped_records.key?(:delete) + + # The following is an optimization for statuses specifically, since + # we want to de-index statuses that cannot be searched by anybody, + # but can't use Chewy's delete_if logic because it doesn't use + # crutches and our searchable_by logic depends on them + if type == StatusesIndex::Status + bulk_body.map! do |entry| + if entry[:index] && entry.dig(:index, :data, 'searchable_by').blank? + index_count -= 1 + delete_count += 1 + + { delete: entry[:index].except(:data) } + else + entry + end + end + end + + Chewy::Type::Import::BulkRequest.new(type).perform(bulk_body) + + progress.progress += records.size + + added.increment(index_count) + removed.increment(delete_count) + + sleep 1 + rescue => e + progress.log pastel.red("Error importing #{index}: #{e}") + end + end + end + + futures.map(&:value) + end + end + end + + progress.title = '' + progress.stop + + say("Indexed #{added.value} records, de-indexed #{removed.value}", :green, true) end end end