From 32a030dd74f9e0beed830daecb4dc6a4842aa14b Mon Sep 17 00:00:00 2001
From: Claire <claire.github-309c@sitedethib.com>
Date: Tue, 2 May 2023 12:08:48 +0200
Subject: [PATCH] Rewrite import feature (#21054)

---
 .rubocop.yml                                  |   2 +
 .rubocop_todo.yml                             |   2 +
 .../settings/imports_controller.rb            |  92 +++-
 app/lib/vacuum/imports_vacuum.rb              |  18 +
 app/models/bulk_import.rb                     |  53 +++
 app/models/bulk_import_row.rb                 |  15 +
 app/models/concerns/account_associations.rb   |   3 +
 app/models/form/import.rb                     | 151 +++++++
 app/models/import.rb                          |   4 +-
 app/services/bulk_import_row_service.rb       |  60 +++
 app/services/bulk_import_service.rb           | 160 +++++++
 app/services/import_service.rb                |   3 +
 app/validators/import_validator.rb            |  46 --
 app/views/settings/imports/index.html.haml    |  49 ++
 app/views/settings/imports/show.html.haml     |  20 +-
 app/workers/bulk_import_worker.rb             |  13 +
 app/workers/import/relationship_worker.rb     |   3 +
 app/workers/import/row_worker.rb              |  33 ++
 app/workers/import_worker.rb                  |   3 +
 app/workers/scheduler/vacuum_scheduler.rb     |   5 +
 config/i18n-tasks.yml                         |   2 +
 config/locales/en.yml                         |  38 ++
 config/navigation.rb                          |   2 +-
 config/routes.rb                              |   8 +-
 .../20230330135507_create_bulk_imports.rb     |  22 +
 .../20230330140036_create_bulk_import_rows.rb |  12 +
 db/schema.rb                                  |  29 +-
 .../settings/imports_controller_spec.rb       | 316 +++++++++++--
 spec/fabricators/bulk_import_fabricator.rb    |  12 +
 .../fabricators/bulk_import_row_fabricator.rb |   6 +
 spec/fixtures/files/empty.csv                 |   0
 spec/fixtures/files/following_accounts.csv    |   5 +
 spec/fixtures/files/muted_accounts.csv        |   5 +
 spec/lib/vacuum/imports_vacuum_spec.rb        |  19 +
 spec/models/form/import_spec.rb               | 281 ++++++++++++
 spec/models/import_spec.rb                    |  15 -
 spec/services/bulk_import_row_service_spec.rb |  95 ++++
 spec/services/bulk_import_service_spec.rb     | 417 ++++++++++++++++++
 spec/workers/bulk_import_worker_spec.rb       |  26 ++
 spec/workers/import/row_worker_spec.rb        | 127 ++++++
 40 files changed, 2059 insertions(+), 113 deletions(-)
 create mode 100644 app/lib/vacuum/imports_vacuum.rb
 create mode 100644 app/models/bulk_import.rb
 create mode 100644 app/models/bulk_import_row.rb
 create mode 100644 app/models/form/import.rb
 create mode 100644 app/services/bulk_import_row_service.rb
 create mode 100644 app/services/bulk_import_service.rb
 delete mode 100644 app/validators/import_validator.rb
 create mode 100644 app/views/settings/imports/index.html.haml
 create mode 100644 app/workers/bulk_import_worker.rb
 create mode 100644 app/workers/import/row_worker.rb
 create mode 100644 db/migrate/20230330135507_create_bulk_imports.rb
 create mode 100644 db/migrate/20230330140036_create_bulk_import_rows.rb
 create mode 100644 spec/fabricators/bulk_import_fabricator.rb
 create mode 100644 spec/fabricators/bulk_import_row_fabricator.rb
 create mode 100644 spec/fixtures/files/empty.csv
 create mode 100644 spec/fixtures/files/following_accounts.csv
 create mode 100644 spec/fixtures/files/muted_accounts.csv
 create mode 100644 spec/lib/vacuum/imports_vacuum_spec.rb
 create mode 100644 spec/models/form/import_spec.rb
 create mode 100644 spec/services/bulk_import_row_service_spec.rb
 create mode 100644 spec/services/bulk_import_service_spec.rb
 create mode 100644 spec/workers/bulk_import_worker_spec.rb
 create mode 100644 spec/workers/import/row_worker_spec.rb

diff --git a/.rubocop.yml b/.rubocop.yml
index b510c43031..16a04ae958 100644
--- a/.rubocop.yml
+++ b/.rubocop.yml
@@ -65,6 +65,7 @@ Metrics/AbcSize:
 Metrics/BlockLength:
   CountAsOne: ['array', 'hash', 'heredoc', 'method_call']
   Exclude:
+    - 'config/routes.rb'
     - 'lib/mastodon/*_cli.rb'
     - 'lib/tasks/*.rake'
     - 'app/models/concerns/account_associations.rb'
@@ -130,6 +131,7 @@ Metrics/ClassLength:
     - 'app/services/activitypub/process_account_service.rb'
     - 'app/services/activitypub/process_status_update_service.rb'
     - 'app/services/backup_service.rb'
+    - 'app/services/bulk_import_service.rb'
     - 'app/services/delete_account_service.rb'
     - 'app/services/fan_out_on_write_service.rb'
     - 'app/services/fetch_link_card_service.rb'
diff --git a/.rubocop_todo.yml b/.rubocop_todo.yml
index dec4f38528..446e38b0a7 100644
--- a/.rubocop_todo.yml
+++ b/.rubocop_todo.yml
@@ -741,6 +741,7 @@ RSpec/LetSetup:
     - 'spec/controllers/following_accounts_controller_spec.rb'
     - 'spec/controllers/oauth/authorized_applications_controller_spec.rb'
     - 'spec/controllers/oauth/tokens_controller_spec.rb'
+    - 'spec/controllers/settings/imports_controller_spec.rb'
     - 'spec/lib/activitypub/activity/delete_spec.rb'
     - 'spec/lib/vacuum/preview_cards_vacuum_spec.rb'
     - 'spec/models/account_spec.rb'
@@ -755,6 +756,7 @@ RSpec/LetSetup:
     - 'spec/services/activitypub/process_collection_service_spec.rb'
     - 'spec/services/batched_remove_status_service_spec.rb'
     - 'spec/services/block_domain_service_spec.rb'
+    - 'spec/services/bulk_import_service_spec.rb'
     - 'spec/services/delete_account_service_spec.rb'
     - 'spec/services/import_service_spec.rb'
     - 'spec/services/notify_service_spec.rb'
diff --git a/app/controllers/settings/imports_controller.rb b/app/controllers/settings/imports_controller.rb
index d4516526ee..bdbf8796fe 100644
--- a/app/controllers/settings/imports_controller.rb
+++ b/app/controllers/settings/imports_controller.rb
@@ -1,31 +1,97 @@
 # frozen_string_literal: true
 
-class Settings::ImportsController < Settings::BaseController
-  before_action :set_account
+require 'csv'
 
-  def show
-    @import = Import.new
+class Settings::ImportsController < Settings::BaseController
+  before_action :set_bulk_import, only: [:show, :confirm, :destroy]
+  before_action :set_recent_imports, only: [:index]
+
+  TYPE_TO_FILENAME_MAP = {
+    following: 'following_accounts_failures.csv',
+    blocking: 'blocked_accounts_failures.csv',
+    muting: 'muted_accounts_failures.csv',
+    domain_blocking: 'blocked_domains_failures.csv',
+    bookmarks: 'bookmarks_failures.csv',
+  }.freeze
+
+  TYPE_TO_HEADERS_MAP = {
+    following: ['Account address', 'Show boosts', 'Notify on new posts', 'Languages'],
+    blocking: false,
+    muting: ['Account address', 'Hide notifications'],
+    domain_blocking: false,
+    bookmarks: false,
+  }.freeze
+
+  def index
+    @import = Form::Import.new(current_account: current_account)
+  end
+
+  def show; end
+
+  def failures
+    @bulk_import = current_account.bulk_imports.where(state: :finished).find(params[:id])
+
+    respond_to do |format|
+      format.csv do
+        filename = TYPE_TO_FILENAME_MAP[@bulk_import.type.to_sym]
+        headers = TYPE_TO_HEADERS_MAP[@bulk_import.type.to_sym]
+
+        export_data = CSV.generate(headers: headers, write_headers: true) do |csv|
+          @bulk_import.rows.find_each do |row|
+            case @bulk_import.type.to_sym
+            when :following
+              csv << [row.data['acct'], row.data.fetch('show_reblogs', true), row.data.fetch('notify', false), row.data['languages']&.join(', ')]
+            when :blocking
+              csv << [row.data['acct']]
+            when :muting
+              csv << [row.data['acct'], row.data.fetch('hide_notifications', true)]
+            when :domain_blocking
+              csv << [row.data['domain']]
+            when :bookmarks
+              csv << [row.data['uri']]
+            end
+          end
+        end
+
+        send_data export_data, filename: filename
+      end
+    end
+  end
+
+  def confirm
+    @bulk_import.update!(state: :scheduled)
+    BulkImportWorker.perform_async(@bulk_import.id)
+    redirect_to settings_imports_path, notice: I18n.t('imports.success')
   end
 
   def create
-    @import = Import.new(import_params)
-    @import.account = @account
+    @import = Form::Import.new(import_params.merge(current_account: current_account))
 
     if @import.save
-      ImportWorker.perform_async(@import.id)
-      redirect_to settings_import_path, notice: I18n.t('imports.success')
+      redirect_to settings_import_path(@import.bulk_import.id)
     else
-      render :show
+      # We need to set recent imports as we are displaying the index again
+      set_recent_imports
+      render :index
     end
   end
 
+  def destroy
+    @bulk_import.destroy!
+    redirect_to settings_imports_path
+  end
+
   private
 
-  def set_account
-    @account = current_user.account
+  def import_params
+    params.require(:form_import).permit(:data, :type, :mode)
   end
 
-  def import_params
-    params.require(:import).permit(:data, :type, :mode)
+  def set_bulk_import
+    @bulk_import = current_account.bulk_imports.where(state: :unconfirmed).find(params[:id])
+  end
+
+  def set_recent_imports
+    @recent_imports = current_account.bulk_imports.reorder(id: :desc).limit(10)
   end
 end
diff --git a/app/lib/vacuum/imports_vacuum.rb b/app/lib/vacuum/imports_vacuum.rb
new file mode 100644
index 0000000000..ffb9449a42
--- /dev/null
+++ b/app/lib/vacuum/imports_vacuum.rb
@@ -0,0 +1,18 @@
+# frozen_string_literal: true
+
+class Vacuum::ImportsVacuum
+  def perform
+    clean_unconfirmed_imports!
+    clean_old_imports!
+  end
+
+  private
+
+  def clean_unconfirmed_imports!
+    BulkImport.where(state: :unconfirmed).where('created_at <= ?', 10.minutes.ago).reorder(nil).in_batches.delete_all
+  end
+
+  def clean_old_imports!
+    BulkImport.where('created_at <= ?', 1.week.ago).reorder(nil).in_batches.delete_all
+  end
+end
diff --git a/app/models/bulk_import.rb b/app/models/bulk_import.rb
new file mode 100644
index 0000000000..af9a9670bf
--- /dev/null
+++ b/app/models/bulk_import.rb
@@ -0,0 +1,53 @@
+# frozen_string_literal: true
+
+# == Schema Information
+#
+# Table name: bulk_imports
+#
+#  id                :bigint(8)        not null, primary key
+#  type              :integer          not null
+#  state             :integer          not null
+#  total_items       :integer          default(0), not null
+#  imported_items    :integer          default(0), not null
+#  processed_items   :integer          default(0), not null
+#  finished_at       :datetime
+#  overwrite         :boolean          default(FALSE), not null
+#  likely_mismatched :boolean          default(FALSE), not null
+#  original_filename :string           default(""), not null
+#  account_id        :bigint(8)        not null
+#  created_at        :datetime         not null
+#  updated_at        :datetime         not null
+#
+class BulkImport < ApplicationRecord
+  self.inheritance_column = false
+
+  belongs_to :account
+  has_many :rows, class_name: 'BulkImportRow', inverse_of: :bulk_import, dependent: :delete_all
+
+  enum type: {
+    following: 0,
+    blocking: 1,
+    muting: 2,
+    domain_blocking: 3,
+    bookmarks: 4,
+  }
+
+  enum state: {
+    unconfirmed: 0,
+    scheduled: 1,
+    in_progress: 2,
+    finished: 3,
+  }
+
+  validates :type, presence: true
+
+  def self.progress!(bulk_import_id, imported: false)
+    # Use `increment_counter` so that the incrementation is done atomically in the database
+    BulkImport.increment_counter(:processed_items, bulk_import_id) # rubocop:disable Rails/SkipsModelValidations
+    BulkImport.increment_counter(:imported_items, bulk_import_id) if imported # rubocop:disable Rails/SkipsModelValidations
+
+    # Since the incrementation has been done atomically, concurrent access to `bulk_import` is now bening
+    bulk_import = BulkImport.find(bulk_import_id)
+    bulk_import.update!(state: :finished, finished_at: Time.now.utc) if bulk_import.processed_items == bulk_import.total_items
+  end
+end
diff --git a/app/models/bulk_import_row.rb b/app/models/bulk_import_row.rb
new file mode 100644
index 0000000000..dd7190c970
--- /dev/null
+++ b/app/models/bulk_import_row.rb
@@ -0,0 +1,15 @@
+# frozen_string_literal: true
+
+# == Schema Information
+#
+# Table name: bulk_import_rows
+#
+#  id             :bigint(8)        not null, primary key
+#  bulk_import_id :bigint(8)        not null
+#  data           :jsonb
+#  created_at     :datetime         not null
+#  updated_at     :datetime         not null
+#
+class BulkImportRow < ApplicationRecord
+  belongs_to :bulk_import
+end
diff --git a/app/models/concerns/account_associations.rb b/app/models/concerns/account_associations.rb
index bbe269e8f0..592812e960 100644
--- a/app/models/concerns/account_associations.rb
+++ b/app/models/concerns/account_associations.rb
@@ -68,5 +68,8 @@ module AccountAssociations
 
     # Account statuses cleanup policy
     has_one :statuses_cleanup_policy, class_name: 'AccountStatusesCleanupPolicy', inverse_of: :account, dependent: :destroy
+
+    # Imports
+    has_many :bulk_imports, inverse_of: :account, dependent: :delete_all
   end
 end
diff --git a/app/models/form/import.rb b/app/models/form/import.rb
new file mode 100644
index 0000000000..750ef84be4
--- /dev/null
+++ b/app/models/form/import.rb
@@ -0,0 +1,151 @@
+# frozen_string_literal: true
+
+require 'csv'
+
+# A non-ActiveRecord helper class for CSV uploads.
+# Handles saving contents to database.
+class Form::Import
+  include ActiveModel::Model
+
+  MODES = %i(merge overwrite).freeze
+
+  FILE_SIZE_LIMIT       = 20.megabytes
+  ROWS_PROCESSING_LIMIT = 20_000
+
+  EXPECTED_HEADERS_BY_TYPE = {
+    following: ['Account address', 'Show boosts', 'Notify on new posts', 'Languages'],
+    blocking: ['Account address'],
+    muting: ['Account address', 'Hide notifications'],
+    domain_blocking: ['#domain'],
+    bookmarks: ['#uri'],
+  }.freeze
+
+  KNOWN_FIRST_HEADERS = EXPECTED_HEADERS_BY_TYPE.values.map(&:first).uniq.freeze
+
+  ATTRIBUTE_BY_HEADER = {
+    'Account address' => 'acct',
+    'Show boosts' => 'show_reblogs',
+    'Notify on new posts' => 'notify',
+    'Languages' => 'languages',
+    'Hide notifications' => 'hide_notifications',
+    '#domain' => 'domain',
+    '#uri' => 'uri',
+  }.freeze
+
+  class EmptyFileError < StandardError; end
+
+  attr_accessor :current_account, :data, :type, :overwrite, :bulk_import
+
+  validates :type, presence: true
+  validates :data, presence: true
+  validate :validate_data
+
+  def guessed_type
+    return :muting if csv_data.headers.include?('Hide notifications')
+    return :following if csv_data.headers.include?('Show boosts') || csv_data.headers.include?('Notify on new posts') || csv_data.headers.include?('Languages')
+    return :following if data.original_filename&.start_with?('follows') || data.original_filename&.start_with?('following_accounts')
+    return :blocking if data.original_filename&.start_with?('blocks') || data.original_filename&.start_with?('blocked_accounts')
+    return :muting if data.original_filename&.start_with?('mutes') || data.original_filename&.start_with?('muted_accounts')
+    return :domain_blocking if data.original_filename&.start_with?('domain_blocks') || data.original_filename&.start_with?('blocked_domains')
+    return :bookmarks if data.original_filename&.start_with?('bookmarks')
+  end
+
+  # Whether the uploaded CSV file seems to correspond to a different import type than the one selected
+  def likely_mismatched?
+    guessed_type.present? && guessed_type != type.to_sym
+  end
+
+  def save
+    return false unless valid?
+
+    ApplicationRecord.transaction do
+      now = Time.now.utc
+      @bulk_import = current_account.bulk_imports.create(type: type, overwrite: overwrite || false, state: :unconfirmed, original_filename: data.original_filename, likely_mismatched: likely_mismatched?)
+      nb_items = BulkImportRow.insert_all(parsed_rows.map { |row| { bulk_import_id: bulk_import.id, data: row, created_at: now, updated_at: now } }).length # rubocop:disable Rails/SkipsModelValidations
+      @bulk_import.update(total_items: nb_items)
+    end
+  end
+
+  def mode
+    overwrite ? :overwrite : :merge
+  end
+
+  def mode=(str)
+    self.overwrite = str.to_sym == :overwrite
+  end
+
+  private
+
+  def default_csv_header
+    case type.to_sym
+    when :following, :blocking, :muting
+      'Account address'
+    when :domain_blocking
+      '#domain'
+    when :bookmarks
+      '#uri'
+    end
+  end
+
+  def csv_data
+    return @csv_data if defined?(@csv_data)
+
+    csv_converter = lambda do |field, field_info|
+      case field_info.header
+      when 'Show boosts', 'Notify on new posts', 'Hide notifications'
+        ActiveModel::Type::Boolean.new.cast(field)
+      when 'Languages'
+        field&.split(',')&.map(&:strip)&.presence
+      when 'Account address'
+        field.strip.gsub(/\A@/, '')
+      when '#domain', '#uri'
+        field.strip
+      else
+        field
+      end
+    end
+
+    @csv_data = CSV.open(data.path, encoding: 'UTF-8', skip_blanks: true, headers: true, converters: csv_converter)
+    @csv_data.take(1) # Ensure the headers are read
+    raise EmptyFileError if @csv_data.headers == true
+
+    @csv_data = CSV.open(data.path, encoding: 'UTF-8', skip_blanks: true, headers: [default_csv_header], converters: csv_converter) unless KNOWN_FIRST_HEADERS.include?(@csv_data.headers&.first)
+    @csv_data
+  end
+
+  def csv_row_count
+    return @csv_row_count if defined?(@csv_row_count)
+
+    csv_data.rewind
+    @csv_row_count = csv_data.take(ROWS_PROCESSING_LIMIT + 2).count
+  end
+
+  def parsed_rows
+    csv_data.rewind
+
+    expected_headers = EXPECTED_HEADERS_BY_TYPE[type.to_sym]
+
+    csv_data.take(ROWS_PROCESSING_LIMIT + 1).map do |row|
+      row.to_h.slice(*expected_headers).transform_keys { |key| ATTRIBUTE_BY_HEADER[key] }
+    end
+  end
+
+  def validate_data
+    return if data.nil?
+    return errors.add(:data, I18n.t('imports.errors.too_large')) if data.size > FILE_SIZE_LIMIT
+    return errors.add(:data, I18n.t('imports.errors.incompatible_type')) unless csv_data.headers.include?(default_csv_header)
+
+    errors.add(:data, I18n.t('imports.errors.over_rows_processing_limit', count: ROWS_PROCESSING_LIMIT)) if csv_row_count > ROWS_PROCESSING_LIMIT
+
+    if type.to_sym == :following
+      base_limit = FollowLimitValidator.limit_for_account(current_account)
+      limit = base_limit
+      limit -= current_account.following_count unless overwrite
+      errors.add(:data, I18n.t('users.follow_limit_reached', limit: base_limit)) if csv_row_count > limit
+    end
+  rescue CSV::MalformedCSVError => e
+    errors.add(:data, I18n.t('imports.errors.invalid_csv_file', error: e.message))
+  rescue EmptyFileError
+    errors.add(:data, I18n.t('imports.errors.empty'))
+  end
+end
diff --git a/app/models/import.rb b/app/models/import.rb
index 21634005ed..7cd6cccf7c 100644
--- a/app/models/import.rb
+++ b/app/models/import.rb
@@ -17,6 +17,9 @@
 #  overwrite         :boolean          default(FALSE), not null
 #
 
+# NOTE: This is a deprecated model, only kept to not break ongoing imports
+# on upgrade. See `BulkImport` and `Form::Import` for its replacements.
+
 class Import < ApplicationRecord
   FILE_TYPES = %w(text/plain text/csv application/csv).freeze
   MODES = %i(merge overwrite).freeze
@@ -28,7 +31,6 @@ class Import < ApplicationRecord
   enum type: { following: 0, blocking: 1, muting: 2, domain_blocking: 3, bookmarks: 4 }
 
   validates :type, presence: true
-  validates_with ImportValidator, on: :create
 
   has_attached_file :data
   validates_attachment_content_type :data, content_type: FILE_TYPES
diff --git a/app/services/bulk_import_row_service.rb b/app/services/bulk_import_row_service.rb
new file mode 100644
index 0000000000..4046ef4eed
--- /dev/null
+++ b/app/services/bulk_import_row_service.rb
@@ -0,0 +1,60 @@
+# frozen_string_literal: true
+
+class BulkImportRowService
+  def call(row)
+    @account = row.bulk_import.account
+    @data    = row.data
+    @type    = row.bulk_import.type.to_sym
+
+    case @type
+    when :following, :blocking, :muting
+      target_acct     = @data['acct']
+      target_domain   = domain(target_acct)
+      @target_account = stoplight_wrap_request(target_domain) { ResolveAccountService.new.call(target_acct, { check_delivery_availability: true }) }
+      return false if @target_account.nil?
+    when :bookmarks
+      target_uri      = @data['uri']
+      target_domain   = Addressable::URI.parse(target_uri).normalized_host
+      @target_status = ActivityPub::TagManager.instance.uri_to_resource(target_uri, Status)
+      return false if @target_status.nil? && ActivityPub::TagManager.instance.local_uri?(target_uri)
+
+      @target_status ||= stoplight_wrap_request(target_domain) { ActivityPub::FetchRemoteStatusService.new.call(target_uri) }
+      return false if @target_status.nil?
+    end
+
+    case @type
+    when :following
+      FollowService.new.call(@account, @target_account, reblogs: @data['show_reblogs'], notify: @data['notify'], languages: @data['languages'])
+    when :blocking
+      BlockService.new.call(@account, @target_account)
+    when :muting
+      MuteService.new.call(@account, @target_account, notifications: @data['hide_notifications'])
+    when :bookmarks
+      return false unless StatusPolicy.new(@account, @target_status).show?
+
+      @account.bookmarks.find_or_create_by!(status: @target_status)
+    end
+
+    true
+  rescue ActiveRecord::RecordNotFound
+    false
+  end
+
+  def domain(uri)
+    domain = uri.is_a?(Account) ? uri.domain : uri.split('@')[1]
+    TagManager.instance.local_domain?(domain) ? nil : TagManager.instance.normalize_domain(domain)
+  end
+
+  def stoplight_wrap_request(domain, &block)
+    if domain.present?
+      Stoplight("source:#{domain}", &block)
+        .with_fallback { nil }
+        .with_threshold(1)
+        .with_cool_off_time(5.minutes.seconds)
+        .with_error_handler { |error, handle| error.is_a?(HTTP::Error) || error.is_a?(OpenSSL::SSL::SSLError) ? handle.call(error) : raise(error) }
+        .run
+    else
+      yield
+    end
+  end
+end
diff --git a/app/services/bulk_import_service.rb b/app/services/bulk_import_service.rb
new file mode 100644
index 0000000000..2701b0c7e0
--- /dev/null
+++ b/app/services/bulk_import_service.rb
@@ -0,0 +1,160 @@
+# frozen_string_literal: true
+
+class BulkImportService < BaseService
+  def call(import)
+    @import  = import
+    @account = @import.account
+
+    case @import.type.to_sym
+    when :following
+      import_follows!
+    when :blocking
+      import_blocks!
+    when :muting
+      import_mutes!
+    when :domain_blocking
+      import_domain_blocks!
+    when :bookmarks
+      import_bookmarks!
+    end
+
+    @import.update!(state: :finished, finished_at: Time.now.utc) if @import.processed_items == @import.total_items
+  rescue
+    @import.update!(state: :finished, finished_at: Time.now.utc)
+
+    raise
+  end
+
+  private
+
+  def extract_rows_by_acct
+    local_domain_suffix = "@#{Rails.configuration.x.local_domain}"
+    @import.rows.to_a.index_by { |row| row.data['acct'].delete_suffix(local_domain_suffix) }
+  end
+
+  def import_follows!
+    rows_by_acct = extract_rows_by_acct
+
+    if @import.overwrite?
+      @account.following.find_each do |followee|
+        row = rows_by_acct.delete(followee.acct)
+
+        if row.nil?
+          UnfollowService.new.call(@account, followee)
+        else
+          row.destroy
+          @import.processed_items += 1
+          @import.imported_items += 1
+
+          # Since we're updating the settings of an existing relationship, we can safely call
+          # FollowService directly
+          FollowService.new.call(@account, followee, reblogs: row.data['show_reblogs'], notify: row.data['notify'], languages: row.data['languages'])
+        end
+      end
+
+      # Save pending infos due to `overwrite?` handling
+      @import.save!
+    end
+
+    Import::RowWorker.push_bulk(rows_by_acct.values) do |row|
+      [row.id]
+    end
+  end
+
+  def import_blocks!
+    rows_by_acct = extract_rows_by_acct
+
+    if @import.overwrite?
+      @account.blocking.find_each do |blocked_account|
+        row = rows_by_acct.delete(blocked_account.acct)
+
+        if row.nil?
+          UnblockService.new.call(@account, blocked_account)
+        else
+          row.destroy
+          @import.processed_items += 1
+          @import.imported_items += 1
+          BlockService.new.call(@account, blocked_account)
+        end
+      end
+
+      # Save pending infos due to `overwrite?` handling
+      @import.save!
+    end
+
+    Import::RowWorker.push_bulk(rows_by_acct.values) do |row|
+      [row.id]
+    end
+  end
+
+  def import_mutes!
+    rows_by_acct = extract_rows_by_acct
+
+    if @import.overwrite?
+      @account.muting.find_each do |muted_account|
+        row = rows_by_acct.delete(muted_account.acct)
+
+        if row.nil?
+          UnmuteService.new.call(@account, muted_account)
+        else
+          row.destroy
+          @import.processed_items += 1
+          @import.imported_items += 1
+          MuteService.new.call(@account, muted_account, notifications: row.data['hide_notifications'])
+        end
+      end
+
+      # Save pending infos due to `overwrite?` handling
+      @import.save!
+    end
+
+    Import::RowWorker.push_bulk(rows_by_acct.values) do |row|
+      [row.id]
+    end
+  end
+
+  def import_domain_blocks!
+    domains = @import.rows.map { |row| row.data['domain'] }
+
+    if @import.overwrite?
+      @account.domain_blocks.find_each do |domain_block|
+        domain = domains.delete(domain_block)
+
+        @account.unblock_domain!(domain_block.domain) if domain.nil?
+      end
+    end
+
+    @import.rows.delete_all
+    domains.each { |domain| @account.block_domain!(domain) }
+    @import.update!(processed_items: @import.total_items, imported_items: @import.total_items)
+
+    AfterAccountDomainBlockWorker.push_bulk(domains) do |domain|
+      [@account.id, domain]
+    end
+  end
+
+  def import_bookmarks!
+    rows_by_uri = @import.rows.index_by { |row| row.data['uri'] }
+
+    if @import.overwrite?
+      @account.bookmarks.includes(:status).find_each do |bookmark|
+        row = rows_by_uri.delete(ActivityPub::TagManager.instance.uri_for(bookmark.status))
+
+        if row.nil?
+          bookmark.destroy!
+        else
+          row.destroy
+          @import.processed_items += 1
+          @import.imported_items += 1
+        end
+      end
+
+      # Save pending infos due to `overwrite?` handling
+      @import.save!
+    end
+
+    Import::RowWorker.push_bulk(rows_by_uri.values) do |row|
+      [row.id]
+    end
+  end
+end
diff --git a/app/services/import_service.rb b/app/services/import_service.rb
index f6c94cbb6f..133c081be5 100644
--- a/app/services/import_service.rb
+++ b/app/services/import_service.rb
@@ -2,6 +2,9 @@
 
 require 'csv'
 
+# NOTE: This is a deprecated service, only kept to not break ongoing imports
+# on upgrade. See `BulkImportService` for its replacement.
+
 class ImportService < BaseService
   ROWS_PROCESSING_LIMIT = 20_000
 
diff --git a/app/validators/import_validator.rb b/app/validators/import_validator.rb
deleted file mode 100644
index 782baf5d6b..0000000000
--- a/app/validators/import_validator.rb
+++ /dev/null
@@ -1,46 +0,0 @@
-# frozen_string_literal: true
-
-require 'csv'
-
-class ImportValidator < ActiveModel::Validator
-  KNOWN_HEADERS = [
-    'Account address',
-    '#domain',
-    '#uri',
-  ].freeze
-
-  def validate(import)
-    return if import.type.blank? || import.data.blank?
-
-    # We parse because newlines could be part of individual rows. This
-    # runs on create so we should be reading the local file here before
-    # it is uploaded to object storage or moved anywhere...
-    csv_data = CSV.parse(import.data.queued_for_write[:original].read)
-
-    row_count  = csv_data.size
-    row_count -= 1 if KNOWN_HEADERS.include?(csv_data.first&.first)
-
-    import.errors.add(:data, I18n.t('imports.errors.over_rows_processing_limit', count: ImportService::ROWS_PROCESSING_LIMIT)) if row_count > ImportService::ROWS_PROCESSING_LIMIT
-
-    case import.type
-    when 'following'
-      validate_following_import(import, row_count)
-    end
-  rescue CSV::MalformedCSVError
-    import.errors.add(:data, :malformed)
-  end
-
-  private
-
-  def validate_following_import(import, row_count)
-    base_limit = FollowLimitValidator.limit_for_account(import.account)
-
-    limit = if import.overwrite?
-              base_limit
-            else
-              base_limit - import.account.following_count
-            end
-
-    import.errors.add(:data, I18n.t('users.follow_limit_reached', limit: base_limit)) if row_count > limit
-  end
-end
diff --git a/app/views/settings/imports/index.html.haml b/app/views/settings/imports/index.html.haml
new file mode 100644
index 0000000000..f55d850791
--- /dev/null
+++ b/app/views/settings/imports/index.html.haml
@@ -0,0 +1,49 @@
+- content_for :page_title do
+  = t('settings.import')
+
+= simple_form_for @import, url: settings_imports_path do |f|
+  .field-group
+    = f.input :type, as: :grouped_select, collection: { constructive: %i(following bookmarks), destructive: %i(muting blocking domain_blocking) }, wrapper: :with_block_label, include_blank: false, label_method: ->(type) { I18n.t("imports.types.#{type}") }, group_label_method: ->(group) { I18n.t("imports.type_groups.#{group.first}") }, group_method: :last, hint: t('imports.preface')
+
+  .fields-row
+    .fields-group.fields-row__column.fields-row__column-6
+      = f.input :data, wrapper: :with_block_label, hint: t('simple_form.hints.imports.data')
+    .fields-group.fields-row__column.fields-row__column-6
+      = f.input :mode, as: :radio_buttons, collection: Import::MODES, label_method: ->(mode) { safe_join([I18n.t("imports.modes.#{mode}"), content_tag(:span, I18n.t("imports.modes.#{mode}_long"), class: 'hint')]) }, collection_wrapper_tag: 'ul', item_wrapper_tag: 'li'
+
+  .actions
+    = f.button :button, t('imports.upload'), type: :submit
+
+- unless @recent_imports.empty?
+  %hr.spacer/
+
+  %h3= t('imports.recent_imports')
+
+  .table-wrapper
+    %table.table
+      %thead
+        %tr
+          %th= t('imports.type')
+          %th= t('imports.status')
+          %th= t('imports.imported')
+          %th= t('imports.time_started')
+          %th= t('imports.failures')
+      %tbody
+        - @recent_imports.each do |import|
+          %tr
+            %td= t("imports.types.#{import.type}")
+            %td
+              - if import.unconfirmed?
+                = link_to t("imports.states.#{import.state}"), settings_import_path(import)
+              - else
+                = t("imports.states.#{import.state}")
+            %td
+              #{import.imported_items} / #{import.total_items}
+            %td= l(import.created_at)
+            %td
+              - num_failed = import.processed_items - import.imported_items
+              - if num_failed.positive?
+                - if import.finished?
+                  = link_to num_failed, failures_settings_import_path(import, format: 'csv')
+                - else
+                  = num_failed
diff --git a/app/views/settings/imports/show.html.haml b/app/views/settings/imports/show.html.haml
index 7bb4beb01c..65954e3e1e 100644
--- a/app/views/settings/imports/show.html.haml
+++ b/app/views/settings/imports/show.html.haml
@@ -1,15 +1,15 @@
 - content_for :page_title do
-  = t('settings.import')
+  = t("imports.titles.#{@bulk_import.type.to_s}")
 
-= simple_form_for @import, url: settings_import_path do |f|
-  .field-group
-    = f.input :type, collection: Import.types.keys, wrapper: :with_block_label, include_blank: false, label_method: lambda { |type| I18n.t("imports.types.#{type}") }, hint: t('imports.preface')
+- if @bulk_import.likely_mismatched?
+  .flash-message.warning= t("imports.mismatched_types_warning")
 
-  .fields-row
-    .fields-group.fields-row__column.fields-row__column-6
-      = f.input :data, wrapper: :with_block_label, hint: t('simple_form.hints.imports.data')
-    .fields-group.fields-row__column.fields-row__column-6
-      = f.input :mode, as: :radio_buttons, collection: Import::MODES, label_method: lambda { |mode| safe_join([I18n.t("imports.modes.#{mode}"), content_tag(:span, I18n.t("imports.modes.#{mode}_long"), class: 'hint')]) }, collection_wrapper_tag: 'ul', item_wrapper_tag: 'li'
+- if @bulk_import.overwrite?
+  %p.hint= t("imports.overwrite_preambles.#{@bulk_import.type.to_s}_html", filename: @bulk_import.original_filename, total_items: @bulk_import.total_items)
+- else
+  %p.hint= t("imports.preambles.#{@bulk_import.type.to_s}_html", filename: @bulk_import.original_filename, total_items: @bulk_import.total_items)
 
+.simple_form
   .actions
-    = f.button :button, t('imports.upload'), type: :submit
+    = link_to t('generic.cancel'), settings_import_path(@bulk_import), method: :delete, class: 'button button-tertiary'
+    = link_to t('generic.confirm'), confirm_settings_import_path(@bulk_import), method: :post, class: 'button'
diff --git a/app/workers/bulk_import_worker.rb b/app/workers/bulk_import_worker.rb
new file mode 100644
index 0000000000..54571a95c0
--- /dev/null
+++ b/app/workers/bulk_import_worker.rb
@@ -0,0 +1,13 @@
+# frozen_string_literal: true
+
+class BulkImportWorker
+  include Sidekiq::Worker
+
+  sidekiq_options queue: 'pull', retry: false
+
+  def perform(import_id)
+    import = BulkImport.find(import_id)
+    import.update!(state: :in_progress)
+    BulkImportService.new.call(import)
+  end
+end
diff --git a/app/workers/import/relationship_worker.rb b/app/workers/import/relationship_worker.rb
index c2728c1961..a411ab5876 100644
--- a/app/workers/import/relationship_worker.rb
+++ b/app/workers/import/relationship_worker.rb
@@ -1,5 +1,8 @@
 # frozen_string_literal: true
 
+# NOTE: This is a deprecated worker, only kept to not break ongoing imports
+# on upgrade. See `Import::RowWorker` for its replacement.
+
 class Import::RelationshipWorker
   include Sidekiq::Worker
 
diff --git a/app/workers/import/row_worker.rb b/app/workers/import/row_worker.rb
new file mode 100644
index 0000000000..09dd6ce736
--- /dev/null
+++ b/app/workers/import/row_worker.rb
@@ -0,0 +1,33 @@
+# frozen_string_literal: true
+
+class Import::RowWorker
+  include Sidekiq::Worker
+
+  sidekiq_options queue: 'pull', retry: 6, dead: false
+
+  sidekiq_retries_exhausted do |msg, _exception|
+    ActiveRecord::Base.connection_pool.with_connection do
+      # Increment the total number of processed items, and bump the state of the import if needed
+      bulk_import_id = BulkImportRow.where(id: msg['args'][0]).pick(:id)
+      BulkImport.progress!(bulk_import_id) unless bulk_import_id.nil?
+    end
+  end
+
+  def perform(row_id)
+    row = BulkImportRow.eager_load(bulk_import: :account).find_by(id: row_id)
+    return true if row.nil?
+
+    imported = BulkImportRowService.new.call(row)
+
+    mark_as_processed!(row, imported)
+  end
+
+  private
+
+  def mark_as_processed!(row, imported)
+    bulk_import_id = row.bulk_import_id
+    row.destroy! if imported
+
+    BulkImport.progress!(bulk_import_id, imported: imported)
+  end
+end
diff --git a/app/workers/import_worker.rb b/app/workers/import_worker.rb
index dfa71b29ec..b6afb972a9 100644
--- a/app/workers/import_worker.rb
+++ b/app/workers/import_worker.rb
@@ -1,5 +1,8 @@
 # frozen_string_literal: true
 
+# NOTE: This is a deprecated worker, only kept to not break ongoing imports
+# on upgrade. See `ImportWorker` for its replacement.
+
 class ImportWorker
   include Sidekiq::Worker
 
diff --git a/app/workers/scheduler/vacuum_scheduler.rb b/app/workers/scheduler/vacuum_scheduler.rb
index 9544f808bf..9e884caefd 100644
--- a/app/workers/scheduler/vacuum_scheduler.rb
+++ b/app/workers/scheduler/vacuum_scheduler.rb
@@ -23,6 +23,7 @@ class Scheduler::VacuumScheduler
       backups_vacuum,
       access_tokens_vacuum,
       feeds_vacuum,
+      imports_vacuum,
     ]
   end
 
