mt/broker.cpp

A multithreaded broker, that will work on any multi-threaded container. See mt/epoll_containercpp for an example of a multi-threaded container.

__Requires C++11__

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

#include "../options.hpp"
#include "mt_container.hpp"

#include <proton/connection.hpp>
#include <proton/default_container.hpp>
#include <proton/delivery.hpp>
#include <proton/error_condition.hpp>
#include <proton/listen_handler.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/thread_safe.hpp>

#include <atomic>
#include <functional>
#include <iostream>
#include <mutex>
#include <thread>

#include "../fake_cpp11.hpp"

// Thread safe queue.
// Stores messages, notifies subscribed connections when there is data.
class queue {
  public:
    queue(const std::string& name) : name_(name) {}

    std::string name() const { return name_; }

    // Push a message onto the queue.
    // If the queue was previously empty, notify subscribers it has messages.
    // Called from receiver's connection.
    void push(const proton::message &m) {
        std::lock_guard<std::mutex> g(lock_);
        messages_.push_back(m);
        if (messages_.size() == 1) { // Non-empty, notify subscribers
            for (auto cb : callbacks_)
                cb(this);
            callbacks_.clear();
        }
    }

    // If the queue is not empty, pop a message into m and return true.
    // Otherwise save callback to be called when there are messages and return false.
    // Called from sender's connection.
    bool pop(proton::message& m, std::function<void(queue*)> callback) {
        std::lock_guard<std::mutex> g(lock_);
        if (messages_.empty()) {
            callbacks_.push_back(callback);
            return false;
        } else {
            m = std::move(messages_.front());
            messages_.pop_front();
            return true;
        }
    }

  private:
    const std::string name_;
    std::mutex lock_;
    std::deque<proton::message> messages_;
    std::vector<std::function<void(queue*)> > callbacks_;
};

class queues {
  public:
    queues() : next_id_(0) {}

    // Get or create the named queue.
    queue* get(const std::string& name) {
        std::lock_guard<std::mutex> g(lock_);
        auto i = queues_.insert(queue_map::value_type(name, nullptr)).first;
        if (!i->second)
            i->second.reset(new queue(name));
        return i->second.get();
    }

    // Create a dynamic queue with a unique name.
    queue* dynamic() {
        std::ostringstream os;
        os << "_dynamic_" << next_id_++;
        return get(os.str());
    }

  private:
    typedef std::map<std::string, std::unique_ptr<queue> > queue_map;

    std::mutex lock_;
    queue_map queues_;
    std::atomic<uint64_t> next_id_; // Use to generate unique queue IDs.
};

class broker_connection_handler : public proton::messaging_handler {
  public:
    broker_connection_handler(queues& qs) : queues_(qs) {}

    void on_connection_open(proton::connection& c) OVERRIDE {
        // Create the has_messages callback for queue subscriptions.
        //
        // Make a std::shared_ptr to a thread_safe handle for our proton::connection.
        // The connection's proton::event_loop will remain valid as a shared_ptr exists.
        std::shared_ptr<proton::thread_safe<proton::connection> > ts_c = make_shared_thread_safe(c);

        // Make a lambda function to inject a call to this->has_messages() via the proton::event_loop.
        // The function is bound to a shared_ptr so this is safe. If the connection has already closed
        // proton::event_loop::inject() will drop the callback.
        has_messages_callback_ = [this, ts_c](queue* q) mutable {
            ts_c->event_loop()->inject(
                std::bind(&broker_connection_handler::has_messages, this, q));
        };

        c.open();            // Accept the connection
    }

    // A sender sends messages from a queue to a subscriber.
    void on_sender_open(proton::sender &sender) OVERRIDE {
        queue *q = sender.source().dynamic() ?
            queues_.dynamic() : queues_.get(sender.source().address());
        std::cout << "sending from " << q->name() << std::endl;
    }

    // We have credit to send a message.
    void on_sendable(proton::sender &s) OVERRIDE {
        queue* q = sender_queue(s);
        if (!do_send(q, s))     // Queue is empty, save ourselves in the blocked set.
            blocked_.insert(std::make_pair(q, s));
    }

