# File lib/fluent/plugin/in_exec.rb, line 28 def initialize super require 'fluent/plugin/exec_util' end
# File lib/fluent/plugin/in_exec.rb, line 52 def configure(conf) super if localtime = conf['localtime'] @localtime = true elsif utc = conf['utc'] @localtime = false end if conf['timezone'] @timezone = conf['timezone'] Fluent::Timezone.validate!(@timezone) end if !@tag && !@tag_key raise ConfigError, "'tag' or 'tag_key' option is required on exec input" end if @time_key if @time_format f = @time_format @time_parse_proc = Proc.new {|str| Time.strptime(str, f).to_i } else @time_parse_proc = Proc.new {|str| str.to_i } end end @parser = setup_parser(conf) end
# File lib/fluent/plugin/in_exec.rb, line 132 def run @parser.call(@io) end
# File lib/fluent/plugin/in_exec.rb, line 136 def run_periodic sleep @run_interval until @finished begin io = IO.popen(@command, "r") @parser.call(io) Process.waitpid(io.pid) sleep @run_interval rescue log.error "exec failed to run or shutdown child process", error: $!.to_s, error_class: $!.class.to_s log.warn_backtrace $!.backtrace end end end
# File lib/fluent/plugin/in_exec.rb, line 82 def setup_parser(conf) case @format when 'tsv' if @keys.empty? raise ConfigError, "keys option is required on exec input for tsv format" end ExecUtil::TSVParser.new(@keys, method(:on_message)) when 'json' ExecUtil::JSONParser.new(method(:on_message)) when 'msgpack' ExecUtil::MessagePackParser.new(method(:on_message)) else ExecUtil::TextParserWrapperParser.new(conf, method(:on_message)) end end
# File lib/fluent/plugin/in_exec.rb, line 109 def shutdown if @run_interval @finished = true # call Thread#run which interupts sleep in order to stop run_periodic thread immediately. @thread.run @thread.join else begin Process.kill(:TERM, @pid) rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM end if @thread.join(60) # TODO wait time return end begin Process.kill(:KILL, @pid) rescue #Errno::ECHILD, Errno::ESRCH, Errno::EPERM end @thread.join end end
# File lib/fluent/plugin/in_exec.rb, line 98 def start if @run_interval @finished = false @thread = Thread.new(&method(:run_periodic)) else @io = IO.popen(@command, "r") @pid = @io.pid @thread = Thread.new(&method(:run)) end end
# File lib/fluent/plugin/in_exec.rb, line 153 def on_message(record, parsed_time = nil) if val = record.delete(@tag_key) tag = val else tag = @tag end if parsed_time time = parsed_time else if val = record.delete(@time_key) time = @time_parse_proc.call(val) else time = Engine.now end end router.emit(tag, time, record) rescue => e log.error "exec failed to emit", error: e.to_s, error_class: e.class.to_s, tag: tag, record: Yajl.dump(record) end