@@ -50,6 +51,10 @@ class Scheduler::VacuumScheduler
     Vacuum::FeedsVacuum.new
   end
 
+  def imports_vacuum
+    Vacuum::ImportsVacuum.new
+  end
+
   def content_retention_policy
     ContentRetentionPolicy.current
   end
diff --git a/config/i18n-tasks.yml b/config/i18n-tasks.yml
index 46dd3124bf..ddf503197c 100644
--- a/config/i18n-tasks.yml
+++ b/config/i18n-tasks.yml
@@ -64,6 +64,8 @@ ignore_unused:
   - 'statuses.attached.*'
   - 'move_handler.carry_{mutes,blocks}_over_text'
   - 'notification_mailer.*'
+  - 'imports.overwrite_preambles.{following,blocking,muting,domain_blocking,bookmarks}_html'
+  - 'imports.preambles.{following,blocking,muting,domain_blocking,bookmarks}_html'
 
 ignore_inconsistent_interpolations:
   - '*.one'
diff --git a/config/locales/en.yml b/config/locales/en.yml
index ddefbc49b1..0188519c26 100644
--- a/config/locales/en.yml
+++ b/config/locales/en.yml
@@ -1218,7 +1218,9 @@ en:
     all_matching_items_selected_html:
       one: "<strong>%{count}</strong> item matching your search is selected."
       other: All <strong>%{count}</strong> items matching your search are selected.
