class Fluent::ForwardInput

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Input.new
# File lib/fluent/plugin/in_forward.rb, line 28
def initialize
  super
  require 'fluent/plugin/socket_util'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::Input#configure
# File lib/fluent/plugin/in_forward.rb, line 53
def configure(conf)
  super
end
listen() click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 89
def listen
  log.info "listening fluent socket on #{@bind}:#{@port}"
  s = Coolio::TCPServer.new(@bind, @port, Handler, @linger_timeout, log, method(:on_message))
  s.listen(@backlog) unless @backlog.nil?
  s
end
run() click to toggle source

config_param :path, :string, :default => DEFAULT_SOCKET_PATH def listen

if File.exist?(@path)
  File.unlink(@path)
end
FileUtils.mkdir_p File.dirname(@path)
log.debug "listening fluent socket on #{@path}"
Coolio::UNIXServer.new(@path, Handler, method(:on_message))

end

# File lib/fluent/plugin/in_forward.rb, line 106
def run
  @loop.run(@blocking_timeout)
rescue => e
  log.error "unexpected error", error: e, error_class: e.class
  log.error_backtrace
end
shutdown() click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 72
def shutdown
  # In test cases it occasionally appeared that when detaching a watcher, another watcher is also detached.
  # In the case in the iteration of watchers, a watcher that has been already detached is intended to be detached
  # and therfore RuntimeError occurs saying that it is not attached to a loop.
  # It occures only when testing for sending responses to ForwardOutput.
  # Sending responses needs to write the socket that is previously used only to read
  # and a handler has 2 watchers that is used to read and to write.
  # This problem occurs possibly because those watchers are thought to be related to each other
  # and when detaching one of them the other is also detached for some reasons.
  # As a workaround, check if watchers are attached before detaching them.
  @loop.watchers.each {|w| w.detach if w.attached? }
  @loop.stop
  @usock.close
  @thread.join
  @lsock.close
end
start() click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 57
def start
  @loop = Coolio::Loop.new

  @lsock = listen
  @loop.attach(@lsock)

  @usock = SocketUtil.create_udp_socket(@bind)
  @usock.bind(@bind, @port)
  @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
  @hbr = HeartbeatRequestHandler.new(@usock, method(:on_heartbeat_request))
  @loop.attach(@hbr)

  @thread = Thread.new(&method(:run))
end

Private Instance Methods

add_source_host(es, host) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 222
def add_source_host(es, host)
  new_es = MultiEventStream.new
  es.each { |time, record|
    record[@source_hostname_key] = host
    new_es.add(time, record)
  }
  new_es
end
check_and_skip_invalid_event(tag, es, peeraddr) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 210
def check_and_skip_invalid_event(tag, es, peeraddr)
  new_es = MultiEventStream.new
  es.each { |time, record|
    if invalid_event?(tag, time, record)
      log.warn "skip invalid event:", source: source_message(peeraddr), tag: tag, time: time, record: record
      next
    end
    new_es.add(time, record)
  }
  new_es
end
invalid_event?(tag, time, record) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 206
def invalid_event?(tag, time, record)
  !(time.is_a?(Integer) && record.is_a?(Hash) && tag.is_a?(String))
end
on_heartbeat_request(host, port, msg) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 345
def on_heartbeat_request(host, port, msg)
  #log.trace "heartbeat request from #{host}:#{port}"
  begin
    @usock.send "\00"", 0, host, port
  rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR
  end
end
on_message(msg, chunk_size, peeraddr) click to toggle source

message Entry {

1: long time
2: object record

}

message Forward {

1: string tag
2: list<Entry> entries
3: object option (optional)

}

message PackedForward {

1: string tag
2: raw entries  # msgpack stream of Entry
3: object option (optional)

}

message Message {

1: string tag
2: long? time
3: object record
4: object option (optional)

}

# File lib/fluent/plugin/in_forward.rb, line 138
def on_message(msg, chunk_size, peeraddr)
  if msg.nil?
    # for future TCP heartbeat_request
    return
  end

  # TODO: raise an exception if broken chunk is generated by recoverable situation
  unless msg.is_a?(Array)
    log.warn "incoming chunk is broken:", source: source_message(peeraddr), msg: msg
    return
  end

  tag = msg[0]
  entries = msg[1]

  if @chunk_size_limit && (chunk_size > @chunk_size_limit)
    log.warn "Input chunk size is larger than 'chunk_size_limit', dropped:", tag: tag, source: source_message(peeraddr), limit: @chunk_size_limit, size: chunk_size
    return
  elsif @chunk_size_warn_limit && (chunk_size > @chunk_size_warn_limit)
    log.warn "Input chunk size is larger than 'chunk_size_warn_limit':", tag: tag, source: source_message(peeraddr), limit: @chunk_size_warn_limit, size: chunk_size
  end

  if entries.class == String
    # PackedForward
    es = MessagePackEventStream.new(entries)
    es = check_and_skip_invalid_event(tag, es, peeraddr) if @skip_invalid_event
    es = add_source_host(es, peeraddr[2]) if @source_hostname_key
    router.emit_stream(tag, es)
    option = msg[2]

  elsif entries.class == Array
    # Forward
    es = if @skip_invalid_event
           check_and_skip_invalid_event(tag, entries, peeraddr)
         else
           es = MultiEventStream.new
           entries.each { |e|
             record = e[1]
             next if record.nil?
             time = e[0]
             time = (now ||= Engine.now) if time.to_i == 0
             es.add(time, record)
           }
           es
         end
    es = add_source_host(es, peeraddr[2]) if @source_hostname_key
    router.emit_stream(tag, es)
    option = msg[2]

  else
    # Message
    time = msg[1]
    record = msg[2]
    if @skip_invalid_event && invalid_event?(tag, time, record)
      log.warn "got invalid event and drop it:", source: source_message(peeraddr), tag: tag, time: time, record: record
      return msg[3] # retry never succeeded so return ack and drop incoming event.
    end
    return if record.nil?
    time = Engine.now if time == 0
    record[@source_hostname_key] = peeraddr[2] if @source_hostname_key
    router.emit(tag, time, record)
    option = msg[3]
  end

  # return option for response
  option
end
source_message(peeraddr) click to toggle source
# File lib/fluent/plugin/in_forward.rb, line 231
def source_message(peeraddr)
  _, port, host, addr = peeraddr
  "host: #{host}, addr: #{addr}, port: #{port}"
end