# File lib/fluent/plugin/in_forward.rb, line 28 def initialize super require 'fluent/plugin/socket_util' end
# File lib/fluent/plugin/in_forward.rb, line 53 def configure(conf) super end
# File lib/fluent/plugin/in_forward.rb, line 89 def listen log.info "listening fluent socket on #{@bind}:#{@port}" s = Coolio::TCPServer.new(@bind, @port, Handler, @linger_timeout, log, method(:on_message)) s.listen(@backlog) unless @backlog.nil? s end
config_param :path, :string, :default => DEFAULT_SOCKET_PATH def listen
if File.exist?(@path) File.unlink(@path) end FileUtils.mkdir_p File.dirname(@path) log.debug "listening fluent socket on #{@path}" Coolio::UNIXServer.new(@path, Handler, method(:on_message))
end
# File lib/fluent/plugin/in_forward.rb, line 106 def run @loop.run(@blocking_timeout) rescue => e log.error "unexpected error", error: e, error_class: e.class log.error_backtrace end
# File lib/fluent/plugin/in_forward.rb, line 72 def shutdown # In test cases it occasionally appeared that when detaching a watcher, another watcher is also detached. # In the case in the iteration of watchers, a watcher that has been already detached is intended to be detached # and therfore RuntimeError occurs saying that it is not attached to a loop. # It occures only when testing for sending responses to ForwardOutput. # Sending responses needs to write the socket that is previously used only to read # and a handler has 2 watchers that is used to read and to write. # This problem occurs possibly because those watchers are thought to be related to each other # and when detaching one of them the other is also detached for some reasons. # As a workaround, check if watchers are attached before detaching them. @loop.watchers.each {|w| w.detach if w.attached? } @loop.stop @usock.close @thread.join @lsock.close end
# File lib/fluent/plugin/in_forward.rb, line 57 def start @loop = Coolio::Loop.new @lsock = listen @loop.attach(@lsock) @usock = SocketUtil.create_udp_socket(@bind) @usock.bind(@bind, @port) @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) @hbr = HeartbeatRequestHandler.new(@usock, method(:on_heartbeat_request)) @loop.attach(@hbr) @thread = Thread.new(&method(:run)) end
# File lib/fluent/plugin/in_forward.rb, line 222 def add_source_host(es, host) new_es = MultiEventStream.new es.each { |time, record| record[@source_hostname_key] = host new_es.add(time, record) } new_es end
# File lib/fluent/plugin/in_forward.rb, line 210 def check_and_skip_invalid_event(tag, es, peeraddr) new_es = MultiEventStream.new es.each { |time, record| if invalid_event?(tag, time, record) log.warn "skip invalid event:", source: source_message(peeraddr), tag: tag, time: time, record: record next end new_es.add(time, record) } new_es end
# File lib/fluent/plugin/in_forward.rb, line 206 def invalid_event?(tag, time, record) !(time.is_a?(Integer) && record.is_a?(Hash) && tag.is_a?(String)) end
# File lib/fluent/plugin/in_forward.rb, line 345 def on_heartbeat_request(host, port, msg) #log.trace "heartbeat request from #{host}:#{port}" begin @usock.send "\00"", 0, host, port rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR end end
message Entry {
1: long time 2: object record
}
message Forward {
1: string tag 2: list<Entry> entries 3: object option (optional)
}
message PackedForward {
1: string tag 2: raw entries # msgpack stream of Entry 3: object option (optional)
}
message Message {
1: string tag 2: long? time 3: object record 4: object option (optional)
}
# File lib/fluent/plugin/in_forward.rb, line 138 def on_message(msg, chunk_size, peeraddr) if msg.nil? # for future TCP heartbeat_request return end # TODO: raise an exception if broken chunk is generated by recoverable situation unless msg.is_a?(Array) log.warn "incoming chunk is broken:", source: source_message(peeraddr), msg: msg return end tag = msg[0] entries = msg[1] if @chunk_size_limit && (chunk_size > @chunk_size_limit) log.warn "Input chunk size is larger than 'chunk_size_limit', dropped:", tag: tag, source: source_message(peeraddr), limit: @chunk_size_limit, size: chunk_size return elsif @chunk_size_warn_limit && (chunk_size > @chunk_size_warn_limit) log.warn "Input chunk size is larger than 'chunk_size_warn_limit':", tag: tag, source: source_message(peeraddr), limit: @chunk_size_warn_limit, size: chunk_size end if entries.class == String # PackedForward es = MessagePackEventStream.new(entries) es = check_and_skip_invalid_event(tag, es, peeraddr) if @skip_invalid_event es = add_source_host(es, peeraddr[2]) if @source_hostname_key router.emit_stream(tag, es) option = msg[2] elsif entries.class == Array # Forward es = if @skip_invalid_event check_and_skip_invalid_event(tag, entries, peeraddr) else es = MultiEventStream.new entries.each { |e| record = e[1] next if record.nil? time = e[0] time = (now ||= Engine.now) if time.to_i == 0 es.add(time, record) } es end es = add_source_host(es, peeraddr[2]) if @source_hostname_key router.emit_stream(tag, es) option = msg[2] else # Message time = msg[1] record = msg[2] if @skip_invalid_event && invalid_event?(tag, time, record) log.warn "got invalid event and drop it:", source: source_message(peeraddr), tag: tag, time: time, record: record return msg[3] # retry never succeeded so return ack and drop incoming event. end return if record.nil? time = Engine.now if time == 0 record[@source_hostname_key] = peeraddr[2] if @source_hostname_key router.emit(tag, time, record) option = msg[3] end # return option for response option end
# File lib/fluent/plugin/in_forward.rb, line 231 def source_message(peeraddr) _, port, host, addr = peeraddr "host: #{host}, addr: #{addr}, port: #{port}" end