def filter_stream(tag, es)
new_es = Fluent::MultiEventStream.new
es.each do |time,record|
raw_value = record[@key_name]
if raw_value.nil?
if @emit_invalid_record_to_error
router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist"))
else
log.warn "#{@key_name} does not exist" unless @ignore_key_not_exist
end
new_es.add(time, handle_parsed(tag, record, time, {})) if @reserve_data
next
end
begin
@parser.parse(raw_value) do |t,values|
if values
t ||= time
r = handle_parsed(tag, record, t, values)
new_es.add(t, r)
else
if @emit_invalid_record_to_error
router.emit_error_event(tag, time, record, ::Fluent::ParserError.new("pattern not match with data '#{raw_value}'"))
else
log.warn "pattern not match with data '#{raw_value.dump}'" unless @suppress_parse_error_log
end
if @reserve_data
t = time
r = handle_parsed(tag, record, time, {})
new_es.add(t, r)
end
end
end
rescue Fluent::ParserError => e
if @emit_invalid_record_to_error
router.emit_error_event(tag, time, record, e)
else
log.warn e.message unless @suppress_parse_error_log
end
rescue ArgumentError => e
raise unless @replace_invalid_sequence
raise unless e.message.index("invalid byte sequence in") == 0
raw_value = replace_invalid_byte(raw_value)
retry
rescue => e
if @emit_invalid_record_to_error
router.emit_error_event(tag, time, record, Fluent::ParserError.new("parse failed #{e.message}"))
else
log.warn "parse failed #{e.message}" unless @suppress_parse_error_log
end
end
end
new_es
end