Package proton ::
Module utils
|
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 import collections, socket, time, threading
20
21 from proton import ConnectionException, Delivery, Endpoint, Handler, Link, LinkException, Message
22 from proton import ProtonException, Timeout, Url
23 from proton.reactor import Container
24 from proton.handlers import MessagingHandler, IncomingMessageHandler
29 self.connection = connection
30 self.link = link
31 self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_UNINIT),
32 msg="Opening link %s" % link.name)
33 self._checkClosed()
34
36 try:
37 self.connection.wait(lambda: self.link.state & Endpoint.REMOTE_CLOSED,
38 timeout=timeout,
39 msg="Opening link %s" % self.link.name)
40 except Timeout as e: pass
41 self._checkClosed()
42
44 if self.link.state & Endpoint.REMOTE_CLOSED:
45 self.link.close()
46 raise LinkDetached(self.link)
47
49 self.link.close()
50 self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_ACTIVE),
51 msg="Closing link %s" % self.link.name)
52
53
54 - def __getattr__(self, name): return getattr(self.link, name)
55
57 """
58 Exception used to indicate an exceptional state/condition on a send request
59 """
62
64 return delivery.settled or delivery.link.snd_settle_mode == Link.SND_SETTLED
65
68 super(BlockingSender, self).__init__(connection, sender)
69 if self.link.target and self.link.target.address and self.link.target.address != self.link.remote_target.address:
70
71 self._waitForClose()
72
73 self.link.close()
74 raise LinkException("Failed to open sender %s, target does not match" % self.link.name)
75
76 - def send(self, msg, timeout=False, error_states=None):
77 delivery = self.link.send(msg)
78 self.connection.wait(lambda: _is_settled(delivery), msg="Sending on sender %s" % self.link.name, timeout=timeout)
79 if delivery.link.snd_settle_mode != Link.SND_SETTLED:
80 delivery.settle()
81 bad = error_states
82 if bad is None:
83 bad = [Delivery.REJECTED, Delivery.RELEASED]
84 if delivery.remote_state in bad:
85 raise SendException(delivery.remote_state)
86 return delivery
87
89 - def __init__(self, connection, prefetch):
90 super(Fetcher, self).__init__(prefetch=prefetch, auto_accept=False)
91 self.connection = connection
92 self.incoming = collections.deque([])
93 self.unsettled = collections.deque([])
94
96 self.incoming.append((event.message, event.delivery))
97 self.connection.container.yield_()
98
100 if event.link.state & Endpoint.LOCAL_ACTIVE:
101 event.link.close()
102 raise LinkDetached(event.link)
103
106
107 @property
109 return len(self.incoming)
110
112 message, delivery = self.incoming.popleft()
113 if not delivery.settled:
114 self.unsettled.append(delivery)
115 return message
116
117 - def settle(self, state=None):
118 delivery = self.unsettled.popleft()
119 if state:
120 delivery.update(state)
121 delivery.settle()
122
125 - def __init__(self, connection, receiver, fetcher, credit=1):
126 super(BlockingReceiver, self).__init__(connection, receiver)
127 if self.link.source and self.link.source.address and self.link.source.address != self.link.remote_source.address:
128
129 self._waitForClose()
130
131 self.link.close()
132 raise LinkException("Failed to open receiver %s, source does not match" % self.link.name)
133 if credit: receiver.flow(credit)
134 self.fetcher = fetcher
135
137 self.fetcher = None
138 self.link.handler = None
139
141 if not self.fetcher:
142 raise Exception("Can't call receive on this receiver as a handler was provided")
143 if not self.link.credit:
144 self.link.flow(1)
145 self.connection.wait(lambda: self.fetcher.has_message, msg="Receiving on receiver %s" % self.link.name, timeout=timeout)
146 return self.fetcher.pop()
147
149 self.settle(Delivery.ACCEPTED)
150
152 self.settle(Delivery.REJECTED)
153
154 - def release(self, delivered=True):
155 if delivered:
156 self.settle(Delivery.MODIFIED)
157 else:
158 self.settle(Delivery.RELEASED)
159
160 - def settle(self, state=None):
161 if not self.fetcher:
162 raise Exception("Can't call accept/reject etc on this receiver as a handler was provided")
163 self.fetcher.settle(state)
164
168 self.link = link
169 if link.is_sender:
170 txt = "sender %s to %s closed" % (link.name, link.target.address)
171 else:
172 txt = "receiver %s from %s closed" % (link.name, link.source.address)
173 if link.remote_condition:
174 txt += " due to: %s" % link.remote_condition
175 self.condition = link.remote_condition.name
176 else:
177 txt += " by peer"
178 self.condition = None
179 super(LinkDetached, self).__init__(txt)
180
193
196 """
197 A synchronous style connection wrapper.
198 """
199 - def __init__(self, url, timeout=None, container=None, ssl_domain=None, heartbeat=None):
200 self.disconnected = False
201 self.timeout = timeout or 60
202 self.container = container or Container()
203 self.container.timeout = self.timeout
204 self.container.start()
205 self.url = Url(url).defaults()
206 self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat)
207 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT),
208 msg="Opening connection")
209
210 - def create_sender(self, address, handler=None, name=None, options=None):
212
213 - def create_receiver(self, address, credit=None, dynamic=False, handler=None, name=None, options=None):
214 prefetch = credit
215 if handler:
216 fetcher = None
217 if prefetch is None:
218 prefetch = 1
219 else:
220 fetcher = Fetcher(self, credit)
221 return BlockingReceiver(
222 self, self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, options=options), fetcher, credit=prefetch)
223
225 self.conn.close()
226 try:
227 self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE),
228 msg="Closing connection")
229 finally:
230 self.conn = None
231 self.container = None
232
234 return self.conn.state & (Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED)
235
237 """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """
238 while self.container.process(): pass
239
240 - def wait(self, condition, timeout=False, msg=None):
241 """Call process until condition() is true"""
242 if timeout is False:
243 timeout = self.timeout
244 if timeout is None:
245 while not condition() and not self.disconnected:
246 self.container.process()
247 else:
248 container_timeout = self.container.timeout
249 self.container.timeout = timeout
250 try:
251 deadline = time.time() + timeout
252 while not condition() and not self.disconnected:
253 self.container.process()
254 if deadline < time.time():
255 txt = "Connection %s timed out" % self.url
256 if msg: txt += ": " + msg
257 raise Timeout(txt)
258 finally:
259 self.container.timeout = container_timeout
260 if self.disconnected or self._is_closed():
261 self.container.stop()
262 self.conn.handler = None
263 if self.disconnected and not self._is_closed():
264 raise ConnectionException("Connection %s disconnected" % self.url)
265
267 if event.link.state & Endpoint.LOCAL_ACTIVE:
268 event.link.close()
269 raise LinkDetached(event.link)
270
275
278
281
283 self.disconnected = True
284
287 """Thread-safe atomic counter. Start at start, increment by step."""
288 self.count, self.step = start, step
289 self.lock = threading.Lock()
290
292 """Get the next value"""
293 self.lock.acquire()
294 self.count += self.step;
295 result = self.count
296 self.lock.release()
297 return result
298
300 """
301 Implementation of the synchronous request-responce (aka RPC) pattern.
302 @ivar address: Address for all requests, may be None.
303 @ivar connection: Connection for requests and responses.
304 """
305
306 correlation_id = AtomicCount()
307
308 - def __init__(self, connection, address=None):
309 """
310 Send requests and receive responses. A single instance can send many requests
311 to the same or different addresses.
312
313 @param connection: A L{BlockingConnection}
314 @param address: Address for all requests.
315 If not specified, each request must have the address property set.
316 Sucessive messages may have different addresses.
317 """
318 super(SyncRequestResponse, self).__init__()
319 self.connection = connection
320 self.address = address
321 self.sender = self.connection.create_sender(self.address)
322
323
324 self.receiver = self.connection.create_receiver(None, dynamic=True, credit=1, handler=self)
325 self.response = None
326
327 - def call(self, request):
328 """
329 Send a request message, wait for and return the response message.
330
331 @param request: A L{proton.Message}. If L{self.address} is not set the
332 L{self.address} must be set and will be used.
333 """
334 if not self.address and not request.address:
335 raise ValueError("Request message has no address: %s" % request)
336 request.reply_to = self.reply_to
337 request.correlation_id = correlation_id = self.correlation_id.next()
338 self.sender.send(request)
339 def wakeup():
340 return self.response and (self.response.correlation_id == correlation_id)
341 self.connection.wait(wakeup, msg="Waiting for response")
342 response = self.response
343 self.response = None
344 self.receiver.flow(1)
345 return response
346
347 @property
349 """Return the dynamic address of our receiver."""
350 return self.receiver.remote_source.address
351
353 """Called when we receive a message for our receiver."""
354 self.response = event.message
355 self.connection.container.yield_()
356