class Sensu::Transport::RabbitMQ

Public Instance Methods

ack(info)

A proper alias for acknowledge().

Alias for: acknowledge
acknowledge(info) click to toggle source

Acknowledge the delivery of a message from RabbitMQ.

@param info [Hash] message info containing its delivery tag. @yield [info] passes acknowledgment info to an optional

callback/block.
Calls superclass method Sensu::Transport::Base#acknowledge
# File lib/sensu/transport/rabbitmq.rb, line 123
def acknowledge(info)
  catch_errors do
    info.ack
  end
  super
end
Also aliased as: ack
close() click to toggle source

Close the RabbitMQ connection.

# File lib/sensu/transport/rabbitmq.rb, line 44
def close
  callback = Proc.new { @connection.close }
  connected? ? callback.call : EM.next_tick(callback)
end
connect(options={}) click to toggle source

RabbitMQ connection setup. The deferred status is set to `:succeeded` (via `succeed()`) once the connection has been established.

@param options [Hash, String]

# File lib/sensu/transport/rabbitmq.rb, line 16
def connect(options={})
  reset
  set_connection_options(options)
  create_connection_timeout
  connect_with_eligible_options
end
connected?() click to toggle source

Indicates if connected to RabbitMQ.

@return [TrueClass, FalseClass]

# File lib/sensu/transport/rabbitmq.rb, line 39
def connected?
  @connection.connected?
end
publish(type, pipe, message, options={}) { |info| ... } click to toggle source

Publish a message to RabbitMQ.

@param type [Symbol] the RabbitMQ exchange type, possible

values are: :direct and :fanout.

@param pipe [String] the RabbitMQ exchange name. @param message [String] the message to be published to

RabbitMQ.

@param options [Hash] the options to publish the message with. @yield [info] passes publish info to an optional

callback/block.

@yieldparam info [Hash] contains publish information.

# File lib/sensu/transport/rabbitmq.rb, line 60
def publish(type, pipe, message, options={})
  if connected?
    catch_errors do
      @channel.method(type.to_sym).call(pipe, options).publish(message) do
        info = {}
        yield(info) if block_given?
      end
    end
  else
    info = {:error => "Transport is not connected"}
    yield(info) if block_given?
  end
end
reconnect(force=false) click to toggle source

Reconnect to RabbitMQ.

@param force [Boolean] the reconnect.

# File lib/sensu/transport/rabbitmq.rb, line 26
def reconnect(force=false)
  unless @reconnecting
    @reconnecting = true
    @logger.debug("transport reconnecting...")
    @before_reconnect.call
    reset
    periodically_reconnect
  end
end
stats(funnel, options={}) { |info| ... } click to toggle source

RabbitMQ queue stats, including message and consumer counts.

@param funnel [String] the RabbitMQ queue to get stats for. @param options [Hash] the options to get queue stats with. @yield [info] passes queue stats to the callback/block. @yieldparam info [Hash] contains queue stats.

# File lib/sensu/transport/rabbitmq.rb, line 139
def stats(funnel, options={})
  catch_errors do
    options = options.merge(:auto_delete => true)
    @channel.queue(funnel, options).status do |messages, consumers|
      info = {
        :messages => messages,
        :consumers => consumers
      }
      yield(info)
    end
  end
end
subscribe(type, pipe, funnel="", options={}, &callback) click to toggle source

Subscribe to a RabbitMQ queue.

@param type [Symbol] the RabbitMQ exchange type, possible

values are: :direct and :fanout.

@param pipe [String] the RabbitMQ exhange name. @param funnel [String] the RabbitMQ queue. @param options [Hash] the options to consume messages with. @yield [info, message] passes message info and content to the

consumer callback/block.

@yieldparam info [Hash] contains message information. @yieldparam message [String] message.

# File lib/sensu/transport/rabbitmq.rb, line 85
def subscribe(type, pipe, funnel="", options={}, &callback)
  catch_errors do
    previously_declared = @queues.has_key?(funnel)
    @queues[funnel] ||= @channel.queue!(funnel, :auto_delete => true)
    queue = @queues[funnel]
    queue.bind(@channel.method(type.to_sym).call(pipe))
    unless previously_declared
      queue.subscribe(options, &callback)
    end
  end
end
unsubscribe() click to toggle source

Unsubscribe from all RabbitMQ queues.

