Module proton
[frames] | no frames]

Source Code for Module proton

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  # 
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  # 
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  """ 
  21  The proton module defines a suite of APIs that implement the AMQP 1.0 
  22  protocol. 
  23   
  24  The proton APIs consist of the following classes: 
  25   
  26   - L{Messenger} -- A messaging endpoint. 
  27   - L{Message}   -- A class for creating and/or accessing AMQP message content. 
  28   - L{Data}      -- A class for creating and/or accessing arbitrary AMQP encoded 
  29                    data. 
  30   
  31  """ 
  32   
  33  from cproton import * 
  34   
  35  import weakref, re, socket 
  36  try: 
  37    import uuid 
  38  except ImportError: 
  39    """ 
  40    No 'native' UUID support.  Provide a very basic UUID type that is a compatible subset of the uuid type provided by more modern python releases. 
  41    """ 
  42    import struct 
43 - class uuid:
44 - class UUID:
45 - def __init__(self, hex=None, bytes=None):
46 if [hex, bytes].count(None) != 1: 47 raise TypeError("need one of hex or bytes") 48 if bytes is not None: 49 self.bytes = bytes 50 elif hex is not None: 51 fields=hex.split("-") 52 fields[4:5] = [fields[4][:4], fields[4][4:]] 53 self.bytes = struct.pack("!LHHHHL", *[int(x,16) for x in fields])
54
55 - def __cmp__(self, other):
56 if isinstance(other, uuid.UUID): 57 return cmp(self.bytes, other.bytes) 58 else: 59 return -1
60
61 - def __str__(self):
62 return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes)
63
64 - def __repr__(self):
65 return "UUID(%r)" % str(self)
66
67 - def __hash__(self):
68 return self.bytes.__hash__()
69 70 import os, random, socket, time 71 rand = random.Random() 72 rand.seed((os.getpid(), time.time(), socket.gethostname()))
73 - def random_uuid():
74 bytes = [rand.randint(0, 255) for i in xrange(16)] 75 76 # From RFC4122, the version bits are set to 0100 77 bytes[7] &= 0x0F 78 bytes[7] |= 0x40 79 80 # From RFC4122, the top two bits of byte 8 get set to 01 81 bytes[8] &= 0x3F 82 bytes[8] |= 0x80 83 return "".join(map(chr, bytes))
84
85 - def uuid4():
86 return uuid.UUID(bytes=random_uuid())
87 88 try: 89 bytes() 90 except NameError: 91 bytes = str 92 93 VERSION_MAJOR = PN_VERSION_MAJOR 94 VERSION_MINOR = PN_VERSION_MINOR 95 API_LANGUAGE = "C" 96 IMPLEMENTATION_LANGUAGE = "C"
97 98 -class Constant(object):
99
100 - def __init__(self, name):
101 self.name = name
102
103 - def __repr__(self):
104 return self.name
105
106 -class ProtonException(Exception):
107 """ 108 The root of the proton exception hierarchy. All proton exception 109 classes derive from this exception. 110 """ 111 pass
112
113 -class Timeout(ProtonException):
114 """ 115 A timeout exception indicates that a blocking operation has timed 116 out. 117 """ 118 pass
119
120 -class Interrupt(ProtonException):
121 """ 122 An interrupt exception indicaes that a blocking operation was interrupted. 123 """ 124 pass
125
126 -class MessengerException(ProtonException):
127 """ 128 The root of the messenger exception hierarchy. All exceptions 129 generated by the messenger class derive from this exception. 130 """ 131 pass
132
133 -class MessageException(ProtonException):
134 """ 135 The MessageException class is the root of the message exception 136 hierarhcy. All exceptions generated by the Message class derive from 137 this exception. 138 """ 139 pass
140 141 EXCEPTIONS = { 142 PN_TIMEOUT: Timeout, 143 PN_INTR: Interrupt 144 } 145 146 PENDING = Constant("PENDING") 147 ACCEPTED = Constant("ACCEPTED") 148 REJECTED = Constant("REJECTED") 149 RELEASED = Constant("RELEASED") 150 ABORTED = Constant("ABORTED") 151 SETTLED = Constant("SETTLED") 152 153 STATUSES = { 154 PN_STATUS_ABORTED: ABORTED, 155 PN_STATUS_ACCEPTED: ACCEPTED, 156 PN_STATUS_REJECTED: REJECTED, 157 PN_STATUS_RELEASED: RELEASED, 158 PN_STATUS_PENDING: PENDING, 159 PN_STATUS_SETTLED: SETTLED, 160 PN_STATUS_UNKNOWN: None 161 } 162 163 AUTOMATIC = Constant("AUTOMATIC") 164 MANUAL = Constant("MANUAL")
165 166 -class Messenger(object):
167 """ 168 The L{Messenger} class defines a high level interface for sending 169 and receiving L{Messages<Message>}. Every L{Messenger} contains a 170 single logical queue of incoming messages and a single logical queue 171 of outgoing messages. These messages in these queues may be destined 172 for, or originate from, a variety of addresses. 173 174 The messenger interface is single-threaded. All methods 175 except one (L{interrupt}) are intended to be used from within 176 the messenger thread. 177 178 179 Address Syntax 180 ============== 181 182 An address has the following form:: 183 184 [ amqp[s]:// ] [user[:password]@] domain [/[name]] 185 186 Where domain can be one of:: 187 188 host | host:port | ip | ip:port | name 189 190 The following are valid examples of addresses: 191 192 - example.org 193 - example.org:1234 194 - amqp://example.org 195 - amqps://example.org 196 - example.org/incoming 197 - amqps://example.org/outgoing 198 - amqps://fred:trustno1@example.org 199 - 127.0.0.1:1234 200 - amqps://127.0.0.1:1234 201 202 Sending & Receiving Messages 203 ============================ 204 205 The L{Messenger} class works in conjuction with the L{Message} class. The 206 L{Message} class is a mutable holder of message content. 207 208 The L{put} method copies its L{Message} to the outgoing queue, and may 209 send queued messages if it can do so without blocking. The L{send} 210 method blocks until it has sent the requested number of messages, 211 or until a timeout interrupts the attempt. 212 213 214 >>> message = Message() 215 >>> for i in range(3): 216 ... message.address = "amqp://host/queue" 217 ... message.subject = "Hello World %i" % i 218 ... messenger.put(message) 219 >>> messenger.send() 220 221 Similarly, the L{recv} method receives messages into the incoming 222 queue, and may block as it attempts to receive the requested number 223 of messages, or until timeout is reached. It may receive fewer 224 than the requested number. The L{get} method pops the 225 eldest L{Message} off the incoming queue and copies it into the L{Message} 226 object that you supply. It will not block. 227 228 229 >>> message = Message() 230 >>> messenger.recv(10): 231 >>> while messenger.incoming > 0: 232 ... messenger.get(message) 233 ... print message.subject 234 Hello World 0 235 Hello World 1 236 Hello World 2 237 238 The blocking flag allows you to turn off blocking behavior entirely, 239 in which case L{send} and L{recv} will do whatever they can without 240 blocking, and then return. You can then look at the number 241 of incoming and outgoing messages to see how much outstanding work 242 still remains. 243 """ 244
245 - def __init__(self, name=None):
246 """ 247 Construct a new L{Messenger} with the given name. The name has 248 global scope. If a NULL name is supplied, a UUID based name will 249 be chosen. 250 251 @type name: string 252 @param name: the name of the messenger or None 253 254 """ 255 self._mng = pn_messenger(name) 256 self._selectables = {}
257
258 - def __del__(self):
259 """ 260 Destroy the L{Messenger}. This will close all connections that 261 are managed by the L{Messenger}. Call the L{stop} method before 262 destroying the L{Messenger}. 263 """ 264 if hasattr(self, "_mng"): 265 pn_messenger_free(self._mng) 266 del self._mng
267
268 - def _check(self, err):
269 if err < 0: 270 if (err == PN_INPROGRESS): 271 return 272 exc = EXCEPTIONS.get(err, MessengerException) 273 raise exc("[%s]: %s" % (err, pn_error_text(pn_messenger_error(self._mng)))) 274 else: 275 return err
276 277 @property
278 - def name(self):
279 """ 280 The name of the L{Messenger}. 281 """ 282 return pn_messenger_name(self._mng)
283
284 - def _get_certificate(self):
285 return pn_messenger_get_certificate(self._mng)
286
287 - def _set_certificate(self, value):
288 self._check(pn_messenger_set_certificate(self._mng, value))
289 290 certificate = property(_get_certificate, _set_certificate, 291 doc=""" 292 Path to a certificate file for the L{Messenger}. This certificate is 293 used when the L{Messenger} accepts or establishes SSL/TLS connections. 294 This property must be specified for the L{Messenger} to accept 295 incoming SSL/TLS connections and to establish client authenticated 296 outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS 297 connections do not require this property. 298 """) 299
300 - def _get_private_key(self):
301 return pn_messenger_get_private_key(self._mng)
302
303 - def _set_private_key(self, value):
304 self._check(pn_messenger_set_private_key(self._mng, value))
305 306 private_key = property(_get_private_key, _set_private_key, 307 doc=""" 308 Path to a private key file for the L{Messenger's<Messenger>} 309 certificate. This property must be specified for the L{Messenger} to 310 accept incoming SSL/TLS connections and to establish client 311 authenticated outgoing SSL/TLS connection. Non client authenticated 312 SSL/TLS connections do not require this property. 313 """) 314
315 - def _get_password(self):
316 return pn_messenger_get_password(self._mng)
317
318 - def _set_password(self, value):
319 self._check(pn_messenger_set_password(self._mng, value))
320 321 password = property(_get_password, _set_password, 322 doc=""" 323 This property contains the password for the L{Messenger.private_key} 324 file, or None if the file is not encrypted. 325 """) 326
327 - def _get_trusted_certificates(self):
328 return pn_messenger_get_trusted_certificates(self._mng)
329
330 - def _set_trusted_certificates(self, value):
331 self._check(pn_messenger_set_trusted_certificates(self._mng, value))
332 333 trusted_certificates = property(_get_trusted_certificates, 334 _set_trusted_certificates, 335 doc=""" 336 A path to a database of trusted certificates for use in verifying the 337 peer on an SSL/TLS connection. If this property is None, then the peer 338 will not be verified. 339 """) 340
341 - def _get_timeout(self):
342 t = pn_messenger_get_timeout(self._mng) 343 if t == -1: 344 return None 345 else: 346 return float(t)/1000
347
348 - def _set_timeout(self, value):
349 if value is None: 350 t = -1 351 else: 352 t = long(1000*value) 353 self._check(pn_messenger_set_timeout(self._mng, t))
354 355 timeout = property(_get_timeout, _set_timeout, 356 doc=""" 357 The timeout property contains the default timeout for blocking 358 operations performed by the L{Messenger}. 359 """) 360
361 - def _is_blocking(self):
362 return pn_messenger_is_blocking(self._mng)
363
364 - def _set_blocking(self, b):
365 self._check(pn_messenger_set_blocking(self._mng, b))
366 367 blocking = property(_is_blocking, _set_blocking, 368 doc=""" 369 Enable or disable blocking behavior during L{Message} sending 370 and receiving. This affects every blocking call, with the 371 exception of L{work}. Currently, the affected calls are 372 L{send}, L{recv}, and L{stop}. 373 """) 374
375 - def _is_passive(self):
376 return pn_messenger_is_passive(self._mng)
377
378 - def _set_passive(self, b):
379 self._check(pn_messenger_set_passive(self._mng, b))
380 381 passive = property(_is_passive, _set_passive, 382 doc=""" 383 When passive is set to true, Messenger will not attempt to perform I/O 384 internally. In this mode it is necessary to use the selectables API to 385 drive any I/O needed to perform requested actions. In this mode 386 Messenger will never block. 387 """) 388
389 - def _get_incoming_window(self):
390 return pn_messenger_get_incoming_window(self._mng)
391
392 - def _set_incoming_window(self, window):
393 self._check(pn_messenger_set_incoming_window(self._mng, window))
394 395 incoming_window = property(_get_incoming_window, _set_incoming_window, 396 doc=""" 397 The incoming tracking window for the messenger. The messenger will 398 track the remote status of this many incoming deliveries after they 399 have been accepted or rejected. Defaults to zero. 400 401 L{Messages<Message>} enter this window only when you take them into your application 402 using L{get}. If your incoming window size is I{n}, and you get I{n}+1 L{messages<Message>} 403 without explicitly accepting or rejecting the oldest message, then the 404 message that passes beyond the edge of the incoming window will be assigned 405 the default disposition of its link. 406 """) 407
408 - def _get_outgoing_window(self):
409 return pn_messenger_get_outgoing_window(self._mng)
410
411 - def _set_outgoing_window(self, window):
412 self._check(pn_messenger_set_outgoing_window(self._mng, window))
413 414 outgoing_window = property(_get_outgoing_window, _set_outgoing_window, 415 doc=""" 416 The outgoing tracking window for the messenger. The messenger will 417 track the remote status of this many outgoing deliveries after calling 418 send. Defaults to zero. 419 420 A L{Message} enters this window when you call the put() method with the 421 message. If your outgoing window size is I{n}, and you call L{put} I{n}+1 422 times, status information will no longer be available for the 423 first message. 424 """) 425
426 - def start(self):
427 """ 428 Currently a no-op placeholder. 429 For future compatibility, do not L{send} or L{recv} messages 430 before starting the L{Messenger}. 431 """ 432 self._check(pn_messenger_start(self._mng))
433
434 - def stop(self):
435 """ 436 Transitions the L{Messenger} to an inactive state. An inactive 437 L{Messenger} will not send or receive messages from its internal 438 queues. A L{Messenger} should be stopped before being discarded to 439 ensure a clean shutdown handshake occurs on any internally managed 440 connections. 441 """ 442 self._check(pn_messenger_stop(self._mng))
443 444 @property
445 - def stopped(self):
446 """ 447 Returns true iff a L{Messenger} is in the stopped state. 448 This function does not block. 449 """ 450 return pn_messenger_stopped(self._mng)
451
452 - def subscribe(self, source):
453 """ 454 Subscribes the L{Messenger} to messages originating from the 455 specified source. The source is an address as specified in the 456 L{Messenger} introduction with the following addition. If the 457 domain portion of the address begins with the '~' character, the 458 L{Messenger} will interpret the domain as host/port, bind to it, 459 and listen for incoming messages. For example "~0.0.0.0", 460 "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any 461 local interface and listen for incoming messages with the last 462 variant only permitting incoming SSL connections. 463 464 @type source: string 465 @param source: the source of messages to subscribe to 466 """ 467 sub_impl = pn_messenger_subscribe(self._mng, source) 468 if not sub_impl: 469 self._check(pn_error_code(pn_messenger_error(self._mng))) 470 raise MessengerException("Cannot subscribe to %s"%source) 471 return Subscription(sub_impl)
472
473 - def put(self, message):
474 """ 475 Places the content contained in the message onto the outgoing 476 queue of the L{Messenger}. This method will never block, however 477 it will send any unblocked L{Messages<Message>} in the outgoing 478 queue immediately and leave any blocked L{Messages<Message>} 479 remaining in the outgoing queue. The L{send} call may be used to 480 block until the outgoing queue is empty. The L{outgoing} property 481 may be used to check the depth of the outgoing queue. 482 483 When the content in a given L{Message} object is copied to the outgoing 484 message queue, you may then modify or discard the L{Message} object 485 without having any impact on the content in the outgoing queue. 486 487 This method returns an outgoing tracker for the L{Message}. The tracker 488 can be used to determine the delivery status of the L{Message}. 489 490 @type message: Message 491 @param message: the message to place in the outgoing queue 492 @return: a tracker 493 """ 494 message._pre_encode() 495 self._check(pn_messenger_put(self._mng, message._msg)) 496 return pn_messenger_outgoing_tracker(self._mng)
497
498 - def status(self, tracker):
499 """ 500 Gets the last known remote state of the delivery associated with 501 the given tracker. 502 503 @type tracker: tracker 504 @param tracker: the tracker whose status is to be retrieved 505 506 @return: one of None, PENDING, REJECTED, or ACCEPTED 507 """ 508 disp = pn_messenger_status(self._mng, tracker); 509 return STATUSES.get(disp, disp)
510
511 - def buffered(self, tracker):
512 """ 513 Checks if the delivery associated with the given tracker is still 514 waiting to be sent. 515 516 @type tracker: tracker 517 @param tracker: the tracker whose status is to be retrieved 518 519 @return: true if delivery is still buffered 520 """ 521 return pn_messenger_buffered(self._mng, tracker);
522
523 - def settle(self, tracker=None):
524 """ 525 Frees a L{Messenger} from tracking the status associated with a given 526 tracker. If you don't supply a tracker, all outgoing L{messages<Message>} up 527 to the most recent will be settled. 528 """ 529 if tracker is None: 530 tracker = pn_messenger_outgoing_tracker(self._mng) 531 flags = PN_CUMULATIVE 532 else: 533 flags = 0 534 self._check(pn_messenger_settle(self._mng, tracker, flags))
535
536 - def send(self, n=-1):
537 """ 538 This call will block until the indicated number of L{messages<Message>} 539 have been sent, or until the operation times out. If n is -1 this call will 540 block until all outgoing L{messages<Message>} have been sent. If n is 0 then 541 this call will send whatever it can without blocking. 542 """ 543 self._check(pn_messenger_send(self._mng, n))
544
545 - def recv(self, n=None):
546 """ 547 Receives up to I{n} L{messages<Message>} into the incoming queue. If no value 548 for I{n} is supplied, this call will receive as many L{messages<Message>} as it 549 can buffer internally. If the L{Messenger} is in blocking mode, this 550 call will block until at least one L{Message} is available in the 551 incoming queue. 552 """ 553 if n is None: 554 n = -1 555 self._check(pn_messenger_recv(self._mng, n))
556
557 - def work(self, timeout=None):
558 """ 559 Sends or receives any outstanding L{messages<Message>} queued for a L{Messenger}. 560 This will block for the indicated timeout. 561 This method may also do I/O work other than sending and receiving 562 L{messages<Message>}. For example, closing connections after messenger.L{stop}() 563 has been called. 564 """ 565 if timeout is None: 566 t = -1 567 else: 568 t = long(1000*timeout) 569 err = pn_messenger_work(self._mng, t) 570 if (err == PN_TIMEOUT): 571 return False 572 else: 573 self._check(err) 574 return True
575 576 @property
577 - def receiving(self):
578 return pn_messenger_receiving(self._mng)
579
580 - def interrupt(self):
581 """ 582 The L{Messenger} interface is single-threaded. 583 This is the only L{Messenger} function intended to be called 584 from outside of the L{Messenger} thread. 585 Call this from a non-messenger thread to interrupt 586 a L{Messenger} that is blocking. 587 This will cause any in-progress blocking call to throw 588 the L{Interrupt} exception. If there is no currently blocking 589 call, then the next blocking call will be affected, even if it 590 is within the same thread that interrupt was called from. 591 """ 592 self._check(pn_messenger_interrupt(self._mng))
593
594 - def get(self, message=None):
595 """ 596 Moves the message from the head of the incoming message queue into 597 the supplied message object. Any content in the message will be 598 overwritten. 599 600 A tracker for the incoming L{Message} is returned. The tracker can 601 later be used to communicate your acceptance or rejection of the 602 L{Message}. 603 604 If None is passed in for the L{Message} object, the L{Message} 605 popped from the head of the queue is discarded. 606 607 @type message: Message 608 @param message: the destination message object 609 @return: a tracker 610 """ 611 if message is None: 612 impl = None 613 else: 614 impl = message._msg 615 self._check(pn_messenger_get(self._mng, impl)) 616 if message is not None: 617 message._post_decode() 618 return pn_messenger_incoming_tracker(self._mng)
619
620 - def accept(self, tracker=None):
621 """ 622 Signal the sender that you have acted on the L{Message} 623 pointed to by the tracker. If no tracker is supplied, 624 then all messages that have been returned by the L{get} 625 method are accepted, except those that have already been 626 auto-settled by passing beyond your incoming window size. 627 628 @type tracker: tracker 629 @param tracker: a tracker as returned by get 630 """ 631 if tracker is None: 632 tracker = pn_messenger_incoming_tracker(self._mng) 633 flags = PN_CUMULATIVE 634 else: 635 flags = 0 636 self._check(pn_messenger_accept(self._mng, tracker, flags))
637
638 - def reject(self, tracker=None):
639 """ 640 Rejects the L{Message} indicated by the tracker. If no tracker 641 is supplied, all messages that have been returned by the L{get} 642 method are rejected, except those that have already been auto-settled 643 by passing beyond your outgoing window size. 644 645 @type tracker: tracker 646 @param tracker: a tracker as returned by get 647 """ 648 if tracker is None: 649 tracker = pn_messenger_incoming_tracker(self._mng) 650 flags = PN_CUMULATIVE 651 else: 652 flags = 0 653 self._check(pn_messenger_reject(self._mng, tracker, flags))
654 655 @property
656 - def outgoing(self):
657 """ 658 The outgoing queue depth. 659 """ 660 return pn_messenger_outgoing(self._mng)
661 662 @property
663 - def incoming(self):
664 """ 665 The incoming queue depth. 666 """ 667 return pn_messenger_incoming(self._mng)
668
669 - def route(self, pattern, address):
670 """ 671 Adds a routing rule to a L{Messenger's<Messenger>} internal routing table. 672 673 The route procedure may be used to influence how a L{Messenger} will 674 internally treat a given address or class of addresses. Every call 675 to the route procedure will result in L{Messenger} appending a routing 676 rule to its internal routing table. 677 678 Whenever a L{Message} is presented to a L{Messenger} for delivery, it 679 will match the address of this message against the set of routing 680 rules in order. The first rule to match will be triggered, and 681 instead of routing based on the address presented in the message, 682 the L{Messenger} will route based on the address supplied in the rule. 683 684 The pattern matching syntax supports two types of matches, a '%' 685 will match any character except a '/', and a '*' will match any 686 character including a '/'. 687 688 A routing address is specified as a normal AMQP address, however it 689 may additionally use substitution variables from the pattern match 690 that triggered the rule. 691 692 Any message sent to "foo" will be routed to "amqp://foo.com": 693 694 >>> messenger.route("foo", "amqp://foo.com"); 695 696 Any message sent to "foobar" will be routed to 697 "amqp://foo.com/bar": 698 699 >>> messenger.route("foobar", "amqp://foo.com/bar"); 700 701 Any message sent to bar/<path> will be routed to the corresponding 702 path within the amqp://bar.com domain: 703 704 >>> messenger.route("bar/*", "amqp://bar.com/$1"); 705 706 Route all L{messages<Message>} over TLS: 707 708 >>> messenger.route("amqp:*", "amqps:$1") 709 710 Supply credentials for foo.com: 711 712 >>> messenger.route("amqp://foo.com/*", "amqp://user:password@foo.com/$1"); 713 714 Supply credentials for all domains: 715 716 >>> messenger.route("amqp://*", "amqp://user:password@$1"); 717 718 Route all addresses through a single proxy while preserving the 719 original destination: 720 721 >>> messenger.route("amqp://%/*", "amqp://user:password@proxy/$1/$2"); 722 723 Route any address through a single broker: 724 725 >>> messenger.route("*", "amqp://user:password@broker/$1"); 726 """ 727 self._check(pn_messenger_route(self._mng, pattern, address))
728
729 - def rewrite(self, pattern, address):
730 """ 731 Similar to route(), except that the destination of 732 the L{Message} is determined before the message address is rewritten. 733 734 The outgoing address is only rewritten after routing has been 735 finalized. If a message has an outgoing address of 736 "amqp://0.0.0.0:5678", and a rewriting rule that changes its 737 outgoing address to "foo", it will still arrive at the peer that 738 is listening on "amqp://0.0.0.0:5678", but when it arrives there, 739 the receiver will see its outgoing address as "foo". 740 741 The default rewrite rule removes username and password from addresses 742 before they are transmitted. 743 """ 744 self._check(pn_messenger_rewrite(self._mng, pattern, address))
745
746 - def selectable(self):
747 impl = pn_messenger_selectable(self._mng) 748 if impl: 749 fd = pn_selectable_fd(impl) 750 sel = self._selectables.get(fd, None) 751 if sel is None: 752 sel = Selectable(self, impl) 753 self._selectables[fd] = sel 754 return sel 755 else: 756 return None
757 758 @property
759 - def deadline(self):
760 tstamp = pn_messenger_deadline(self._mng) 761 if tstamp: 762 return float(tstamp)/1000 763 else: 764 return None
765
766 -class Message(object):
767 """The L{Message} class is a mutable holder of message content. 768 769 @ivar instructions: delivery instructions for the message 770 @type instructions: dict 771 @ivar annotations: infrastructure defined message annotations 772 @type annotations: dict 773 @ivar properties: application defined message properties 774 @type properties: dict 775 @ivar body: message body 776 @type body: bytes | unicode | dict | list | int | long | float | UUID 777 """ 778 779 DATA = PN_DATA 780 TEXT = PN_TEXT 781 AMQP = PN_AMQP 782 JSON = PN_JSON 783 784 DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY 785
786 - def __init__(self, **kwargs):
787 """ 788 @param kwargs: Message property name/value pairs to initialise the Message 789 """ 790 self._msg = pn_message() 791 self._id = Data(pn_message_id(self._msg)) 792 self._correlation_id = Data(pn_message_correlation_id(self._msg)) 793 self.instructions = None 794 self.annotations = None 795 self.properties = None 796 self.body = None 797 for k,v in kwargs.iteritems(): 798 getattr(self, k) # Raise exception if it's not a valid attribute. 799 setattr(self, k, v)
800
801 - def __del__(self):
802 if hasattr(self, "_msg"): 803 pn_message_free(self._msg) 804 del self._msg
805
806 - def _check(self, err):
807 if err < 0: 808 exc = EXCEPTIONS.get(err, MessageException) 809 raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg)))) 810 else: 811 return err
812
813 - def _pre_encode(self):
814 inst = Data(pn_message_instructions(self._msg)) 815 ann = Data(pn_message_annotations(self._msg)) 816 props = Data(pn_message_properties(self._msg)) 817 body = Data(pn_message_body(self._msg)) 818 819 inst.clear() 820 if self.instructions is not None: 821 inst.put_object(self.instructions) 822 ann.clear() 823 if self.annotations is not None: 824 ann.put_object(self.annotations) 825 props.clear() 826 if self.properties is not None: 827 props.put_object(self.properties) 828 body.clear() 829 if self.body is not None: 830 body.put_object(self.body)
831
832 - def _post_decode(self):
833 inst = Data(pn_message_instructions(self._msg)) 834 ann = Data(pn_message_annotations(self._msg)) 835 props = Data(pn_message_properties(self._msg)) 836 body = Data(pn_message_body(self._msg)) 837 838 if inst.next(): 839 self.instructions = inst.get_object() 840 else: 841 self.instructions = None 842 if ann.next(): 843 self.annotations = ann.get_object() 844 else: 845 self.annotations = None 846 if props.next(): 847 self.properties = props.get_object() 848 else: 849 self.properties = None 850 if body.next(): 851 self.body = body.get_object() 852 else: 853 self.body = None
854
855 - def clear(self):
856 """ 857 Clears the contents of the L{Message}. All fields will be reset to 858 their default values. 859 """ 860 pn_message_clear(self._msg) 861 self.instructions = None 862 self.annotations = None 863 self.properties = None 864 self.body = None
865
866 - def _is_inferred(self):
867 return pn_message_is_inferred(self._msg)
868
869 - def _set_inferred(self, value):
870 self._check(pn_message_set_inferred(self._msg, bool(value)))
871 872 inferred = property(_is_inferred, _set_inferred, doc=""" 873 The inferred flag for a message indicates how the message content 874 is encoded into AMQP sections. If inferred is true then binary and 875 list values in the body of the message will be encoded as AMQP DATA 876 and AMQP SEQUENCE sections, respectively. If inferred is false, 877 then all values in the body of the message will be encoded as AMQP 878 VALUE sections regardless of their type. 879 """) 880
881 - def _is_durable(self):
882 return pn_message_is_durable(self._msg)
883
884 - def _set_durable(self, value):
885 self._check(pn_message_set_durable(self._msg, bool(value)))
886 887 durable = property(_is_durable, _set_durable, 888 doc=""" 889 The durable property indicates that the message should be held durably 890 by any intermediaries taking responsibility for the message. 891 """) 892
893 - def _get_priority(self):
894 return pn_message_get_priority(self._msg)
895
896 - def _set_priority(self, value):
897 self._check(pn_message_set_priority(self._msg, value))
898 899 priority = property(_get_priority, _set_priority, 900 doc=""" 901 The priority of the message. 902 """) 903
904 - def _get_ttl(self):
905 return pn_message_get_ttl(self._msg)
906
907 - def _set_ttl(self, value):
908 self._check(pn_message_set_ttl(self._msg, value))
909 910 ttl = property(_get_ttl, _set_ttl, 911 doc=""" 912 The time to live of the message measured in milliseconds. Expired 913 messages may be dropped. 914 """) 915
916 - def _is_first_acquirer(self):
917 return pn_message_is_first_acquirer(self._msg)
918
919 - def _set_first_acquirer(self, value):
920 self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
921 922 first_acquirer = property(_is_first_acquirer, _set_first_acquirer, 923 doc=""" 924 True iff the recipient is the first to acquire the message. 925 """) 926
927 - def _get_delivery_count(self):
928 return pn_message_get_delivery_count(self._msg)
929
930 - def _set_delivery_count(self, value):
931 self._check(pn_message_set_delivery_count(self._msg, value))
932 933 delivery_count = property(_get_delivery_count, _set_delivery_count, 934 doc=""" 935 The number of delivery attempts made for this message. 936 """) 937 938
939 - def _get_id(self):
940 return self._id.get_object()
941 - def _set_id(self, value):
942 if type(value) in (int, long): 943 value = ulong(value) 944 self._id.rewind() 945 self._id.put_object(value)
946 id = property(_get_id, _set_id, 947 doc=""" 948 The id of the message. 949 """) 950
951 - def _get_user_id(self):
952 return pn_message_get_user_id(self._msg)
953
954 - def _set_user_id(self, value):
955 self._check(pn_message_set_user_id(self._msg, value))
956 957 user_id = property(_get_user_id, _set_user_id, 958 doc=""" 959 The user id of the message creator. 960 """) 961
962 - def _get_address(self):
963 return pn_message_get_address(self._msg)
964
965 - def _set_address(self, value):
966 self._check(pn_message_set_address(self._msg, value))
967 968 address = property(_get_address, _set_address, 969 doc=""" 970 The address of the message. 971 """) 972
973 - def _get_subject(self):
974 return pn_message_get_subject(self._msg)
975
976 - def _set_subject(self, value):
977 self._check(pn_message_set_subject(self._msg, value))
978 979 subject = property(_get_subject, _set_subject, 980 doc=""" 981 The subject of the message. 982 """) 983
984 - def _get_reply_to(self):
985 return pn_message_get_reply_to(self._msg)
986
987 - def _set_reply_to(self, value):
988 self._check(pn_message_set_reply_to(self._msg, value))
989 990 reply_to = property(_get_reply_to, _set_reply_to, 991 doc=""" 992 The reply-to address for the message. 993 """) 994
995 - def _get_correlation_id(self):
996 return self._correlation_id.get_object()
997 - def _set_correlation_id(self, value):
998 if type(value) in (int, long): 999 value = ulong(value) 1000 self._correlation_id.rewind() 1001 self._correlation_id.put_object(value)
1002 1003 correlation_id = property(_get_correlation_id, _set_correlation_id, 1004 doc=""" 1005 The correlation-id for the message. 1006 """) 1007
1008 - def _get_content_type(self):
1009 return pn_message_get_content_type(self._msg)
1010
1011 - def _set_content_type(self, value):
1012 self._check(pn_message_set_content_type(self._msg, value))
1013 1014 content_type = property(_get_content_type, _set_content_type, 1015 doc=""" 1016 The content-type of the message. 1017 """) 1018
1019 - def _get_content_encoding(self):
1020 return pn_message_get_content_encoding(self._msg)
1021
1022 - def _set_content_encoding(self, value):
1023 self._check(pn_message_set_content_encoding(self._msg, value))
1024 1025 content_encoding = property(_get_content_encoding, _set_content_encoding, 1026 doc=""" 1027 The content-encoding of the message. 1028 """) 1029
1030 - def _get_expiry_time(self):
1031 return pn_message_get_expiry_time(self._msg)
1032
1033 - def _set_expiry_time(self, value):
1034 self._check(pn_message_set_expiry_time(self._msg, value))
1035 1036 expiry_time = property(_get_expiry_time, _set_expiry_time, 1037 doc=""" 1038 The expiry time of the message. 1039 """) 1040
1041 - def _get_creation_time(self):
1042 return pn_message_get_creation_time(self._msg)
1043
1044 - def _set_creation_time(self, value):
1045 self._check(pn_message_set_creation_time(self._msg, value))
1046 1047 creation_time = property(_get_creation_time, _set_creation_time, 1048 doc=""" 1049 The creation time of the message. 1050 """) 1051
1052 - def _get_group_id(self):
1053 return pn_message_get_group_id(self._msg)
1054
1055 - def _set_group_id(self, value):
1056 self._check(pn_message_set_group_id(self._msg, value))
1057 1058 group_id = property(_get_group_id, _set_group_id, 1059 doc=""" 1060 The group id of the message. 1061 """) 1062
1063 - def _get_group_sequence(self):
1064 return pn_message_get_group_sequence(self._msg)
1065
1066 - def _set_group_sequence(self, value):
1067 self._check(pn_message_set_group_sequence(self._msg, value))
1068 1069 group_sequence = property(_get_group_sequence, _set_group_sequence, 1070 doc=""" 1071 The sequence of the message within its group. 1072 """) 1073
1074 - def _get_reply_to_group_id(self):
1075 return pn_message_get_reply_to_group_id(self._msg)
1076
1077 - def _set_reply_to_group_id(self, value):
1078 self._check(pn_message_set_reply_to_group_id(self._msg, value))
1079 1080 reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id, 1081 doc=""" 1082 The group-id for any replies. 1083 """) 1084 1085 # XXX
1086 - def _get_format(self):
1087 return pn_message_get_format(self._msg)
1088
1089 - def _set_format(self, value):
1090 self._check(pn_message_set_format(self._msg, value))
1091 1092 format = property(_get_format, _set_format, 1093 doc=""" 1094 The format of the message. 1095 """) 1096
1097 - def encode(self):
1098 self._pre_encode() 1099 sz = 16 1100 while True: 1101 err, data = pn_message_encode(self._msg, sz) 1102 if err == PN_OVERFLOW: 1103 sz *= 2 1104 continue 1105 else: 1106 self._check(err) 1107 return data
1108
1109 - def decode(self, data):
1110 self._check(pn_message_decode(self._msg, data, len(data))) 1111 self._post_decode()
1112
1113 - def load(self, data):
1114 self._check(pn_message_load(self._msg, data))
1115
1116 - def save(self):
1117 sz = 16 1118 while True: 1119 err, data = pn_message_save(self._msg, sz) 1120 if err == PN_OVERFLOW: 1121 sz *= 2 1122 continue 1123 else: 1124 self._check(err) 1125 return data
1126
1127 - def __repr2__(self):
1128 props = [] 1129 for attr in ("inferred", "address", "reply_to", "durable", "ttl", 1130 "priority", "first_acquirer", "delivery_count", "id", 1131 "correlation_id", "user_id", "group_id", "group_sequence", 1132 "reply_to_group_id", "instructions", "annotations", 1133 "properties", "body"): 1134 value = getattr(self, attr) 1135 if value: props.append("%s=%r" % (attr, value)) 1136 return "Message(%s)" % ", ".join(props)
1137
1138 - def __repr__(self):
1139 tmp = pn_string(None) 1140 err = pn_inspect(self._msg, tmp) 1141 result = pn_string_get(tmp) 1142 pn_free(tmp) 1143 self._check(err) 1144 return result
1145
1146 -class Subscription(object):
1147
1148 - def __init__(self, impl):
1149 self._impl = impl
1150 1151 @property
1152 - def address(self):
1153 return pn_subscription_address(self._impl)
1154
1155 -class Selectable(object):
1156
1157 - def __init__(self, messenger, impl):
1158 self.messenger = messenger 1159 self._impl = impl
1160
1161 - def fileno(self):
1162 if not self._impl: raise ValueError("selectable freed") 1163 return pn_selectable_fd(self._impl)
1164 1165 @property
1166 - def capacity(self):
1167 if not self._impl: raise ValueError("selectable freed") 1168 return pn_selectable_capacity(self._impl)
1169 1170 @property
1171 - def pending(self):
1172 if not self._impl: raise ValueError("selectable freed") 1173 return pn_selectable_pending(self._impl)
1174 1175 @property
1176 - def deadline(self):
1177 if not self._impl: raise ValueError("selectable freed") 1178 tstamp = pn_selectable_deadline(self._impl) 1179 if tstamp: 1180 return float(tstamp)/1000 1181 else: 1182 return None
1183
1184 - def readable(self):
1185 if not self._impl: raise ValueError("selectable freed") 1186 pn_selectable_readable(self._impl)
1187
1188 - def writable(self):
1189 if not self._impl: raise ValueError("selectable freed") 1190 pn_selectable_writable(self._impl)
1191
1192 - def expired(self):
1193 if not self._impl: raise ValueError("selectable freed") 1194 pn_selectable_expired(self._impl)
1195
1196 - def _is_registered(self):
1197 if not self._impl: raise ValueError("selectable freed") 1198 return pn_selectable_is_registered(self._impl)
1199
1200 - def _set_registered(self, registered):
1201 if not self._impl: raise ValueError("selectable freed") 1202 pn_selectable_set_registered(self._impl, registered)
1203 1204 registered = property(_is_registered, _set_registered, 1205 doc=""" 1206 The registered property may be get/set by an I/O polling system to 1207 indicate whether the fd has been registered or not. 1208 """) 1209 1210 @property
1211 - def is_terminal(self):
1212 if not self._impl: return True 1213 return pn_selectable_is_terminal(self._impl)
1214
1215 - def free(self):
1216 if self._impl: 1217 del self.messenger._selectables[self.fileno()] 1218 pn_selectable_free(self._impl) 1219 self._impl = None
1220
1221 - def __del__(self):
1222 self.free()
1223
1224 -class DataException(ProtonException):
1225 """ 1226 The DataException class is the root of the Data exception hierarchy. 1227 All exceptions raised by the Data class extend this exception. 1228 """ 1229 pass
1230
1231 -class UnmappedType:
1232
1233 - def __init__(self, msg):
1234 self.msg = msg
1235
1236 - def __repr__(self):
1237 return "UnmappedType(%s)" % self.msg
1238
1239 -class ulong(long):
1240
1241 - def __repr__(self):
1242 return "ulong(%s)" % long.__repr__(self)
1243
1244 -class timestamp(long):
1245
1246 - def __repr__(self):
1247 return "timestamp(%s)" % long.__repr__(self)
1248
1249 -class symbol(unicode):
1250
1251 - def __repr__(self):
1252 return "symbol(%s)" % unicode.__repr__(self)
1253
1254 -class char(unicode):
1255
1256 - def __repr__(self):
1257 return "char(%s)" % unicode.__repr__(self)
1258
1259 -class Described(object):
1260
1261 - def __init__(self, descriptor, value):
1262 self.descriptor = descriptor 1263 self.value = value
1264
1265 - def __repr__(self):
1266 return "Described(%r, %r)" % (self.descriptor, self.value)
1267
1268 - def __eq__(self, o):
1269 if isinstance(o, Described): 1270 return self.descriptor == o.descriptor and self.value == o.value 1271 else: 1272 return False
1273 1274 UNDESCRIBED = Constant("UNDESCRIBED")
1275 1276 -class Array(object):
1277
1278 - def __init__(self, descriptor, type, *elements):
1279 self.descriptor = descriptor 1280 self.type = type 1281 self.elements = elements
1282
1283 - def __repr__(self):
1284 if self.elements: 1285 els = ", %s" % (", ".join(map(repr, self.elements))) 1286 else: 1287 els = "" 1288 return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
1289
1290 - def __eq__(self, o):
1291 if isinstance(o, Array): 1292 return self.descriptor == o.descriptor and \ 1293 self.type == o.type and self.elements == o.elements 1294 else: 1295 return False
1296
1297 -class Data:
1298 """ 1299 The L{Data} class provides an interface for decoding, extracting, 1300 creating, and encoding arbitrary AMQP data. A L{Data} object 1301 contains a tree of AMQP values. Leaf nodes in this tree correspond 1302 to scalars in the AMQP type system such as L{ints<INT>} or 1303 L{strings<STRING>}. Non-leaf nodes in this tree correspond to 1304 compound values in the AMQP type system such as L{lists<LIST>}, 1305 L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}. 1306 The root node of the tree is the L{Data} object itself and can have 1307 an arbitrary number of children. 1308 1309 A L{Data} object maintains the notion of the current sibling node 1310 and a current parent node. Siblings are ordered within their parent. 1311 Values are accessed and/or added by using the L{next}, L{prev}, 1312 L{enter}, and L{exit} methods to navigate to the desired location in 1313 the tree and using the supplied variety of put_*/get_* methods to 1314 access or add a value of the desired type. 1315 1316 The put_* methods will always add a value I{after} the current node 1317 in the tree. If the current node has a next sibling the put_* method 1318 will overwrite the value on this node. If there is no current node 1319 or the current node has no next sibling then one will be added. The 1320 put_* methods always set the added/modified node to the current 1321 node. The get_* methods read the value of the current node and do 1322 not change which node is current. 1323 1324 The following types of scalar values are supported: 1325 1326 - L{NULL} 1327 - L{BOOL} 1328 - L{UBYTE} 1329 - L{USHORT} 1330 - L{SHORT} 1331 - L{UINT} 1332 - L{INT} 1333 - L{ULONG} 1334 - L{LONG} 1335 - L{FLOAT} 1336 - L{DOUBLE} 1337 - L{BINARY} 1338 - L{STRING} 1339 - L{SYMBOL} 1340 1341 The following types of compound values are supported: 1342 1343 - L{DESCRIBED} 1344 - L{ARRAY} 1345 - L{LIST} 1346 - L{MAP} 1347 """ 1348 1349 NULL = PN_NULL; "A null value." 1350 BOOL = PN_BOOL; "A boolean value." 1351 UBYTE = PN_UBYTE; "An unsigned byte value." 1352 BYTE = PN_BYTE; "A signed byte value." 1353 USHORT = PN_USHORT; "An unsigned short value." 1354 SHORT = PN_SHORT; "A short value." 1355 UINT = PN_UINT; "An unsigned int value." 1356 INT = PN_INT; "A signed int value." 1357 CHAR = PN_CHAR; "A character value." 1358 ULONG = PN_ULONG; "An unsigned long value." 1359 LONG = PN_LONG; "A signed long value." 1360 TIMESTAMP = PN_TIMESTAMP; "A timestamp value." 1361 FLOAT = PN_FLOAT; "A float value." 1362 DOUBLE = PN_DOUBLE; "A double value." 1363 DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value." 1364 DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value." 1365 DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value." 1366 UUID = PN_UUID; "A UUID value." 1367 BINARY = PN_BINARY; "A binary string." 1368 STRING = PN_STRING; "A unicode string." 1369 SYMBOL = PN_SYMBOL; "A symbolic string." 1370 DESCRIBED = PN_DESCRIBED; "A described value." 1371 ARRAY = PN_ARRAY; "An array value." 1372 LIST = PN_LIST; "A list value." 1373 MAP = PN_MAP; "A map value." 1374 1375 type_names = { 1376 NULL: "null", 1377 BOOL: "bool", 1378 BYTE: "byte", 1379 UBYTE: "ubyte", 1380 SHORT: "short", 1381 USHORT: "ushort", 1382 INT: "int", 1383 UINT: "uint", 1384 CHAR: "char", 1385 LONG: "long", 1386 ULONG: "ulong", 1387 TIMESTAMP: "timestamp", 1388 FLOAT: "float", 1389 DOUBLE: "double", 1390 DECIMAL32: "decimal32", 1391 DECIMAL64: "decimal64", 1392 DECIMAL128: "decimal128", 1393 UUID: "uuid", 1394 BINARY: "binary", 1395 STRING: "string", 1396 SYMBOL: "symbol", 1397 DESCRIBED: "described", 1398 ARRAY: "array", 1399 LIST: "list", 1400 MAP: "map" 1401 } 1402 1403 @classmethod
1404 - def type_name(type): return Data.type_names[type]
1405
1406 - def __init__(self, capacity=16):
1407 if type(capacity) in (int, long): 1408 self._data = pn_data(capacity) 1409 self._free = True 1410 else: 1411 self._data = capacity 1412 self._free = False
1413
1414 - def __del__(self):
1415 if self._free and hasattr(self, "_data"): 1416 pn_data_free(self._data) 1417 del self._data
1418
1419 - def _check(self, err):
1420 if err < 0: 1421 exc = EXCEPTIONS.get(err, DataException) 1422 raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data)))) 1423 else: 1424 return err
1425
1426 - def clear(self):
1427 """ 1428 Clears the data object. 1429 """ 1430 pn_data_clear(self._data)
1431
1432 - def rewind(self):
1433 """ 1434 Clears current node and sets the parent to the root node. Clearing the 1435 current node sets it _before_ the first node, calling next() will advance to 1436 the first node. 1437 """ 1438 pn_data_rewind(self._data)
1439
1440 - def next(self):
1441 """ 1442 Advances the current node to its next sibling and returns its 1443 type. If there is no next sibling the current node remains 1444 unchanged and None is returned. 1445 """ 1446 found = pn_data_next(self._data) 1447 if found: 1448 return self.type() 1449 else: 1450 return None
1451
1452 - def prev(self):
1453 """ 1454 Advances the current node to its previous sibling and returns its 1455 type. If there is no previous sibling the current node remains 1456 unchanged and None is returned. 1457 """ 1458 found = pn_data_prev(self._data) 1459 if found: 1460 return self.type() 1461 else: 1462 return None
1463
1464 - def enter(self):
1465 """ 1466 Sets the parent node to the current node and clears the current node. 1467 Clearing the current node sets it _before_ the first child, 1468 call next() advances to the first child. 1469 """ 1470 return pn_data_enter(self._data)
1471
1472 - def exit(self):
1473 """ 1474 Sets the current node to the parent node and the parent node to 1475 its own parent. 1476 """ 1477 return pn_data_exit(self._data)
1478
1479 - def lookup(self, name):
1480 return pn_data_lookup(self._data, name)
1481
1482 - def narrow(self):
1483 pn_data_narrow(self._data)
1484
1485 - def widen(self):
1486 pn_data_widen(self._data)
1487
1488 - def type(self):
1489 """ 1490 Returns the type of the current node. 1491 """ 1492 dtype = pn_data_type(self._data) 1493 if dtype == -1: 1494 return None 1495 else: 1496 return dtype
1497
1498 - def encode(self):
1499 """ 1500 Returns a representation of the data encoded in AMQP format. 1501 """ 1502 size = 1024 1503 while True: 1504 cd, enc = pn_data_encode(self._data, size) 1505 if cd == PN_OVERFLOW: 1506 size *= 2 1507 elif cd >= 0: 1508 return enc 1509 else: 1510 self._check(cd)
1511
1512 - def decode(self, encoded):
1513 """ 1514 Decodes the first value from supplied AMQP data and returns the 1515 number of bytes consumed. 1516 1517 @type encoded: binary 1518 @param encoded: AMQP encoded binary data 1519 """ 1520 return self._check(pn_data_decode(self._data, encoded))
1521
1522 - def put_list(self):
1523 """ 1524 Puts a list value. Elements may be filled by entering the list 1525 node and putting element values. 1526 1527 >>> data = Data() 1528 >>> data.put_list() 1529 >>> data.enter() 1530 >>> data.put_int(1) 1531 >>> data.put_int(2) 1532 >>> data.put_int(3) 1533 >>> data.exit() 1534 """ 1535 self._check(pn_data_put_list(self._data))
1536
1537 - def put_map(self):
1538 """ 1539 Puts a map value. Elements may be filled by entering the map node 1540 and putting alternating key value pairs. 1541 1542 >>> data = Data() 1543 >>> data.put_map() 1544 >>> data.enter() 1545 >>> data.put_string("key") 1546 >>> data.put_string("value") 1547 >>> data.exit() 1548 """ 1549 self._check(pn_data_put_map(self._data))
1550
1551 - def put_array(self, described, element_type):
1552 """ 1553 Puts an array value. Elements may be filled by entering the array 1554 node and putting the element values. The values must all be of the 1555 specified array element type. If an array is described then the 1556 first child value of the array is the descriptor and may be of any 1557 type. 1558 1559 >>> data = Data() 1560 >>> 1561 >>> data.put_array(False, Data.INT) 1562 >>> data.enter() 1563 >>> data.put_int(1) 1564 >>> data.put_int(2) 1565 >>> data.put_int(3) 1566 >>> data.exit() 1567 >>> 1568 >>> data.put_array(True, Data.DOUBLE) 1569 >>> data.enter() 1570 >>> data.put_symbol("array-descriptor") 1571 >>> data.put_double(1.1) 1572 >>> data.put_double(1.2) 1573 >>> data.put_double(1.3) 1574 >>> data.exit() 1575 1576 @type described: bool 1577 @param described: specifies whether the array is described 1578 @type element_type: int 1579 @param element_type: the type of the array elements 1580 """ 1581 self._check(pn_data_put_array(self._data, described, element_type))
1582
1583 - def put_described(self):
1584 """ 1585 Puts a described value. A described node has two children, the 1586 descriptor and the value. These are specified by entering the node 1587 and putting the desired values. 1588 1589 >>> data = Data() 1590 >>> data.put_described() 1591 >>> data.enter() 1592 >>> data.put_symbol("value-descriptor") 1593 >>> data.put_string("the value") 1594 >>> data.exit() 1595 """ 1596 self._check(pn_data_put_described(self._data))
1597
1598 - def put_null(self):
1599 """ 1600 Puts a null value. 1601 """ 1602 self._check(pn_data_put_null(self._data))
1603
1604 - def put_bool(self, b):
1605 """ 1606 Puts a boolean value. 1607 1608 @param b: a boolean value 1609 """ 1610 self._check(pn_data_put_bool(self._data, b))
1611
1612 - def put_ubyte(self, ub):
1613 """ 1614 Puts an unsigned byte value. 1615 1616 @param ub: an integral value 1617 """ 1618 self._check(pn_data_put_ubyte(self._data, ub))
1619
1620 - def put_byte(self, b):
1621 """ 1622 Puts a signed byte value. 1623 1624 @param b: an integral value 1625 """ 1626 self._check(pn_data_put_byte(self._data, b))
1627
1628 - def put_ushort(self, us):
1629 """ 1630 Puts an unsigned short value. 1631 1632 @param us: an integral value. 1633 """ 1634 self._check(pn_data_put_ushort(self._data, us))
1635
1636 - def put_short(self, s):
1637 """ 1638 Puts a signed short value. 1639 1640 @param s: an integral value 1641 """ 1642 self._check(pn_data_put_short(self._data, s))
1643
1644 - def put_uint(self, ui):
1645 """ 1646 Puts an unsigned int value. 1647 1648 @param ui: an integral value 1649 """ 1650 self._check(pn_data_put_uint(self._data, ui))
1651
1652 - def put_int(self, i):
1653 """ 1654 Puts a signed int value. 1655 1656 @param i: an integral value 1657 """ 1658 self._check(pn_data_put_int(self._data, i))
1659
1660 - def put_char(self, c):
1661 """ 1662 Puts a char value. 1663 1664 @param c: a single character 1665 """ 1666 self._check(pn_data_put_char(self._data, ord(c)))
1667
1668 - def put_ulong(self, ul):
1669 """ 1670 Puts an unsigned long value. 1671 1672 @param ul: an integral value 1673 """ 1674 self._check(pn_data_put_ulong(self._data, ul))
1675
1676 - def put_long(self, l):
1677 """ 1678 Puts a signed long value. 1679 1680 @param l: an integral value 1681 """ 1682 self._check(pn_data_put_long(self._data, l))
1683
1684 - def put_timestamp(self, t):
1685 """ 1686 Puts a timestamp value. 1687 1688 @param t: an integral value 1689 """ 1690 self._check(pn_data_put_timestamp(self._data, t))
1691
1692 - def put_float(self, f):
1693 """ 1694 Puts a float value. 1695 1696 @param f: a floating point value 1697 """ 1698 self._check(pn_data_put_float(self._data, f))
1699
1700 - def put_double(self, d):
1701 """ 1702 Puts a double value. 1703 1704 @param d: a floating point value. 1705 """ 1706 self._check(pn_data_put_double(self._data, d))
1707
1708 - def put_decimal32(self, d):
1709 """ 1710 Puts a decimal32 value. 1711 1712 @param d: a decimal32 value 1713 """ 1714 self._check(pn_data_put_decimal32(self._data, d))
1715
1716 - def put_decimal64(self, d):
1717 """ 1718 Puts a decimal64 value. 1719 1720 @param d: a decimal64 value 1721 """ 1722 self._check(pn_data_put_decimal64(self._data, d))
1723
1724 - def put_decimal128(self, d):
1725 """ 1726 Puts a decimal128 value. 1727 1728 @param d: a decimal128 value 1729 """ 1730 self._check(pn_data_put_decimal128(self._data, d))
1731
1732 - def put_uuid(self, u):
1733 """ 1734 Puts a UUID value. 1735 1736 @param u: a uuid value 1737 """ 1738 self._check(pn_data_put_uuid(self._data, u.bytes))
1739
1740 - def put_binary(self, b):
1741 """ 1742 Puts a binary value. 1743 1744 @type b: binary 1745 @param b: a binary value 1746 """ 1747 self._check(pn_data_put_binary(self._data, b))
1748
1749 - def put_string(self, s):
1750 """ 1751 Puts a unicode value. 1752 1753 @type s: unicode 1754 @param s: a unicode value 1755 """ 1756 self._check(pn_data_put_string(self._data, s.encode("utf8")))
1757
1758 - def put_symbol(self, s):
1759 """ 1760 Puts a symbolic value. 1761 1762 @type s: string 1763 @param s: the symbol name 1764 """ 1765 self._check(pn_data_put_symbol(self._data, s))
1766
1767 - def get_list(self):
1768 """ 1769 If the current node is a list, return the number of elements, 1770 otherwise return zero. List elements can be accessed by entering 1771 the list. 1772 1773 >>> count = data.get_list() 1774 >>> data.enter() 1775 >>> for i in range(count): 1776 ... type = data.next() 1777 ... if type == Data.STRING: 1778 ... print data.get_string() 1779 ... elif type == ...: 1780 ... ... 1781 >>> data.exit() 1782 """ 1783 return pn_data_get_list(self._data)
1784
1785 - def get_map(self):
1786 """ 1787 If the current node is a map, return the number of child elements, 1788 otherwise return zero. Key value pairs can be accessed by entering 1789 the map. 1790 1791 >>> count = data.get_map() 1792 >>> data.enter() 1793 >>> for i in range(count/2): 1794 ... type = data.next() 1795 ... if type == Data.STRING: 1796 ... print data.get_string() 1797 ... elif type == ...: 1798 ... ... 1799 >>> data.exit() 1800 """ 1801 return pn_data_get_map(self._data)
1802
1803 - def get_array(self):
1804 """ 1805 If the current node is an array, return a tuple of the element 1806 count, a boolean indicating whether the array is described, and 1807 the type of each element, otherwise return (0, False, None). Array 1808 data can be accessed by entering the array. 1809 1810 >>> # read an array of strings with a symbolic descriptor 1811 >>> count, described, type = data.get_array() 1812 >>> data.enter() 1813 >>> data.next() 1814 >>> print "Descriptor:", data.get_symbol() 1815 >>> for i in range(count): 1816 ... data.next() 1817 ... print "Element:", data.get_string() 1818 >>> data.exit() 1819 """ 1820 count = pn_data_get_array(self._data) 1821 described = pn_data_is_array_described(self._data) 1822 type = pn_data_get_array_type(self._data) 1823 if type == -1: 1824 type = None 1825 return count, described, type
1826
1827 - def is_described(self):
1828 """ 1829 Checks if the current node is a described value. The descriptor 1830 and value may be accessed by entering the described value. 1831 1832 >>> # read a symbolically described string 1833 >>> assert data.is_described() # will error if the current node is not described 1834 >>> data.enter() 1835 >>> print data.get_symbol() 1836 >>> print data.get_string() 1837 >>> data.exit() 1838 """ 1839 return pn_data_is_described(self._data)
1840
1841 - def is_null(self):
1842 """ 1843 Checks if the current node is a null. 1844 """ 1845 return pn_data_is_null(self._data)
1846
1847 - def get_bool(self):
1848 """ 1849 If the current node is a boolean, returns its value, returns False 1850 otherwise. 1851 """ 1852 return pn_data_get_bool(self._data)
1853
1854 - def get_ubyte(self):
1855 """ 1856 If the current node is an unsigned byte, returns its value, 1857 returns 0 otherwise. 1858 """ 1859 return pn_data_get_ubyte(self._data)
1860
1861 - def get_byte(self):
1862 """ 1863 If the current node is a signed byte, returns its value, returns 0 1864 otherwise. 1865 """ 1866 return pn_data_get_byte(self._data)
1867
1868 - def get_ushort(self):
1869 """ 1870 If the current node is an unsigned short, returns its value, 1871 returns 0 otherwise. 1872 """ 1873 return pn_data_get_ushort(self._data)
1874
1875 - def get_short(self):
1876 """ 1877 If the current node is a signed short, returns its value, returns 1878 0 otherwise. 1879 """ 1880 return pn_data_get_short(self._data)
1881
1882 - def get_uint(self):
1883 """ 1884 If the current node is an unsigned int, returns its value, returns 1885 0 otherwise. 1886 """ 1887 return pn_data_get_uint(self._data)
1888
1889 - def get_int(self):
1890 """ 1891 If the current node is a signed int, returns its value, returns 0 1892 otherwise. 1893 """ 1894 return pn_data_get_int(self._data)
1895
1896 - def get_char(self):
1897 """ 1898 If the current node is a char, returns its value, returns 0 1899 otherwise. 1900 """ 1901 return char(unichr(pn_data_get_char(self._data)))
1902
1903 - def get_ulong(self):
1904 """ 1905 If the current node is an unsigned long, returns its value, 1906 returns 0 otherwise. 1907 """ 1908 return ulong(pn_data_get_ulong(self._data))
1909
1910 - def get_long(self):
1911 """ 1912 If the current node is an signed long, returns its value, returns 1913 0 otherwise. 1914 """ 1915 return pn_data_get_long(self._data)
1916
1917 - def get_timestamp(self):
1918 """ 1919 If the current node is a timestamp, returns its value, returns 0 1920 otherwise. 1921 """ 1922 return timestamp(pn_data_get_timestamp(self._data))
1923
1924 - def get_float(self):
1925 """ 1926 If the current node is a float, returns its value, raises 0 1927 otherwise. 1928 """ 1929 return pn_data_get_float(self._data)
1930
1931 - def get_double(self):
1932 """ 1933 If the current node is a double, returns its value, returns 0 1934 otherwise. 1935 """ 1936 return pn_data_get_double(self._data)
1937 1938 # XXX: need to convert
1939 - def get_decimal32(self):
1940 """ 1941 If the current node is a decimal32, returns its value, returns 0 1942 otherwise. 1943 """ 1944 return pn_data_get_decimal32(self._data)
1945 1946 # XXX: need to convert
1947 - def get_decimal64(self):
1948 """ 1949 If the current node is a decimal64, returns its value, returns 0 1950 otherwise. 1951 """ 1952 return pn_data_get_decimal64(self._data)
1953 1954 # XXX: need to convert
1955 - def get_decimal128(self):
1956 """ 1957 If the current node is a decimal128, returns its value, returns 0 1958 otherwise. 1959 """ 1960 return pn_data_get_decimal128(self._data)
1961
1962 - def get_uuid(self):
1963 """ 1964 If the current node is a UUID, returns its value, returns None 1965 otherwise. 1966 """ 1967 if pn_data_type(self._data) == Data.UUID: 1968 return uuid.UUID(bytes=pn_data_get_uuid(self._data)) 1969 else: 1970 return None
1971
1972 - def get_binary(self):
1973 """ 1974 If the current node is binary, returns its value, returns "" 1975 otherwise. 1976 """ 1977 return pn_data_get_binary(self._data)
1978
1979 - def get_string(self):
1980 """ 1981 If the current node is a string, returns its value, returns "" 1982 otherwise. 1983 """ 1984 return pn_data_get_string(self._data).decode("utf8")
1985
1986 - def get_symbol(self):
1987 """ 1988 If the current node is a symbol, returns its value, returns "" 1989 otherwise. 1990 """ 1991 return symbol(pn_data_get_symbol(self._data))
1992
1993 - def copy(self, src):
1994 self._check(pn_data_copy(self._data, src._data))
1995
1996 - def format(self):
1997 sz = 16 1998 while True: 1999 err, result = pn_data_format(self._data, sz) 2000 if err == PN_OVERFLOW: 2001 sz *= 2 2002 continue 2003 else: 2004 self._check(err) 2005 return result
2006
2007 - def dump(self):
2008 pn_data_dump(self._data)
2009
2010 - def put_dict(self, d):
2011 self.put_map() 2012 self.enter() 2013 try: 2014 for k, v in d.items(): 2015 self.put_object(k) 2016 self.put_object(v) 2017 finally: 2018 self.exit()
2019
2020 - def get_dict(self):
2021 if self.enter(): 2022 try: 2023 result = {} 2024 while self.next(): 2025 k = self.get_object() 2026 if self.next(): 2027 v = self.get_object() 2028 else: 2029 v = None 2030 result[k] = v 2031 finally: 2032 self.exit() 2033 return result
2034
2035 - def put_sequence(self, s):
2036 self.put_list() 2037 self.enter() 2038 try: 2039 for o in s: 2040 self.put_object(o) 2041 finally: 2042 self.exit()
2043
2044 - def get_sequence(self):
2045 if self.enter(): 2046 try: 2047 result = [] 2048 while self.next(): 2049 result.append(self.get_object()) 2050 finally: 2051 self.exit() 2052 return result
2053
2054 - def get_py_described(self):
2055 if self.enter(): 2056 try: 2057 self.next() 2058 descriptor = self.get_object() 2059 self.next() 2060 value = self.get_object() 2061 finally: 2062 self.exit() 2063 return Described(descriptor, value)
2064
2065 - def put_py_described(self, d):
2066 self.put_described() 2067 self.enter() 2068 try: 2069 self.put_object(d.descriptor) 2070 self.put_object(d.value) 2071 finally: 2072 self.exit()
2073
2074 - def get_py_array(self):
2075 """ 2076 If the current node is an array, return an Array object 2077 representing the array and its contents. Otherwise return None. 2078 This is a convenience wrapper around get_array, enter, etc. 2079 """ 2080 2081 count, described, type = self.get_array() 2082 if type is None: return None 2083 if self.enter(): 2084 try: 2085 if described: 2086 self.next() 2087 descriptor = self.get_object() 2088 else: 2089 descriptor = UNDESCRIBED 2090 elements = [] 2091 while self.next(): 2092 elements.append(self.get_object()) 2093 finally: 2094 self.exit() 2095 return Array(descriptor, type, *elements)
2096
2097 - def put_py_array(self, a):
2098 described = a.descriptor != UNDESCRIBED 2099 self.put_array(described, a.type) 2100 self.enter() 2101 try: 2102 if described: 2103 self.put_object(a.descriptor) 2104 for e in a.elements: 2105 self.put_object(e) 2106 finally: 2107 self.exit()
2108 2109 put_mappings = { 2110 None.__class__: lambda s, _: s.put_null(), 2111 bool: put_bool, 2112 dict: put_dict, 2113 list: put_sequence, 2114 tuple: put_sequence, 2115 unicode: put_string, 2116 bytes: put_binary, 2117 symbol: put_symbol, 2118 int: put_long, 2119 char: put_char, 2120 long: put_long, 2121 ulong: put_ulong, 2122 timestamp: put_timestamp, 2123 float: put_double, 2124 uuid.UUID: put_uuid, 2125 Described: put_py_described, 2126 Array: put_py_array 2127 } 2128 get_mappings = { 2129 NULL: lambda s: None, 2130 BOOL: get_bool, 2131 BYTE: get_byte, 2132 UBYTE: get_ubyte, 2133 SHORT: get_short, 2134 USHORT: get_ushort, 2135 INT: get_int, 2136 UINT: get_uint, 2137 CHAR: get_char, 2138 LONG: get_long, 2139 ULONG: get_ulong, 2140 TIMESTAMP: get_timestamp, 2141 FLOAT: get_float, 2142 DOUBLE: get_double, 2143 DECIMAL32: get_decimal32, 2144 DECIMAL64: get_decimal64, 2145 DECIMAL128: get_decimal128, 2146 UUID: get_uuid, 2147 BINARY: get_binary, 2148 STRING: get_string, 2149 SYMBOL: get_symbol, 2150 DESCRIBED: get_py_described, 2151 ARRAY: get_py_array, 2152 LIST: get_sequence, 2153 MAP: get_dict 2154 } 2155 2156
2157 - def put_object(self, obj):
2158 putter = self.put_mappings[obj.__class__] 2159 putter(self, obj)
2160
2161 - def get_object(self):
2162 type = self.type() 2163 if type is None: return None 2164 getter = self.get_mappings.get(type) 2165 if getter: 2166 return getter(self) 2167 else: 2168 return UnmappedType(str(type))
2169
2170 -class ConnectionException(ProtonException):
2171 pass
2172
2173 -class Endpoint(object):
2174 2175 LOCAL_UNINIT = PN_LOCAL_UNINIT 2176 REMOTE_UNINIT = PN_REMOTE_UNINIT 2177 LOCAL_ACTIVE = PN_LOCAL_ACTIVE 2178 REMOTE_ACTIVE = PN_REMOTE_ACTIVE 2179 LOCAL_CLOSED = PN_LOCAL_CLOSED 2180 REMOTE_CLOSED = PN_REMOTE_CLOSED 2181
2182 - def __init__(self):
2183 self.condition = None 2184 self._release_invoked = False
2185
2186 - def _release(self):
2187 """Release the underlying C Engine resource.""" 2188 if not self._release_invoked: 2189 for c in self._children: 2190 c._release() 2191 self._free_resource() 2192 self.connection._releasing(self) 2193 self._release_invoked = True
2194
2195 - def _update_cond(self):
2196 obj2cond(self.condition, self._get_cond_impl())
2197 2198 @property
2199 - def remote_condition(self):
2200 return cond2obj(self._get_remote_cond_impl())
2201 2202 # the following must be provided by subclasses
2203 - def _get_cond_impl(self):
2204 assert False, "Subclass must override this!"
2205
2206 - def _get_remote_cond_impl(self):
2207 assert False, "Subclass must override this!"
2208
2209 -class Condition:
2210
2211 - def __init__(self, name, description=None, info=None):
2212 self.name = name 2213 self.description = description 2214 self.info = info
2215
2216 - def __repr__(self):
2217 return "Condition(%s)" % ", ".join([repr(x) for x in 2218 (self.name, self.description, self.info) 2219 if x])
2220
2221 - def __eq__(self, o):
2222 if not isinstance(o, Condition): return False 2223 return self.name == o.name and \ 2224 self.description == o.description and \ 2225 self.info == o.info
2226
2227 -def obj2cond(obj, cond):
2228 pn_condition_clear(cond) 2229 if obj: 2230 pn_condition_set_name(cond, str(obj.name)) 2231 pn_condition_set_description(cond, obj.description) 2232 info = Data(pn_condition_info(cond)) 2233 if obj.info: 2234 info.put_object(obj.info)
2235
2236 -def cond2obj(cond):
2237 if pn_condition_is_set(cond): 2238 return Condition(pn_condition_get_name(cond), 2239 pn_condition_get_description(cond), 2240 dat2obj(pn_condition_info(cond))) 2241 else: 2242 return None
2243
2244 -def dat2obj(dimpl):
2245 d = Data(dimpl) 2246 d.rewind() 2247 d.next() 2248 obj = d.get_object() 2249 d.rewind() 2250 return obj
2251
2252 -def obj2dat(obj, dimpl):
2253 if obj is not None: 2254 d = Data(dimpl) 2255 d.put_object(obj)
2256
2257 -class Connection(Endpoint):
2258 2259 @staticmethod
2260 - def _wrap_connection(c_conn):
2261 """Maintain only a single instance of this class for each Connection 2262 object that exists in the the C Engine. This is done by storing a (weak) 2263 reference to the python instance in the context field of the C object. 2264 """ 2265 if not c_conn: return None 2266 py_conn = pn_void2py(pn_connection_get_context(c_conn)) 2267 if py_conn: return py_conn 2268 wrapper = Connection(_conn=c_conn) 2269 return wrapper
2270
2271 - def __init__(self, _conn=None):
2272 Endpoint.__init__(self) 2273 if _conn: 2274 self._conn = _conn 2275 else: 2276 self._conn = pn_connection() 2277 pn_connection_set_context(self._conn, pn_py2void(self)) 2278 self.offered_capabilities = None 2279 self.desired_capabilities = None 2280 self.properties = None 2281 self._sessions = set()
2282
2283 - def __del__(self):
2284 if hasattr(self, "_conn") and self._conn: 2285 self._release()
2286
2287 - def free(self):
2288 self._release()
2289 2290 @property
2291 - def _children(self):
2292 return self._sessions
2293 2294 @property
2295 - def connection(self):
2296 return self
2297
2298 - def _free_resource(self):
2299 pn_connection_free(self._conn)
2300
2301 - def _released(self):
2302 self._conn = None
2303
2304 - def _releasing(self, child):
2305 coll = getattr(self, "_collector", None) 2306 if coll: coll = coll() 2307 if coll: 2308 coll._contexts.add(child) 2309 else: 2310 child._released()
2311
2312 - def _check(self, err):
2313 if err < 0: 2314 exc = EXCEPTIONS.get(err, ConnectionException) 2315 raise exc("[%s]: %s" % (err, pn_connection_error(self._conn))) 2316 else: 2317 return err
2318
2319 - def _get_cond_impl(self):
2320 return pn_connection_condition(self._conn)
2321
2322 - def _get_remote_cond_impl(self):
2323 return pn_connection_remote_condition(self._conn)
2324
2325 - def collect(self, collector):
2326 if collector is None: 2327 pn_connection_collect(self._conn, None) 2328 else: 2329 pn_connection_collect(self._conn, collector._impl) 2330 self._collector = weakref.ref(collector)
2331
2332 - def _get_container(self):
2333 return pn_connection_get_container(self._conn)
2334 - def _set_container(self, name):
2335 return pn_connection_set_container(self._conn, name)
2336 2337 container = property(_get_container, _set_container) 2338
2339 - def _get_hostname(self):
2340 return pn_connection_get_hostname(self._conn)
2341 - def _set_hostname(self, name):
2342 return pn_connection_set_hostname(self._conn, name)
2343 2344 hostname = property(_get_hostname, _set_hostname) 2345 2346 @property
2347 - def remote_container(self):
2348 return pn_connection_remote_container(self._conn)
2349 2350 @property
2351 - def remote_hostname(self):
2352 return pn_connection_remote_hostname(self._conn)
2353 2354 @property
2356 return dat2obj(pn_connection_remote_offered_capabilities(self._conn))
2357 2358 @property
2360 return dat2obj(pn_connection_remote_desired_capabilities(self._conn))
2361 2362 @property
2363 - def remote_properties(self):
2364 return dat2obj(pn_connection_remote_properties(self._conn))
2365
2366 - def open(self):
2367 obj2dat(self.offered_capabilities, 2368 pn_connection_offered_capabilities(self._conn)) 2369 obj2dat(self.desired_capabilities, 2370 pn_connection_desired_capabilities(self._conn)) 2371 obj2dat(self.properties, pn_connection_properties(self._conn)) 2372 pn_connection_open(self._conn)
2373
2374 - def close(self):
2375 self._update_cond() 2376 pn_connection_close(self._conn)
2377 2378 @property
2379 - def state(self):
2380 return pn_connection_state(self._conn)
2381
2382 - def session(self):
2383 return Session._wrap_session(pn_session(self._conn))
2384
2385 - def session_head(self, mask):
2386 return Session._wrap_session(pn_session_head(self._conn, mask))
2387 2390 2391 @property
2392 - def work_head(self):
2393 return Delivery._wrap_delivery(pn_work_head(self._conn))
2394 2395 @property
2396 - def error(self):
2397 return pn_error_code(pn_connection_error(self._conn))
2398
2399 -class SessionException(ProtonException):
2400 pass
2401
2402 -class Session(Endpoint):
2403 2404 @staticmethod
2405 - def _wrap_session(c_ssn):
2406 """Maintain only a single instance of this class for each Session object that 2407 exists in the C Engine. 2408 """ 2409 if c_ssn is None: return None 2410 py_ssn = pn_void2py(pn_session_get_context(c_ssn)) 2411 if py_ssn: return py_ssn 2412 wrapper = Session(c_ssn) 2413 return wrapper
2414
2415 - def __init__(self, ssn):
2416 Endpoint.__init__(self) 2417 self._ssn = ssn 2418 pn_session_set_context(self._ssn, pn_py2void(self)) 2419 self._links = set() 2420 self.connection._sessions.add(self)
2421 2422 @property
2423 - def _children(self):
2424 return self._links
2425
2426 - def _free_resource(self):
2427 pn_session_free(self._ssn)
2428
2429 - def _released(self):
2430 self._ssn = None
2431
2432 - def free(self):
2433 """Release the Session, freeing its resources. 2434 2435 Call this when you no longer need the session. This will allow the 2436 session's resources to be reclaimed. Once called, you should no longer 2437 reference the session. 2438 2439 """ 2440 self.connection._sessions.remove(self) 2441 self._release()
2442
2443 - def _get_cond_impl(self):
2444 return pn_session_condition(self._ssn)
2445
2446 - def _get_remote_cond_impl(self):
2447 return pn_session_remote_condition(self._ssn)
2448
2449 - def _get_incoming_capacity(self):
2450 return pn_session_get_incoming_capacity(self._ssn)
2451
2452 - def _set_incoming_capacity(self, capacity):
2453 pn_session_set_incoming_capacity(self._ssn, capacity)
2454 2455 incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity) 2456 2457 @property
2458 - def outgoing_bytes(self):
2459 return pn_session_outgoing_bytes(self._ssn)
2460 2461 @property
2462 - def incoming_bytes(self):
2463 return pn_session_incoming_bytes(self._ssn)
2464
2465 - def open(self):
2466 pn_session_open(self._ssn)
2467
2468 - def close(self):
2469 self._update_cond() 2470 pn_session_close(self._ssn)
2471
2472 - def next(self, mask):
2473 return Session._wrap_session(pn_session_next(self._ssn, mask))
2474 2475 @property
2476 - def state(self):
2477 return pn_session_state(self._ssn)
2478 2479 @property
2480 - def connection(self):
2481 return Connection._wrap_connection(pn_session_connection(self._ssn))
2482
2483 - def sender(self, name):
2484 return Link._wrap_link(pn_sender(self._ssn, name))
2485
2486 - def receiver(self, name):
2487 return Link._wrap_link(pn_receiver(self._ssn, name))
2488
2489 -class LinkException(ProtonException):
2490 pass
2491 2650
2651 -class Terminus(object):
2652 2653 UNSPECIFIED = PN_UNSPECIFIED 2654 SOURCE = PN_SOURCE 2655 TARGET = PN_TARGET 2656 COORDINATOR = PN_COORDINATOR 2657 2658 NONDURABLE = PN_NONDURABLE 2659 CONFIGURATION = PN_CONFIGURATION 2660 DELIVERIES = PN_DELIVERIES 2661 2662 DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED 2663 DIST_MODE_COPY = PN_DIST_MODE_COPY 2664 DIST_MODE_MOVE = PN_DIST_MODE_MOVE 2665
2666 - def __init__(self, impl):
2667 self._impl = impl
2668
2669 - def _check(self, err):
2670 if err < 0: 2671 exc = EXCEPTIONS.get(err, LinkException) 2672 raise exc("[%s]" % err) 2673 else: 2674 return err
2675
2676 - def _get_type(self):
2677 return pn_terminus_get_type(self._impl)
2678 - def _set_type(self, type):
2679 self._check(pn_terminus_set_type(self._impl, type))
2680 type = property(_get_type, _set_type) 2681
2682 - def _get_address(self):
2683 return pn_terminus_get_address(self._impl)
2684 - def _set_address(self, address):
2685 self._check(pn_terminus_set_address(self._impl, address))
2686 address = property(_get_address, _set_address) 2687
2688 - def _get_durability(self):
2689 return pn_terminus_get_durability(self._impl)
2690 - def _set_durability(self, seconds):
2691 self._check(pn_terminus_set_durability(self._impl, seconds))
2692 durability = property(_get_durability, _set_durability) 2693
2694 - def _get_expiry_policy(self):
2695 return pn_terminus_get_expiry_policy(self._impl)
2696 - def _set_expiry_policy(self, seconds):
2697 self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
2698 expiry_policy = property(_get_expiry_policy, _set_expiry_policy) 2699
2700 - def _get_timeout(self):
2701 return pn_terminus_get_timeout(self._impl)
2702 - def _set_timeout(self, seconds):
2703 self._check(pn_terminus_set_timeout(self._impl, seconds))
2704 timeout = property(_get_timeout, _set_timeout) 2705
2706 - def _is_dynamic(self):
2707 return pn_terminus_is_dynamic(self._impl)
2708 - def _set_dynamic(self, dynamic):
2709 self._check(pn_terminus_set_dynamic(self._impl, dynamic))
2710 dynamic = property(_is_dynamic, _set_dynamic) 2711
2712 - def _get_distribution_mode(self):
2713 return pn_terminus_get_distribution_mode(self._impl)
2714 - def _set_distribution_mode(self, mode):
2715 self._check(pn_terminus_set_distribution_mode(self._impl, mode))
2716 distribution_mode = property(_get_distribution_mode, _set_distribution_mode) 2717 2718 @property
2719 - def properties(self):
2720 return Data(pn_terminus_properties(self._impl))
2721 2722 @property
2723 - def capabilities(self):
2724 return Data(pn_terminus_capabilities(self._impl))
2725 2726 @property
2727 - def outcomes(self):
2728 return Data(pn_terminus_outcomes(self._impl))
2729 2730 @property
2731 - def filter(self):
2732 return Data(pn_terminus_filter(self._impl))
2733
2734 - def copy(self, src):
2735 self._check(pn_terminus_copy(self._impl, src._impl))
2736
2737 -class Sender(Link):
2738
2739 - def __init__(self, c_link):
2740 super(Sender, self).__init__(c_link)
2741
2742 - def offered(self, n):
2743 pn_link_offered(self._link, n)
2744
2745 - def send(self, bytes):
2746 return self._check(pn_link_send(self._link, bytes))
2747
2748 -class Receiver(Link):
2749
2750 - def __init__(self, c_link):
2751 super(Receiver, self).__init__(c_link)
2752
2753 - def flow(self, n):
2754 pn_link_flow(self._link, n)
2755
2756 - def recv(self, limit):
2757 n, bytes = pn_link_recv(self._link, limit) 2758 if n == PN_EOS: 2759 return None 2760 else: 2761 self._check(n) 2762 return bytes
2763
2764 - def drain(self, n):
2765 pn_link_drain(self._link, n)
2766
2767 - def draining(self):
2768 return pn_link_draining(self._link)
2769
2770 -class NamedInt(int):
2771 2772 values = {} 2773
2774 - def __new__(cls, i, name):
2775 ni = super(NamedInt, cls).__new__(cls, i) 2776 cls.values[i] = ni 2777 return ni
2778
2779 - def __init__(self, i, name):
2780 self.name = name
2781
2782 - def __repr__(self):
2783 return self.name
2784
2785 - def __str__(self):
2786 return self.name
2787 2788 @classmethod
2789 - def get(cls, i):
2790 return cls.values.get(i, i)
2791
2792 -class DispositionType(NamedInt):
2793 values = {}
2794
2795 -class Disposition(object):
2796 2797 RECEIVED = DispositionType(PN_RECEIVED, "RECEIVED") 2798 ACCEPTED = DispositionType(PN_ACCEPTED, "ACCEPTED") 2799 REJECTED = DispositionType(PN_REJECTED, "REJECTED") 2800 RELEASED = DispositionType(PN_RELEASED, "RELEASED") 2801 MODIFIED = DispositionType(PN_MODIFIED, "MODIFIED") 2802
2803 - def __init__(self, impl, local):
2804 self._impl = impl 2805 self.local = local 2806 self._data = None 2807 self._condition = None 2808 self._annotations = None
2809 2810 @property
2811 - def type(self):
2812 return DispositionType.get(pn_disposition_type(self._impl))
2813
2814 - def _get_section_number(self):
2815 return pn_disposition_get_section_number(self._impl)
2816 - def _set_section_number(self, n):
2817 pn_disposition_set_section_number(self._impl, n)
2818 section_number = property(_get_section_number, _set_section_number) 2819
2820 - def _get_section_offset(self):
2821 return pn_disposition_get_section_offset(self._impl)
2822 - def _set_section_offset(self, n):
2823 pn_disposition_set_section_offset(self._impl, n)
2824 section_offset = property(_get_section_offset, _set_section_offset) 2825
2826 - def _get_failed(self):
2827 return pn_disposition_is_failed(self._impl)
2828 - def _set_failed(self, b):
2829 pn_disposition_set_failed(self._impl, b)
2830 failed = property(_get_failed, _set_failed) 2831
2832 - def _get_undeliverable(self):
2833 return pn_disposition_is_undeliverable(self._impl)
2834 - def _set_undeliverable(self, b):
2835 pn_disposition_set_undeliverable(self._impl, b)
2836 undeliverable = property(_get_undeliverable, _set_undeliverable) 2837
2838 - def _get_data(self):
2839 if self.local: 2840 return self._data 2841 else: 2842 return dat2obj(pn_disposition_data(self._impl))
2843 - def _set_data(self, obj):
2844 if self.local: 2845 self._data = obj 2846 else: 2847 raise AttributeError("data attribute is read-only")
2848 data = property(_get_data, _set_data) 2849
2850 - def _get_annotations(self):
2851 if self.local: 2852 return self._annotations 2853 else: 2854 return dat2obj(pn_disposition_annotations(self._impl))
2855 - def _set_annotations(self, obj):
2856 if self.local: 2857 self._annotations = obj 2858 else: 2859 raise AttributeError("annotations attribute is read-only")
2860 annotations = property(_get_annotations, _set_annotations) 2861
2862 - def _get_condition(self):
2863 if self.local: 2864 return self._condition 2865 else: 2866 return cond2obj(pn_disposition_condition(self._impl))
2867 - def _set_condition(self, obj):
2868 if self.local: 2869 self._condition = obj 2870 else: 2871 raise AttributeError("condition attribute is read-only")
2872 condition = property(_get_condition, _set_condition)
2873
2874 -class Delivery(object):
2875 2876 RECEIVED = Disposition.RECEIVED 2877 ACCEPTED = Disposition.ACCEPTED 2878 REJECTED = Disposition.REJECTED 2879 RELEASED = Disposition.RELEASED 2880 MODIFIED = Disposition.MODIFIED 2881 2882 @staticmethod
2883 - def _wrap_delivery(c_dlv):
2884 """Maintain only a single instance of this class for each Delivery object that 2885 exists in the C Engine. 2886 """ 2887 if not c_dlv: return None 2888 py_dlv = pn_void2py(pn_delivery_get_context(c_dlv)) 2889 if py_dlv: return py_dlv 2890 wrapper = Delivery(c_dlv) 2891 return wrapper
2892
2893 - def __init__(self, dlv):
2894 self._dlv = dlv 2895 pn_delivery_set_context(self._dlv, pn_py2void(self)) 2896 self.local = Disposition(pn_delivery_local(self._dlv), True) 2897 self.remote = Disposition(pn_delivery_remote(self._dlv), False) 2898 self.link._deliveries.add(self)
2899
2900 - def __del__(self):
2901 self._release()
2902
2903 - def _release(self):
2904 """Release the underlying C Engine resource.""" 2905 if self._dlv: 2906 pn_delivery_set_context(self._dlv, pn_py2void(None)) 2907 pn_delivery_settle(self._dlv) 2908 self._dlv = None
2909 2910 @property
2911 - def released(self):
2912 return self._dlv is None
2913 2914 @property
2915 - def tag(self):
2916 return pn_delivery_tag(self._dlv)
2917 2918 @property
2919 - def writable(self):
2920 return pn_delivery_writable(self._dlv)
2921 2922 @property
2923 - def readable(self):
2924 return pn_delivery_readable(self._dlv)
2925 2926 @property
2927 - def updated(self):
2928 return pn_delivery_updated(self._dlv)
2929
2930 - def update(self, state):
2931 obj2dat(self.local._data, pn_disposition_data(self.local._impl)) 2932 obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl)) 2933 obj2cond(self.local._condition, pn_disposition_condition(self.local._impl)) 2934 pn_delivery_update(self._dlv, state)
2935 2936 @property
2937 - def pending(self):
2938 return pn_delivery_pending(self._dlv)
2939 2940 @property
2941 - def partial(self):
2942 return pn_delivery_partial(self._dlv)
2943 2944 @property
2945 - def local_state(self):
2946 return DispositionType.get(pn_delivery_local_state(self._dlv))
2947 2948 @property
2949 - def remote_state(self):
2950 return DispositionType.get(pn_delivery_remote_state(self._dlv))
2951 2952 @property
2953 - def settled(self):
2954 return pn_delivery_settled(self._dlv)
2955
2956 - def settle(self):
2957 """Release the delivery""" 2958 self.link._deliveries.remove(self) 2959 self._release()
2960 2961 @property
2962 - def work_next(self):
2963 return Delivery._wrap_delivery(pn_work_next(self._dlv))
2964 2965 @property
2968
2969 -class TransportException(ProtonException):
2970 pass
2971
2972 -class Transport(object):
2973 2974 TRACE_OFF = PN_TRACE_OFF 2975 TRACE_DRV = PN_TRACE_DRV 2976 TRACE_FRM = PN_TRACE_FRM 2977 TRACE_RAW = PN_TRACE_RAW 2978
2979 - def __init__(self, _trans=None):
2980 if not _trans: 2981 self._trans = pn_transport() 2982 else: 2983 self._shared_trans = True 2984 self._trans = _trans 2985 self._sasl = None 2986 self._ssl = None
2987
2988 - def __del__(self):
2989 if hasattr(self, "_trans"): 2990 if not hasattr(self, "_shared_trans"): 2991 pn_transport_free(self._trans) 2992 if hasattr(self, "_sasl") and self._sasl: 2993 # pn_transport_free deallocs the C sasl associated with the 2994 # transport, so erase the reference if a SASL object was used. 2995 self._sasl._sasl = None 2996 self._sasl = None 2997 if hasattr(self, "_ssl") and self._ssl: 2998 # ditto the owned c SSL object 2999 self._ssl._ssl = None 3000 self._ssl = None 3001 del self._trans
3002
3003 - def _check(self, err):
3004 if err < 0: 3005 exc = EXCEPTIONS.get(err, TransportException) 3006 raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._trans)))) 3007 else: 3008 return err
3009
3010 - def bind(self, connection):
3011 """Assign a connection to the transport""" 3012 self._check(pn_transport_bind(self._trans, connection._conn)) 3013 # keep python connection from being garbage collected: 3014 self._connection = connection
3015
3016 - def unbind(self):
3017 """Release the connection""" 3018 self._check(pn_transport_unbind(self._trans)) 3019 self._connection = None
3020
3021 - def trace(self, n):
3022 pn_transport_trace(self._trans, n)
3023
3024 - def tick(self, now):
3025 """Process any timed events (like heartbeat generation). 3026 now = seconds since epoch (float). 3027 """ 3028 next = pn_transport_tick(self._trans, long(now * 1000)) 3029 return float(next) / 1000.0
3030
3031 - def capacity(self):
3032 c = pn_transport_capacity(self._trans) 3033 if c >= PN_EOS: 3034 return c 3035 else: 3036 return self._check(c)
3037
3038 - def push(self, bytes):
3039 n = self._check(pn_transport_push(self._trans, bytes)) 3040 if n != len(bytes): 3041 raise OverflowError("unable to process all bytes")
3042
3043 - def close_tail(self):
3044 self._check(pn_transport_close_tail(self._trans))
3045
3046 - def pending(self):
3047 p = pn_transport_pending(self._trans) 3048 if p >= PN_EOS: 3049 return p 3050 else: 3051 return self._check(p)
3052
3053 - def peek(self, size):
3054 cd, out = pn_transport_peek(self._trans, size) 3055 if cd == PN_EOS: 3056 return None 3057 else: 3058 self._check(cd) 3059 return out
3060
3061 - def pop(self, size):
3062 pn_transport_pop(self._trans, size)
3063
3064 - def close_head(self):
3065 self._check(pn_transport_close_head(self._trans))
3066 3067 @property
3068 - def closed(self):
3069 return pn_transport_closed(self._trans)
3070 3071 # AMQP 1.0 max-frame-size
3072 - def _get_max_frame_size(self):
3073 return pn_transport_get_max_frame(self._trans)
3074
3075 - def _set_max_frame_size(self, value):
3076 pn_transport_set_max_frame(self._trans, value)
3077 3078 max_frame_size = property(_get_max_frame_size, _set_max_frame_size, 3079 doc=""" 3080 Sets the maximum size for received frames (in bytes). 3081 """) 3082 3083 @property
3084 - def remote_max_frame_size(self):
3085 return pn_transport_get_remote_max_frame(self._trans)
3086
3087 - def _get_channel_max(self):
3088 return pn_transport_get_channel_max(self._trans)
3089
3090 - def _set_channel_max(self, value):
3091 pn_transport_set_channel_max(self._trans, value)
3092 3093 channel_max = property(_get_channel_max, _set_channel_max, 3094 doc=""" 3095 Sets the maximum channel that may be used on the transport. 3096 """) 3097 3098 @property
3099 - def remote_channel_max(self):
3100 return pn_transport_remote_channel_max(self._trans)
3101 3102 # AMQP 1.0 idle-time-out
3103 - def _get_idle_timeout(self):
3104 msec = pn_transport_get_idle_timeout(self._trans) 3105 return float(msec)/1000.0
3106
3107 - def _set_idle_timeout(self, sec):
3108 pn_transport_set_idle_timeout(self._trans, long(sec * 1000))
3109 3110 idle_timeout = property(_get_idle_timeout, _set_idle_timeout, 3111 doc=""" 3112 The idle timeout of the connection (float, in seconds). 3113 """) 3114 3115 @property
3116 - def remote_idle_timeout(self):
3117 msec = pn_transport_get_remote_idle_timeout(self._trans) 3118 return float(msec)/1000.0
3119 3120 @property
3121 - def frames_output(self):
3122 return pn_transport_get_frames_output(self._trans)
3123 3124 @property
3125 - def frames_input(self):
3126 return pn_transport_get_frames_input(self._trans)
3127
3128 - def sasl(self):
3129 # SASL factory (singleton for this transport) 3130 if not self._sasl: 3131 self._sasl = SASL(self) 3132 return self._sasl
3133
3134 - def ssl(self, domain=None, session_details=None):
3135 # SSL factory (singleton for this transport) 3136 if not self._ssl: 3137 self._ssl = SSL(self, domain, session_details) 3138 return self._ssl
3139 3140 @property
3141 - def condition(self):
3142 return cond2obj(pn_transport_condition(self._trans))
3143 3144 @property
3145 - def connection(self):
3146 return Connection._wrap_connection(pn_transport_connection(self._trans))
3147
3148 -class SASLException(TransportException):
3149 pass
3150
3151 -class SASL(object):
3152 3153 OK = PN_SASL_OK 3154 AUTH = PN_SASL_AUTH 3155 SKIPPED = PN_SASL_SKIPPED 3156
3157 - def __new__(cls, transport):
3158 """Enforce a singleton SASL object per Transport""" 3159 if not transport._sasl: 3160 obj = super(SASL, cls).__new__(cls) 3161 obj._sasl = pn_sasl(transport._trans) 3162 transport._sasl = obj 3163 return transport._sasl
3164
3165 - def _check(self, err):
3166 if err < 0: 3167 exc = EXCEPTIONS.get(err, SASLException) 3168 raise exc("[%s]" % (err)) 3169 else: 3170 return err
3171
3172 - def mechanisms(self, mechs):
3173 pn_sasl_mechanisms(self._sasl, mechs)
3174
3175 - def client(self):
3176 pn_sasl_client(self._sasl)
3177
3178 - def server(self):
3179 pn_sasl_server(self._sasl)
3180
3181 - def allow_skip(self, allow):
3182 pn_sasl_allow_skip(self._sasl, allow)
3183
3184 - def plain(self, user, password):
3185 pn_sasl_plain(self._sasl, user, password)
3186
3187 - def send(self, data):
3188 self._check(pn_sasl_send(self._sasl, data, len(data)))
3189
3190 - def recv(self):
3191 sz = 16 3192 while True: 3193 n, data = pn_sasl_recv(self._sasl, sz) 3194 if n == PN_OVERFLOW: 3195 sz *= 2 3196 continue 3197 elif n == PN_EOS: 3198 return None 3199 else: 3200 self._check(n) 3201 return data
3202 3203 @property
3204 - def outcome(self):
3205 outcome = pn_sasl_outcome(self._sasl) 3206 if outcome == PN_SASL_NONE: 3207 return None 3208 else: 3209 return outcome
3210
3211 - def done(self, outcome):
3212 pn_sasl_done(self._sasl, outcome)
3213 3214 STATE_CONF = PN_SASL_CONF 3215 STATE_IDLE = PN_SASL_IDLE 3216 STATE_STEP = PN_SASL_STEP 3217 STATE_PASS = PN_SASL_PASS 3218 STATE_FAIL = PN_SASL_FAIL 3219 3220 @property
3221 - def state(self):
3222 return pn_sasl_state(self._sasl)
3223
3224 3225 -class SSLException(TransportException):
3226 pass
3227
3228 -class SSLUnavailable(SSLException):
3229 pass
3230
3231 -class SSLDomain(object):
3232 3233 MODE_CLIENT = PN_SSL_MODE_CLIENT 3234 MODE_SERVER = PN_SSL_MODE_SERVER 3235 VERIFY_PEER = PN_SSL_VERIFY_PEER 3236 VERIFY_PEER_NAME = PN_SSL_VERIFY_PEER_NAME 3237 ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER 3238
3239 - def __init__(self, mode):
3240 self._domain = pn_ssl_domain(mode) 3241 if self._domain is None: 3242 raise SSLUnavailable()
3243
3244 - def _check(self, err):
3245 if err < 0: 3246 exc = EXCEPTIONS.get(err, SSLException) 3247 raise exc("SSL failure.") 3248 else: 3249 return err
3250
3251 - def set_credentials(self, cert_file, key_file, password):
3252 return self._check( pn_ssl_domain_set_credentials(self._domain, 3253 cert_file, key_file, 3254 password) )
3255 - def set_trusted_ca_db(self, certificate_db):
3256 return self._check( pn_ssl_domain_set_trusted_ca_db(self._domain, 3257 certificate_db) )
3258 - def set_peer_authentication(self, verify_mode, trusted_CAs=None):
3259 return self._check( pn_ssl_domain_set_peer_authentication(self._domain, 3260 verify_mode, 3261 trusted_CAs) )
3262
3263 - def allow_unsecured_client(self):
3264 return self._check( pn_ssl_domain_allow_unsecured_client(self._domain) )
3265
3266 -class SSL(object):
3267
3268 - def _check(self, err):
3269 if err < 0: 3270 exc = EXCEPTIONS.get(err, SSLException) 3271 raise exc("SSL failure.") 3272 else: 3273 return err
3274
3275 - def __new__(cls, transport, domain, session_details=None):
3276 """Enforce a singleton SSL object per Transport""" 3277 if transport._ssl: 3278 # unfortunately, we've combined the allocation and the configuration in a 3279 # single step. So catch any attempt by the application to provide what 3280 # may be a different configuration than the original (hack) 3281 ssl = transport._ssl 3282 if (domain and (ssl._domain is not domain) or 3283 session_details and (ssl._session_details is not session_details)): 3284 raise SSLException("Cannot re-configure existing SSL object!") 3285 else: 3286 obj = super(SSL, cls).__new__(cls) 3287 obj._domain = domain 3288 obj._session_details = session_details 3289 session_id = None 3290 if session_details: 3291 session_id = session_details.get_session_id() 3292 obj._ssl = pn_ssl( transport._trans ) 3293 if obj._ssl is None: 3294 raise SSLUnavailable() 3295 pn_ssl_init( obj._ssl, domain._domain, session_id ) 3296 transport._ssl = obj 3297 return transport._ssl
3298
3299 - def cipher_name(self):
3300 rc, name = pn_ssl_get_cipher_name( self._ssl, 128 ) 3301 if rc: 3302 return name 3303 return None
3304
3305 - def protocol_name(self):
3306 rc, name = pn_ssl_get_protocol_name( self._ssl, 128 ) 3307 if rc: 3308 return name 3309 return None
3310 3311 RESUME_UNKNOWN = PN_SSL_RESUME_UNKNOWN 3312 RESUME_NEW = PN_SSL_RESUME_NEW 3313 RESUME_REUSED = PN_SSL_RESUME_REUSED 3314
3315 - def resume_status(self):
3316 return pn_ssl_resume_status( self._ssl )
3317
3318 - def _set_peer_hostname(self, hostname):
3319 self._check(pn_ssl_set_peer_hostname( self._ssl, hostname ))
3320 - def _get_peer_hostname(self):
3321 err, name = pn_ssl_get_peer_hostname( self._ssl, 1024 ) 3322 self._check(err) 3323 return name
3324 peer_hostname = property(_get_peer_hostname, _set_peer_hostname, 3325 doc=""" 3326 Manage the expected name of the remote peer. Used to authenticate the remote. 3327 """)
3328
3329 3330 -class SSLSessionDetails(object):
3331 """ Unique identifier for the SSL session. Used to resume previous session on a new 3332 SSL connection. 3333 """ 3334
3335 - def __init__(self, session_id):
3336 self._session_id = session_id
3337
3338 - def get_session_id(self):
3339 return self._session_id
3340 3341 3342 wrappers = { 3343 "pn_void": lambda x: pn_void2py(x), 3344 "pn_pyref": lambda x: pn_void2py(x), 3345 "pn_connection": lambda x: Connection._wrap_connection(pn_cast_pn_connection(x)), 3346 "pn_session": lambda x: Session._wrap_session(pn_cast_pn_session(x)), 3347 "pn_link": lambda x: Link._wrap_link(pn_cast_pn_link(x)), 3348 "pn_delivery": lambda x: Delivery._wrap_delivery(pn_cast_pn_delivery(x)), 3349 "pn_transport": lambda x: Transport(pn_cast_pn_transport(x)) 3350 }
3351 3352 -class Collector:
3353
3354 - def __init__(self):
3355 self._impl = pn_collector() 3356 self._contexts = set()
3357
3358 - def put(self, obj, etype):
3359 pn_collector_put(self._impl, PN_PYREF, pn_py2void(obj), etype.number)
3360
3361 - def peek(self):
3362 event = pn_collector_peek(self._impl) 3363 if event is None: 3364 return None 3365 3366 clazz = pn_class_name(pn_event_class(event)) 3367 context = wrappers[clazz](pn_event_context(event)) 3368 return Event(clazz, context, EventType.TYPES[pn_event_type(event)])
3369
3370 - def pop(self):
3371 ev = self.peek() 3372 if ev is not None: 3373 ev._popped(self) 3374 pn_collector_pop(self._impl)
3375
3376 - def __del__(self):
3377 pn_collector_free(self._impl)
3378
3379 -class EventType:
3380 3381 TYPES = {} 3382
3383 - def __init__(self, number, method):
3384 self.number = number 3385 self.name = pn_event_type_name(self.number) 3386 self.method = method 3387 self.TYPES[number] = self
3388
3389 - def __repr__(self):
3390 return self.name
3391
3392 -def dispatch(handler, method, *args):
3393 m = getattr(handler, method, None) 3394 if m: 3395 return m(*args) 3396 elif hasattr(handler, "on_unhandled"): 3397 return handler.on_unhandled(method, args)
3398
3399 -class Event:
3400 3401 CONNECTION_INIT = EventType(PN_CONNECTION_INIT, "on_connection_init") 3402 CONNECTION_BOUND = EventType(PN_CONNECTION_BOUND, "on_connection_bound") 3403 CONNECTION_UNBOUND = EventType(PN_CONNECTION_UNBOUND, "on_connection_unbound") 3404 CONNECTION_LOCAL_OPEN = EventType(PN_CONNECTION_LOCAL_OPEN, "on_connection_local_open") 3405 CONNECTION_LOCAL_CLOSE = EventType(PN_CONNECTION_LOCAL_CLOSE, "on_connection_local_close") 3406 CONNECTION_REMOTE_OPEN = EventType(PN_CONNECTION_REMOTE_OPEN, "on_connection_remote_open") 3407 CONNECTION_REMOTE_CLOSE = EventType(PN_CONNECTION_REMOTE_CLOSE, "on_connection_remote_close") 3408 CONNECTION_FINAL = EventType(PN_CONNECTION_FINAL, "on_connection_final") 3409 3410 SESSION_INIT = EventType(PN_SESSION_INIT, "on_session_init") 3411 SESSION_LOCAL_OPEN = EventType(PN_SESSION_LOCAL_OPEN, "on_session_local_open") 3412 SESSION_LOCAL_CLOSE = EventType(PN_SESSION_LOCAL_CLOSE, "on_session_local_close") 3413 SESSION_REMOTE_OPEN = EventType(PN_SESSION_REMOTE_OPEN, "on_session_remote_open") 3414 SESSION_REMOTE_CLOSE = EventType(PN_SESSION_REMOTE_CLOSE, "on_session_remote_close") 3415 SESSION_FINAL = EventType(PN_SESSION_FINAL, "on_session_final") 3416 3417 LINK_INIT = EventType(PN_LINK_INIT, "on_link_init") 3418 LINK_LOCAL_OPEN = EventType(PN_LINK_LOCAL_OPEN, "on_link_local_open") 3419 LINK_LOCAL_CLOSE = EventType(PN_LINK_LOCAL_CLOSE, "on_link_local_close") 3420 LINK_LOCAL_DETACH = EventType(PN_LINK_LOCAL_DETACH, "on_link_local_detach") 3421 LINK_REMOTE_OPEN = EventType(PN_LINK_REMOTE_OPEN, "on_link_remote_open") 3422 LINK_REMOTE_CLOSE = EventType(PN_LINK_REMOTE_CLOSE, "on_link_remote_close") 3423 LINK_REMOTE_DETACH = EventType(PN_LINK_REMOTE_DETACH, "on_link_remote_detach") 3424 LINK_FLOW = EventType(PN_LINK_FLOW, "on_link_flow") 3425 LINK_FINAL = EventType(PN_LINK_FINAL, "on_link_final") 3426 3427 DELIVERY = EventType(PN_DELIVERY, "on_delivery") 3428 3429 TRANSPORT = EventType(PN_TRANSPORT, "on_transport") 3430 TRANSPORT_ERROR = EventType(PN_TRANSPORT_ERROR, "on_transport_error") 3431 TRANSPORT_HEAD_CLOSED = EventType(PN_TRANSPORT_HEAD_CLOSED, "on_transport_head_closed") 3432 TRANSPORT_TAIL_CLOSED = EventType(PN_TRANSPORT_TAIL_CLOSED, "on_transport_tail_closed") 3433 TRANSPORT_CLOSED = EventType(PN_TRANSPORT_CLOSED, "on_transport_closed") 3434
3435 - def __init__(self, clazz, context, type):
3436 self.clazz = clazz 3437 self.context = context 3438 self.type = type
3439
3440 - def _popped(self, collector):
3441 if self.type in (Event.LINK_FINAL, Event.SESSION_FINAL, 3442 Event.CONNECTION_FINAL): 3443 collector._contexts.remove(self.context) 3444 self.context._released()
3445
3446 - def dispatch(self, handler):
3447 return dispatch(handler, self.type.method, self)
3448 3449 @property
3450 - def connection(self):
3451 if self.clazz == "pn_connection": 3452 return self.context 3453 elif self.clazz == "pn_session": 3454 return self.context.connection 3455 elif self.clazz == "pn_link": 3456 return self.context.connection 3457 elif self.clazz == "pn_delivery" and not self.context.released: 3458 return self.context.link.connection 3459 else: 3460 return None
3461 3462 @property
3463 - def session(self):
3464 if self.clazz == "pn_session": 3465 return self.context 3466 elif self.clazz == "pn_link": 3467 return self.context.session 3468 elif self.clazz == "pn_delivery" and not self.context.released: 3469 return self.context.link.session 3470 else: 3471 return None
3472 3473 @property 3481 3482 @property
3483 - def sender(self):
3484 l = self.link 3485 if l and l.is_sender: 3486 return l 3487 else: 3488 return None
3489 3490 @property
3491 - def receiver(self):
3492 l = self.link 3493 if l and l.is_receiver: 3494 return l 3495 else: 3496 return None
3497 3498 @property
3499 - def delivery(self):
3500 if self.clazz == "pn_delivery": 3501 return self.context 3502 else: 3503 return None
3504
3505 - def __repr__(self):
3506 return "%s(%s)" % (self.type, self.context)
3507
3508 -class Handler(object):
3509
3510 - def on_unhandled(self, method, args):
3511 pass
3512
3513 3514 ### 3515 # Driver 3516 ### 3517 3518 -class DriverException(ProtonException):
3519 """ 3520 The DriverException class is the root of the driver exception hierarchy. 3521 """ 3522 pass
3523
3524 -class Connector(object):
3525 3526 @staticmethod
3527 - def _wrap_connector(c_cxtr, py_driver=None):
3528 """Maintain only a single instance of this class for each Connector object that 3529 exists in the C Driver. 3530 """ 3531 if not c_cxtr: return None 3532 py_cxtr = pn_void2py(pn_connector_context(c_cxtr)) 3533 if py_cxtr: return py_cxtr 3534 wrapper = Connector(_cxtr=c_cxtr, _py_driver=py_driver) 3535 return wrapper
3536
3537 - def __init__(self, _cxtr, _py_driver):
3538 self._cxtr = _cxtr 3539 assert(_py_driver) 3540 self._driver = weakref.ref(_py_driver) 3541 pn_connector_set_context(self._cxtr, pn_py2void(self)) 3542 self._connection = None 3543 self._driver()._connectors.add(self)
3544
3545 - def _release(self):
3546 """Release the underlying C Engine resource.""" 3547 if self._cxtr: 3548 pn_connector_set_context(self._cxtr, pn_py2void(None)) 3549 pn_connector_free(self._cxtr) 3550 self._cxtr = None
3551
3552 - def free(self):
3553 """Release the Connector, freeing its resources. 3554 3555 Call this when you no longer need the Connector. This will allow the 3556 connector's resources to be reclaimed. Once called, you should no longer 3557 reference this connector. 3558 3559 """ 3560 self.connection = None 3561 d = self._driver() 3562 if d: d._connectors.remove(self) 3563 self._release()
3564
3565 - def next(self):
3566 return Connector._wrap_connector(pn_connector_next(self._cxtr))
3567
3568 - def process(self):
3569 pn_connector_process(self._cxtr)
3570
3571 - def listener(self):
3572 return Listener._wrap_listener(pn_connector_listener(self._cxtr))
3573
3574 - def sasl(self):
3575 ## seems easier just to grab the SASL associated with the transport: 3576 trans = self.transport 3577 if trans: 3578 return SASL(self.transport) 3579 return None
3580 3581 @property
3582 - def transport(self):
3583 trans = pn_connector_transport(self._cxtr) 3584 if trans: 3585 return Transport(trans) 3586 return None
3587
3588 - def close(self):
3589 return pn_connector_close(self._cxtr)
3590 3591 @property
3592 - def closed(self):
3593 return pn_connector_closed(self._cxtr)
3594
3595 - def _get_connection(self):
3596 return self._connection
3597
3598 - def _set_connection(self, conn):
3599 if conn: 3600 pn_connector_set_connection(self._cxtr, conn._conn) 3601 else: 3602 pn_connector_set_connection(self._cxtr, None) 3603 self._connection = conn
3604 3605 3606 connection = property(_get_connection, _set_connection, 3607 doc=""" 3608 Associate a Connection with this Connector. 3609 """)
3610
3611 -class Listener(object):
3612 3613 @staticmethod
3614 - def _wrap_listener(c_lsnr, py_driver=None):
3615 """Maintain only a single instance of this class for each Listener object that 3616 exists in the C Driver. 3617 """ 3618 if not c_lsnr: return None 3619 py_lsnr = pn_void2py(pn_listener_context(c_lsnr)) 3620 if py_lsnr: return py_lsnr 3621 wrapper = Listener(_lsnr=c_lsnr, _py_driver=py_driver) 3622 return wrapper
3623
3624 - def __init__(self, _lsnr, _py_driver):
3625 self._lsnr = _lsnr 3626 assert(_py_driver) 3627 self._driver = weakref.ref(_py_driver) 3628 pn_listener_set_context(self._lsnr, pn_py2void(self)) 3629 self._driver()._listeners.add(self)
3630
3631 - def _release(self):
3632 """Release the underlying C Engine resource.""" 3633 if self._lsnr: 3634 pn_listener_set_context(self._lsnr, pn_py2void(None)); 3635 pn_listener_free(self._lsnr) 3636 self._lsnr = None
3637
3638 - def free(self):
3639 """Release the Listener, freeing its resources""" 3640 d = self._driver() 3641 if d: d._listeners.remove(self) 3642 self._release()
3643
3644 - def next(self):
3645 return Listener._wrap_listener(pn_listener_next(self._lsnr))
3646
3647 - def accept(self):
3648 d = self._driver() 3649 if d: 3650 cxtr = pn_listener_accept(self._lsnr) 3651 c = Connector._wrap_connector(cxtr, d) 3652 return c 3653 return None
3654
3655 - def close(self):
3656 pn_listener_close(self._lsnr)
3657
3658 -class Driver(object):
3659 - def __init__(self):
3660 self._driver = pn_driver() 3661 self._listeners = set() 3662 self._connectors = set()
3663
3664 - def __del__(self):
3665 # freeing the driver will release all child objects in the C Engine, so 3666 # clean up their references in the corresponding Python objects 3667 for c in self._connectors: 3668 c._release() 3669 for l in self._listeners: 3670 l._release() 3671 if hasattr(self, "_driver") and self._driver: 3672 pn_driver_free(self._driver) 3673 del self._driver
3674
3675 - def wait(self, timeout_sec):
3676 if timeout_sec is None or timeout_sec < 0.0: 3677 t = -1 3678 else: 3679 t = long(1000*timeout_sec) 3680 return pn_driver_wait(self._driver, t)
3681
3682 - def wakeup(self):
3683 return pn_driver_wakeup(self._driver)
3684
3685 - def listener(self, host, port):
3686 """Construct a listener""" 3687 return Listener._wrap_listener(pn_listener(self._driver, host, port, None), 3688 self)
3689
3690 - def pending_listener(self):
3691 return Listener._wrap_listener(pn_driver_listener(self._driver))
3692
3693 - def head_listener(self):
3694 return Listener._wrap_listener(pn_listener_head(self._driver))
3695
3696 - def connector(self, host, port):
3697 return Connector._wrap_connector(pn_connector(self._driver, host, port, None), 3698 self)
3699
3700 - def head_connector(self):
3701 return Connector._wrap_connector(pn_connector_head(self._driver))
3702
3703 - def pending_connector(self):
3704 return Connector._wrap_connector(pn_driver_connector(self._driver))
3705
3706 -class Url(object):
3707 """ 3708 Simple URL parser/constructor, handles URLs of the form: 3709 3710 <scheme>://<user>:<password>@<host>:<port>/<path> 3711 3712 All components can be None if not specifeid in the URL string. 3713 3714 The port can be specified as a service name, e.g. 'amqp' in the 3715 URL string but Url.port always gives the integer value. 3716 3717 @ivar scheme: Url scheme e.g. 'amqp' or 'amqps' 3718 @ivar user: Username 3719 @ivar password: Password 3720 @ivar host: Host name, ipv6 literal or ipv4 dotted quad. 3721 @ivar port: Integer port. 3722 @ivar host_port: Returns host:port 3723 """ 3724 3725 AMQPS = "amqps" 3726 AMQP = "amqp" 3727
3728 - class Port(int):
3729 """An integer port number that can be constructed from a service name string""" 3730
3731 - def __new__(cls, value):
3732 """@param value: integer port number or string service name.""" 3733 port = super(Url.Port, cls).__new__(cls, cls._port_int(value)) 3734 setattr(port, 'name', str(value)) 3735 return port
3736
3737 - def __eq__(self, x): return str(self) == x or int(self) == x
3738 - def __ne__(self, x): return not self == x
3739 - def __str__(self): return str(self.name)
3740 3741 @staticmethod
3742 - def _port_int(value):
3743 """Convert service, an integer or a service name, into an integer port number.""" 3744 try: 3745 return int(value) 3746 except ValueError: 3747 try: 3748 return socket.getservbyname(value) 3749 except socket.error: 3750 # Not every system has amqp/amqps defined as a service 3751 if value == Url.AMQPS: return 5671 3752 elif value == Url.AMQP: return 5672 3753 else: 3754 raise ValueError("Not a valid port number or service name: '%s'" % value)
3755
3756 - def __init__(self, url=None, **kwargs):
3757 """ 3758 @param url: URL string to parse. 3759 @param kwargs: scheme, user, password, host, port, path. 3760 If specified, replaces corresponding part in url string. 3761 """ 3762 if url: 3763 self._url = pn_url_parse(str(url)) 3764 if not self._url: raise ValueError("Invalid URL '%s'" % url) 3765 else: 3766 self._url = pn_url() 3767 for k in kwargs: # Let kwargs override values parsed from url 3768 getattr(self, k) # Check for invalid kwargs 3769 setattr(self, k, kwargs[k])
3770
3771 - class PartDescriptor(object):
3772 - def __init__(self, part):
3773 self.getter = globals()["pn_url_get_%s" % part] 3774 self.setter = globals()["pn_url_set_%s" % part]
3775 - def __get__(self, obj, type=None): return self.getter(obj._url)
3776 - def __set__(self, obj, value): return self.setter(obj._url, str(value))
3777 3778 scheme = PartDescriptor('scheme') 3779 username = PartDescriptor('username') 3780 password = PartDescriptor('password') 3781 host = PartDescriptor('host') 3782 path = PartDescriptor('path') 3783
3784 - def _get_port(self):
3785 portstr = pn_url_get_port(self._url) 3786 return portstr and Url.Port(portstr)
3787
3788 - def _set_port(self, value):
3789 if value is None: pn_url_set_port(self._url, None) 3790 else: pn_url_set_port(self._url, str(Url.Port(value)))
3791 3792 port = property(_get_port, _set_port) 3793
3794 - def __str__(self): return pn_url_str(self._url)
3795
3796 - def __repr__(self): return "Url(%r)" % str(self)
3797
3798 - def __del__(self):
3799 pn_url_free(self._url); 3800 self._url = None
3801
3802 - def defaults(self):
3803 """ 3804 Fill in missing values (scheme, host or port) with defaults 3805 @return: self 3806 """ 3807 self.scheme = self.scheme or self.AMQP 3808 self.host = self.host or '0.0.0.0' 3809 self.port = self.port or self.Port(self.scheme) 3810 return self
3811 3812 __all__ = [ 3813 "API_LANGUAGE", 3814 "IMPLEMENTATION_LANGUAGE", 3815 "ABORTED", 3816 "ACCEPTED", 3817 "AUTOMATIC", 3818 "PENDING", 3819 "MANUAL", 3820 "REJECTED", 3821 "RELEASED", 3822 "SETTLED", 3823 "UNDESCRIBED", 3824 "Array", 3825 "Collector", 3826 "Condition", 3827 "Connection", 3828 "Connector", 3829 "Data", 3830 "Delivery", 3831 "Disposition", 3832 "Described", 3833 "Driver", 3834 "DriverException", 3835 "Endpoint", 3836 "Event", 3837 "Handler", 3838 "Link", 3839 "Listener", 3840 "Message", 3841 "MessageException", 3842 "Messenger", 3843 "MessengerException", 3844 "ProtonException", 3845 "VERSION_MAJOR", 3846 "VERSION_MINOR", 3847 "Receiver", 3848 "SASL", 3849 "Sender", 3850 "Session", 3851 "SSL", 3852 "SSLDomain", 3853 "SSLSessionDetails", 3854 "SSLUnavailable", 3855 "SSLException", 3856 "Terminus", 3857 "Timeout", 3858 "Interrupt", 3859 "Transport", 3860 "TransportException", 3861 "Url", 3862 "char", 3863 "dispatch", 3864 "symbol", 3865 "timestamp", 3866 "ulong" 3867 ] 3868