    // A receiver receives messages from a publisher to a queue.
    void on_receiver_open(proton::receiver &r) OVERRIDE {
        std::string qname = r.target().address();
        if (qname == "shutdown") {
            std::cout << "broker shutting down" << std::endl;
            // Sending to the special "shutdown" queue stops the broker.
            r.connection().container().stop(
                proton::error_condition("shutdown", "stop broker"));
        } else {
            std::cout << "receiving to " << qname << std::endl;
        }
    }

    // A message is received.
    void on_message(proton::delivery &d, proton::message &m) OVERRIDE {
        std::string qname = d.receiver().target().address();
        queues_.get(qname)->push(m);
    }

    void on_session_close(proton::session &session) OVERRIDE {
        // Erase all blocked senders that belong to session.
        auto predicate = [session](const proton::sender& s) {
            return s.session() == session;
        };
        erase_sender_if(blocked_.begin(), blocked_.end(), predicate);
    }

    void on_sender_close(proton::sender &sender) OVERRIDE {
        // Erase sender from the blocked set.
        auto range = blocked_.equal_range(sender_queue(sender));
        auto predicate = [sender](const proton::sender& s) { return s == sender; };
        erase_sender_if(range.first, range.second, predicate);
    }

    void on_error(const proton::error_condition& e) OVERRIDE {
        std::cerr << "error: " << e.what() << std::endl;
    }
    // The container calls on_transport_close() last.
    void on_transport_close(proton::transport&) OVERRIDE {
        delete this;            // All done.
    }

  private:
    typedef std::multimap<queue*, proton::sender> blocked_map;

    // Get the queue associated with a sender.
    queue* sender_queue(const proton::sender& s) {
        return queues_.get(s.source().address()); // Thread safe.
    }

    // Only called if we have credit. Return true if we sent a message.
    bool do_send(queue* q, proton::sender &s) {
        proton::message m;
        bool popped =  q->pop(m, has_messages_callback_);
        if (popped)
            s.send(m);
        return popped;
    }

    // Called via the connection's proton::event_loop when q has messages.
    // Try all the blocked senders.
    void has_messages(queue* q) {
        auto range = blocked_.equal_range(q);
        for (auto i = range.first; i != range.second;) {
            if (i->second.credit() <= 0 || do_send(q, i->second))
                i = blocked_.erase(i); // No credit or send was successful, stop blocked.
            else
                ++i;            // have credit, didn't send, keep blocked
        }
    }

    // Use to erase closed senders from blocked_ set.
    template <class Predicate>
    void erase_sender_if(blocked_map::iterator begin, blocked_map::iterator end, Predicate p) {
        for (auto i = begin; i != end; ) {
            if (p(i->second))
                i = blocked_.erase(i);
            else
                ++i;
        }
    }

    queues& queues_;
    blocked_map blocked_;
    std::function<void(queue*)> has_messages_callback_;
    proton::connection connection_;
};


class broker {
  public:
    broker(const std::string addr) :
        container_(make_mt_container("mt_broker")), listener_(queues_)
    {
        container_->listen(addr, listener_);
        std::cout << "broker listening on " << addr << std::endl;
    }

    void run() {
        std::vector<std::thread> threads(std::thread::hardware_concurrency()-1);
        for (auto& t : threads)
            t = std::thread(&proton::container::run, container_.get());
        container_->run();      // Use this thread too.
        for (auto& t : threads)
            t.join();
    }

  private:
    struct listener : public proton::listen_handler {
        listener(queues& qs) : queues_(qs) {}

        proton::connection_options on_accept() OVERRIDE{
            return proton::connection_options().handler(*(new broker_connection_handler(queues_)));
        }

        void on_error(const std::string& s) OVERRIDE {
            std::cerr << "listen error: " << s << std::endl;
            throw std::runtime_error(s);
        }
        queues& queues_;
    };

    queues queues_;
    std::unique_ptr<proton::container> container_;
    listener listener_;
};

int main(int argc, char **argv) {
    // Command line options
    std::string address("0.0.0.0");
    example::options opts(argc, argv);
    opts.add_value(address, 'a', "address", "listen on URL", "URL");
    try {
        opts.parse();
        broker(address).run();
        return 0;
    } catch (const example::bad_option& e) {
        std::cout << opts << std::endl << e.what() << std::endl;
    } catch (const std::exception& e) {
        std::cerr << "broker shutdown: " << e.what() << std::endl;
    }
    return 1;
}

Generated on 27 Jan 2017 for Qpid Proton C++ by  doxygen 1.6.1