Coverage report: /development/source/library/org/datagraph/spocq-shard/src/core/patches/amqp-user-id-patch.lisp
| Kind | Covered | All | % |
| expression | 0 | 46 | 0.0 |
| branch | 0 | 0 | nil |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
3
(defgeneric basic-user-id (channel)
4
(:documentation "Return the user id specified in the basic's channel, which
5
should then delegate to the channel's connection.")
7
(:method ((basic amqp:basic))
8
(let ((channel (basic-channel basic)))
10
(channel-user-id channel)
13
(defparameter *publish-expiration* "100000"
14
"Value in milliseconds. COuld also be a default value in the instance, but this is more direct,")
16
(def-amqp-command amqp:publish (class &key body exchange routing-key mandatory immediate
17
content-type content-encoding headers delivery-mode
18
priority correlation-id reply-to expiration message-id timestamp
20
(:documentation "C-->S : publish a message :
21
This method publishes a message to a specific exchange. The message will be routed to queues as
22
defined by the exchange configuration and distributed to any active consumers when the transaction, if
26
(:method ((exchange amqp:exchange) &rest args)
27
"Given an exchange, delegate to its channel's basic instance."
28
(declare (dynamic-extent args))
29
(apply #'amqp::request-publish (amqp:channel.basic (amqp.u:exchange-channel exchange)) args))
31
(:method ((channel amqp:channel) &rest args &key (user-id (or (channel-user-id channel) "")) &allow-other-keys)
32
"The class' channel is state is set to use-channel.body.output, the stream is cleared,
33
and the encoding is asserted. If a body is supplied, then, it is written. Otherwise the
34
channel is left available as a stream."
35
(declare (dynamic-extent args))
36
;; delegate to the channel's basic class
37
(apply #'amqp::request-publish (amqp:channel.basic channel)
41
(:method ((basic amqp:basic) &rest args &key (body nil body-s)
42
(exchange nil e-s) (routing-key nil rk-s)
43
(user-id (or (basic-user-id basic) ""))
44
(expiration *publish-expiration*)
47
(setf exchange (amqp:exchange-exchange exchange)) ; coerce to a string
48
(setf (amqp:basic-exchange basic) exchange)) ; cache for possible use in chunk headers
50
(setf (amqp:basic-routing-key basic) routing-key))
52
(setf args (copy-list args))
54
(apply #'shared-initialize basic t args)
55
(let ((channel (object-channel basic)))
56
(apply #'device-write-content channel body :exchange exchange :user-id user-id
57
:expiration expiration args)))))
60
(defun publish-get-loop (publish-channel get-channel data count
61
&key (queue "q1") (exchange "ex")
63
;; specify :debug to observe the protocol exchange
64
((:log-level *log-level*) *log-level*))
65
(let* ((publish-basic (amqp:basic publish-channel :delivery-mode 1))
66
(get-basic (amqp:basic get-channel))
67
(exchange (amqp:exchange publish-channel :exchange exchange :type "direct"))
68
(publish-queue (amqp:queue publish-channel :queue queue))
69
(get-queue (amqp:queue get-channel :queue queue)))
71
(amqp:request-declare publish-queue)
72
(amqp:request-declare exchange)
73
(amqp:bind publish-queue :exchange exchange :queue publish-queue :routing-key routing-key)
77
(amqp:request-publish publish-basic :exchange (amqp:exchange-exchange exchange) :body datum :routing-key routing-key)
78
(print (amqp:request-get get-basic :queue get-queue))))))
81
(defparameter *c* (make-instance 'amqp:connection :uri "amqp://guest:guest@localhost/"))
82
(defparameter *ch1* (amqp:channel *c* :uri (puri:uri "amqp:/")))
83
(defparameter *ch2* (amqp:channel *c* :uri (puri:uri "amqp:/")))
84
;;; (setq *log-level* :debug)
85
(publish-get-loop *ch1* *ch2* '("this is a test") 2)