class Fluent::FileBuffer

Constants

PATH_MATCH

Dots are separator for many cases:

we should have to escape dots in keys...

Attributes

Public Class Methods

clear_buffer_paths() click to toggle source
# File lib/fluent/test/input_test.rb, line 25
def self.clear_buffer_paths
  @@buffer_paths = {}
end
new() click to toggle source
Calls superclass method Fluent::BasicBuffer.new
# File lib/fluent/plugin/buf_file.rb, line 85
def initialize
  require 'uri'
  super

  @uri_parser = URI::Parser.new
end

Public Instance Methods

before_shutdown(out) click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 195
def before_shutdown(out)
  if @flush_at_shutdown
    synchronize do
      @map.each_key {|key|
        push(key)
      }
      while pop(out)
      end
    end
  end
end
chunk_identifier_in_path(path) click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 176
def chunk_identifier_in_path(path)
  pos_after_prefix = @buffer_path_prefix.length
  pos_before_suffix = @buffer_path_suffix.length + 1 # from tail of path

  path.slice(pos_after_prefix..-pos_before_suffix)
end
configure(conf) click to toggle source
Calls superclass method Fluent::BasicBuffer#configure
# File lib/fluent/plugin/buf_file.rb, line 102
def configure(conf)
  super

  if @@buffer_paths.has_key?(@buffer_path)
    raise ConfigError, "Other '#{@@buffer_paths[@buffer_path]}' plugin already use same buffer_path: type = #{conf['@type'] || conf['type']}, buffer_path = #{@buffer_path}"
  else
    @@buffer_paths[@buffer_path] = conf['@type'] || conf['type']
  end

  if pos = @buffer_path.index('*')
    @buffer_path_prefix = @buffer_path[0, pos]
    @buffer_path_suffix = @buffer_path[(pos + 1)..-1]
  else
    @buffer_path_prefix = @buffer_path + "."
    @buffer_path_suffix = ".log"
  end

end
enqueue(chunk) click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 183
def enqueue(chunk)
  path = chunk.path
  identifier_part = chunk_identifier_in_path(path)

  m = PATH_MATCH.match(identifier_part)
  encoded_key = m ? m[1] : ""
  tsuffix = m[3]
  npath = "#{@buffer_path_prefix}#{encoded_key}.q#{tsuffix}#{@buffer_path_suffix}"

  chunk.mv(npath)
end
new_chunk(key) click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 130
def new_chunk(key)
  encoded_key = encode_key(key)
  path, tsuffix = make_path(encoded_key, "b")
  unique_id = tsuffix_to_unique_id(tsuffix)
  FileBufferChunk.new(key, path, unique_id, "a+", @symlink_path)
end
resume() click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 137
def resume
  maps = []
  queues = []

  Dir.glob("#{@buffer_path_prefix}*#{@buffer_path_suffix}") {|path|
    identifier_part = chunk_identifier_in_path(path)
    if m = PATH_MATCH.match(identifier_part)
      key = decode_key(m[1])
      bq = m[2]
      tsuffix = m[3]
      timestamp = m[3].to_i(16)
      unique_id = tsuffix_to_unique_id(tsuffix)

      if bq == 'b'
        chunk = FileBufferChunk.new(key, path, unique_id, "a+")
        maps << [timestamp, chunk]
      elsif bq == 'q'
        chunk = FileBufferChunk.new(key, path, unique_id, "r")
        queues << [timestamp, chunk]
      end
    end
  }

  map = {}
  maps.sort_by {|(timestamp,chunk)|
    timestamp
  }.each {|(timestamp,chunk)|
    map[chunk.key] = chunk
  }

  queue = queues.sort_by {|(timestamp,chunk)|
    timestamp
  }.map {|(timestamp,chunk)|
    chunk
  }

  return queue, map
end
start() click to toggle source
Calls superclass method Fluent::BasicBuffer#start
# File lib/fluent/plugin/buf_file.rb, line 121
def start
  FileUtils.mkdir_p File.dirname(@buffer_path_prefix + "path"), mode: DEFAULT_DIR_PERMISSION
  super
end

Private Instance Methods

decode_key(encoded_key) click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 215
def decode_key(encoded_key)
  @uri_parser.unescape(encoded_key)
end
encode_key(key) click to toggle source

Dots are separator for many cases:

we should have to escape dots in keys...
# File lib/fluent/plugin/buf_file.rb, line 211
def encode_key(key)
  @uri_parser.escape(key, /[^-_.a-zA-Z0-9]/) # //n switch means explicit 'ASCII-8BIT' pattern
end
make_path(encoded_key, bq) click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 219
def make_path(encoded_key, bq)
  now = Time.now.utc
  timestamp = ((now.to_i * 1000 * 1000 + now.usec) << 12 | rand(0xfff))
  tsuffix = timestamp.to_s(16)
  path = "#{@buffer_path_prefix}#{encoded_key}.#{bq}#{tsuffix}#{@buffer_path_suffix}"
  return path, tsuffix
end
tsuffix_to_unique_id(tsuffix) click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 227
def tsuffix_to_unique_id(tsuffix)
  # why *2 ? frsyuki said that I forgot why completely.
  tsuffix.scan(/../).map {|x| x.to_i(16) }.pack('C*') * 2
end