Coverage report: /development/source/library/org/datagraph/spocq-shard/src/core/patches/amqp-user-id-patch.lisp

KindCoveredAll%
expression046 0.0
branch00nil
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 (in-package :amqp.i)
2
 
3
 (defgeneric basic-user-id (channel)
4
   (:documentation "Return the user id specified in the basic's channel, which
5
     should then delegate to the channel's connection.")
6
 
7
   (:method ((basic amqp:basic))
8
     (let ((channel (basic-channel basic)))
9
       (if channel
10
         (channel-user-id channel)
11
         ""))))
12
 
13
 (defparameter *publish-expiration* "100000"
14
   "Value in milliseconds. COuld also be a default value in the instance, but this is more direct,")
15
 
16
 (def-amqp-command amqp:publish (class &key  body exchange routing-key mandatory immediate
17
                                       content-type content-encoding headers delivery-mode
18
                                       priority correlation-id reply-to expiration message-id timestamp
19
                                       type user-id)
20
   (:documentation "C-->S : publish a message :
21
 This method publishes a message to a specific exchange. The message will be routed to queues as 
22
 defined by the exchange configuration and distributed to any active consumers when the transaction, if 
23
 any, is committed.")
24
   
25
   (:request
26
    (:method ((exchange amqp:exchange) &rest args)
27
      "Given an exchange, delegate to its channel's basic instance."
28
      (declare (dynamic-extent args))
29
      (apply #'amqp::request-publish (amqp:channel.basic (amqp.u:exchange-channel exchange)) args))
30
    
31
    (:method ((channel amqp:channel) &rest args &key (user-id (or (channel-user-id channel) "")) &allow-other-keys)
32
      "The class' channel is state is set to use-channel.body.output, the stream is cleared,
33
  and the encoding is asserted. If a body is supplied, then, it is written. Otherwise the
34
  channel is left available as a stream."
35
      (declare (dynamic-extent args))
36
      ;; delegate to the channel's basic class
37
      (apply #'amqp::request-publish (amqp:channel.basic channel) 
38
             :user-id user-id
39
             args))
40
    
41
    (:method ((basic amqp:basic) &rest args &key (body nil body-s)
42
              (exchange nil e-s) (routing-key nil rk-s)
43
              (user-id (or (basic-user-id basic) ""))
44
              (expiration *publish-expiration*)
45
              &allow-other-keys)
46
      (when e-s
47
        (setf exchange (amqp:exchange-exchange exchange))          ; coerce to a string
48
        (setf (amqp:basic-exchange basic) exchange))     ; cache for possible use in chunk headers
49
      (when rk-s
50
        (setf (amqp:basic-routing-key basic) routing-key))
51
      (when body-s
52
        (setf args (copy-list args))
53
        (remf args :body))
54
      (apply #'shared-initialize basic t args)
55
      (let ((channel (object-channel basic)))
56
        (apply #'device-write-content channel body :exchange exchange :user-id user-id
57
               :expiration expiration args)))))
58
 
59
 #|
60
 (defun publish-get-loop (publish-channel get-channel data count
61
                                          &key (queue "q1") (exchange "ex")
62
                                          (routing-key "/")
63
                                          ;; specify :debug to observe the protocol exchange
64
                                          ((:log-level *log-level*) *log-level*))
65
   (let* ((publish-basic (amqp:basic publish-channel :delivery-mode 1))
66
          (get-basic (amqp:basic get-channel))
67
          (exchange (amqp:exchange publish-channel :exchange exchange :type "direct"))
68
          (publish-queue (amqp:queue publish-channel :queue queue))
69
          (get-queue (amqp:queue get-channel :queue queue)))
70
     
71
     (amqp:request-declare publish-queue)
72
     (amqp:request-declare exchange)
73
     (amqp:bind publish-queue :exchange exchange :queue publish-queue :routing-key routing-key)
74
 
75
     (dotimes (i count)
76
       (dolist (datum data)
77
         (amqp:request-publish publish-basic :exchange (amqp:exchange-exchange exchange) :body datum :routing-key routing-key)
78
         (print (amqp:request-get get-basic :queue get-queue))))))
79
       
80
 
81
 (defparameter *c* (make-instance 'amqp:connection :uri "amqp://guest:guest@localhost/"))
82
 (defparameter *ch1* (amqp:channel *c* :uri (puri:uri "amqp:/")))
83
 (defparameter *ch2* (amqp:channel *c* :uri (puri:uri "amqp:/")))
84
 ;;; (setq *log-level* :debug)
85
 (publish-get-loop *ch1* *ch2* '("this is a test") 2)
86
 
87
 (close *c* :abort t)
88
 
89
 |#