emit(tag, time, record)
click to toggle source
def emit(tag, time, record)
unless record.nil?
emit_stream tag, OneEventStream.new(time, record)
end
end
emit_array(tag, array)
click to toggle source
def emit_array(tag, array)
emit_stream tag, ArrayEventStream.new(array)
end
emit_stream(tag, es)
click to toggle source
def emit_stream(tag, es)
@event_router.emit_stream(tag, es)
end
flush!()
click to toggle source
def flush!
@root_agent.flush!
end
init(system_config)
click to toggle source
def init(system_config)
@system_config = system_config
BasicSocket.do_not_reverse_lookup = true
Plugin.load_plugins
if defined?(Encoding)
Encoding.default_internal = ENV['DEFAULT_ENCODING'] || 'UTF-8' if Encoding.respond_to?(:default_internal)
Encoding.default_external = ENV['DEFAULT_ENCODING'] || 'UTF-8' if Encoding.respond_to?(:default_external)
end
suppress_interval(system_config.emit_error_log_interval) unless system_config.emit_error_log_interval.nil?
@suppress_config_dump = system_config.suppress_config_dump unless system_config.suppress_config_dump.nil?
@without_source = system_config.without_source unless system_config.without_source.nil?
@root_agent = RootAgent.new(@system_config)
self
end
load_plugin_dir(dir)
click to toggle source
def load_plugin_dir(dir)
Plugin.load_plugin_dir(dir)
end
log()
click to toggle source
log_event_loop()
click to toggle source
def log_event_loop
$log.disable_events(Thread.current)
while sleep(LOG_EMIT_INTERVAL)
break if @log_event_loop_stop
next if @log_event_queue.empty?
events = @log_event_queue.slice!(0..-1)
next if events.empty?
events.each {|tag,time,record|
begin
@event_router.emit(tag, time, record)
rescue => e
$log.error "failed to emit fluentd's log event", tag: tag, event: record, error_class: e.class, error: e
end
}
end
end
now()
click to toggle source
def now
Time.now.to_i
end
parse_config(io, fname, basepath = Dir.pwd, v1_config = false)
click to toggle source
def parse_config(io, fname, basepath = Dir.pwd, v1_config = false)
if fname =~ /\.rb$/
require 'fluent/config/dsl'
Config::DSL::Parser.parse(io, File.join(basepath, fname))
else
Config.parse(io, fname, basepath, v1_config)
end
end
push_log_event(tag, time, record)
click to toggle source
def push_log_event(tag, time, record)
return if @log_emit_thread.nil?
@log_event_queue.push([tag, time, record])
end
run()
click to toggle source
def run
begin
start
if @event_router.match?($log.tag)
$log.enable_event
@log_emit_thread = Thread.new(&method(:log_event_loop))
end
unless @engine_stopped
@default_loop = Coolio::Loop.default
@default_loop.attach Coolio::TimerWatcher.new(1, true)
@default_loop.run
end
if @engine_stopped and @default_loop
@default_loop.stop
@default_loop = nil
end
rescue => e
$log.error "unexpected error", error_class: e.class, error: e
$log.error_backtrace
ensure
$log.info "shutting down fluentd"
shutdown
if @log_emit_thread
@log_event_loop_stop = true
@log_emit_thread.join
end
end
end
stop()
click to toggle source
def stop
@engine_stopped = true
if @default_loop
@default_loop.stop
@default_loop = nil
end
nil
end
suppress_interval(interval_time)
click to toggle source
def suppress_interval(interval_time)
@suppress_emit_error_log_interval = interval_time
@next_emit_error_log_time = Time.now.to_i
end