@api private
@return [Client]
@option options [Client] :client
# File lib/aws-sdk-resources/services/s3/multipart_file_uploader.rb, line 18 def initialize(options = {}) @client = options[:client] || Client.new end
@param [String,Pathname,File,Tempfile] source @option options [required,String] :bucket @option options [required,String] :key @return [void]
# File lib/aws-sdk-resources/services/s3/multipart_file_uploader.rb, line 29 def upload(source, options = {}) if File.size(source) < MIN_PART_SIZE raise ArgumentError, FILE_TOO_SMALL else upload_id = initiate_upload(options) parts = upload_parts(upload_id, source, options) complete_upload(upload_id, parts, options) end end
# File lib/aws-sdk-resources/services/s3/multipart_file_uploader.rb, line 64 def abort_upload(upload_id, options, errors) @client.abort_multipart_upload( bucket: options[:bucket], key: options[:key], upload_id: upload_id ) msg = "multipart upload failed: #{errors.map(&:message).join("; ")}" raise MultipartUploadError.new(msg, errors) rescue MultipartUploadError => error raise error rescue => error msg = "failed to abort multipart upload: #{error.message}" raise MultipartUploadError.new(msg, errors + [error]) end
# File lib/aws-sdk-resources/services/s3/multipart_file_uploader.rb, line 45 def complete_upload(upload_id, parts, options) @client.complete_multipart_upload( bucket: options[:bucket], key: options[:key], upload_id: upload_id, multipart_upload: { parts: parts }) end
# File lib/aws-sdk-resources/services/s3/multipart_file_uploader.rb, line 126 def compute_default_part_size(source_size) [(source_size.to_f / MAX_PARTS).ceil, MIN_PART_SIZE].max.to_i end
# File lib/aws-sdk-resources/services/s3/multipart_file_uploader.rb, line 79 def compute_parts(upload_id, source, options) size = File.size(source) default_part_size = compute_default_part_size(size) offset = 0 part_number = 1 parts = [] while offset < size parts << { bucket: options[:bucket], key: options[:key], upload_id: upload_id, part_number: part_number, body: FilePart.new( source: source, offset: offset, size: part_size(size, default_part_size, offset) ) } part_number += 1 offset += default_part_size end parts end
# File lib/aws-sdk-resources/services/s3/multipart_file_uploader.rb, line 41 def initiate_upload(options) @client.create_multipart_upload(options).upload_id end
# File lib/aws-sdk-resources/services/s3/multipart_file_uploader.rb, line 130 def part_size(total_size, part_size, offset) if offset + part_size > total_size total_size - offset else part_size end end
# File lib/aws-sdk-resources/services/s3/multipart_file_uploader.rb, line 103 def upload_in_threads(pending, completed) threads = [] THREAD_COUNT.times do thread = Thread.new do begin while part = pending.shift resp = @client.upload_part(part) part[:body].close completed.push(etag: resp.etag, part_number: part[:part_number]) end nil rescue => error # keep other threads from uploading other parts pending.clear! error end end thread.abort_on_exception = true threads << thread end threads.map(&:value).compact end
# File lib/aws-sdk-resources/services/s3/multipart_file_uploader.rb, line 53 def upload_parts(upload_id, source, options) pending = PartList.new(compute_parts(upload_id, source, options)) completed = PartList.new errors = upload_in_threads(pending, completed) if errors.empty? completed.to_a.sort_by { |part| part[:part_number] } else abort_upload(upload_id, options, errors) end end