# File lib/fluent/plugin/socket_util.rb, line 100 def initialize super require 'fluent/parser' end
# File lib/fluent/plugin/socket_util.rb, line 119 def configure(conf) super @source_hostname_key = @source_host_key if @source_host_key @parser = Plugin.new_parser(@format) @parser.configure(conf) end
# File lib/fluent/plugin/socket_util.rb, line 141 def run @loop.run(@blocking_timeout) rescue => e log.error "unexpected error", error: e, error_class: e.class log.error_backtrace end
# File lib/fluent/plugin/socket_util.rb, line 134 def shutdown @loop.watchers.each { |w| w.detach } @loop.stop @handler.close @thread.join end
# File lib/fluent/plugin/socket_util.rb, line 127 def start @loop = Coolio::Loop.new @handler = listen(method(:on_message)) @loop.attach(@handler) @thread = Thread.new(&method(:run)) end
# File lib/fluent/plugin/socket_util.rb, line 150 def on_message(msg, addr) @parser.parse(msg) { |time, record| unless time && record log.warn "pattern not match: #{msg.inspect}" return end record[@source_hostname_key] = addr[2] if @source_hostname_key router.emit(@tag, time, record) } rescue => e log.error msg.dump, error: e, error_class: e.class, host: addr[3] log.error_backtrace end