class Fluent::NewTailInput::TailWatcher

Attributes

line_buffer[RW]
line_buffer_timer_flusher[RW]
path[R]
unwatched[RW]

Public Class Methods

new(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, &receive_lines) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 398
def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, &receive_lines)
  @path = path
  @rotate_wait = rotate_wait
  @pe = pe || MemoryPositionEntry.new
  @read_from_head = read_from_head
  @enable_watch_timer = enable_watch_timer
  @read_lines_limit = read_lines_limit
  @receive_lines = receive_lines
  @update_watcher = update_watcher

  @timer_trigger = TimerWatcher.new(1, true, log, &method(:on_notify)) if @enable_watch_timer

  @stat_trigger = StatWatcher.new(path, log, &method(:on_notify))

  @rotate_handler = RotateHandler.new(path, log, &method(:on_rotate))
  @io_handler = nil
  @log = log

  @line_buffer_timer_flusher = line_buffer_timer_flusher
end

Public Instance Methods

attach(loop) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 431
def attach(loop)
  @timer_trigger.attach(loop) if @enable_watch_timer
  @stat_trigger.attach(loop)
  on_notify
end
close(close_io = true) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 442
def close(close_io = true)
  if close_io && @io_handler
    @io_handler.on_notify
    @io_handler.close
  end
  detach
end
detach() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 437
def detach
  @timer_trigger.detach if @enable_watch_timer && @timer_trigger.attached?
  @stat_trigger.detach if @stat_trigger.attached?
end
on_notify() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 450
def on_notify
  @rotate_handler.on_notify if @rotate_handler
  @line_buffer_timer_flusher.on_notify(self) if @line_buffer_timer_flusher
  return unless @io_handler
  @io_handler.on_notify
end
on_rotate(io) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 457
def on_rotate(io)
  if @io_handler == nil
    if io
      # first time
      stat = io.stat
      fsize = stat.size
      inode = stat.ino

      last_inode = @pe.read_inode
      if inode == last_inode
        # rotated file has the same inode number with the last file.
        # assuming following situation:
        #   a) file was once renamed and backed, or
        #   b) symlink or hardlink to the same file is recreated
        # in either case, seek to the saved position
        pos = @pe.read_pos
      elsif last_inode != 0
        # this is FilePositionEntry and fluentd once started.
        # read data from the head of the rotated file.
        # logs never duplicate because this file is a rotated new file.
        pos = 0
        @pe.update(inode, pos)
      else
        # this is MemoryPositionEntry or this is the first time fluentd started.
        # seek to the end of the any files.
        # logs may duplicate without this seek because it's not sure the file is
        # existent file or rotated new file.
        pos = @read_from_head ? 0 : fsize
        @pe.update(inode, pos)
      end
      io.seek(pos)

      @io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines))
    else
      @io_handler = NullIOHandler.new
    end
  else
    log_msg = "detected rotation of #{@path}"
    log_msg << "; waiting #{@rotate_wait} seconds" if @io_handler.io  # wait rotate_time if previous file is exist
    @log.info log_msg

    if io
      stat = io.stat
      inode = stat.ino
      if inode == @pe.read_inode # truncated
        @pe.update_pos(stat.size)
        io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines))
        @io_handler.close
        @io_handler = io_handler
      elsif @io_handler.io.nil? # There is no previous file. Reuse TailWatcher
        @pe.update(inode, io.pos)
        io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines))
        @io_handler = io_handler
      else # file is rotated and new file found
        detach
        @update_watcher.call(@path, swap_state(@pe))
      end
    else # file is rotated and new file not found
      # Clear RotateHandler to avoid duplicated file watch in same path.
      @rotate_handler = nil
      detach
      @update_watcher.call(@path, swap_state(@pe))
    end
  end

  def swap_state(pe)
    # Use MemoryPositionEntry for rotated file temporary
    mpe = MemoryPositionEntry.new
    mpe.update(pe.read_inode, pe.read_pos)
    @pe = mpe
    @io_handler.pe = mpe # Don't re-create IOHandler because IOHandler has an internal buffer.

    pe # This pe will be updated in on_rotate after TailWatcher is initialized
  end
end
swap_state(pe) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 522
def swap_state(pe)
  # Use MemoryPositionEntry for rotated file temporary
  mpe = MemoryPositionEntry.new
  mpe.update(pe.read_inode, pe.read_pos)
  @pe = mpe
  @io_handler.pe = mpe # Don't re-create IOHandler because IOHandler has an internal buffer.

  pe # This pe will be updated in on_rotate after TailWatcher is initialized
end
tag() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 423
def tag
  @parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '')
end
wrap_receive_lines(lines) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 427
def wrap_receive_lines(lines)
  @receive_lines.call(lines, self)
end