def run(num_waits = 10, &block)
result = nil
super(num_waits) {
block.call if block
es = ArrayEventStream.new(@entries)
buffer = @instance.format_stream(@tag, es)
if @expected_buffer
assert_equal(@expected_buffer, buffer)
end
case
when @instance.is_a?(Fluent::ObjectBufferedOutput)
key = @tag
when @instance.respond_to?(:time_slicer)
time, _record = @entries.first
key = @instance.time_slicer.call(time)
else
key = ''
end
chunk = @instance.buffer.new_chunk(key)
chunk << buffer
begin
result = @instance.write(chunk)
ensure
chunk.purge
end
}
result
end