class Fluent::NewTailInput

Attributes

paths[R]

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Input.new
# File lib/fluent/plugin/in_tail.rb, line 27
def initialize
  super
  @paths = []
  @tails = {}
  @ignore_list = []
end

Public Instance Methods

close_watcher(tw, close_io = true) click to toggle source

Fluent::NewTailInput::TailWatcher#close is called by another thread at shutdown phase. It causes 'can't modify string; temporarily locked' error in IOHandler so adding close_io argument to avoid this problem. At shutdown, IOHandler's io will be released automatically after detached the event loop

# File lib/fluent/plugin/in_tail.rb, line 254
def close_watcher(tw, close_io = true)
  tw.close(close_io)
  flush_buffer(tw)
  if tw.unwatched && @pf
    @pf[tw.path].update_pos(PositionFile::UNWATCHED_POSITION)
  end
end
close_watcher_after_rotate_wait(tw) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 262
def close_watcher_after_rotate_wait(tw)
  closer = TailWatcher::Closer.new(@rotate_wait, tw, log, &method(:close_watcher))
  closer.attach(@loop)
end
configure(conf) click to toggle source
Calls superclass method Fluent::Input#configure
# File lib/fluent/plugin/in_tail.rb, line 71
def configure(conf)
  super

  @paths = @path.split(',').map {|path| path.strip }
  if @paths.empty?
    raise ConfigError, "tail: 'path' parameter is required on tail input"
  end

  unless @pos_file
    $log.warn "'pos_file PATH' parameter is not set to a 'tail' source."
    $log.warn "this parameter is highly recommended to save the position to resume tailing."
  end

  configure_parser(conf)
  configure_tag
  configure_encoding

  @multiline_mode = conf['format'] =~ /multiline/
  @receive_handler = if @multiline_mode
                       method(:parse_multilines)
                     else
                       method(:parse_singleline)
                     end
end
configure_encoding() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 111
def configure_encoding
  unless @encoding
    if @from_encoding
      raise ConfigError, "tail: 'from_encoding' parameter must be specified with 'encoding' parameter."
    end
  end

  @encoding = parse_encoding_param(@encoding) if @encoding
  @from_encoding = parse_encoding_param(@from_encoding) if @from_encoding
end
configure_parser(conf) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 96
def configure_parser(conf)
  @parser = Plugin.new_parser(conf['format'])
  @parser.configure(conf)
end
configure_tag() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 101
def configure_tag
  if @tag.index('*')
    @tag_prefix, @tag_suffix = @tag.split('*')
    @tag_suffix ||= ''
  else
    @tag_prefix = nil
    @tag_suffix = nil
  end
end
convert_line_to_event(line, es, tail_watcher) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 322
def convert_line_to_event(line, es, tail_watcher)
  begin
    line.chomp!  # remove \n
    if @encoding
      if @from_encoding
        line.encode!(@encoding, @from_encoding)
      else
        line.force_encoding(@encoding)
      end
    end
    @parser.parse(line) { |time, record|
      if time && record
        record[@path_key] ||= tail_watcher.path unless @path_key.nil?
        es.add(time, record)
      else
        if @emit_unmatched_lines
          record = {'unmatched_line' => line}
          record[@path_key] ||= tail_watcher.path unless @path_key.nil?
          es.add(::Fluent::Engine.now, record)
        end
        log.warn "pattern not match: #{line.inspect}"
      end
    }
  rescue => e
    log.warn line.dump, error: e.to_s
    log.debug_backtrace(e.backtrace)
  end
end
expand_paths() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 154
def expand_paths
  date = Time.now
  paths = []

  @paths.each { |path|
    path = date.strftime(path)
    if path.include?('*')
      paths += Dir.glob(path).select { |p|
        is_file = !File.directory?(p)
        if File.readable?(p) && is_file
          if @limit_recently_modified && File.mtime(p) < (date - @limit_recently_modified)
            false
          else
            true
          end
        else
          if is_file
            unless @ignore_list.include?(path)
              log.warn "#{p} unreadable. It is excluded and would be examined next time."
              @ignore_list << path if @ignore_repeated_permission_error
            end
          end
          false
        end
      }
    else
      # When file is not created yet, Dir.glob returns an empty array. So just add when path is static.
      paths << path
    end
  }
  excluded = @exclude_path.map { |path| path = date.strftime(path); path.include?('*') ? Dir.glob(path) : path }.flatten.uniq
  paths - excluded
end
flush_buffer(tw) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 267
def flush_buffer(tw)
  if lb = tw.line_buffer
    lb.chomp!
    if @encoding
      if @from_encoding
        lb.encode!(@encoding, @from_encoding)
      else
        lb.force_encoding(@encoding)
      end
    end
    @parser.parse(lb) { |time, record|
      if time && record
        tag = if @tag_prefix || @tag_suffix
                @tag_prefix + tw.tag + @tag_suffix
              else
                @tag
              end
        record[@path_key] ||= tw.path unless @path_key.nil?
        router.emit(tag, time, record)
      else
        log.warn "got incomplete line at shutdown from #{tw.path}: #{lb.inspect}"
      end
    }
  end