@yield [info] passes info to an optional callback/block. @yieldparam info [Hash] contains unsubscribe information.

Calls superclass method Sensu::Transport::Base#unsubscribe
# File lib/sensu/transport/rabbitmq.rb, line 101
def unsubscribe
  catch_errors do
    @queues.values.each do |queue|
      if connected?
        queue.unsubscribe
      else
        queue.before_recovery do
          queue.unsubscribe
        end
      end
    end
    @queues = {}
    @channel.recover if connected?
  end
  super
end

Private Instance Methods

catch_errors() { || ... } click to toggle source

Catch RabbitMQ errors and call the on_error callback, providing it with the error object as an argument. This method is intended to be applied where necessary, not to be confused with a catch-all.

@yield [] callback/block to execute within a rescue block to

catch RabbitMQ errors.
# File lib/sensu/transport/rabbitmq.rb, line 161
def catch_errors
  begin
    yield
  rescue AMQP::Error => error
    @on_error.call(error)
  end
end
connect_with_eligible_options(&callback) click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 252
def connect_with_eligible_options(&callback)
  next_connection_options do |options|
    setup_connection(options, &callback)
    setup_channel(options)
  end
end
create_connection_timeout() click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 179
def create_connection_timeout
  @connection_timeout = EM::Timer.new(20) do
    reconnect
  end
end
next_connection_options() { |merge(:host => ip_address)| ... } click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 185
def next_connection_options(&callback)
  if @eligible_options.nil? || @eligible_options.empty?
    @eligible_options = @connection_options.shuffle
  end
  options = @eligible_options.shift
  if options.is_a?(Hash) && options[:host]
    resolve_host(options[:host]) do |ip_address|
      if ip_address.nil?
        EM::Timer.new(3) do
          next_connection_options(&callback)
        end
      else
        yield options.merge(:host => ip_address)
      end
    end
  else
    yield options
  end
end
periodically_reconnect(delay=2) click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 259
def periodically_reconnect(delay=2)
  capped_delay = (delay >= 20 ? 20 : delay)
  EM::Timer.new(capped_delay) do
    unless connected?
      reset
      periodically_reconnect(capped_delay += 2)
      begin
        connect_with_eligible_options do
          @reconnecting = false
          @after_reconnect.call
        end
      rescue EventMachine::ConnectionError
      rescue Errno::ECONNREFUSED
      rescue Java::JavaLang::RuntimeException
      rescue Java::JavaNioChannels::UnresolvedAddressException
      end
    end
  end
end
reset() click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 169
def reset
  @queues = {}
  @connection_timeout.cancel if @connection_timeout
  @connection.close_connection if @connection
end
set_connection_options(options) click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 175
def set_connection_options(options)
  @connection_options = [options].flatten
end
setup_channel(options={}) click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 238
def setup_channel(options={})
  @channel = AMQP::Channel.new(@connection)
  @channel.auto_recovery = true
  @channel.on_error do |channel, channel_close|
    error = Error.new("rabbitmq channel error")
    @on_error.call(error)
  end
  prefetch = 1
  if options.is_a?(Hash)
    prefetch = options.fetch(:prefetch, 1)
  end
  @channel.prefetch(prefetch)
end
setup_connection(options={}) { || ... } click to toggle source
# File lib/sensu/transport/rabbitmq.rb, line 205
def setup_connection(options={})
  reconnect_callback = Proc.new { reconnect }
  on_possible_auth_failure = Proc.new {
    @logger.warn("transport connection error", {
      :reason => "possible authentication failure. wrong credentials?",
      :user => options[:user]
    })
    reconnect
  }
  @connection = AMQP.connect(options, {
    :on_tcp_connection_failure => reconnect_callback,
    :on_possible_authentication_failure => on_possible_auth_failure
  })
  @connection.logger = @logger
  @connection.on_open do
    @logger.debug("transport connection open")
    @connection_timeout.cancel
    succeed
    yield if block_given?
  end
  @connection.on_tcp_connection_loss do
    @logger.warn("transport connection error", :reason => "tcp connection lost")
    reconnect
  end
  @connection.on_skipped_heartbeats do
    @logger.warn("transport connection error", :reason => "skipped heartbeats")
    reconnect
  end
  @connection.on_closed do
    @logger.debug("transport connection closed")
  end
end