class Fluent::BufferedOutput

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Output.new
# File lib/fluent/output.rb, line 180
def initialize
  super
  @next_flush_time = 0
  @last_retry_time = 0
  @next_retry_time = 0
  @num_errors = 0
  @num_errors_lock = Mutex.new
  @secondary_limit = 8
  @emit_count = 0
end

Public Instance Methods

before_shutdown() click to toggle source
# File lib/fluent/output.rb, line 421
def before_shutdown
  begin
    @buffer.before_shutdown(self)
  rescue
    $log.warn "before_shutdown failed", error: $!.to_s
    $log.warn_backtrace
  end
end
calc_retry_wait() click to toggle source
# File lib/fluent/output.rb, line 430
def calc_retry_wait
  # TODO retry pattern
  wait = if @disable_retry_limit || @num_errors <= @retry_limit
           @retry_wait * (2 ** (@num_errors - 1))
         else
           # secondary retry
           @retry_wait * (2 ** (@num_errors - 2 - @retry_limit))
         end
  retry_wait = wait.finite? ? wait + (rand * (wait / 4.0) - (wait / 8.0)) : wait
  @max_retry_wait ? [retry_wait, @max_retry_wait].min : retry_wait
end
configure(conf) click to toggle source
Calls superclass method Fluent::Output#configure
# File lib/fluent/output.rb, line 211
def configure(conf)
  super

  @retry_wait = @retry_wait.to_f # converted to Float for calc_retry_wait
  @buffer = Plugin.new_buffer(@buffer_type)
  @buffer.configure(conf)

  if @buffer.respond_to?(:enable_parallel)
    if @num_threads == 1
      @buffer.enable_parallel(false)
    else
      @buffer.enable_parallel(true)
    end
  end

  @writers = (1..@num_threads).map {
    writer = OutputThread.new(self)
    writer.configure(conf)
    writer
  }

  if sconf = conf.elements.select {|e| e.name == 'secondary' }.first
    type = sconf['@type'] || conf['@type'] || sconf['type'] || conf['type']
    @secondary = Plugin.new_output(type)
    @secondary.router = router
    @secondary.configure(sconf)

    if secondary_limit = conf['secondary_limit']
      @secondary_limit = secondary_limit.to_i
      if @secondary_limit < 0
        raise ConfigError, "invalid parameter 'secondary_limit #{secondary_limit}'"
      end
    end

    @secondary.secondary_init(self)
  end

  Status.register(self, "queue_size") { @buffer.queue_size }
  Status.register(self, "emit_count") { @emit_count }
end
emit(tag, es, chain, key="") click to toggle source
# File lib/fluent/output.rb, line 267
def emit(tag, es, chain, key="")
  @emit_count += 1
  data = format_stream(tag, es)
  if @buffer.emit(key, data, chain)
    submit_flush
  end
end
enqueue_buffer(force = false) click to toggle source

def write(chunk) end

# File lib/fluent/output.rb, line 295
def enqueue_buffer(force = false)
  @buffer.keys.each {|key|
    @buffer.push(key)
  }
end
flush_secondary(secondary) click to toggle source
# File lib/fluent/output.rb, line 452
def flush_secondary(secondary)
  @buffer.pop(secondary)
end
force_flush() click to toggle source
# File lib/fluent/output.rb, line 413
def force_flush
  @num_errors_lock.synchronize do
    @next_retry_time = Time.now.to_f - 1
  end
  enqueue_buffer(true)
  submit_flush
end
format_stream(tag, es) click to toggle source
# File lib/fluent/output.rb, line 281
def format_stream(tag, es)
  out = ''
  es.each {|time,record|
    out << format(tag, time, record)
  }
  out
end
shutdown() click to toggle source
# File lib/fluent/output.rb, line 261
def shutdown
  @writers.each {|writer| writer.shutdown }
  @secondary.shutdown if @secondary
  @buffer.shutdown
end
start() click to toggle source
# File lib/fluent/output.rb, line 252
def start
  @next_flush_time = Time.now.to_f + @flush_interval
  @buffer.start
  @secondary.start if @secondary
  @writers.each {|writer| writer.start }
  @writer_current_position = 0
  @writers_size = @writers.size