+    cancel: Cancel
     changes_saved_msg: Changes successfully saved!
+    confirm: Confirm
     copy: Copy
     delete: Delete
     deselect: Deselect all
@@ -1234,15 +1236,51 @@ en:
       other: Something isn't quite right yet! Please review %{count} errors below
   imports:
     errors:
+      empty: Empty CSV file
+      incompatible_type: Incompatible with the selected import type
       invalid_csv_file: 'Invalid CSV file. Error: %{error}'
       over_rows_processing_limit: contains more than %{count} rows
+      too_large: File is too large
+    failures: Failures
+    imported: Imported
+    mismatched_types_warning: It appears you may have selected the wrong type for this import, please double-check.
     modes:
       merge: Merge
       merge_long: Keep existing records and add new ones
       overwrite: Overwrite
       overwrite_long: Replace current records with the new ones
+    overwrite_preambles:
+      blocking_html: You are about to <strong>replace your block list</strong> with up to <strong>%{total_items} accounts</strong> from <strong>%{filename}</strong>.
+      bookmarks_html: You are about to <strong>replace your bookmarks</strong> with up to <strong>%{total_items} posts</strong> from <strong>%{filename}</strong>.
+      domain_blocking_html: You are about to <strong>replace your domain block list</strong> with up to <strong>%{total_items} domains</strong> from <strong>%{filename}</strong>.
+      following_html: You are about to <strong>follow</strong> up to <strong>%{total_items} accounts</strong> from <strong>%{filename}</strong> and <strong>stop following anyone else</strong>.
+      muting_html: You are about to <strong>replace your list of muted accounts</strong> with up to <strong>%{total_items} accounts</strong> from <strong>%{filename}</strong>.
+    preambles:
+      blocking_html: You are about to <strong>block</strong> up to <strong>%{total_items} accounts</strong> from <strong>%{filename}</strong>.
+      bookmarks_html: You are about to add up to <strong>%{total_items} posts</strong> from <strong>%{filename}</strong> to your <strong>bookmarks</strong>.
+      domain_blocking_html: You are about to <strong>block</strong> up to <strong>%{total_items} domains</strong> from <strong>%{filename}</strong>.
+      following_html: You are about to <strong>follow</strong> up to <strong>%{total_items} accounts</strong> from <strong>%{filename}</strong>.
+      muting_html: You are about to <strong>mute</strong> up to <strong>%{total_items} accounts</strong> from <strong>%{filename}</strong>.
     preface: You can import data that you have exported from another server, such as a list of the people you are following or blocking.
