# File lib/fluent/test/filter_test.rb, line 24 def initialize(klass, tag = 'filter.test', &block) super(klass, &block) @tag = tag @events = {} @filtered = MultiEventStream.new end
# File lib/fluent/test/filter_test.rb, line 34 def emit(record, time = Engine.now) emit_with_tag(@tag, record, time) end
# File lib/fluent/test/filter_test.rb, line 39 def emit_with_tag(tag, record, time = Engine.now) @events[tag] ||= MultiEventStream.new @events[tag].add(time, record) end
# File lib/fluent/test/filter_test.rb, line 45 def filter_stream(es) filter_stream_with_tag(@tag, es) end
# File lib/fluent/test/filter_test.rb, line 49 def filter_stream_with_tag(tag, es) @events[tag] = es end
# File lib/fluent/test/filter_test.rb, line 53 def filtered_as_array all = [] @filtered.each { |time, record| all << [@tag, time, record] } all end
Almost filters don't use threads so default is 0. It reduces test time.
# File lib/fluent/test/filter_test.rb, line 63 def run(num_waits = 0, &block) super(num_waits) { block.call if block @events.each { |tag, es| processed = @instance.filter_stream(tag, es) processed.each { |time, record| @filtered.add(time, record) } } } self end