end
submit_flush() click to toggle source
# File lib/fluent/output.rb, line 275
def submit_flush
  # Without locks: it is rough but enough to select "next" writer selection
  @writer_current_position = (@writer_current_position + 1) % @writers_size
  @writers[@writer_current_position].submit_flush
end
try_flush() click to toggle source
# File lib/fluent/output.rb, line 301
def try_flush
  time = Time.now.to_f

  empty = @buffer.queue_size == 0
  if empty && @next_flush_time < (now = Time.now.to_f)
    @buffer.synchronize do
      if @next_flush_time < now
        enqueue_buffer
        @next_flush_time = now + @flush_interval
        empty = @buffer.queue_size == 0
      end
    end
  end
  if empty
    return time + @try_flush_interval
  end

  begin
    retrying = !@num_errors.zero?

    if retrying
      @num_errors_lock.synchronize do
        if retrying = !@num_errors.zero? # re-check in synchronize
          if @next_retry_time >= time
            # allow retrying for only one thread
            return time + @try_flush_interval
          end
          # assume next retry failes and
          # clear them if when it succeeds
          @last_retry_time = time
          @num_errors += 1
          @next_retry_time += calc_retry_wait
        end
      end
    end

    chunk_write_start = Time.now

    if @secondary && !@disable_retry_limit && @num_errors > @retry_limit
      has_next = flush_secondary(@secondary)
    else
      has_next = @buffer.pop(self)
    end

    elapsed_time = Time.now - chunk_write_start
    if elapsed_time > @slow_flush_log_threshold
      $log.warn "buffer flush took longer time than slow_flush_log_threshold:",
                plugin_id: plugin_id, elapsed_time: elapsed_time, slow_flush_log_threshold: @slow_flush_log_threshold
    end

    # success
    if retrying
      @num_errors = 0
      # Note: don't notify to other threads to prevent
      #       burst to recovered server
      $log.warn "retry succeeded.", plugin_id: plugin_id
    end

    if has_next
      return Time.now.to_f + @queued_chunk_flush_interval
    else
      return time + @try_flush_interval
    end

  rescue => e
    if retrying
      error_count = @num_errors
    else
      # first error
      error_count = 0
      @num_errors_lock.synchronize do
        if @num_errors.zero?
          @last_retry_time = time
          @num_errors += 1
          @next_retry_time = time + calc_retry_wait
        end
      end
    end

    if @disable_retry_limit || error_count < @retry_limit
      $log.warn "temporarily failed to flush the buffer.", next_retry: Time.at(@next_retry_time), error_class: e.class.to_s, error: e.to_s, plugin_id: plugin_id
      $log.warn_backtrace e.backtrace

    elsif @secondary
      if error_count == @retry_limit
        $log.warn "failed to flush the buffer.", error_class: e.class.to_s, error: e.to_s, plugin_id: plugin_id
        $log.warn "retry count exceededs limit. falling back to secondary output."
        $log.warn_backtrace e.backtrace
        retry  # retry immediately
      elsif error_count <= @retry_limit + @secondary_limit
        $log.warn "failed to flush the buffer, next retry will be with secondary output.", next_retry: Time.at(@next_retry_time), error_class: e.class.to_s, error: e.to_s, plugin_id: plugin_id
        $log.warn_backtrace e.backtrace
      else
        $log.warn "failed to flush the buffer.", error_class: e.class, error: e.to_s, plugin_id: plugin_id
        $log.warn "secondary retry count exceededs limit."
        $log.warn_backtrace e.backtrace
        write_abort
        @num_errors = 0
      end

    else
      $log.warn "failed to flush the buffer.", error_class: e.class.to_s, error: e.to_s, plugin_id: plugin_id
      $log.warn "retry count exceededs limit."
      $log.warn_backtrace e.backtrace
      write_abort
      @num_errors = 0
    end

    return @next_retry_time
  end
end
write_abort() click to toggle source
# File lib/fluent/output.rb, line 442
def write_abort
  $log.error "throwing away old logs."
  begin
    @buffer.clear!
  rescue
    $log.error "unexpected error while aborting", error: $!.to_s
    $log.error_backtrace
  end
end