A simple, single-threaded broker using the `protoncontainer`. You can use this to run other examples that reqiure an intermediary, or you can use any AMQP 1.0 broker. This broker creates queues automatically when a client tries to send or subscribe.
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include "../options.hpp" #include "mt_container.hpp" #include <proton/connection.hpp> #include <proton/default_container.hpp> #include <proton/delivery.hpp> #include <proton/error_condition.hpp> #include <proton/listen_handler.hpp> #include <proton/messaging_handler.hpp> #include <proton/thread_safe.hpp> #include <atomic> #include <functional> #include <iostream> #include <mutex> #include <thread> #include "../fake_cpp11.hpp" // Thread safe queue. // Stores messages, notifies subscribed connections when there is data. class queue { public: queue(const std::string& name) : name_(name) {} std::string name() const { return name_; } // Push a message onto the queue. // If the queue was previously empty, notify subscribers it has messages. // Called from receiver's connection. void push(const proton::message &m) { std::lock_guard<std::mutex> g(lock_); messages_.push_back(m); if (messages_.size() == 1) { // Non-empty, notify subscribers for (auto cb : callbacks_) cb(this); callbacks_.clear(); } } // If the queue is not empty, pop a message into m and return true. // Otherwise save callback to be called when there are messages and return false. // Called from sender's connection. bool pop(proton::message& m, std::function<void(queue*)> callback) { std::lock_guard<std::mutex> g(lock_); if (messages_.empty()) { callbacks_.push_back(callback); return false; } else { m = std::move(messages_.front()); messages_.pop_front(); return true; } } private: const std::string name_; std::mutex lock_; std::deque<proton::message> messages_; std::vector<std::function<void(queue*)> > callbacks_; }; class queues { public: queues() : next_id_(0) {} // Get or create the named queue. queue* get(const std::string& name) { std::lock_guard<std::mutex> g(lock_); auto i = queues_.insert(queue_map::value_type(name, nullptr)).first; if (!i->second) i->second.reset(new queue(name)); return i->second.get(); } // Create a dynamic queue with a unique name. queue* dynamic() { std::ostringstream os; os << "_dynamic_" << next_id_++; return get(os.str()); } private: typedef std::map<std::string, std::unique_ptr<queue> > queue_map; std::mutex lock_; queue_map queues_; std::atomic<uint64_t> next_id_; // Use to generate unique queue IDs. }; class broker_connection_handler : public proton::messaging_handler { public: broker_connection_handler(queues& qs) : queues_(qs) {} void on_connection_open(proton::connection& c) OVERRIDE { // Create the has_messages callback for queue subscriptions. // // Make a std::shared_ptr to a thread_safe handle for our proton::connection. // The connection's proton::event_loop will remain valid as a shared_ptr exists. std::shared_ptr<proton::thread_safe<proton::connection> > ts_c = make_shared_thread_safe(c); // Make a lambda function to inject a call to this->has_messages() via the proton::event_loop. // The function is bound to a shared_ptr so this is safe. If the connection has already closed // proton::event_loop::inject() will drop the callback. has_messages_callback_ = [this, ts_c](queue* q) mutable { ts_c->event_loop()->inject( std::bind(&broker_connection_handler::has_messages, this, q)); }; c.open(); // Accept the connection } // A sender sends messages from a queue to a subscriber. void on_sender_open(proton::sender &sender) OVERRIDE { queue *q = sender.source().dynamic() ? queues_.dynamic() : queues_.get(sender.source().address()); std::cout << "sending from " << q->name() << std::endl; } // We have credit to send a message. void on_sendable(proton::sender &s) OVERRIDE { queue* q = sender_queue(s); if (!do_send(q, s)) // Queue is empty, save ourselves in the blocked set. blocked_.insert(std::make_pair(q, s)); } // A receiver receives messages from a publisher to a queue. void on_receiver_open(proton::receiver &r) OVERRIDE { std::string qname = r.target().address(); if (qname == "shutdown") { std::cout << "broker shutting down" << std::endl; // Sending to the special "shutdown" queue stops the broker. r.connection().container().stop( proton::error_condition("shutdown", "stop broker")); } else { std::cout << "receiving to " << qname << std::endl; } } // A message is received. void on_message(proton::delivery &d, proton::message &m) OVERRIDE { std::string qname = d.receiver().target().address(); queues_.get(qname)->push(m); } void on_session_close(proton::session &session) OVERRIDE { // Erase all blocked senders that belong to session. auto predicate = [session](const proton::sender& s) { return s.session() == session; }; erase_sender_if(blocked_.begin(), blocked_.end(), predicate); } void on_sender_close(proton::sender &sender) OVERRIDE { // Erase sender from the blocked set. auto range = blocked_.equal_range(sender_queue(sender)); auto predicate = [sender](const proton::sender& s) { return s == sender; }; erase_sender_if(range.first, range.second, predicate); } void on_error(const proton::error_condition& e) OVERRIDE { std::cerr << "error: " << e.what() << std::endl; } // The container calls on_transport_close() last. void on_transport_close(proton::transport&) OVERRIDE { delete this; // All done. } private: typedef std::multimap<queue*, proton::sender> blocked_map; // Get the queue associated with a sender. queue* sender_queue(const proton::sender& s) { return queues_.get(s.source().address()); // Thread safe. } // Only called if we have credit. Return true if we sent a message. bool do_send(queue* q, proton::sender &s) { proton::message m; bool popped = q->pop(m, has_messages_callback_); if (popped) s.send(m); return popped; } // Called via the connection's proton::event_loop when q has messages. // Try all the blocked senders. void has_messages(queue* q) { auto range = blocked_.equal_range(q); for (auto i = range.first; i != range.second;) { if (i->second.credit() <= 0 || do_send(q, i->second)) i = blocked_.erase(i); // No credit or send was successful, stop blocked. else ++i; // have credit, didn't send, keep blocked } } // Use to erase closed senders from blocked_ set. template <class Predicate> void erase_sender_if(blocked_map::iterator begin, blocked_map::iterator end, Predicate p) { for (auto i = begin; i != end; ) { if (p(i->second)) i = blocked_.erase(i); else ++i; } } queues& queues_; blocked_map blocked_; std::function<void(queue*)> has_messages_callback_; proton::connection connection_; }; class broker { public: broker(const std::string addr) : container_(make_mt_container("mt_broker")), listener_(queues_) { container_->listen(addr, listener_); std::cout << "broker listening on " << addr << std::endl; } void run() { std::vector<std::thread> threads(std::thread::hardware_concurrency()-1); for (auto& t : threads) t = std::thread(&proton::container::run, container_.get()); container_->run(); // Use this thread too. for (auto& t : threads) t.join(); } private: struct listener : public proton::listen_handler { listener(queues& qs) : queues_(qs) {} proton::connection_options on_accept() OVERRIDE{ return proton::connection_options().handler(*(new broker_connection_handler(queues_))); } void on_error(const std::string& s) OVERRIDE { std::cerr << "listen error: " << s << std::endl; throw std::runtime_error(s); } queues& queues_; }; queues queues_; std::unique_ptr<proton::container> container_; listener listener_; }; int main(int argc, char **argv) { // Command line options std::string address("0.0.0.0"); example::options opts(argc, argv); opts.add_value(address, 'a', "address", "listen on URL", "URL"); try { opts.parse(); broker(address).run(); return 0; } catch (const example::bad_option& e) { std::cout << opts << std::endl << e.what() << std::endl; } catch (const std::exception& e) { std::cerr << "broker shutdown: " << e.what() << std::endl; } return 1; }