class Fluent::Agent

Agent is a resource unit who manages emittable plugins

Next step: `fluentd/root_agent.rb` Next step: `fluentd/label.rb`

Attributes

context[R]
error_collector[R]
event_router[R]
filters[R]
log[R]
outputs[R]

Public Class Methods

new(opts = {}) click to toggle source
Calls superclass method Fluent::Configurable.new
# File lib/fluent/agent.rb, line 32
def initialize(opts = {})
  super()

  @context = nil
  @outputs = []
  @filters = []
  @started_outputs = []
  @started_filters = []

  @log = Engine.log
  @event_router = EventRouter.new(NoMatchMatch.new(log), self)
  @error_collector = nil
end

Public Instance Methods

add_filter(type, pattern, conf) click to toggle source
# File lib/fluent/agent.rb, line 140
def add_filter(type, pattern, conf)
  log.info "adding filter#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type

  filter = Plugin.new_filter(type)
  filter.router = @event_router
  filter.configure(conf)
  @filters << filter
  @event_router.add_rule(pattern, filter)

  filter
end
add_match(type, pattern, conf) click to toggle source
# File lib/fluent/agent.rb, line 128
def add_match(type, pattern, conf)
  log.info "adding match#{@context.nil? ? '' : " in #{@context}"}", pattern: pattern, type: type

  output = Plugin.new_output(type)
  output.router = @event_router
  output.configure(conf)
  @outputs << output
  @event_router.add_rule(pattern, output)

  output
end
configure(conf) click to toggle source
Calls superclass method Fluent::Configurable#configure
# File lib/fluent/agent.rb, line 53
def configure(conf)
  super

  # initialize <match> and <filter> elements
  conf.elements.select { |e| e.name == 'filter' || e.name == 'match' }.each { |e|
    pattern = e.arg.empty? ? '**' : e.arg
    type = e['@type'] || e['type']
    raise ConfigError, "Missing '@type' parameter on <#{e.name}> directive" unless type
    if e.name == 'filter'
      add_filter(type, pattern, e)
    else
      add_match(type, pattern, e)
    end
  }
end
emit_error_event(tag, time, record, error) click to toggle source

For handling invalid record

# File lib/fluent/agent.rb, line 153
def emit_error_event(tag, time, record, error)
end
flush!() click to toggle source
# File lib/fluent/agent.rb, line 109
def flush!
  flush_recursive(@outputs)
end
flush_recursive(array) click to toggle source
# File lib/fluent/agent.rb, line 113
def flush_recursive(array)
  array.each { |o|
    begin
      if o.is_a?(BufferedOutput)
        o.force_flush
      elsif o.is_a?(MultiOutput)
        flush_recursive(o.outputs)
      end
    rescue => e
      log.debug "error while force flushing", error_class: e.class, error: e
      log.debug_backtrace
    end
  }
end
handle_emits_error(tag, es, error) click to toggle source
# File lib/fluent/agent.rb, line 156
def handle_emits_error(tag, es, error)
end
shutdown() click to toggle source
# File lib/fluent/agent.rb, line 81
def shutdown
  @started_filters.map { |f|
    Thread.new do
      begin
        log.info "shutting down filter#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(f.class), plugin_id: f.plugin_id
        f.shutdown
      rescue => e
        log.warn "unexpected error while shutting down filter plugins", plugin: f.class, plugin_id: f.plugin_id, error_class: e.class, error: e
        log.warn_backtrace
      end
    end
  }.each { |t| t.join }

  # Output plugin as filter emits records at shutdown so emit problem still exist.
  # This problem will be resolved after actual filter mechanizm.
  @started_outputs.map { |o|
    Thread.new do
      begin
        log.info "shutting down output#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(o.class), plugin_id: o.plugin_id
        o.shutdown
      rescue => e
        log.warn "unexpected error while shutting down output plugins", plugin: o.class, plugin_id: o.plugin_id, error_class: e.class, error: e
        log.warn_backtrace
      end
    end
  }.each { |t| t.join }
end
start() click to toggle source
# File lib/fluent/agent.rb, line 69
def start
  @outputs.each { |o|
    o.start
    @started_outputs << o
  }

  @filters.each { |f|
    f.start
    @started_filters << f
  }
end