end
parse_encoding_param(encoding_name) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 122
def parse_encoding_param(encoding_name)
  begin
    Encoding.find(encoding_name) if encoding_name
  rescue ArgumentError => e
    raise ConfigError, e.message
  end
end
parse_multilines(lines, tail_watcher) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 359
def parse_multilines(lines, tail_watcher)
  lb = tail_watcher.line_buffer
  es = MultiEventStream.new
  if @parser.has_firstline?
    tail_watcher.line_buffer_timer_flusher.reset_timer if tail_watcher.line_buffer_timer_flusher
    lines.each { |line|
      if @parser.firstline?(line)
        if lb
          convert_line_to_event(lb, es, tail_watcher)
        end
        lb = line
      else
        if lb.nil?
          if @emit_unmatched_lines
            convert_line_to_event(line, es, tail_watcher)
          end
          log.warn "got incomplete line before first line from #{tail_watcher.path}: #{line.inspect}"
        else
          lb << line
        end
      end
    }
  else
    lb ||= ''
    lines.each do |line|
      lb << line
      @parser.parse(lb) { |time, record|
        if time && record
          convert_line_to_event(lb, es, tail_watcher)
          lb = ''
        end
      }
    end
  end
  tail_watcher.line_buffer = lb
  es
end
parse_singleline(lines, tail_watcher) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 351
def parse_singleline(lines, tail_watcher)
  es = MultiEventStream.new
  lines.each { |line|
    convert_line_to_event(line, es, tail_watcher)
  }
  es
end
receive_lines(lines, tail_watcher) click to toggle source

@return true if no error or unrecoverable error happens in emit action. false if got BufferQueueLimitError

# File lib/fluent/plugin/in_tail.rb, line 301
def receive_lines(lines, tail_watcher)
  es = @receive_handler.call(lines, tail_watcher)
  unless es.empty?
    tag = if @tag_prefix || @tag_suffix
            @tag_prefix + tail_watcher.tag + @tag_suffix
          else
            @tag
          end
    begin
      router.emit_stream(tag, es)
    rescue BufferQueueLimitError
      return false
    rescue
      # ignore non BufferQueueLimitError errors because in_tail can't recover. Engine shows logs and backtraces.
      return true
    end
  end

  return true
end
refresh_watchers() click to toggle source

in_tail with '*' path doesn't check rotation file equality at refresh phase. So you should not use '*' path when your logs will be rotated by another tool. It will cause log duplication after updated watch files. In such case, you should separate log directory and specify two paths in path parameter. e.g. path /path/to/dir/*,/path/to/rotated_logs/target_file

# File lib/fluent/plugin/in_tail.rb, line 193
def refresh_watchers
  target_paths = expand_paths
  existence_paths = @tails.keys

  unwatched = existence_paths - target_paths
  added = target_paths - existence_paths

  stop_watchers(unwatched, false, true) unless unwatched.empty?
  start_watchers(added) unless added.empty?
end
run() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 293
def run
  @loop.run
rescue
  log.error "unexpected error", error: $!.to_s
  log.error_backtrace
end
setup_watcher(path, pe) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 204
def setup_watcher(path, pe)
  line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
  tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher,  &method(:receive_lines))
  tw.attach(@loop)
  tw
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 145
def shutdown
  @refresh_trigger.detach if @refresh_trigger && @refresh_trigger.attached?

  stop_watchers(@tails.keys, true)
  @loop.stop rescue nil # when all watchers are detached, `stop` raises RuntimeError. We can ignore this exception.
  @thread.join
  @pf_file.close if @pf_file
end
start() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 130
def start
  if @pos_file
    @pf_file = File.open(@pos_file, File::RDWR|File::CREAT, DEFAULT_FILE_PERMISSION)
    @pf_file.sync = true
    @pf = PositionFile.parse(@pf_file)
  end

  @loop = Coolio::Loop.new
  refresh_watchers unless @skip_refresh_on_startup

  @refresh_trigger = TailWatcher::TimerWatcher.new(@refresh_interval, true, log, &method(:refresh_watchers))
  @refresh_trigger.attach(@loop)
  @thread = Thread.new(&method(:run))
end
start_watchers(paths) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 211
def start_watchers(paths)
  paths.each { |path|
    pe = nil
    if @pf
      pe = @pf[path]
      if @read_from_head && pe.read_inode.zero?
        begin
          pe.update(File::Stat.new(path).ino, 0)
        rescue Errno::ENOENT
          $log.warn "#{path} not found. Continuing without tailing it."
        end
      end
    end

    @tails[path] = setup_watcher(path, pe)
  }
end
stop_watchers(paths, immediate = false, unwatched = false) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 229
def stop_watchers(paths, immediate = false, unwatched = false)
  paths.each { |path|
    tw = @tails.delete(path)
    if tw
      tw.unwatched = unwatched
      if immediate
        close_watcher(tw, false)
      else
        close_watcher_after_rotate_wait(tw)
      end
    end
  }
end
update_watcher(path, pe) click to toggle source

#refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety.

# File lib/fluent/plugin/in_tail.rb, line 244
def update_watcher(path, pe)
  rotated_tw = @tails[path]
  @tails[path] = setup_watcher(path, pe)
  close_watcher_after_rotate_wait(rotated_tw) if rotated_tw
end