+    recent_imports: Recent imports
+    states:
+      finished: Finished
+      in_progress: In progress
+      scheduled: Scheduled
+      unconfirmed: Unconfirmed
+    status: Status
     success: Your data was successfully uploaded and will be processed in due time
+    time_started: Started at
+    titles:
+      blocking: Importing blocked accounts
+      bookmarks: Importing bookmarks
+      domain_blocking: Importing blocked domains
+      following: Importing followed accounts
+      muting: Importing muted accounts
+    type: Import type
+    type_groups:
+      constructive: Follows & Bookmarks
+      destructive: Blocks & mutes
     types:
       blocking: Blocking list
       bookmarks: Bookmarks
diff --git a/config/navigation.rb b/config/navigation.rb
index 30817d0252..4b829c151a 100644
--- a/config/navigation.rb
+++ b/config/navigation.rb
@@ -26,7 +26,7 @@ SimpleNavigation::Configuration.run do |navigation|
     end
 
     n.item :data, safe_join([fa_icon('cloud-download fw'), t('settings.import_and_export')]), settings_export_path do |s|
-      s.item :import, safe_join([fa_icon('cloud-upload fw'), t('settings.import')]), settings_import_path, if: -> { current_user.functional? }
+      s.item :import, safe_join([fa_icon('cloud-upload fw'), t('settings.import')]), settings_imports_path, if: -> { current_user.functional? }
       s.item :export, safe_join([fa_icon('cloud-download fw'), t('settings.export')]), settings_export_path
     end
 
diff --git a/config/routes.rb b/config/routes.rb
index c09c5175a2..f177a1fdb9 100644
--- a/config/routes.rb
+++ b/config/routes.rb
@@ -144,7 +144,13 @@ Rails.application.routes.draw do
       resource :other, only: [:show, :update], controller: :other
     end
 
-    resource :import, only: [:show, :create]
+    resources :imports, only: [:index, :show, :destroy, :create] do
+      member do
+        post :confirm
+        get :failures
+      end
+    end
+
     resource :export, only: [:show, :create]
 
     namespace :exports, constraints: { format: :csv } do
diff --git a/db/migrate/20230330135507_create_bulk_imports.rb b/db/migrate/20230330135507_create_bulk_imports.rb
new file mode 100644
index 0000000000..117a135086
--- /dev/null
+++ b/db/migrate/20230330135507_create_bulk_imports.rb
@@ -0,0 +1,22 @@
+# frozen_string_literal: true
+
+class CreateBulkImports < ActiveRecord::Migration[6.1]
+  def change
+    create_table :bulk_imports do |t|
+      t.integer :type, null: false
+      t.integer :state, null: false
+      t.integer :total_items, null: false, default: 0
+      t.integer :imported_items, null: false, default: 0
+      t.integer :processed_items, null: false, default: 0
+      t.datetime :finished_at
+      t.boolean :overwrite, null: false, default: false
+      t.boolean :likely_mismatched, null: false, default: false
+      t.string :original_filename, null: false, default: ''
+      t.references :account, null: false, foreign_key: { on_delete: :cascade }
+
+      t.timestamps
+    end
+
+    add_index :bulk_imports, [:id], name: :index_bulk_imports_unconfirmed, where: 'state = 0'
+  end
+end
diff --git a/db/migrate/20230330140036_create_bulk_import_rows.rb b/db/migrate/20230330140036_create_bulk_import_rows.rb
new file mode 100644
index 0000000000..ef6ad7069b
--- /dev/null
+++ b/db/migrate/20230330140036_create_bulk_import_rows.rb
@@ -0,0 +1,12 @@
+# frozen_string_literal: true
+
+class CreateBulkImportRows < ActiveRecord::Migration[6.1]
+  def change
+    create_table :bulk_import_rows do |t|
+      t.references :bulk_import, null: false, foreign_key: { on_delete: :cascade }
+      t.jsonb :data
+
+      t.timestamps
+    end
+  end
+end
diff --git a/db/schema.rb b/db/schema.rb
index 620bed2bc9..7bd6f4d602 100644
--- a/db/schema.rb
+++ b/db/schema.rb
@@ -10,7 +10,7 @@
 #
 # It's strongly recommended that you check this file into your version control system.
 
-ActiveRecord::Schema.define(version: 2023_02_15_074423) do
+ActiveRecord::Schema.define(version: 2023_03_30_140036) do
 
   # These are extensions that must be enabled in order to support this database
   enable_extension "plpgsql"
@@ -294,6 +294,31 @@ ActiveRecord::Schema.define(version: 2023_02_15_074423) do
     t.index ["status_id"], name: "index_bookmarks_on_status_id"
   end
 
+  create_table "bulk_import_rows", force: :cascade do |t|
+    t.bigint "bulk_import_id", null: false
+    t.jsonb "data"
+    t.datetime "created_at", precision: 6, null: false
+    t.datetime "updated_at", precision: 6, null: false
+    t.index ["bulk_import_id"], name: "index_bulk_import_rows_on_bulk_import_id"
+  end
+
+  create_table "bulk_imports", force: :cascade do |t|
+    t.integer "type", null: false
+    t.integer "state", null: false
+    t.integer "total_items", default: 0, null: false
+    t.integer "imported_items", default: 0, null: false
+    t.integer "processed_items", default: 0, null: false
+    t.datetime "finished_at"
+    t.boolean "overwrite", default: false, null: false
+    t.boolean "likely_mismatched", default: false, null: false
+    t.string "original_filename", default: "", null: false
+    t.bigint "account_id", null: false
+    t.datetime "created_at", precision: 6, null: false
+    t.datetime "updated_at", precision: 6, null: false
+    t.index ["account_id"], name: "index_bulk_imports_on_account_id"
+    t.index ["id"], name: "index_bulk_imports_unconfirmed", where: "(state = 0)"
+  end
+
   create_table "canonical_email_blocks", force: :cascade do |t|
     t.string "canonical_email_hash", default: "", null: false
     t.bigint "reference_account_id"
@@ -1146,6 +1171,8 @@ ActiveRecord::Schema.define(version: 2023_02_15_074423) do
   add_foreign_key "blocks", "accounts", name: "fk_4269e03e65", on_delete: :cascade
   add_foreign_key "bookmarks", "accounts", on_delete: :cascade
   add_foreign_key "bookmarks", "statuses", on_delete: :cascade
+  add_foreign_key "bulk_import_rows", "bulk_imports", on_delete: :cascade
+  add_foreign_key "bulk_imports", "accounts", on_delete: :cascade
   add_foreign_key "canonical_email_blocks", "accounts", column: "reference_account_id", on_delete: :cascade
   add_foreign_key "conversation_mutes", "accounts", name: "fk_225b4212bb", on_delete: :cascade
   add_foreign_key "conversation_mutes", "conversations", on_delete: :cascade
diff --git a/spec/controllers/settings/imports_controller_spec.rb b/spec/controllers/settings/imports_controller_spec.rb
index 98ba897e41..500b442396 100644
--- a/spec/controllers/settings/imports_controller_spec.rb
+++ b/spec/controllers/settings/imports_controller_spec.rb
@@ -5,13 +5,22 @@ require 'rails_helper'
 RSpec.describe Settings::ImportsController, type: :controller do
   render_views
 
+  let(:user) { Fabricate(:user) }
+
   before do
-    sign_in Fabricate(:user), scope: :user
+    sign_in user, scope: :user
   end
 
-  describe 'GET #show' do
+  describe 'GET #index' do
+    let!(:import)       { Fabricate(:bulk_import, account: user.account) }
+    let!(:other_import) { Fabricate(:bulk_import) }
+
     before do
-      get :show
+      get :index
+    end
+
+    it 'assigns the expected imports' do
+      expect(assigns(:recent_imports)).to eq [import]
     end
 
     it 'returns http success' do
@@ -23,31 +32,288 @@ RSpec.describe Settings::ImportsController, type: :controller do
     end
   end
 
-  describe 'POST #create' do
-    it 'redirects to settings path with successful following import' do
-      service = double(call: nil)
-      allow(ResolveAccountService).to receive(:new).and_return(service)
-      post :create, params: {
-        import: {
-          type: 'following',
-          data: fixture_file_upload('imports.txt'),
-        },
-      }
-
-      expect(response).to redirect_to(settings_import_path)
+  describe 'GET #show' do
+    before do
+      get :show, params: { id: bulk_import.id }
     end
 
-    it 'redirects to settings path with successful blocking import' do
-      service = double(call: nil)
-      allow(ResolveAccountService).to receive(:new).and_return(service)
-      post :create, params: {
-        import: {
-          type: 'blocking',
-          data: fixture_file_upload('imports.txt'),
-        },
-      }
+    context 'with someone else\'s import' do
+      let(:bulk_import) { Fabricate(:bulk_import, state: :unconfirmed) }
 
-      expect(response).to redirect_to(settings_import_path)
+      it 'returns http not found' do
+        expect(response).to have_http_status(404)
+      end
+    end
+
+    context 'with an already-confirmed import' do
+      let(:bulk_import) { Fabricate(:bulk_import, account: user.account, state: :in_progress) }
+
+      it 'returns http not found' do
+        expect(response).to have_http_status(404)
+      end
+    end
+
+    context 'with an unconfirmed import' do
+      let(:bulk_import) { Fabricate(:bulk_import, account: user.account, state: :unconfirmed) }
+
+      it 'returns http success' do
+        expect(response).to have_http_status(200)
+      end
     end
   end
