Coverage report: /development/source/library/org/datagraph/spocq-shard/src/spocq-server/streams/mqtt.lisp

KindCoveredAll%
expression0965 0.0
branch050 0.0
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.server.implementation; -*-
2
 ;;; (load #p"patches/model.lisp")
3
 
4
 (in-package :org.datagraph.spocq.server.implementation)
5
 
6
 #|
7
 Implement an MQTT service based on the execution control structor of the HTTP server.
8
 - the acceptor thread management
9
 - the response function dispatch mechanism
10
 
11
 the protocol state model is different, which leads to a pull control flow which listens to messages from
12
 a single broker on a single stream, rather than the http push flow which accepts connections from arbitrary
13
 clients on a socket.
14
 while http processing handles each single-socket connection by creating a stream and handling the request
15
 as a synchronous read/compute/write process, each mqtt request takes the form of an asynchronous PUBLISH
16
 message.
17
 the 'acceptor' process receives these from the broker connection stream in response to its subscription to
18
 a topic, such as '+/+/sparql'. (note, there is no provision for url query arguments.)
19
 the acceptor creates a new thread for each received message, delegates the processing to that thread by
20
 establishing the dynamic http request context and invoking http:respond-to-request.
21
 the response is captured in a vector stream which is wrapped in a response message and published back
22
 to another topic, such as '/sparql/response'.
23
 the response topic is taken from the request's 'Asynchronous-Location' header.
24
 
25
 the control-flow involves one per-acceptor thread which sends both response messages and acknowledgements
26
 back to the broker and multiplexes that task with interpreting acknowledgements from
27
 the broker for its messages and with resending messages due to time-outs.
28
 its work is scheduled by queueing both incoming and outgoing messages for it to process and by waking it
29
 up periodically, should there be no messages to process.
30
 
31
 note that, there can be just a single acceptor thread to read subscriptions, as the mqtt routing pattern is
32
 to pass a given message to _all_ subscribers.
33
 
34
 
35
 each request specifies a view as its topic and passes any parameters in the message - as for a http post.
36
 - the topic becomes the resource url as for http
37
 - the url has just 'sparql' as its third element - that is there is no support for views, which means the
38
  query text must be present in-line with content type 'application/sparql-query'
39
 - the media type is taken from the 'Content-Type' header 
40
 
41
 in order to get a broker:
42
 
43
   http://www.monsuitto.org
44
 
45
 |#
46
 
47
 ;;; extend the existing mqtt package
48
 (modpackage :mqtt
49
   (:import-from :mqtt-implementation
50
                 :mqtt-message
51
                 :mqtt-message-dup
52
                 :mqtt-message-payload
53
                 :mqtt-message-topic
54
                 :mqtt-message-type
55
                 :parse-packet
56
                 )
57
   (:export :*acceptor*
58
            :*broker-url*
59
            :*class.request*
60
            :*class.response*
61
            :*packet-identifier*
62
            :*port*
63
            :*ssl-port*
64
            :*topics*
65
            :accept-message
66
            :accept-messages
67
            :acceptor
68
            :acceptor-broker-connection
69
            :acceptor-broker-url
70
            :acceptor-client-id
71
            :acceptor-protocol
72
            :acceptor-requests
73
            :acceptor-topics
74
            :connect
75
            :main
76
            :make-connect-message
77
            :make-error-message
78
            :make-puback-message
79
            :make-publish-message
80
            :make-subscribe-message
81
            :make-request
82
            :make-response
83
            :message-dup
84
            :message-headers
85
            :message-mid
86
            :message-payload
87
            :message-qos
88
            :message-topic
89
            :message-type
90
            :message-type-p
91
            :message-url
92
            :mqtt-message
93
            :parse-packet
94
            :process-request
95
            :process-typed-message
96
            :query
97
            :receive
98
            :request
99
            :requests
100
            :require-connack
101
            :require-suback
102
            :require-puback
103
            :response
104
            :response-uri
105
            :run
106
            :send
107
            :send-connect-message
108
            :send-messages
109
            :send-response-message
110
            :subscribe-failed
111
            ;; :taskmaster
112
           )
113
   (:intern :*packet-identifier-lock*
114
            :*qos*
115
            :log-error
116
            :log-warn))
117
 
118
 
119
 (defparameter *mqtt-request-limit* 16)
120
 (defparameter *mqtt-thread-limit* 8)
121
 (defparameter mqtt:*packet-identifier* 0)
122
 (defparameter mqtt::*packet-identifier-lock* nil)
123
 (defparameter mqtt:*acceptor* nil)
124
 (defparameter mqtt:*port* 1883)
125
 (defparameter mqtt::*qos* 1)
126
 (defparameter mqtt:*ssl-port* 8883)
127
 (defparameter mqtt:*class.request* 'mqtt:request)
128
 (defparameter mqtt:*class.response* 'mqtt:response)
129
 (defparameter mqtt:*broker-url* "mqtt://test.dydra.com")
130
 (defparameter mqtt:*topics* '("/+/+/sparql" "/+/+/tpf"))
131
 
132
 (defmethod (setf spocq.i:configuration-parameter) ((value integer) (parameter (eql :mqtt-port)))
133
   (setq mqtt:*port* value))
134
 (defmethod (setf spocq.i:configuration-parameter) ((value string) (parameter (eql :mqtt-broker-url)))
135
   (setq mqtt:*broker-url* value))
136
 (defmethod (setf spocq.i:configuration-parameter) ((value list) (parameter (eql :mqtt-topics)))
137
   (setq mqtt:*topics* value))
138
 (defmethod (setf spocq.i:configuration-parameter) ((value integer) (parameter (eql :mqtt-request-limit)))
139
   (setq *mqtt-request-limit* value))
140
 (defmethod (setf spocq.i:configuration-parameter) ((value integer) (parameter (eql :mqtt-thread-limit)))
141
   (setq *mqtt-thread-limit* value))
142
 
143
 (declaim (type fixnum mqtt:*packet-identifier*))
144
 
145
 
146
 (defclass mqtt:acceptor (spocq-acceptor) ; (tbnl:acceptor)
147
   ((client-id
148
     :initform "spocq-rsp"
149
     :reader mqtt:acceptor-client-id)
150
    (broker-url
151
     :initform (error "acceptor: broker-url is required.")
152
     :initarg :broker-url
153
     :reader mqtt:acceptor-broker-url
154
     :documentation "bind the broker spocq:mqtt-url instance. it follows the url scheme
155
 
156
       mqtt[s]://[username][:password]@host.domain[:port]")
157
    (broker-connection
158
     :accessor mqtt:acceptor-broker-connection
159
     :documentation "The stream connection to the broker")
160
    (protocol
161
     :initform "HTTP 1.0"
162
     :reader mqtt:acceptor-protocol
163
     :documentation "The effective http protocol disables chunking as the message is a single unit.")
164
    (topics
165
     :initform (error "acceptor: topics is required.")
166
     :initarg :topics
167
     :reader mqtt:acceptor-topics)
168
    (qos
169
     :initform 0 :initarg :qos
170
     :accessor acceptor-qos
171
     :documentation "The QOS for subscriptions and published messages")
172
    (request-limit
173
     :initform *mqtt-request-limit*
174
     :initarg :request-limit
175
     :reader acceptor-request-limit)
176
    (thread-limit
177
     :initform *mqtt-thread-limit*
178
     :initarg :thread-limit
179
     :reader acceptor-thread-limit)
180
    (threads-in-progress
181
     :initform 0
182
     :accessor acceptor-threads-in-progress)
183
    (wait-queue
184
     :initform (tbnl::make-condition-variable)
185
     :reader acceptor-wait-queue
186
     :documentation
187
     "A queue that we use to wait for a free connection.")
188
    (wait-lock
189
     :initform (bt:make-lock "acceptor-wait-lock")
190
     :reader acceptor-wait-lock
191
     :documentation
192
     "The lock for the connection wait queue.")
193
    (message-queue
194
     :initform (spocq.i::make-pool :name "mqtt message pool")
195
     :reader acceptor-message-queue
196
     :documentation
197
     "Passes message to the thread which emits and confirms receipts and responses.
198
      These will be the publish and confirmation messages read and passed through from the main acceptor thread
199
      and the response messages from the respective request processing thread.")
200
    (message-thread
201
     :accessor acceptor-message-thread
202
     :documentation
203
     "A dependent thread to multiplex response messages over the acceptor connection, manage their timeouts,
204
      and correlate confirmation messages.")
205
    (send-limit
206
     :initform 3 :initarg :send-limit
207
     :reader acceptor-send-limit
208
     :documentation "Limit the result message publication retry count")
209
    (send-timeout
210
     :initform 5 :initarg :send-timeut
211
     :reader acceptor-send-timeout
212
     :documentation "Limit the result message publication PUBACK response time interval"))
213
   (:documentation
214
    "Combine mqtt-specific attributes with standard http acceptor behaviour and connection limits from
215
     task-manager"))
216
 
217
 (defmethod initialize-instance :after ((instance mqtt:acceptor) &key)
218
   (setf (tbnl::acceptor-output-chunking-p instance) nil))
219
 
220
 (defclass mqtt:request (spocq-request)
221
   ())
222
 
223
 (defun mqtt:make-request (&rest initargs)
224
   (apply #'make-instance mqtt:*class.request* :acceptor initargs))
225
 
226
 (defclass mqtt:response (spocq-response)
227
   ())
228
 
229
 (defun mqtt:make-response (&rest initargs)
230
   (apply #'make-instance mqtt:*class.response* :acceptor initargs))
231
 
232
 
233
 #+(or) ; not used
234
 (defclass mqtt:taskmaster (spocq-lock-taskmaster)
235
   ((requests :initform (spocq.i:make-pool :name "mqtt requests")
236
              :reader mqtt:requests)
237
    (threads :initform nil
238
             :accessor taskmaster-threads)))
239
 
240
 (defclass mqtt:query (dydra:query)
241
   ()
242
   (:metaclass org.datagraph.spocq.implementation::applicable-query-class)
243
   (:documentation "Distinguish MQTT queries in order to specialize spocq.i::initiate-task to iterate
244
  over revisions"))
245
 
246
 (define-condition mqtt::revision-processed ()
247
   ())
248
 
249
 
250
 (defgeneric mqtt:message-headers (message stream)
251
   (:documentation "Consume the http header from the input stream, decode them and return the parsed p-list.
252
  they comprise:
253
  Content-Type
254
  Authorization
255
  Accept
256
  ")
257
   (:method ((message mqtt:mqtt-message) (stream stream))
258
     (let ((chunga:*accept-bogus-eols* t))
259
       (tbnl::read-http-headers stream tbnl:*header-stream*))))
260
 
261
 (defun mqtt:message-payload (message)
262
   (mqtt-implementation::mqtt-message-payload message))
263
 (defun mqtt:message-dup (message)
264
   (mqtt-implementation::mqtt-message-dup message))
265
 (defun (setf mqtt:message-dup) (value message)
266
   (setf (mqtt-implementation::mqtt-message-dup message) value))
267
 (defun mqtt:message-mid (message)
268
   (mqtt-implementation::mqtt-message-mid message))
269
 (defun mqtt:message-qos (message)
270
   (mqtt-implementation::mqtt-message-qos message))
271
 (defun mqtt:message-topic (message)
272
   (mqtt-implementation::mqtt-message-topic message))
273
 (defun mqtt:message-type (message)
274
   (mqtt-implementation::mqtt-message-type message))
275
 (defun mqtt:message-url (message)
276
   (format nil "http:/~A/~A"
277
           (dydra:server-host-name)
278
           (mqtt:message-topic message)))
279
 (defgeneric mqtt:message-type-p (message type)
280
   (:method ((message mqtt:mqtt-message) (type t))
281
     (equal (mqtt:message-type message) type)))
282
 
283
 (defun mqtt::next-packet-identifier ()
284
   (bt:with-lock-held (mqtt::*packet-identifier-lock*)
285
     (incf mqtt:*packet-identifier*)))
286
 
287
 
288
 ;;; mqtt constructors
289
 ;;; each is a different argument + defaults combination.
290
 ;;; eg. for packet identifier
291
 
292
 (defun mqtt:make-connect-message (&rest initargs
293
                                   &key 
294
                                   (dup 0) (qos 0) (retain nil) username password
295
                                   &allow-other-keys)
296
   ;; construct flags from provided args
297
   (unless (and (stringp username(plusp (length username))) (remf initargs :username))
298
   (unless (and (stringp password(plusp (length password))) (remf initargs :password))
299
   (apply #'mqtt-implementation::make-mqtt-message
300
          :type :connect
301
          :dup dup
302
          :qos qos
303
          :retain retain
304
          :protocol-name "MQTT"
305
          :protocol-level 4
306
          :connect-username-flag (if username 1 0)
307
          :connect-password-flag (if password 1 0)
308
          :connect-will-qos 0
309
          :connect-will-flag 0
310
          :connect-clean-session-flag 1
311
          initargs))
312
 
313
 (defun mqtt:make-subscribe-message (topic &key (qos 1))
314
   (mqtt-implementation::make-mqtt-message
315
    :type :subscribe
316
    :mid (mqtt::next-packet-identifier)
317
    :qos 1
318
    :protocol-name "MQTT"
319
    :protocol-level 4
320
    :topic topic
321
    :subscription-qos qos))
322
 
323
 (defun mqtt:make-publish-message (topic payload &key (qos 1))
324
   (mqtt-implementation::make-mqtt-message
325
    :type :publish
326
    :topic topic
327
    :mid (mqtt::next-packet-identifier)
328
    :payload payload
329
    :qos qos))
330
 
331
 (defun mqtt:make-puback-message (mid)
332
   (mqtt-implementation::make-mqtt-message
333
    :type :puback
334
    :mid mid))
335
 
336
 (defun mqtt:make-error-message (payload &key (qos 0))
337
   (mqtt-implementation::make-mqtt-message
338
    :type :publish
339
    :topic "error"
340
    :mid (mqtt::next-packet-identifier)
341
    :payload payload
342
    :qos qos))
343
 
344
 
345
 ;;; mqtt stream operators
346
 
347
 (defun mqtt::make-buffer (length)
348
   (make-array length :element-type '(unsigned-byte 8)))
349
 
350
 (defun mqtt::send-buffer (stream buffer)
351
   (write-sequence buffer stream)
352
   (force-output stream))
353
 
354
 (defun mqtt::receive-buffer (stream buffer &rest args)
355
   (values (apply #'read-sequence buffer stream args)
356
           buffer))
357
 
358
 (defun mqtt::read-length (source next-byte)
359
   (let* ((byte (funcall next-byte source))
360
          (length (logand byte #x7f))
361
          (byte-count 1)
362
          (cache (list byte)))
363
     (when (logbitp 7 byte)
364
       (setf byte (funcall next-byte source))
365
       (push byte cache)
366
       (incf length (ash (logand byte #x7f) 7))
367
       (setf byte-count 2)
368
       (when (logbitp 7 byte)
369
         (setf byte (funcall next-byte source))
370
         (push byte cache)
371
         (incf length (ash (logand byte #x7f) 14))
372
         (setf byte-count 3)
373
         (when (logbitp 7 byte)
374
           (setf byte (funcall next-byte source))
375
           (push byte cache)
376
           (incf length (ash (logand byte #x7f) 21))
377
           (setf byte-count 4)
378
           (assert (not (logbitp 7 byte)) ()
379
                   "read-length: invalid length: ~s" (apply #'vector (reverse cache))))))
380
     (values length byte-count)))
381
 ; (let ((bytes '(#x00))) (mqtt::read-length nil #'(lambda (source) (declare (ignore source)) (pop bytes))))
382
 ; (let ((bytes '(#x7f))) (mqtt::read-length nil #'(lambda (source) (declare (ignore source)) (pop bytes))))
383
 ; (let ((bytes '(#x80 #x01))) (mqtt::read-length nil #'(lambda (source) (declare (ignore source)) (pop bytes))))
384
 ; (let ((bytes '(#xff #x7f))) (mqtt::read-length nil #'(lambda (source) (declare (ignore source)) (pop bytes))))
385
 ; (let ((bytes '(#x80 #x80 #x01))) (mqtt::read-length nil #'(lambda (source) (declare (ignore source)) (pop bytes))))
386
 ; (let ((bytes '(#xff #xff #x7f))) (mqtt::read-length nil #'(lambda (source) (declare (ignore source)) (pop bytes))))
387
 ; (let ((bytes '(#x80 #x80 #x80 #x01))) (mqtt::read-length nil #'(lambda (source) (declare (ignore source)) (pop bytes))))
388
 ; (let ((bytes '(#xff #xff #xff #x7f))) (mqtt::read-length nil #'(lambda (source) (declare (ignore source)) (pop bytes))))
389
 
390
 (defgeneric mqtt:receive (stream &key timeout)
391
   (:documentation "Read and parse a message from the given stream.")
392
   (:method ((socket usocket:usocket) &rest args)
393
     (apply #'mqtt:receive (usocket:socket-stream socket) args))
394
   (:method ((stream stream) &key timeout)
395
     "read immediately the universal header portion in order to determine the length.
396
     create a buffer to include the header and the successive data
397
     read and buffer the remaining message content.
398
     parse the buffer using the offset of the first byte after the universal header to decode the message."
399
     timeout
400
     (let ((type-and-flags (read-byte stream)))
401
       (multiple-value-bind (length length-byte-count cache)
402
                            (mqtt::read-length stream #'read-byte)
403
         (let ((buffer (mqtt::make-buffer (+ 1 length-byte-count length))))
404
           (setf (aref buffer 0) type-and-flags)
405
           (loop for i from 1 for byte in cache
406
             do (setf (aref buffer i) (pop cache)))
407
           (mqtt::receive-buffer stream buffer :start (+ 1 length-byte-count))
408
           (mqtt:parse-packet buffer (+ 1 length-byte-count)))))))
409
 
410
 (defgeneric mqtt:send (stream message &key timeout)
411
   (:documentation "Encode the message and write it to the stream.")
412
   (:method ((socket usocket:usocket) message &rest args)
413
     (apply #'mqtt:send (usocket:socket-stream socket) message args))
414
   (:method ((stream stream) (message t) &key timeout)
415
     timeout
416
     ;; pack message and send
417
     (let ((buffer (make-array 1024 :element-type '(unsigned-byte 8) :fill-pointer 0)))
418
       (mqtt-implementation::build-packet buffer message)
419
       ;;(terpri *trace-output*)
420
       ;;(write buffer :stream *trace-output* :length nil)
421
       (mqtt::send-buffer stream buffer))))
422
 
423
 
424
 (defun mqtt::require-connack (stream &key timeout)
425
   (let ((message (mqtt:receive stream :timeout timeout)))
426
     (unless (mqtt:message-type-p message :connack)
427
       (error "mqtt:require-connack: message type mismatch: ~s" message))))
428
 
429
 (defun mqtt::require-suback (stream &key timeout)
430
   (let ((message (mqtt:receive stream :timeout timeout)))
431
     (unless (mqtt:message-type-p message :suback)
432
       (error "mqtt:require-suback: message type mismatch: ~s" message))))
433
 
434
 (defun mqtt::require-puback (stream &key timeout)
435
   (let ((message (mqtt:receive stream :timeout timeout)))
436
     (unless (mqtt:message-type-p message :puback)
437
       (error "mqtt:require-puback: message type mismatch: ~s" message))))
438
 
439
 (defun mqtt:request (stream message &key (timeout nil) response error)
440
   (handler-case
441
       (progn
442
         (mqtt:send stream message :timeout timeout)
443
         (when response (funcall response stream :timeout timeout)))
444
     (error (c)
445
            (when error (funcall error stream c)))))
446
 
447
 
448
 (defun mqtt:send-connect-message (stream &rest args)
449
   (mqtt:request stream (apply #'mqtt:make-connect-message args) :response #'mqtt:require-connack))
450
 
451
 (defgeneric mqtt:connect (location &key client-id port)
452
   (:method ((url string) &rest args)
453
     (apply #'mqtt:connect (spocq:mqtt-url url) args))
454
   (:method ((url spocq:mqtt-url) &key client-id
455
             certificate key certificate-password verify max-depth ca-file ca-directory
456
             (port (spocq:mqtt-url-port url))
457
             (user (spocq:mqtt-url-user url))
458
             (password (spocq:mqtt-url-password url)))
459
     (let* ((host (spocq:mqtt-url-authority url))
460
            (stream
461
             #+Allegro (socket::make-socket :remote-host host :remote-port port :format :binary)
462
             #+LispWorks (comm:open-tcp-stream host port :element-type '(unsigned-byte 8))
463
             #+MCL (make-instance 'ccl::binary-tcp-stream :host host :port port :element-type 'unsigned-byte)
464
             #+(or CMU scl) (sys:make-fd-stream (ext:connect-to-inet-socket host port) :buffering :full :element-type '(unsigned-byte 8))
465
             #+sbcl (usocket:socket-stream (usocket:socket-connect host port :element-type '(unsigned-byte 8)))
466
             #+clozure-common-lisp (make-ip-socket :remote-host host :remote-port port)))
467
       (case port
468
         ;; !!!NYI:
469
         ;; the service is intended for intra-host operation.
470
         ;; at best, on a secure local network
471
         (8883 (setf stream (drakma::make-ssl-stream stream
472
                                                    :hostname host
473
                                                    :certificate certificate
474
                                                    :key key
475
                                                    :certificate-password certificate-password
476
                                                    :verify verify
477
                                                    :max-depth max-depth
478
                                                    :ca-file ca-file
479
                                                    :ca-directory ca-directory))))
480
       (let ((connection-id (format nil "~a-~s" client-id (spocq.i::getpid))))
481
         (mqtt:send-connect-message stream :client-id connection-id
482
                                    :username user :password password)
483
         stream))))
484
 
485
 (defun mqtt:subscribe-failed (connection c)
486
   (error "subscribe failed: ~s: ~a" connection c))
487
 
488
 (defgeneric mqtt::send-error-message (broker-url string)
489
   (:method ((broker-url string) (message string))
490
     (mqtt::send-error-message (spocq:mqtt-url broker-url) message))
491
   (:method ((broker-url spocq:mqtt-url) (message string))
492
     (http:log-error *trace-output* message)
493
     (let ((stream (mqtt:connect broker-url)))
494
       (unwind-protect
495
           (mqtt:send-response-message stream :topic-name "/errors" :payload message)
496
         (close stream)))))
497
 
498
 (defgeneric mqtt:send-response-message (stream &key topic-name payload)
499
   ;;!!! requires new connection per message
500
   (:method ((location string) &rest args)
501
     (apply #'mqtt:send-response-message (spocq:mqtt-url location) args))
502
   (:method ((url spocq:mqtt-url) &rest args)
503
     (let ((stream (mqtt:connect url)))
504
       (unwind-protect (apply #'mqtt:send-response-message stream args)
505
         (close stream))))
506
   (:method ((stream stream) &key topic-name payload)
507
     (mqtt:request stream (mqtt:make-publish-message topic-name payload) :response #'mqtt:require-puback)))
508
 
509
 (defun mqtt::log-warn (acceptor message &rest args)
510
   (let ((message (format nil "~?" message args)))
511
     (http:log-error *trace-output* "~a" message)
512
     (log-error *trace-output* "~a" message)
513
     (channel-put (acceptor-message-queue acceptor)
514
                  `(:direction :output :message ,(mqtt::make-error-message message)))))
515
 
516
 (defun mqtt::log-error (acceptor message &rest args)
517
   (let ((message (format nil "~?" message args)))
518
     (http:log-warn *trace-output* "~a" message)
519
     (log-warn *trace-output* "~a" message)
520
     (channel-put (acceptor-message-queue acceptor)
521
                  `(:direction :output :message ,(mqtt::make-error-message message)))))
522
 
523
 (defmethod initialize-instance ((instance mqtt:acceptor) &rest initargs &key broker-url port)
524
   "unless authentication information is provided, check for registered values for the authority"
525
   (typecase broker-url
526
     (string (setf broker-url (spocq:mqtt-url broker-url)))
527
     (spocq:mqtt-url )
528
     (t (assert-argument-type mqtt:acceptor broker-url (or string spocq:mqtt-url))))
529
   (if (spocq:mqtt-url-port broker-url)
530
       (setf port (spocq:mqtt-url-port broker-url))
531
       (setf (spocq:mqtt-url-port broker-url) port
532
             (spocq:iri-lexical-form broker-url) ""))
533
   (apply #'call-next-method instance
534
          :broker-url broker-url
535
          :port port
536
          initargs)
537
   (let ((url (mqtt:acceptor-broker-url instance)))
538
     (unless (or (spocq:mqtt-url-user url) (spocq:mqtt-url-password url))
539
       (destructuring-bind (&key name password token)
540
                           (spocq.i::retrieve-authority-properties (concatenate 'string "mqtt@" (spocq:mqtt-url-authority url)))
541
         (declare (ignore token))
542
         (setf (spocq:mqtt-url-user url) name)
543
         (setf (spocq:mqtt-url-password url) password)
544
         (setf (spocq:iri-lexical-form url) ""))))
545
   )
546
 
547
 
548
 ;;; http integration via hunchentoot initiation crontrol-flow
549
 
550
 (defmethod hunchentoot:start ((acceptor mqtt:acceptor))
551
   (setq mqtt::*packet-identifier-lock* (bt:make-lock "packet identifier lock"))
552
   (setf (acceptor-message-thread acceptor)
553
         (bt:make-thread #'(lambda () (mqtt:send-messages acceptor))
554
                         :name "mqtt send-message thread"))
555
   (call-next-method))
556
 
557
 (defmethod hunchentoot:stop ((acceptor mqtt:acceptor) &key soft)
558
   (declare (ignore soft))
559
   (call-next-method)
560
   (bt:destroy-thread (acceptor-message-thread acceptor)))
561
 
562
 (defmethod hunchentoot:start-listening ((acceptor mqtt:acceptor))
563
   "The mqtt implementation actively connects to the broker and subscribes to the acceptor topics.
564
    It does not listen on a socket for connections.
565
    The single socket connection receives and delivers messages for subscribed topics.
566
    Each message is parsed and then processed by a new thread.
567
    That thread publishes any results and/or errors to the respective topic declared in the request."
568
   ;; connect to the broker
569
   (loop
570
     for retry-count from 1 to 10
571
     until (tbnl::acceptor-shutdown-p acceptor)
572
     do (progn
573
          (handler-case
574
              (block :connected
575
                (setq mqtt::*packet-identifier* 0)
576
                (let* ((connection (handler-case (mqtt:connect (mqtt:acceptor-broker-url acceptor)
577
                                                               :client-id (mqtt:acceptor-client-id acceptor))
578
                                     (error (condition)
579
                                            (http:log-warn *trace-output* "connection error, exit: ~s" condition)
580
                                            (return-from :connected nil)))))
581
                  (setf (mqtt:acceptor-broker-connection acceptor) connection)
582
                  ;; subscribe
583
                  (loop for topic in (mqtt:acceptor-topics acceptor)
584
                    do (destructuring-bind (&key name (qos (acceptor-qos acceptor)))
585
                                           (typecase topic (string `(:name ,topic)) (list topic))
586
                         (mqtt:request connection
587
                                       (mqtt:make-subscribe-message name :qos qos)
588
                                       :timeout 10
589
                                       :response #'mqtt:require-suback
590
                                       :error #'mqtt:subscribe-failed)))
591
                  (mqtt:accept-messages acceptor)))
592
            (error (condition)
593
                   (http:log-warn *trace-output* "accept error, retry: ~s" condition)))
594
          (sleep 5))))
595
 
596
 (defmethod SB-GRAY:STREAM-LISTEN ((stream de.setf.utility.implementation::vector-input-stream))
597
   (< (de.setf.utility.implementation::get-stream-position stream)
598
      (1- (length (de.setf.utility.implementation::get-vector-stream-vector stream)))))
599
                
600
 (defmethod mqtt:accept-messages ((acceptor mqtt:acceptor))
601
   (loop until (or (tbnl::acceptor-shutdown-p acceptor)
602
                   (null (mqtt:accept-message acceptor)))))
603
 
604
 (defun call-with-acceptor-thread-count-incremented (acceptor operator)
605
   (declare (dynamic-extent operator))
606
   (when (and (acceptor-thread-limit acceptor)
607
              (> (acceptor-threads-in-progress acceptor)
608
                 (acceptor-thread-limit acceptor))
609
              (bt:with-lock-held ((acceptor-wait-lock acceptor))
610
                (loop until (<= (acceptor-threads-in-progress acceptor) (acceptor-thread-limit acceptor))
611
                  do (tbnl::condition-variable-wait (acceptor-wait-queue acceptor) (acceptor-wait-lock acceptor))))))
612
   (unwind-protect (progn
613
                     (bt:with-lock-held ((tbnl::acceptor-shutdown-lock acceptor))
614
                       (incf (acceptor-threads-in-progress acceptor)))
615
                     (funcall operator))
616
     (bt:with-lock-held ((tbnl::acceptor-shutdown-lock acceptor))
617
       (decf (acceptor-threads-in-progress acceptor))
618
       (when (tbnl::acceptor-shutdown-p acceptor)
619
         (tbnl::condition-variable-signal (tbnl::acceptor-shutdown-queue acceptor))))))
620
 
621
 (defmacro with-acceptor-thread-count-incremented ((acceptor) &body body)
622
   (let ((op (gensym (string :watci-))))
623
     `(flet ((,op () ,@body))
624
        (declare (dynamic-extent #',op))
625
        (call-with-acceptor-thread-count-incremented ,acceptor #',op))))
626
 
627
 (defgeneric mqtt:accept-message (acceptor)
628
   (:documentation   "for each message, in turn
629
    - transform the headers in to http equivalents
630
    - capture and wrap the content as the input stream
631
    - fabricate an output stream to capture the response
632
    - construct the equivalent http request/response pair
633
    - process as for http.")
634
   (:method ((acceptor mqtt:acceptor))
635
     ;; the with- operator does not apply as the decrement is in another thread
636
     (let ((request-message (mqtt:receive (mqtt:acceptor-broker-connection acceptor))))
637
       ;; handle message by type: publish, puback, pingresp
638
       (case (mqtt:message-type request-message)
639
         (:publish ;; decode, dispatch, process and queue a reply for a request
640
          (flet ((run-message-thread ()
641
                   (flet ((process-message ()
642
                            (channel-put (acceptor-message-queue acceptor)
643
                                         `(:direction :input :message ,request-message))
644
                            (let* ((tbnl:*acceptor* acceptor)
645
                                   (output-stream (make-instance 'de.setf.utility.implementation::vector-output-stream))
646
                                   (hunchentoot:*reply* (mqtt:make-response acceptor
647
                                                                            :server-protocol (mqtt:acceptor-protocol acceptor)
648
                                                                            ;; this may need to be a bivalent stream
649
                                                                            :content-stream (make-instance 'http:output-stream :real-stream output-stream)))
650
                                   (input-stream (make-instance 'de.setf.utility.implementation::vector-input-stream
651
                                                   ;; the message should use the original buffer with
652
                                                   ;; the respective payload position
653
                                                   :vector (mqtt:message-payload request-message)))
654
                                   (tbnl::*hunchentoot-stream* input-stream)
655
                                   (hunchentoot:*request* (mqtt:make-request acceptor
656
                                                                             :socket nil
657
                                                                             :headers-in (mqtt:message-headers request-message input-stream)
658
                                                                             :content-stream (make-instance 'http:input-stream :real-stream input-stream)
659
                                                                             :method :post
660
                                                                             :uri (mqtt:message-topic request-message)
661
                                                                             ;; :uri (mqtt:message-url request-message)
662
                                                                             :server-protocol (mqtt:acceptor-protocol acceptor)))
663
                                   (http:*request* hunchentoot:*request*)
664
                                   (http:*response* hunchentoot:*reply*)
665
                                   (tbnl::*tmp-files* nil)
666
                                   (tbnl::*session* nil)
667
                                   (response-location (http:request-header hunchentoot:*request* "Asynchronous-Location"))
668
                                   (dydra:*class.query* 'mqtt:query))
669
                              ;; iff the request includes a revision designator and it describes an interval,
670
                              ;; then iterate over thos revisions
671
                              (setf (http:response-request http:*response*) http:*request*)
672
                              (setf (http:request-response http:*request*) http:*response*)
673
                              ;; iff the response uri is set, construct a message and post it to the broker
674
                              ;; nb. no output happens here. it is in spocq.i::initiate-task, wherethe result from each
675
                              ;; revision step is captuered and sent as a message
676
                              (if response-location
677
                                  ;; already wrapped (tbnl::with-acceptor-request-count-incremented (acceptor)
678
                                  (http:respond-to-request acceptor hunchentoot:*request* hunchentoot:*reply*) ;)
679
                                  (mqtt::send-error-message (mqtt:acceptor-broker-url acceptor) "mqtt:accept-message: no asynchronous-location for response")))))
680
                     (tbnl::with-acceptor-request-count-incremented (acceptor)
681
                       (when (and (acceptor-request-limit acceptor)
682
                                  (> (tbnl:acceptor-requests-in-progress acceptor)
683
                                     (acceptor-request-limit acceptor)))
684
                         (mqtt::log-warn acceptor "mqtt:accept-message: too many requests pending: ~s" (tbnl:acceptor-requests-in-progress acceptor))
685
                         (return-from run-message-thread nil))
686
                       (with-acceptor-thread-count-incremented (acceptor)
687
                         (process-message))))))
688
            (bt:make-thread #'run-message-thread)))
689
         (:puback ;; for qos1, handle the acknowledgements
690
          (channel-put (acceptor-message-queue acceptor)
691
                       `(:direction :input :message ,request-message)))
692
         (:pingresp ;; ignore
693
          )
694
         (t
695
          (http:log-warn *trace-output* "unsupported message type: ~s" (mqtt:message-type request-message)))))))
696
 
697
 (defparameter dydra:*revision-windows* nil)
698
 
699
 #+(or)
700
 (progn
701
 (defmethod dydra:pipe-query ((query mqtt:query) (output-destination stream) &key &allow-other-keys)
702
   "when an mqtt query request includes a repetitive revision specification, iterate over the revisions,
703
  reiterate the query for each, and coordinate the response with the acceptor to emit successive mqtt
704
  responses:"
705
   (let* ((repository (spocq.i::task-repository query))
706
          (dydra:*revision-windows*
707
           (or dydra:*revision-windows*
708
               (http:request-header http:*request* "Revision-Windows")
709
               (http:request-header http:*request* "Windows")))
710
          (windowed-revision (when dydra:*revision-windows*
711
                               (spocq.i::compute-repository-revision repository dydra:*revision-windows*))))
712
     (cond (windowed-revision
713
            (let ((result nil))
714
              (flet ((for-each-revision (min-record max-record)
715
                       (unless max-record (setf max-record min-record))
716
                       (let* ((min-id (ORG.DATAGRAPH.RDF.LMDB.IMPLEMENTATION::REVISION-RECORD-UUID min-record))
717
                              (max-id (ORG.DATAGRAPH.RDF.LMDB.IMPLEMENTATION::REVISION-RECORD-UUID max-record))
718
                              (id (if (equalp min-id max-id) min-id (concatenate 'string min-id "-" max-id)))
719
                              (effective-revision (spocq.i::compute-repository-revision repository id)))
720
                         (setf (spocq.i::task-revision query) effective-revision)
721
                         (setf (spocq.i::task-revision-id query) id)
722
                         (setf result (call-next-method))
723
                         (restart-bind ((mqtt::next-revision #'(lambda ()
724
                                                                 (return-from for-each-revision))))
725
                           (signal 'mqtt::revision-processed)))))
726
                (spocq.i::map-repository-revision-intervals #'for-each-revision windowed-revision))))
727
           (t
728
            (call-next-method)))))
729
 
730
 (defmethod dydra:pipe-query ((query mqtt:query) (output-destination stream) &key &allow-other-keys)
731
   (call-next-method)))
732
 
733
 (defmethod spocq.i::reinitialize-task ((task t))
734
   task)
735
 
736
 (defparameter mqtt::*close-tasks-p* t)
737
 
738
 (defmethod spocq.i::reinitialize-task ((task mqtt:query))
739
   ;;;!!! need to check whether the transaction needs to be aborted
740
   (if mqtt::*close-tasks-p*
741
       (spocq.i::close-task task)
742
       (spocq.i::setf-task-transaction nil task))
743
   (setf (spocq.i::task-result-generator task) nil)
744
   (spocq.i::initialize-task task)
745
   task)
746
 
747
 (defmethod CHUNGA:CHUNKED-STREAM-STREAM ((stream broadcast-stream))
748
   ;; arises when transcribing, in which case the first of the targets is the http stream
749
   (CHUNGA:CHUNKED-STREAM-STREAM (first (broadcast-stream-streams stream))))
750
 
751
 (defmethod spocq.i::initiate-task ((task mqtt:query) (encoding mime:mime-type) &key (stream *standard-output*)  &allow-other-keys)
752
   "Given a query from an MQTT subscription, expect the request to include a revision window and arrange
753
  to iterate over the designated revision to repeat the query for each revision.
754
  wrap the results captured as a binary vector output stream to publish as a response message.
755
  clear the output stream between passes in order to refresh the response headers - in particular the ETag. 
756
  finalize the query instance only once at the conclusion."
757
   (with-task-environment (:task task)
758
     (spocq.i::register-query task)
759
     (unwind-protect
760
         (progn
761
           (spocq.i::initialize-task task)
762
           (let* ((spocq.i::*thread-operations* (cons (list 'spocq.i::initiate-task (spocq.i::task-id task))
763
                                                     spocq.i::*thread-operations*))
764
                 (repository (spocq.i::task-repository task))
765
                 (dydra:*revision-windows*
766
                  (or dydra:*revision-windows*
767
                      (http:request-header http:*request* "Revision-Windows")
768
                      (http:request-header http:*request* "Windows")))
769
                 (windowed-revision (when dydra:*revision-windows*
770
                                      (spocq.i::compute-repository-revision repository dydra:*revision-windows*)))
771
                 (acceptor tbnl:*acceptor*)
772
                 (response-location (http:request-header hunchentoot:*request* "Asynchronous-Location"))
773
                 (output-stream (chunga:chunked-stream-stream stream)))
774
             (labels ((for-each-revision (min-record max-record)
775
                       (log-debug "initiate-task.mqtt: for-each-revision: ~a -- ~a" min-record max-record)
776
                       (unless max-record (setf max-record min-record))
777
                       (let* ((min-id (ORG.DATAGRAPH.RDF.LMDB.IMPLEMENTATION::REVISION-RECORD-UUID min-record))
778
                              (max-id (ORG.DATAGRAPH.RDF.LMDB.IMPLEMENTATION::REVISION-RECORD-UUID max-record))
779
                              (id (if (equalp min-id max-id) min-id (concatenate 'string min-id "-" max-id)))
780
                              (effective-revision (spocq.i::compute-repository-revision repository id)))
781
                         (run-task-for-revision effective-revision)
782
                         (spocq.i::reinitialize-task task)
783
                         ;; clear the output stream to elminate any cached headers
784
                         (clear-output stream)))
785
                      (run-task-for-revision (effective-revision)
786
                        (let ((effective-revision-id (spocq.i::repository-revision-id effective-revision)))
787
                          (setf (spocq.i::task-revision task) effective-revision)
788
                          (setf (spocq.i::task-revision-id task) effective-revision-id)
789
                          (setf (http.i::response-state http:*response*) nil)
790
                          (setf (http:response-etag http:*response*) effective-revision-id)
791
                          (log-debug "initiate-task.mqtt: run-task-for-revision: ~a" effective-revision-id))
792
                        (multiple-value-prog1
793
                            (progn
794
                              (spocq.i::query-run-in-thread task (spocq.i::task-result-generator task))
795
                              (spocq.i::send-response-message (spocq.i::task-operation task) task
796
                                                              (http:response-content-stream http:*response*)
797
                                                              encoding))
798
                          (generate-response)))
799
                      (generate-response ()
800
                        (channel-put (acceptor-message-queue acceptor)
801
                                     `(:direction :output
802
                                                  :message ,(mqtt:make-publish-message response-location
803
                                                                                       ;; get the sized vector and reset
804
                                                                                       (de.setf.utility.implementation::vector-stream-vector output-stream)
805
                                                                                       :qos (acceptor-qos acceptor))))))
806
               (declare (dynamic-extent #'for-each-revision))
807
               (if windowed-revision
808
                   (spocq.i::map-repository-revision-intervals #'for-each-revision windowed-revision)
809
                   (run-task-for-revision (spocq.i::task-revision task))))))
810
       (spocq.i::finalize-task task))))
811
 
812
 #+(or)
813
 (flet ()
814
   (handler-bind ((mqtt::revision-processed
815
                   (lambda (condition)
816
                     ;;(declare (ignore condition))
817
                     (print (list 'mqtt::revision-processed condition))
818
                     (generate-response)
819
                     (setf output-stream (make-instance 'de.setf.utility.implementation::vector-output-stream))
820
                     (setf hunchentoot:*reply* (mqtt:make-response acceptor
821
                                                                   :server-protocol (mqtt:acceptor-protocol acceptor)
822
                                                                   ;; this may need to be a bivalent stream
823
                                                                   :content-stream (make-instance 'http:output-stream :real-stream output-stream)))
824
                     (setf http:*response* hunchentoot:*reply*)
825
                     (setf (http:response-request http:*response*) http:*request*)
826
                     (setf (http:request-response http:*request*) http:*response*)
827
                     (print (list 'mqtt::invoke-restart condition))
828
                     (invoke-restart 'mqtt::next-revision))))
829
     (catch 'hunchentoot::request-processed
830
       (print (list :respond hunchentoot:*request*))
831
       
832
       (print (list :responded)))
833
     (generate-response)))
834
 
835
 
836
 (defgeneric mqtt:send-messages (acceptor)
837
   (:documentation
838
    "Work through the entries in the acceptor's message queue.
839
  These are either publish messages, which are sent as responses to their respective location, or
840
  puback messages, which serve to release some already sent message.
841
  Should a published message receive no acknowledgement, it is resent")
842
   (:method ((acceptor mqtt:acceptor))
843
     (let ((sent-messages (make-hash-table :test 'eql)))
844
       (labels ((manage-timeouts ()
845
                  (let ((now (get-universal-time)))
846
                    ;; qos2 will require tracing send/receive publishes separately as they can share message ids
847
                    (maphash #'(lambda (mid entry)
848
                                 (destructuring-bind (&key count message (timeout 0)) entry
849
                                   (when (> now timeout)
850
                                     (cond ((>= count (acceptor-send-limit acceptor))
851
                                            (http:log-error *trace-output* "mqtt:send-messages: send limit exceeded: ~a: ~a" count message)
852
                                            (remhash mid sent-messages))
853
                                           (t
854
                                            (handler-case (progn
855
                                                            (mqtt:send (mqtt:acceptor-broker-connection acceptor) message :timeout (acceptor-send-timeout acceptor))
856
                                                            (incf (getf entry :count))
857
                                                            (setf (getf entry :timeout) (+ now (acceptor-send-timeout acceptor))))
858
                                              (error (c)
859
                                                     (http:log-error *trace-output* "mqtt:send-messages: send failed: ~a: ~a" c message)
860
                                                     (remhash mid sent-messages))))))))
861
                             sent-messages)))
862
                (send-publish (message)
863
                  (http:log-debug *trace-output* "mqtt:send-messages: send publish: ~a" message)
864
                  (handler-case (progn
865
                                  (mqtt:send (mqtt:acceptor-broker-connection acceptor) message :timeout (acceptor-send-timeout acceptor))
866
                                  (when (typep (mqtt:message-qos message) '(integer 1))
867
                                    ;; retain qos1 messages for confirmation
868
                                    (setf (mqtt:message-dup message) 1)
869
                                    (setf (gethash (mqtt:message-mid message) sent-messages)
870
                                          `(:count 1 :message ,message :timeout ,(+ (get-universal-time) (acceptor-send-timeout acceptor))))))
871
                    (error (c)
872
                           (http:log-error *trace-output* "mqtt:send-messages: send failed: ~a: ~a" c message))))
873
                (receive-publish (message)
874
                  (http:log-debug *trace-output* "mqtt:send-messages: receive publish: ~a" message)
875
                  (case (mqtt:message-qos message)
876
                    ((0 2) ) ; qos0: no action, qos2: nyi
877
                    (1
878
                     (handler-case (mqtt:send (mqtt:acceptor-broker-connection acceptor)
879
                                              (mqtt:make-puback-message (mqtt:message-mid message))
880
                                              :timeout (acceptor-send-timeout acceptor))
881
                       (error (c)
882
                              (http:log-error *trace-output* "mqtt:send-messages: send failed: ~a: ~a" c message))))))
883
                (receive-puback (message)
884
                  (http:log-debug *trace-output* "mqtt:send-messages: receive puback: ~a" message)
885
                  (let* ((mid (mqtt:message-mid message)))
886
                    (destructuring-bind (&key message &allow-other-keys) (gethash mid sent-messages)
887
                      (http:log-debug *trace-output* "mqtt:send-messages: acknowledged: ~a" message)
888
                      (remhash mid sent-messages)))))
889
         (loop for entry = (spocq.i::channel-get (acceptor-message-queue acceptor) :timeout (acceptor-send-timeout acceptor))
890
           do (manage-timeouts)
891
           when entry
892
           do (destructuring-bind (&key direction message) entry
893
                (ecase direction
894
                  (:output
895
                   (case (mqtt:message-type message)
896
                     (:publish (send-publish message))))
897
                  (:input
898
                   (case (mqtt:message-type message)
899
                     (:publish (receive-publish message))
900
                     (:puback (receive-puback message)))))))))))
901