+
+  describe 'POST #confirm' do
+    subject { post :confirm, params: { id: bulk_import.id } }
+
+    before do
+      allow(BulkImportWorker).to receive(:perform_async)
+    end
+
+    context 'with someone else\'s import' do
+      let(:bulk_import) { Fabricate(:bulk_import, state: :unconfirmed) }
+
+      it 'does not change the import\'s state' do
+        expect { subject }.to_not(change { bulk_import.reload.state })
+      end
+
+      it 'does not fire the import worker' do
+        subject
+        expect(BulkImportWorker).to_not have_received(:perform_async)
+      end
+
+      it 'returns http not found' do
+        subject
+        expect(response).to have_http_status(404)
+      end
+    end
+
+    context 'with an already-confirmed import' do
+      let(:bulk_import) { Fabricate(:bulk_import, account: user.account, state: :in_progress) }
+
+      it 'does not change the import\'s state' do
+        expect { subject }.to_not(change { bulk_import.reload.state })
+      end
+
+      it 'does not fire the import worker' do
+        subject
+        expect(BulkImportWorker).to_not have_received(:perform_async)
+      end
+
+      it 'returns http not found' do
+        subject
+        expect(response).to have_http_status(404)
+      end
+    end
+
+    context 'with an unconfirmed import' do
+      let(:bulk_import) { Fabricate(:bulk_import, account: user.account, state: :unconfirmed) }
+
+      it 'changes the import\'s state to scheduled' do
+        expect { subject }.to change { bulk_import.reload.state.to_sym }.from(:unconfirmed).to(:scheduled)
+      end
+
+      it 'fires the import worker on the expected import' do
+        subject
+        expect(BulkImportWorker).to have_received(:perform_async).with(bulk_import.id)
+      end
+
+      it 'redirects to imports path' do
+        subject
+        expect(response).to redirect_to(settings_imports_path)
+      end
+    end
+  end
+
+  describe 'DELETE #destroy' do
+    subject { delete :destroy, params: { id: bulk_import.id } }
+
+    context 'with someone else\'s import' do
+      let(:bulk_import) { Fabricate(:bulk_import, state: :unconfirmed) }
+
+      it 'does not delete the import' do
+        expect { subject }.to_not(change { BulkImport.exists?(bulk_import.id) })
+      end
+
+      it 'returns http not found' do
+        subject
+        expect(response).to have_http_status(404)
+      end
+    end
+
+    context 'with an already-confirmed import' do
+      let(:bulk_import) { Fabricate(:bulk_import, account: user.account, state: :in_progress) }
+
+      it 'does not delete the import' do
+        expect { subject }.to_not(change { BulkImport.exists?(bulk_import.id) })
+      end
+
+      it 'returns http not found' do
+        subject
+        expect(response).to have_http_status(404)
+      end
+    end
+
+    context 'with an unconfirmed import' do
+      let(:bulk_import) { Fabricate(:bulk_import, account: user.account, state: :unconfirmed) }
+
+      it 'deletes the import' do
+        expect { subject }.to change { BulkImport.exists?(bulk_import.id) }.from(true).to(false)
+      end
+
+      it 'redirects to imports path' do
+        subject
+        expect(response).to redirect_to(settings_imports_path)
+      end
+    end
+  end
+
+  describe 'GET #failures' do
+    subject { get :failures, params: { id: bulk_import.id }, format: :csv }
+
+    shared_examples 'export failed rows' do |expected_contents|
+      let(:bulk_import) { Fabricate(:bulk_import, account: user.account, type: import_type, state: :finished) }
+
+      before do
+        bulk_import.update(total_items: bulk_import.rows.count, processed_items: bulk_import.rows.count, imported_items: 0)
+      end
+
+      it 'returns http success' do
+        subject
+        expect(response).to have_http_status(200)
+      end
+
+      it 'returns expected contents' do
+        subject
+        expect(response.body).to eq expected_contents
+      end
+    end
+
+    context 'with follows' do
+      let(:import_type) { 'following' }
+
+      let!(:rows) do
+        [
+          { 'acct' => 'foo@bar' },
+          { 'acct' => 'user@bar', 'show_reblogs' => false, 'notify' => true, 'languages' => ['fr', 'de'] },
+        ].map { |data| Fabricate(:bulk_import_row, bulk_import: bulk_import, data: data) }
+      end
+
+      include_examples 'export failed rows', "Account address,Show boosts,Notify on new posts,Languages\nfoo@bar,true,false,\nuser@bar,false,true,\"fr, de\"\n"
+    end
+
+    context 'with blocks' do
+      let(:import_type) { 'blocking' }
+
+      let!(:rows) do
+        [
+          { 'acct' => 'foo@bar' },
+          { 'acct' => 'user@bar' },
+        ].map { |data| Fabricate(:bulk_import_row, bulk_import: bulk_import, data: data) }
+      end
+
+      include_examples 'export failed rows', "foo@bar\nuser@bar\n"
+    end
+
+    context 'with mutes' do
+      let(:import_type) { 'muting' }
+
+      let!(:rows) do
+        [
+          { 'acct' => 'foo@bar' },
+          { 'acct' => 'user@bar', 'hide_notifications' => false },
+        ].map { |data| Fabricate(:bulk_import_row, bulk_import: bulk_import, data: data) }
+      end
+
+      include_examples 'export failed rows', "Account address,Hide notifications\nfoo@bar,true\nuser@bar,false\n"
+    end
+
+    context 'with domain blocks' do
+      let(:import_type) { 'domain_blocking' }
+
+      let!(:rows) do
+        [
+          { 'domain' => 'bad.domain' },
+          { 'domain' => 'evil.domain' },
+        ].map { |data| Fabricate(:bulk_import_row, bulk_import: bulk_import, data: data) }
+      end
+
+      include_examples 'export failed rows', "bad.domain\nevil.domain\n"
+    end
+
+    context 'with bookmarks' do
+      let(:import_type) { 'bookmarks' }
+
+      let!(:rows) do
+        [
+          { 'uri' => 'https://foo.com/1' },
+          { 'uri' => 'https://foo.com/2' },
+        ].map { |data| Fabricate(:bulk_import_row, bulk_import: bulk_import, data: data) }
+      end
+
+      include_examples 'export failed rows', "https://foo.com/1\nhttps://foo.com/2\n"
+    end
+  end
+
+  describe 'POST #create' do
+    subject do
+      post :create, params: {
+        form_import: {
+          type: import_type,
+          mode: import_mode,
+          data: fixture_file_upload(import_file),
+        },
+      }
+    end
+
+    shared_examples 'successful import' do |type, file, mode|
+      let(:import_type) { type }
+      let(:import_file) { file }
+      let(:import_mode) { mode }
+
+      it 'creates an unconfirmed bulk_import with expected type' do
+        expect { subject }.to change { user.account.bulk_imports.pluck(:state, :type) }.from([]).to([['unconfirmed', import_type]])
+      end
+
+      it 'redirects to confirmation page for the import' do
+        subject
+        expect(response).to redirect_to(settings_import_path(user.account.bulk_imports.first))
+      end
+    end
+
+    shared_examples 'unsuccessful import' do |type, file, mode|
+      let(:import_type) { type }
+      let(:import_file) { file }
+      let(:import_mode) { mode }
+
+      it 'does not creates an unconfirmed bulk_import' do
+        expect { subject }.to_not(change { user.account.bulk_imports.count })
+      end
+
+      it 'sets error to the import' do
+        subject
+        expect(assigns(:import).errors).to_not be_empty
+      end
+    end
+
+    it_behaves_like 'successful import', 'following', 'imports.txt', 'merge'
+    it_behaves_like 'successful import', 'following', 'imports.txt', 'overwrite'
+    it_behaves_like 'successful import', 'blocking', 'imports.txt', 'merge'
+    it_behaves_like 'successful import', 'blocking', 'imports.txt', 'overwrite'
+    it_behaves_like 'successful import', 'muting', 'imports.txt', 'merge'
+    it_behaves_like 'successful import', 'muting', 'imports.txt', 'overwrite'
+    it_behaves_like 'successful import', 'domain_blocking', 'domain_blocks.csv', 'merge'
+    it_behaves_like 'successful import', 'domain_blocking', 'domain_blocks.csv', 'overwrite'
+    it_behaves_like 'successful import', 'bookmarks', 'bookmark-imports.txt', 'merge'
+    it_behaves_like 'successful import', 'bookmarks', 'bookmark-imports.txt', 'overwrite'
+
+    it_behaves_like 'unsuccessful import', 'following', 'domain_blocks.csv', 'merge'
+    it_behaves_like 'unsuccessful import', 'following', 'domain_blocks.csv', 'overwrite'
+    it_behaves_like 'unsuccessful import', 'blocking', 'domain_blocks.csv', 'merge'
+    it_behaves_like 'unsuccessful import', 'blocking', 'domain_blocks.csv', 'overwrite'
+    it_behaves_like 'unsuccessful import', 'muting', 'domain_blocks.csv', 'merge'
+    it_behaves_like 'unsuccessful import', 'muting', 'domain_blocks.csv', 'overwrite'
+
+    it_behaves_like 'unsuccessful import', 'following', 'empty.csv', 'merge'
+    it_behaves_like 'unsuccessful import', 'following', 'empty.csv', 'overwrite'
+  end
 end
diff --git a/spec/fabricators/bulk_import_fabricator.rb b/spec/fabricators/bulk_import_fabricator.rb
new file mode 100644
index 0000000000..673b7960d9
--- /dev/null
+++ b/spec/fabricators/bulk_import_fabricator.rb
@@ -0,0 +1,12 @@
+# frozen_string_literal: true
+
+Fabricator(:bulk_import) do
+  type            1
+  state           1
+  total_items     1
+  processed_items 1
+  imported_items  1
+  finished_at     '2022-11-18 14:55:07'
+  overwrite       false
+  account
+end
diff --git a/spec/fabricators/bulk_import_row_fabricator.rb b/spec/fabricators/bulk_import_row_fabricator.rb
new file mode 100644
index 0000000000..f8358e734d
--- /dev/null
+++ b/spec/fabricators/bulk_import_row_fabricator.rb
@@ -0,0 +1,6 @@
+# frozen_string_literal: true
+
+Fabricator(:bulk_import_row) do
+  bulk_import
+  data ''
+end
diff --git a/spec/fixtures/files/empty.csv b/spec/fixtures/files/empty.csv
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/spec/fixtures/files/following_accounts.csv b/spec/fixtures/files/following_accounts.csv
new file mode 100644
index 0000000000..c7917443f8
--- /dev/null
+++ b/spec/fixtures/files/following_accounts.csv
@@ -0,0 +1,5 @@
+Account address,Show boosts,Notify on new posts,Languages
+
+user@example.com,true,false,
+
+user@test.com,true,true,"en,fr"
diff --git a/spec/fixtures/files/muted_accounts.csv b/spec/fixtures/files/muted_accounts.csv
new file mode 100644
index 0000000000..66f4315bce
--- /dev/null
+++ b/spec/fixtures/files/muted_accounts.csv
@@ -0,0 +1,5 @@
+Account address,Hide notifications
+
+user@example.com,true
+
+user@test.com,false
diff --git a/spec/lib/vacuum/imports_vacuum_spec.rb b/spec/lib/vacuum/imports_vacuum_spec.rb
new file mode 100644
index 0000000000..1e0abc5e01
--- /dev/null
+++ b/spec/lib/vacuum/imports_vacuum_spec.rb
@@ -0,0 +1,19 @@
+# frozen_string_literal: true
+
+require 'rails_helper'
+
+RSpec.describe Vacuum::ImportsVacuum do
+  subject { described_class.new }
+
+  let!(:old_unconfirmed) { Fabricate(:bulk_import, state: :unconfirmed, created_at: 2.days.ago) }
+  let!(:new_unconfirmed) { Fabricate(:bulk_import, state: :unconfirmed, created_at: 10.seconds.ago) }
+  let!(:recent_ongoing)  { Fabricate(:bulk_import, state: :in_progress, created_at: 20.minutes.ago) }
+  let!(:recent_finished) { Fabricate(:bulk_import, state: :finished, created_at: 1.day.ago) }
+  let!(:old_finished)    { Fabricate(:bulk_import, state: :finished, created_at: 2.months.ago) }
+
+  describe '#perform' do
+    it 'cleans up the expected imports' do
+      expect { subject.perform }.to change { BulkImport.all.pluck(:id) }.from([old_unconfirmed, new_unconfirmed, recent_ongoing, recent_finished, old_finished].map(&:id)).to([new_unconfirmed, recent_ongoing, recent_finished].map(&:id))
+    end
+  end
+end
diff --git a/spec/models/form/import_spec.rb b/spec/models/form/import_spec.rb
new file mode 100644
index 0000000000..e1fea4205c
--- /dev/null
+++ b/spec/models/form/import_spec.rb
@@ -0,0 +1,281 @@
+# frozen_string_literal: true
+
+require 'rails_helper'
+
+RSpec.describe Form::Import do
+  subject { described_class.new(current_account: account, type: import_type, mode: import_mode, data: data) }
+
+  let(:account)     { Fabricate(:account) }
+  let(:data)        { fixture_file_upload(import_file) }
+  let(:import_mode) { 'merge' }
+
+  describe 'validations' do
+    shared_examples 'incompatible import type' do |type, file|
+      let(:import_file) { file }
+      let(:import_type) { type }
+
+      it 'has errors' do
+        subject.validate
+        expect(subject.errors[:data]).to include(I18n.t('imports.errors.incompatible_type'))
+      end
+    end
+
+    shared_examples 'too many CSV rows' do |type, file, allowed_rows|
+      let(:import_file) { file }
+      let(:import_type) { type }
+
+      before do
+        stub_const 'Form::Import::ROWS_PROCESSING_LIMIT', allowed_rows
+      end
+
+      it 'has errors' do
+        subject.validate
+        expect(subject.errors[:data]).to include(I18n.t('imports.errors.over_rows_processing_limit', count: Form::Import::ROWS_PROCESSING_LIMIT))
+      end
+    end
+
+    shared_examples 'valid import' do |type, file|
+      let(:import_file) { file }
+      let(:import_type) { type }
+
+      it 'passes validation' do
+        expect(subject).to be_valid
+      end
+    end
+
+    context 'when the file too large' do
+      let(:import_type) { 'following' }
+      let(:import_file) { 'imports.txt' }
+
+      before do
+        stub_const 'Form::Import::FILE_SIZE_LIMIT', 5
+      end
+
+      it 'has errors' do
+        subject.validate
+        expect(subject.errors[:data]).to include(I18n.t('imports.errors.too_large'))
+      end
+    end
+
+    context 'when the CSV file is malformed CSV' do
+      let(:import_type) { 'following' }
+      let(:import_file) { 'boop.ogg' }
+
+      it 'has errors' do
+        # NOTE: not testing more specific error because we don't know the string to match
+        expect(subject).to model_have_error_on_field(:data)
+      end
+    end
+
+    context 'when importing more follows than allowed' do
+      let(:import_type) { 'following' }
+      let(:import_file) { 'imports.txt' }
+
+      before do
+        allow(FollowLimitValidator).to receive(:limit_for_account).with(account).and_return(1)
+      end
+
+      it 'has errors' do
+        subject.validate
+        expect(subject.errors[:data]).to include(I18n.t('users.follow_limit_reached', limit: 1))
+      end
+    end
+
+    it_behaves_like 'too many CSV rows', 'following', 'imports.txt', 1
+    it_behaves_like 'too many CSV rows', 'blocking', 'imports.txt', 1
+    it_behaves_like 'too many CSV rows', 'muting', 'imports.txt', 1
+    it_behaves_like 'too many CSV rows', 'domain_blocking', 'domain_blocks.csv', 2
+    it_behaves_like 'too many CSV rows', 'bookmarks', 'bookmark-imports.txt', 3
+
+    # Importing list of addresses with no headers into various types
+    it_behaves_like 'valid import', 'following', 'imports.txt'
+    it_behaves_like 'valid import', 'blocking', 'imports.txt'
+    it_behaves_like 'valid import', 'muting', 'imports.txt'
+
+    # Importing domain blocks with headers into expected type
+    it_behaves_like 'valid import', 'domain_blocking', 'domain_blocks.csv'
+
+    # Importing bookmarks list with no headers into expected type
+    it_behaves_like 'valid import', 'bookmarks', 'bookmark-imports.txt'
+
+    # Importing followed accounts with headers into various compatible types
+    it_behaves_like 'valid import', 'following', 'following_accounts.csv'
+    it_behaves_like 'valid import', 'blocking', 'following_accounts.csv'
+    it_behaves_like 'valid import', 'muting', 'following_accounts.csv'
+
+    # Importing domain blocks with headers into incompatible types
+    it_behaves_like 'incompatible import type', 'following', 'domain_blocks.csv'
+    it_behaves_like 'incompatible import type', 'blocking', 'domain_blocks.csv'
+    it_behaves_like 'incompatible import type', 'muting', 'domain_blocks.csv'
+    it_behaves_like 'incompatible import type', 'bookmarks', 'domain_blocks.csv'
+
+    # Importing followed accounts with headers into incompatible types
+    it_behaves_like 'incompatible import type', 'domain_blocking', 'following_accounts.csv'
+    it_behaves_like 'incompatible import type', 'bookmarks', 'following_accounts.csv'
+  end
+
+  describe '#guessed_type' do
+    shared_examples 'with enough information' do |type, file, original_filename, expected_guess|
+      let(:import_file) { file }
+      let(:import_type) { type }
+
+      before do
+        allow(data).to receive(:original_filename).and_return(original_filename)
+      end
+
+      it 'guesses the expected type' do
+        expect(subject.guessed_type).to eq expected_guess
+      end
+    end
+
+    context 'when the headers are enough to disambiguate' do
+      it_behaves_like 'with enough information', 'following', 'following_accounts.csv', 'import.csv', :following
+      it_behaves_like 'with enough information', 'blocking', 'following_accounts.csv', 'import.csv', :following
+      it_behaves_like 'with enough information', 'muting', 'following_accounts.csv', 'import.csv', :following
+
+      it_behaves_like 'with enough information', 'following', 'muted_accounts.csv', 'imports.csv', :muting
+      it_behaves_like 'with enough information', 'blocking', 'muted_accounts.csv', 'imports.csv', :muting
+      it_behaves_like 'with enough information', 'muting', 'muted_accounts.csv', 'imports.csv', :muting
+    end
+
+    context 'when the file name is enough to disambiguate' do
+      it_behaves_like 'with enough information', 'following', 'imports.txt', 'following_accounts.csv', :following
+      it_behaves_like 'with enough information', 'blocking', 'imports.txt', 'following_accounts.csv', :following
+      it_behaves_like 'with enough information', 'muting', 'imports.txt', 'following_accounts.csv', :following
+
+      it_behaves_like 'with enough information', 'following', 'imports.txt', 'follows.csv', :following
+      it_behaves_like 'with enough information', 'blocking', 'imports.txt', 'follows.csv', :following
+      it_behaves_like 'with enough information', 'muting', 'imports.txt', 'follows.csv', :following
+
+      it_behaves_like 'with enough information', 'following', 'imports.txt', 'blocked_accounts.csv', :blocking
+      it_behaves_like 'with enough information', 'blocking', 'imports.txt', 'blocked_accounts.csv', :blocking
+      it_behaves_like 'with enough information', 'muting', 'imports.txt', 'blocked_accounts.csv', :blocking
+
+      it_behaves_like 'with enough information', 'following', 'imports.txt', 'blocks.csv', :blocking
+      it_behaves_like 'with enough information', 'blocking', 'imports.txt', 'blocks.csv', :blocking
+      it_behaves_like 'with enough information', 'muting', 'imports.txt', 'blocks.csv', :blocking
+
+      it_behaves_like 'with enough information', 'following', 'imports.txt', 'muted_accounts.csv', :muting
+      it_behaves_like 'with enough information', 'blocking', 'imports.txt', 'muted_accounts.csv', :muting
+      it_behaves_like 'with enough information', 'muting', 'imports.txt', 'muted_accounts.csv', :muting
+
+      it_behaves_like 'with enough information', 'following', 'imports.txt', 'mutes.csv', :muting
+      it_behaves_like 'with enough information', 'blocking', 'imports.txt', 'mutes.csv', :muting
+      it_behaves_like 'with enough information', 'muting', 'imports.txt', 'mutes.csv', :muting
+    end
+  end
+
+  describe '#likely_mismatched?' do
+    shared_examples 'with matching types' do |type, file, original_filename = nil|
+      let(:import_file) { file }
+      let(:import_type) { type }
+
+      before do
+        allow(data).to receive(:original_filename).and_return(original_filename) if original_filename.present?
+      end
+
+      it 'returns false' do
+        expect(subject.likely_mismatched?).to be false
+      end
+    end
+
+    shared_examples 'with mismatching types' do |type, file, original_filename = nil|
+      let(:import_file) { file }
+      let(:import_type) { type }
+
+      before do
+        allow(data).to receive(:original_filename).and_return(original_filename) if original_filename.present?
+      end
+
+      it 'returns true' do
+        expect(subject.likely_mismatched?).to be true
+      end
+    end
+
+    it_behaves_like 'with matching types', 'following', 'following_accounts.csv'
+    it_behaves_like 'with matching types', 'following', 'following_accounts.csv', 'imports.txt'
+    it_behaves_like 'with matching types', 'following', 'imports.txt'
+    it_behaves_like 'with matching types', 'blocking', 'imports.txt', 'blocks.csv'
+    it_behaves_like 'with matching types', 'blocking', 'imports.txt'
+    it_behaves_like 'with matching types', 'muting', 'muted_accounts.csv'
+    it_behaves_like 'with matching types', 'muting', 'muted_accounts.csv', 'imports.txt'
+    it_behaves_like 'with matching types', 'muting', 'imports.txt'
+    it_behaves_like 'with matching types', 'domain_blocking', 'domain_blocks.csv'
+    it_behaves_like 'with matching types', 'domain_blocking', 'domain_blocks.csv', 'imports.txt'
+    it_behaves_like 'with matching types', 'bookmarks', 'bookmark-imports.txt'
+    it_behaves_like 'with matching types', 'bookmarks', 'bookmark-imports.txt', 'imports.txt'
+
+    it_behaves_like 'with mismatching types', 'following', 'imports.txt', 'blocks.csv'
+    it_behaves_like 'with mismatching types', 'following', 'imports.txt', 'blocked_accounts.csv'
+    it_behaves_like 'with mismatching types', 'following', 'imports.txt', 'mutes.csv'
+    it_behaves_like 'with mismatching types', 'following', 'imports.txt', 'muted_accounts.csv'
+    it_behaves_like 'with mismatching types', 'following', 'muted_accounts.csv'
+    it_behaves_like 'with mismatching types', 'following', 'muted_accounts.csv', 'imports.txt'
+    it_behaves_like 'with mismatching types', 'blocking', 'following_accounts.csv'
+    it_behaves_like 'with mismatching types', 'blocking', 'following_accounts.csv', 'imports.txt'
+    it_behaves_like 'with mismatching types', 'blocking', 'muted_accounts.csv'
+    it_behaves_like 'with mismatching types', 'blocking', 'muted_accounts.csv', 'imports.txt'
+    it_behaves_like 'with mismatching types', 'blocking', 'imports.txt', 'follows.csv'
+    it_behaves_like 'with mismatching types', 'blocking', 'imports.txt', 'following_accounts.csv'
+    it_behaves_like 'with mismatching types', 'blocking', 'imports.txt', 'mutes.csv'
+    it_behaves_like 'with mismatching types', 'blocking', 'imports.txt', 'muted_accounts.csv'
+    it_behaves_like 'with mismatching types', 'muting', 'following_accounts.csv'
+    it_behaves_like 'with mismatching types', 'muting', 'following_accounts.csv', 'imports.txt'
+    it_behaves_like 'with mismatching types', 'muting', 'imports.txt', 'follows.csv'
+    it_behaves_like 'with mismatching types', 'muting', 'imports.txt', 'following_accounts.csv'
+    it_behaves_like 'with mismatching types', 'muting', 'imports.txt', 'blocks.csv'
+    it_behaves_like 'with mismatching types', 'muting', 'imports.txt', 'blocked_accounts.csv'
+  end
+
+  describe 'save' do
+    shared_examples 'on successful import' do |type, mode, file, expected_rows|
+      let(:import_type) { type }
+      let(:import_file) { file }
+      let(:import_mode) { mode }
+
+      before do
+        subject.save
+      end
+
+      it 'creates the expected rows' do
+        expect(account.bulk_imports.first.rows.pluck(:data)).to match_array(expected_rows)
+      end
+
+      it 'creates a BulkImport with expected attributes' do
+        bulk_import = account.bulk_imports.first
+        expect(bulk_import).to_not be_nil
+        expect(bulk_import.type.to_sym).to eq subject.type.to_sym
+        expect(bulk_import.original_filename).to eq subject.data.original_filename
+        expect(bulk_import.likely_mismatched?).to eq subject.likely_mismatched?
+        expect(bulk_import.overwrite?).to eq !!subject.overwrite # rubocop:disable Style/DoubleNegation
+        expect(bulk_import.processed_items).to eq 0
+        expect(bulk_import.imported_items).to eq 0
+        expect(bulk_import.total_items).to eq bulk_import.rows.count
+        expect(bulk_import.unconfirmed?).to be true
+      end
+    end
+
+    it_behaves_like 'on successful import', 'following', 'merge', 'imports.txt', (%w(user@example.com user@test.com).map { |acct| { 'acct' => acct } })
+    it_behaves_like 'on successful import', 'following', 'overwrite', 'imports.txt', (%w(user@example.com user@test.com).map { |acct| { 'acct' => acct } })
+    it_behaves_like 'on successful import', 'blocking', 'merge', 'imports.txt', (%w(user@example.com user@test.com).map { |acct| { 'acct' => acct } })
+    it_behaves_like 'on successful import', 'blocking', 'overwrite', 'imports.txt', (%w(user@example.com user@test.com).map { |acct| { 'acct' => acct } })
+    it_behaves_like 'on successful import', 'muting', 'merge', 'imports.txt', (%w(user@example.com user@test.com).map { |acct| { 'acct' => acct } })
+    it_behaves_like 'on successful import', 'domain_blocking', 'merge', 'domain_blocks.csv', (%w(bad.domain worse.domain reject.media).map { |domain| { 'domain' => domain } })
+    it_behaves_like 'on successful import', 'bookmarks', 'merge', 'bookmark-imports.txt', (%w(https://example.com/statuses/1312 https://local.com/users/foo/statuses/42 https://unknown-remote.com/users/bar/statuses/1 https://example.com/statuses/direct).map { |uri| { 'uri' => uri } })
+
+    it_behaves_like 'on successful import', 'following', 'merge', 'following_accounts.csv', [
+      { 'acct' => 'user@example.com', 'show_reblogs' => true, 'notify' => false, 'languages' => nil },
+      { 'acct' => 'user@test.com', 'show_reblogs' => true, 'notify' => true, 'languages' => ['en', 'fr'] },
+    ]
+
+    it_behaves_like 'on successful import', 'muting', 'merge', 'muted_accounts.csv', [
+      { 'acct' => 'user@example.com', 'hide_notifications' => true },
+      { 'acct' => 'user@test.com', 'hide_notifications' => false },
+    ]
+
+    # Based on the bug report 20571 where UTF-8 encoded domains were rejecting import of their users
+    #
+    # https://github.com/mastodon/mastodon/issues/20571
+    it_behaves_like 'on successful import', 'following', 'merge', 'utf8-followers.txt', [{ 'acct' => 'nare@թութ.հայ' }]
+  end
+end
diff --git a/spec/models/import_spec.rb b/spec/models/import_spec.rb
index 1c84744138..106af4e0f7 100644
--- a/spec/models/import_spec.rb
+++ b/spec/models/import_spec.rb
@@ -22,20 +22,5 @@ RSpec.describe Import, type: :model do
       import = Import.create(account: account, type: type)
       expect(import).to model_have_error_on_field(:data)
     end
-
-    it 'is invalid with malformed data' do
-      import = Import.create(account: account, type: type, data: StringIO.new('\"test'))
-      expect(import).to model_have_error_on_field(:data)
-    end
-
-    it 'is invalid with too many rows in data' do
-      import = Import.create(account: account, type: type, data: StringIO.new("foo@bar.com\n" * (ImportService::ROWS_PROCESSING_LIMIT + 10)))
-      expect(import).to model_have_error_on_field(:data)
-    end
-
-    it 'is invalid when there are more rows when following limit' do
-      import = Import.create(account: account, type: type, data: StringIO.new("foo@bar.com\n" * (FollowLimitValidator.limit_for_account(account) + 10)))
-      expect(import).to model_have_error_on_field(:data)
-    end
   end
 end
diff --git a/spec/services/bulk_import_row_service_spec.rb b/spec/services/bulk_import_row_service_spec.rb
new file mode 100644
index 0000000000..5bbe6b0042
--- /dev/null
+++ b/spec/services/bulk_import_row_service_spec.rb
@@ -0,0 +1,95 @@
+# frozen_string_literal: true
+
+require 'rails_helper'
+
+RSpec.describe BulkImportRowService do
+  subject { described_class.new }
+
+  let(:account)    { Fabricate(:account) }
+  let(:import)     { Fabricate(:bulk_import, account: account, type: import_type) }
+  let(:import_row) { Fabricate(:bulk_import_row, bulk_import: import, data: data) }
+
+  describe '#call' do
+    context 'when importing a follow' do
+      let(:import_type)    { 'following' }
+      let(:target_account) { Fabricate(:account) }
+      let(:service_double) { instance_double(FollowService, call: nil) }
+      let(:data) do
+        { 'acct' => target_account.acct }
+      end
+
+      before do
+        allow(FollowService).to receive(:new).and_return(service_double)
+      end
+
+      it 'calls FollowService with the expected arguments and returns true' do
+        expect(subject.call(import_row)).to be true
+
+        expect(service_double).to have_received(:call).with(account, target_account, { reblogs: nil, notify: nil, languages: nil })
+      end
+    end
+
+    context 'when importing a block' do
+      let(:import_type)    { 'blocking' }
+      let(:target_account) { Fabricate(:account) }
+      let(:service_double) { instance_double(BlockService, call: nil) }
+      let(:data) do
+        { 'acct' => target_account.acct }
+      end
+
+      before do
+        allow(BlockService).to receive(:new).and_return(service_double)
+      end
+
+      it 'calls BlockService with the expected arguments and returns true' do
+        expect(subject.call(import_row)).to be true
+
+        expect(service_double).to have_received(:call).with(account, target_account)
+      end
+    end
+
+    context 'when importing a mute' do
+      let(:import_type)    { 'muting' }
+      let(:target_account) { Fabricate(:account) }
+      let(:service_double) { instance_double(MuteService, call: nil) }
+      let(:data) do
+        { 'acct' => target_account.acct }
+      end
+
+      before do
+        allow(MuteService).to receive(:new).and_return(service_double)
+      end
+
+      it 'calls MuteService with the expected arguments and returns true' do
+        expect(subject.call(import_row)).to be true
+
+        expect(service_double).to have_received(:call).with(account, target_account, { notifications: nil })
+      end
+    end
+
+    context 'when importing a bookmark' do
+      let(:import_type) { 'bookmarks' }
+      let(:data) do
+        { 'uri' => ActivityPub::TagManager.instance.uri_for(target_status) }
+      end
+
+      context 'when the status is public' do
+        let(:target_status) { Fabricate(:status) }
+
+        it 'bookmarks the status and returns true' do
+          expect(subject.call(import_row)).to be true
+          expect(account.bookmarked?(target_status)).to be true
+        end
+      end
+
+      context 'when the status is not accessible to the user' do
+        let(:target_status) { Fabricate(:status, visibility: :direct) }
+
+        it 'does not bookmark the status and returns false' do
+          expect(subject.call(import_row)).to be false
+          expect(account.bookmarked?(target_status)).to be false
+        end
+      end
+    end
+  end
+end
diff --git a/spec/services/bulk_import_service_spec.rb b/spec/services/bulk_import_service_spec.rb
new file mode 100644
index 0000000000..09dfb0a0b6
--- /dev/null
+++ b/spec/services/bulk_import_service_spec.rb
@@ -0,0 +1,417 @@
+# frozen_string_literal: true
+
+require 'rails_helper'
+
+RSpec.describe BulkImportService do
+  subject { described_class.new }
+
+  let(:account) { Fabricate(:account) }
+  let(:import) { Fabricate(:bulk_import, account: account, type: import_type, overwrite: overwrite, state: :in_progress, imported_items: 0, processed_items: 0) }
+
+  before do
+    import.update(total_items: import.rows.count)
+  end
+
+  describe '#call' do
+    around do |example|
+      Sidekiq::Testing.fake! do
+        example.run
+        Sidekiq::Worker.clear_all
+      end
+    end
+
+    context 'when importing follows' do
+      let(:import_type) { 'following' }
+      let(:overwrite)   { false }
+
+      let!(:rows) do
+        [
+          { 'acct' => 'user@foo.bar' },
+          { 'acct' => 'unknown@unknown.bar' },
+        ].map { |data| import.rows.create!(data: data) }
+      end
+
+      before do
+        account.follow!(Fabricate(:account))
+      end
+
+      it 'does not immediately change who the account follows' do
+        expect { subject.call(import) }.to_not(change { account.reload.active_relationships.to_a })
+      end
+
+      it 'enqueues workers for the expected rows' do
+        subject.call(import)
+        expect(Import::RowWorker.jobs.pluck('args').flatten).to match_array(rows.map(&:id))
+      end
+
+      it 'requests to follow all the listed users once the workers have run' do
+        subject.call(import)
+
+        resolve_account_service_double = double
+        allow(ResolveAccountService).to receive(:new).and_return(resolve_account_service_double)
+        allow(resolve_account_service_double).to receive(:call).with('user@foo.bar', any_args) { Fabricate(:account, username: 'user', domain: 'foo.bar', protocol: :activitypub) }
+        allow(resolve_account_service_double).to receive(:call).with('unknown@unknown.bar', any_args) { Fabricate(:account, username: 'unknown', domain: 'unknown.bar', protocol: :activitypub) }
+
+        Import::RowWorker.drain
+
+        expect(FollowRequest.includes(:target_account).where(account: account).map(&:target_account).map(&:acct)).to contain_exactly('user@foo.bar', 'unknown@unknown.bar')
+      end
+    end
+
+    context 'when importing follows with overwrite' do
+      let(:import_type) { 'following' }
+      let(:overwrite)   { true }
+
+      let!(:followed)         { Fabricate(:account, username: 'followed', domain: 'foo.bar', protocol: :activitypub) }
+      let!(:to_be_unfollowed) { Fabricate(:account, username: 'to_be_unfollowed', domain: 'foo.bar', protocol: :activitypub) }
+
+      let!(:rows) do
+        [
+          { 'acct' => 'followed@foo.bar', 'show_reblogs' => false, 'notify' => true, 'languages' => ['en'] },
+          { 'acct' => 'user@foo.bar' },
+          { 'acct' => 'unknown@unknown.bar' },
+        ].map { |data| import.rows.create!(data: data) }
+      end
+
+      before do
+        account.follow!(followed, reblogs: true, notify: false)
+        account.follow!(to_be_unfollowed)
+      end
+
+      it 'unfollows user not present on list' do
+        subject.call(import)
+        expect(account.following?(to_be_unfollowed)).to be false
+      end
+
+      it 'updates the existing follow relationship as expected' do
+        expect { subject.call(import) }.to change { Follow.where(account: account, target_account: followed).pick(:show_reblogs, :notify, :languages) }.from([true, false, nil]).to([false, true, ['en']])
+      end
+
+      it 'enqueues workers for the expected rows' do
+        subject.call(import)
+        expect(Import::RowWorker.jobs.pluck('args').flatten).to match_array(rows[1..].map(&:id))
+      end
+
+      it 'requests to follow all the expected users once the workers have run' do
+        subject.call(import)
+
+        resolve_account_service_double = double
+        allow(ResolveAccountService).to receive(:new).and_return(resolve_account_service_double)
+        allow(resolve_account_service_double).to receive(:call).with('user@foo.bar', any_args) { Fabricate(:account, username: 'user', domain: 'foo.bar', protocol: :activitypub) }
+        allow(resolve_account_service_double).to receive(:call).with('unknown@unknown.bar', any_args) { Fabricate(:account, username: 'unknown', domain: 'unknown.bar', protocol: :activitypub) }
+
+        Import::RowWorker.drain
+
+        expect(FollowRequest.includes(:target_account).where(account: account).map(&:target_account).map(&:acct)).to contain_exactly('user@foo.bar', 'unknown@unknown.bar')
+      end
+    end
+
+    context 'when importing blocks' do
+      let(:import_type) { 'blocking' }
+      let(:overwrite)   { false }
+
+      let!(:rows) do
+        [
+          { 'acct' => 'user@foo.bar' },
+          { 'acct' => 'unknown@unknown.bar' },
+        ].map { |data| import.rows.create!(data: data) }
+      end
+
+      before do
+        account.block!(Fabricate(:account, username: 'already_blocked', domain: 'remote.org'))
+      end
+
+      it 'does not immediately change who the account blocks' do
+        expect { subject.call(import) }.to_not(change { account.reload.blocking.to_a })
+      end
+
+      it 'enqueues workers for the expected rows' do
+        subject.call(import)
+        expect(Import::RowWorker.jobs.pluck('args').flatten).to match_array(rows.map(&:id))
+      end
+
+      it 'blocks all the listed users once the workers have run' do
+        subject.call(import)
+
+        resolve_account_service_double = double
+        allow(ResolveAccountService).to receive(:new).and_return(resolve_account_service_double)
+        allow(resolve_account_service_double).to receive(:call).with('user@foo.bar', any_args) { Fabricate(:account, username: 'user', domain: 'foo.bar', protocol: :activitypub) }
+        allow(resolve_account_service_double).to receive(:call).with('unknown@unknown.bar', any_args) { Fabricate(:account, username: 'unknown', domain: 'unknown.bar', protocol: :activitypub) }
+
+        Import::RowWorker.drain
+
+        expect(account.blocking.map(&:acct)).to contain_exactly('already_blocked@remote.org', 'user@foo.bar', 'unknown@unknown.bar')
+      end
+    end
+
+    context 'when importing blocks with overwrite' do
+      let(:import_type) { 'blocking' }
+      let(:overwrite)   { true }
+
+      let!(:blocked)         { Fabricate(:account, username: 'blocked', domain: 'foo.bar', protocol: :activitypub) }
+      let!(:to_be_unblocked) { Fabricate(:account, username: 'to_be_unblocked', domain: 'foo.bar', protocol: :activitypub) }
+
+      let!(:rows) do
+        [
+          { 'acct' => 'blocked@foo.bar' },
+          { 'acct' => 'user@foo.bar' },
+          { 'acct' => 'unknown@unknown.bar' },
+        ].map { |data| import.rows.create!(data: data) }
+      end
+
+      before do
+        account.block!(blocked)
+        account.block!(to_be_unblocked)
+      end
+
+      it 'unblocks user not present on list' do
+        subject.call(import)
+        expect(account.blocking?(to_be_unblocked)).to be false
+      end
+
+      it 'enqueues workers for the expected rows' do
+        subject.call(import)
+        expect(Import::RowWorker.jobs.pluck('args').flatten).to match_array(rows[1..].map(&:id))
+      end
+
+      it 'requests to follow all the expected users once the workers have run' do
+        subject.call(import)
+
+        resolve_account_service_double = double
+        allow(ResolveAccountService).to receive(:new).and_return(resolve_account_service_double)
+        allow(resolve_account_service_double).to receive(:call).with('user@foo.bar', any_args) { Fabricate(:account, username: 'user', domain: 'foo.bar', protocol: :activitypub) }
+        allow(resolve_account_service_double).to receive(:call).with('unknown@unknown.bar', any_args) { Fabricate(:account, username: 'unknown', domain: 'unknown.bar', protocol: :activitypub) }
+
+        Import::RowWorker.drain
+
+        expect(account.blocking.map(&:acct)).to contain_exactly('blocked@foo.bar', 'user@foo.bar', 'unknown@unknown.bar')
+      end
+    end
+
+    context 'when importing mutes' do
+      let(:import_type) { 'muting' }
+      let(:overwrite)   { false }
+
+      let!(:rows) do
+        [
+          { 'acct' => 'user@foo.bar' },
+          { 'acct' => 'unknown@unknown.bar' },
+        ].map { |data| import.rows.create!(data: data) }
+      end
+
+      before do
+        account.mute!(Fabricate(:account, username: 'already_muted', domain: 'remote.org'))
+      end
+
+      it 'does not immediately change who the account blocks' do
+        expect { subject.call(import) }.to_not(change { account.reload.muting.to_a })
+      end
+
+      it 'enqueues workers for the expected rows' do
+        subject.call(import)
+        expect(Import::RowWorker.jobs.pluck('args').flatten).to match_array(rows.map(&:id))
+      end
+
+      it 'mutes all the listed users once the workers have run' do
+        subject.call(import)
+
+        resolve_account_service_double = double
+        allow(ResolveAccountService).to receive(:new).and_return(resolve_account_service_double)
+        allow(resolve_account_service_double).to receive(:call).with('user@foo.bar', any_args) { Fabricate(:account, username: 'user', domain: 'foo.bar', protocol: :activitypub) }
+        allow(resolve_account_service_double).to receive(:call).with('unknown@unknown.bar', any_args) { Fabricate(:account, username: 'unknown', domain: 'unknown.bar', protocol: :activitypub) }
+
+        Import::RowWorker.drain
+
+        expect(account.muting.map(&:acct)).to contain_exactly('already_muted@remote.org', 'user@foo.bar', 'unknown@unknown.bar')
+      end
+    end
+
+    context 'when importing mutes with overwrite' do
+      let(:import_type) { 'muting' }
+      let(:overwrite)   { true }
+
+      let!(:muted)         { Fabricate(:account, username: 'muted', domain: 'foo.bar', protocol: :activitypub) }
+      let!(:to_be_unmuted) { Fabricate(:account, username: 'to_be_unmuted', domain: 'foo.bar', protocol: :activitypub) }
+
+      let!(:rows) do
+        [
+          { 'acct' => 'muted@foo.bar', 'hide_notifications' => true },
+          { 'acct' => 'user@foo.bar' },
+          { 'acct' => 'unknown@unknown.bar' },
+        ].map { |data| import.rows.create!(data: data) }
+      end
+
+      before do
+        account.mute!(muted, notifications: false)
+        account.mute!(to_be_unmuted)
+      end
+
+      it 'updates the existing mute as expected' do
+        expect { subject.call(import) }.to change { Mute.where(account: account, target_account: muted).pick(:hide_notifications) }.from(false).to(true)
+      end
+
+      it 'unblocks user not present on list' do
+        subject.call(import)
+        expect(account.muting?(to_be_unmuted)).to be false
+      end
+
+      it 'enqueues workers for the expected rows' do
+        subject.call(import)
+        expect(Import::RowWorker.jobs.pluck('args').flatten).to match_array(rows[1..].map(&:id))
+      end
+
+      it 'requests to follow all the expected users once the workers have run' do
+        subject.call(import)
+
+        resolve_account_service_double = double
+        allow(ResolveAccountService).to receive(:new).and_return(resolve_account_service_double)
+        allow(resolve_account_service_double).to receive(:call).with('user@foo.bar', any_args) { Fabricate(:account, username: 'user', domain: 'foo.bar', protocol: :activitypub) }
+        allow(resolve_account_service_double).to receive(:call).with('unknown@unknown.bar', any_args) { Fabricate(:account, username: 'unknown', domain: 'unknown.bar', protocol: :activitypub) }
+
+        Import::RowWorker.drain
+
+        expect(account.muting.map(&:acct)).to contain_exactly('muted@foo.bar', 'user@foo.bar', 'unknown@unknown.bar')
+      end
+    end
+
+    context 'when importing domain blocks' do
+      let(:import_type) { 'domain_blocking' }
+      let(:overwrite)   { false }
+
+      let!(:rows) do
+        [
+          { 'domain' => 'blocked.com' },
+          { 'domain' => 'to_block.com' },
+        ].map { |data| import.rows.create!(data: data) }
+      end
+
+      before do
+        account.block_domain!('alreadyblocked.com')
+        account.block_domain!('blocked.com')
+      end
+
+      it 'blocks all the new domains' do
+        subject.call(import)
+        expect(account.domain_blocks.pluck(:domain)).to contain_exactly('alreadyblocked.com', 'blocked.com', 'to_block.com')
+      end
+
+      it 'marks the import as finished' do
+        subject.call(import)
+        expect(import.reload.finished?).to be true
+      end
+    end
+
+    context 'when importing domain blocks with overwrite' do
+      let(:import_type) { 'domain_blocking' }
+      let(:overwrite)   { true }
+
+      let!(:rows) do
+        [
+          { 'domain' => 'blocked.com' },
+          { 'domain' => 'to_block.com' },
+        ].map { |data| import.rows.create!(data: data) }
+      end
+
+      before do
+        account.block_domain!('alreadyblocked.com')
+        account.block_domain!('blocked.com')
+      end
+
+      it 'blocks all the new domains' do
+        subject.call(import)
+        expect(account.domain_blocks.pluck(:domain)).to contain_exactly('blocked.com', 'to_block.com')
+      end
+
+      it 'marks the import as finished' do
+        subject.call(import)
+        expect(import.reload.finished?).to be true
+      end
+    end
+
+    context 'when importing bookmarks' do
+      let(:import_type) { 'bookmarks' }
+      let(:overwrite)   { false }
+
+      let!(:already_bookmarked)  { Fabricate(:status, uri: 'https://already.bookmarked/1') }
+      let!(:status)              { Fabricate(:status, uri: 'https://foo.bar/posts/1') }
+      let!(:inaccessible_status) { Fabricate(:status, uri: 'https://foo.bar/posts/inaccessible', visibility: :direct) }
+      let!(:bookmarked)          { Fabricate(:status, uri: 'https://foo.bar/posts/already-bookmarked') }
+
+      let!(:rows) do
+        [
+          { 'uri' => status.uri },
+          { 'uri' => inaccessible_status.uri },
+          { 'uri' => bookmarked.uri },
+          { 'uri' => 'https://domain.unknown/foo' },
+          { 'uri' => 'https://domain.unknown/private' },
+        ].map { |data| import.rows.create!(data: data) }
+      end
+
+      before do
+        account.bookmarks.create!(status: already_bookmarked)
+        account.bookmarks.create!(status: bookmarked)
+      end
+
+      it 'enqueues workers for the expected rows' do
+        subject.call(import)
+        expect(Import::RowWorker.jobs.pluck('args').flatten).to match_array(rows.map(&:id))
+      end
+
+      it 'updates the bookmarks as expected once the workers have run' do
+        subject.call(import)
+
+        service_double = double
+        allow(ActivityPub::FetchRemoteStatusService).to receive(:new).and_return(service_double)
+        allow(service_double).to receive(:call).with('https://domain.unknown/foo') { Fabricate(:status, uri: 'https://domain.unknown/foo') }
+        allow(service_double).to receive(:call).with('https://domain.unknown/private') { Fabricate(:status, uri: 'https://domain.unknown/private', visibility: :direct) }
+
+        Import::RowWorker.drain
+
+        expect(account.bookmarks.map(&:status).map(&:uri)).to contain_exactly(already_bookmarked.uri, status.uri, bookmarked.uri, 'https://domain.unknown/foo')
+      end
+    end
+
+    context 'when importing bookmarks with overwrite' do
+      let(:import_type) { 'bookmarks' }
+      let(:overwrite)   { true }
+
+      let!(:already_bookmarked)  { Fabricate(:status, uri: 'https://already.bookmarked/1') }
+      let!(:status)              { Fabricate(:status, uri: 'https://foo.bar/posts/1') }
+      let!(:inaccessible_status) { Fabricate(:status, uri: 'https://foo.bar/posts/inaccessible', visibility: :direct) }
+      let!(:bookmarked)          { Fabricate(:status, uri: 'https://foo.bar/posts/already-bookmarked') }
+
+      let!(:rows) do
+        [
+          { 'uri' => status.uri },
+          { 'uri' => inaccessible_status.uri },
+          { 'uri' => bookmarked.uri },
+          { 'uri' => 'https://domain.unknown/foo' },
+          { 'uri' => 'https://domain.unknown/private' },
+        ].map { |data| import.rows.create!(data: data) }
+      end
+
+      before do
+        account.bookmarks.create!(status: already_bookmarked)
+        account.bookmarks.create!(status: bookmarked)
+      end
+
+      it 'enqueues workers for the expected rows' do
+        subject.call(import)
+        expect(Import::RowWorker.jobs.pluck('args').flatten).to match_array(rows.map(&:id))
+      end
+
+      it 'updates the bookmarks as expected once the workers have run' do
+        subject.call(import)
+
+        service_double = double
+        allow(ActivityPub::FetchRemoteStatusService).to receive(:new).and_return(service_double)
+        allow(service_double).to receive(:call).with('https://domain.unknown/foo') { Fabricate(:status, uri: 'https://domain.unknown/foo') }
+        allow(service_double).to receive(:call).with('https://domain.unknown/private') { Fabricate(:status, uri: 'https://domain.unknown/private', visibility: :direct) }
+
+        Import::RowWorker.drain
+
+        expect(account.bookmarks.map(&:status).map(&:uri)).to contain_exactly(status.uri, bookmarked.uri, 'https://domain.unknown/foo')
+      end
+    end
+  end
+end
diff --git a/spec/workers/bulk_import_worker_spec.rb b/spec/workers/bulk_import_worker_spec.rb
new file mode 100644
index 0000000000..91f51fbb42
--- /dev/null
+++ b/spec/workers/bulk_import_worker_spec.rb
@@ -0,0 +1,26 @@
+# frozen_string_literal: true
+
+require 'rails_helper'
+
+describe BulkImportWorker do
+  subject { described_class.new }
+
+  let(:import) { Fabricate(:bulk_import, state: :scheduled) }
+
+  describe '#perform' do
+    let(:service_double) { instance_double(BulkImportService, call: nil) }
+
+    before do
+      allow(BulkImportService).to receive(:new).and_return(service_double)
+    end
+
+    it 'changes the import\'s state as appropriate' do
+      expect { subject.perform(import.id) }.to change { import.reload.state.to_sym }.from(:scheduled).to(:in_progress)
+    end
+
+    it 'calls BulkImportService' do
+      subject.perform(import.id)
+      expect(service_double).to have_received(:call).with(import)
+    end
+  end
+end
diff --git a/spec/workers/import/row_worker_spec.rb b/spec/workers/import/row_worker_spec.rb
new file mode 100644
index 0000000000..0a71a838fc
--- /dev/null
+++ b/spec/workers/import/row_worker_spec.rb
@@ -0,0 +1,127 @@
+# frozen_string_literal: true
+
+require 'rails_helper'
+
+describe Import::RowWorker do
+  subject { described_class.new }
+
+  let(:row) { Fabricate(:bulk_import_row, bulk_import: import) }
+
+  describe '#perform' do
+    before do
+      allow(BulkImportRowService).to receive(:new).and_return(service_double)
+    end
+
+    shared_examples 'clean failure' do
+      let(:service_double) { instance_double(BulkImportRowService, call: false) }
+
+      it 'calls BulkImportRowService' do
+        subject.perform(row.id)
+        expect(service_double).to have_received(:call).with(row)
+      end
+
+      it 'increases the number of processed items' do
+        expect { subject.perform(row.id) }.to(change { import.reload.processed_items }.by(+1))
+      end
+
+      it 'does not increase the number of imported items' do
+        expect { subject.perform(row.id) }.to_not(change { import.reload.imported_items })
+      end
+
+      it 'does not delete the row' do
+        subject.perform(row.id)
+        expect(BulkImportRow.exists?(row.id)).to be true
+      end
+    end
+
+    shared_examples 'unclean failure' do
+      let(:service_double) { instance_double(BulkImportRowService) }
+
+      before do
+        allow(service_double).to receive(:call) do
+          raise 'dummy error'
+        end
+      end
+
+      it 'raises an error and does not change processed items count' do
+        expect { subject.perform(row.id) }.to raise_error(StandardError, 'dummy error').and(not_change { import.reload.processed_items })
+      end
+
+      it 'does not delete the row' do
+        expect { subject.perform(row.id) }.to raise_error(StandardError, 'dummy error').and(not_change { BulkImportRow.exists?(row.id) })
+      end
+    end
+
+    shared_examples 'clean success' do
+      let(:service_double) { instance_double(BulkImportRowService, call: true) }
+
+      it 'calls BulkImportRowService' do
+        subject.perform(row.id)
+        expect(service_double).to have_received(:call).with(row)
+      end
+
+      it 'increases the number of processed items' do
+        expect { subject.perform(row.id) }.to(change { import.reload.processed_items }.by(+1))
+      end
+
+      it 'increases the number of imported items' do
+        expect { subject.perform(row.id) }.to(change { import.reload.imported_items }.by(+1))
+      end
+
+      it 'deletes the row' do
+        expect { subject.perform(row.id) }.to change { BulkImportRow.exists?(row.id) }.from(true).to(false)
+      end
+    end
+
+    context 'when there are multiple rows to process' do
+      let(:import) { Fabricate(:bulk_import, total_items: 2, processed_items: 0, imported_items: 0, state: :in_progress) }
+
+      context 'with a clean failure' do
+        include_examples 'clean failure'
+
+        it 'does not mark the import as finished' do
+          expect { subject.perform(row.id) }.to_not(change { import.reload.state.to_sym })
+        end
+      end
+
+      context 'with an unclean failure' do
+        include_examples 'unclean failure'
+
+        it 'does not mark the import as finished' do
+          expect { subject.perform(row.id) }.to raise_error(StandardError).and(not_change { import.reload.state.to_sym })
+        end
+      end
+
+      context 'with a clean success' do
+        include_examples 'clean success'
+
+        it 'does not mark the import as finished' do
+          expect { subject.perform(row.id) }.to_not(change { import.reload.state.to_sym })
+        end
+      end
+    end
+
+    context 'when this is the last row to process' do
+      let(:import) { Fabricate(:bulk_import, total_items: 2, processed_items: 1, imported_items: 0, state: :in_progress) }
+
+      context 'with a clean failure' do
+        include_examples 'clean failure'
+
+        it 'marks the import as finished' do
+          expect { subject.perform(row.id) }.to change { import.reload.state.to_sym }.from(:in_progress).to(:finished)
+        end
+      end
+
+      # NOTE: sidekiq retry logic may be a bit too difficult to test, so leaving this blind spot for now
+      it_behaves_like 'unclean failure'
+
+      context 'with a clean success' do
+        include_examples 'clean success'
+
+        it 'marks the import as finished' do
+          expect { subject.perform(row.id) }.to change { import.reload.state.to_sym }.from(:in_progress).to(:finished)
+        end
+      end
+    end
+  end
+end