Coverage report: /development/source/library/org/datagraph/spocq-shard/src/spocq-server/streams/mqtt.lisp
| Kind | Covered | All | % |
| expression | 0 | 965 | 0.0 |
| branch | 0 | 50 | 0.0 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.server.implementation; -*-
2
;;; (load #p"patches/model.lisp")
4
(in-package :org.datagraph.spocq.server.implementation)
7
Implement an MQTT service based on the execution control structor of the HTTP server.
8
- the acceptor thread management
9
- the response function dispatch mechanism
11
the protocol state model is different, which leads to a pull control flow which listens to messages from
12
a single broker on a single stream, rather than the http push flow which accepts connections from arbitrary
14
while http processing handles each single-socket connection by creating a stream and handling the request
15
as a synchronous read/compute/write process, each mqtt request takes the form of an asynchronous PUBLISH
17
the 'acceptor' process receives these from the broker connection stream in response to its subscription to
18
a topic, such as '+/+/sparql'. (note, there is no provision for url query arguments.)
19
the acceptor creates a new thread for each received message, delegates the processing to that thread by
20
establishing the dynamic http request context and invoking http:respond-to-request.
21
the response is captured in a vector stream which is wrapped in a response message and published back
22
to another topic, such as '/sparql/response'.
23
the response topic is taken from the request's 'Asynchronous-Location' header.
25
the control-flow involves one per-acceptor thread which sends both response messages and acknowledgements
26
back to the broker and multiplexes that task with interpreting acknowledgements from
27
the broker for its messages and with resending messages due to time-outs.
28
its work is scheduled by queueing both incoming and outgoing messages for it to process and by waking it
29
up periodically, should there be no messages to process.
31
note that, there can be just a single acceptor thread to read subscriptions, as the mqtt routing pattern is
32
to pass a given message to _all_ subscribers.
35
each request specifies a view as its topic and passes any parameters in the message - as for a http post.
36
- the topic becomes the resource url as for http
37
- the url has just 'sparql' as its third element - that is there is no support for views, which means the
38
query text must be present in-line with content type 'application/sparql-query'
39
- the media type is taken from the 'Content-Type' header
41
in order to get a broker:
43
http://www.monsuitto.org
47
;;; extend the existing mqtt package
49
(:import-from :mqtt-implementation
68
:acceptor-broker-connection
80
:make-subscribe-message
95
:process-typed-message
107
:send-connect-message
109
:send-response-message
113
(:intern :*packet-identifier-lock*
119
(defparameter *mqtt-request-limit* 16)
120
(defparameter *mqtt-thread-limit* 8)
121
(defparameter mqtt:*packet-identifier* 0)
122
(defparameter mqtt::*packet-identifier-lock* nil)
123
(defparameter mqtt:*acceptor* nil)
124
(defparameter mqtt:*port* 1883)
125
(defparameter mqtt::*qos* 1)
126
(defparameter mqtt:*ssl-port* 8883)
127
(defparameter mqtt:*class.request* 'mqtt:request)
128
(defparameter mqtt:*class.response* 'mqtt:response)
129
(defparameter mqtt:*broker-url* "mqtt://test.dydra.com")
130
(defparameter mqtt:*topics* '("/+/+/sparql" "/+/+/tpf"))
132
(defmethod (setf spocq.i:configuration-parameter) ((value integer) (parameter (eql :mqtt-port)))
133
(setq mqtt:*port* value))
134
(defmethod (setf spocq.i:configuration-parameter) ((value string) (parameter (eql :mqtt-broker-url)))
135
(setq mqtt:*broker-url* value))
136
(defmethod (setf spocq.i:configuration-parameter) ((value list) (parameter (eql :mqtt-topics)))
137
(setq mqtt:*topics* value))
138
(defmethod (setf spocq.i:configuration-parameter) ((value integer) (parameter (eql :mqtt-request-limit)))
139
(setq *mqtt-request-limit* value))
140
(defmethod (setf spocq.i:configuration-parameter) ((value integer) (parameter (eql :mqtt-thread-limit)))
141
(setq *mqtt-thread-limit* value))
143
(declaim (type fixnum mqtt:*packet-identifier*))
146
(defclass mqtt:acceptor (spocq-acceptor) ; (tbnl:acceptor)
148
:initform "spocq-rsp"
149
:reader mqtt:acceptor-client-id)
151
:initform (error "acceptor: broker-url is required.")
153
:reader mqtt:acceptor-broker-url
154
:documentation "bind the broker spocq:mqtt-url instance. it follows the url scheme
156
mqtt[s]://[username][:password]@host.domain[:port]")
158
:accessor mqtt:acceptor-broker-connection
159
:documentation "The stream connection to the broker")
162
:reader mqtt:acceptor-protocol
163
:documentation "The effective http protocol disables chunking as the message is a single unit.")
165
:initform (error "acceptor: topics is required.")
167
:reader mqtt:acceptor-topics)
169
:initform 0 :initarg :qos
170
:accessor acceptor-qos
171
:documentation "The QOS for subscriptions and published messages")
173
:initform *mqtt-request-limit*
174
:initarg :request-limit
175
:reader acceptor-request-limit)
177
:initform *mqtt-thread-limit*
178
:initarg :thread-limit
179
:reader acceptor-thread-limit)
182
:accessor acceptor-threads-in-progress)
184
:initform (tbnl::make-condition-variable)
185
:reader acceptor-wait-queue
187
"A queue that we use to wait for a free connection.")
189
:initform (bt:make-lock "acceptor-wait-lock")
190
:reader acceptor-wait-lock
192
"The lock for the connection wait queue.")
194
:initform (spocq.i::make-pool :name "mqtt message pool")
195
:reader acceptor-message-queue
197
"Passes message to the thread which emits and confirms receipts and responses.
198
These will be the publish and confirmation messages read and passed through from the main acceptor thread
199
and the response messages from the respective request processing thread.")
201
:accessor acceptor-message-thread
203
"A dependent thread to multiplex response messages over the acceptor connection, manage their timeouts,
204
and correlate confirmation messages.")
206
:initform 3 :initarg :send-limit
207
:reader acceptor-send-limit
208
:documentation "Limit the result message publication retry count")
210
:initform 5 :initarg :send-timeut
211
:reader acceptor-send-timeout
212
:documentation "Limit the result message publication PUBACK response time interval"))
214
"Combine mqtt-specific attributes with standard http acceptor behaviour and connection limits from
217
(defmethod initialize-instance :after ((instance mqtt:acceptor) &key)
218
(setf (tbnl::acceptor-output-chunking-p instance) nil))
220
(defclass mqtt:request (spocq-request)
223
(defun mqtt:make-request (&rest initargs)
224
(apply #'make-instance mqtt:*class.request* :acceptor initargs))
226
(defclass mqtt:response (spocq-response)
229
(defun mqtt:make-response (&rest initargs)
230
(apply #'make-instance mqtt:*class.response* :acceptor initargs))
234
(defclass mqtt:taskmaster (spocq-lock-taskmaster)
235
((requests :initform (spocq.i:make-pool :name "mqtt requests")
236
:reader mqtt:requests)
237
(threads :initform nil
238
:accessor taskmaster-threads)))
240
(defclass mqtt:query (dydra:query)
242
(:metaclass org.datagraph.spocq.implementation::applicable-query-class)
243
(:documentation "Distinguish MQTT queries in order to specialize spocq.i::initiate-task to iterate
246
(define-condition mqtt::revision-processed ()
250
(defgeneric mqtt:message-headers (message stream)
251
(:documentation "Consume the http header from the input stream, decode them and return the parsed p-list.
257
(:method ((message mqtt:mqtt-message) (stream stream))
258
(let ((chunga:*accept-bogus-eols* t))
259
(tbnl::read-http-headers stream tbnl:*header-stream*))))
261
(defun mqtt:message-payload (message)
262
(mqtt-implementation::mqtt-message-payload message))
263
(defun mqtt:message-dup (message)
264
(mqtt-implementation::mqtt-message-dup message))
265
(defun (setf mqtt:message-dup) (value message)
266
(setf (mqtt-implementation::mqtt-message-dup message) value))
267
(defun mqtt:message-mid (message)
268
(mqtt-implementation::mqtt-message-mid message))
269
(defun mqtt:message-qos (message)
270
(mqtt-implementation::mqtt-message-qos message))
271
(defun mqtt:message-topic (message)
272
(mqtt-implementation::mqtt-message-topic message))
273
(defun mqtt:message-type (message)
274
(mqtt-implementation::mqtt-message-type message))
275
(defun mqtt:message-url (message)
276
(format nil "http:/~A/~A"
277
(dydra:server-host-name)
278
(mqtt:message-topic message)))
279
(defgeneric mqtt:message-type-p (message type)
280
(:method ((message mqtt:mqtt-message) (type t))
281
(equal (mqtt:message-type message) type)))
283
(defun mqtt::next-packet-identifier ()
284
(bt:with-lock-held (mqtt::*packet-identifier-lock*)
285
(incf mqtt:*packet-identifier*)))
288
;;; mqtt constructors
289
;;; each is a different argument + defaults combination.
290
;;; eg. for packet identifier
292
(defun mqtt:make-connect-message (&rest initargs
294
(dup 0) (qos 0) (retain nil) username password
296
;; construct flags from provided args
297
(unless (and (stringp username) (plusp (length username))) (remf initargs :username))
298
(unless (and (stringp password) (plusp (length password))) (remf initargs :password))
299
(apply #'mqtt-implementation::make-mqtt-message
304
:protocol-name "MQTT"
306
:connect-username-flag (if username 1 0)
307
:connect-password-flag (if password 1 0)
310
:connect-clean-session-flag 1
313
(defun mqtt:make-subscribe-message (topic &key (qos 1))
314
(mqtt-implementation::make-mqtt-message
316
:mid (mqtt::next-packet-identifier)
318
:protocol-name "MQTT"
321
:subscription-qos qos))
323
(defun mqtt:make-publish-message (topic payload &key (qos 1))
324
(mqtt-implementation::make-mqtt-message
327
:mid (mqtt::next-packet-identifier)
331
(defun mqtt:make-puback-message (mid)
332
(mqtt-implementation::make-mqtt-message
336
(defun mqtt:make-error-message (payload &key (qos 0))
337
(mqtt-implementation::make-mqtt-message
340
:mid (mqtt::next-packet-identifier)
345
;;; mqtt stream operators
347
(defun mqtt::make-buffer (length)
348
(make-array length :element-type '(unsigned-byte 8)))
350
(defun mqtt::send-buffer (stream buffer)
351
(write-sequence buffer stream)
352
(force-output stream))
354
(defun mqtt::receive-buffer (stream buffer &rest args)
355
(values (apply #'read-sequence buffer stream args)
358
(defun mqtt::read-length (source next-byte)
359
(let* ((byte (funcall next-byte source))
360
(length (logand byte #x7f))
363
(when (logbitp 7 byte)
364
(setf byte (funcall next-byte source))
366
(incf length (ash (logand byte #x7f) 7))
368
(when (logbitp 7 byte)
369
(setf byte (funcall next-byte source))
371
(incf length (ash (logand byte #x7f) 14))
373
(when (logbitp 7 byte)
374
(setf byte (funcall next-byte source))
376
(incf length (ash (logand byte #x7f) 21))
378
(assert (not (logbitp 7 byte)) ()
379
"read-length: invalid length: ~s" (apply #'vector (reverse cache))))))
380
(values length byte-count)))
381
; (let ((bytes '(#x00))) (mqtt::read-length nil #'(lambda (source) (declare (ignore source)) (pop bytes))))
382
; (let ((bytes '(#x7f))) (mqtt::read-length nil #'(lambda (source) (declare (ignore source)) (pop bytes))))
383
; (let ((bytes '(#x80 #x01))) (mqtt::read-length nil #'(lambda (source) (declare (ignore source)) (pop bytes))))
384
; (let ((bytes '(#xff #x7f))) (mqtt::read-length nil #'(lambda (source) (declare (ignore source)) (pop bytes))))
385
; (let ((bytes '(#x80 #x80 #x01))) (mqtt::read-length nil #'(lambda (source) (declare (ignore source)) (pop bytes))))
386
; (let ((bytes '(#xff #xff #x7f))) (mqtt::read-length nil #'(lambda (source) (declare (ignore source)) (pop bytes))))
387
; (let ((bytes '(#x80 #x80 #x80 #x01))) (mqtt::read-length nil #'(lambda (source) (declare (ignore source)) (pop bytes))))
388
; (let ((bytes '(#xff #xff #xff #x7f))) (mqtt::read-length nil #'(lambda (source) (declare (ignore source)) (pop bytes))))
390
(defgeneric mqtt:receive (stream &key timeout)
391
(:documentation "Read and parse a message from the given stream.")
392
(:method ((socket usocket:usocket) &rest args)
393
(apply #'mqtt:receive (usocket:socket-stream socket) args))
394
(:method ((stream stream) &key timeout)
395
"read immediately the universal header portion in order to determine the length.
396
create a buffer to include the header and the successive data
397
read and buffer the remaining message content.
398
parse the buffer using the offset of the first byte after the universal header to decode the message."
400
(let ((type-and-flags (read-byte stream)))
401
(multiple-value-bind (length length-byte-count cache)
402
(mqtt::read-length stream #'read-byte)
403
(let ((buffer (mqtt::make-buffer (+ 1 length-byte-count length))))
404
(setf (aref buffer 0) type-and-flags)
405
(loop for i from 1 for byte in cache
406
do (setf (aref buffer i) (pop cache)))
407
(mqtt::receive-buffer stream buffer :start (+ 1 length-byte-count))
408
(mqtt:parse-packet buffer (+ 1 length-byte-count)))))))
410
(defgeneric mqtt:send (stream message &key timeout)
411
(:documentation "Encode the message and write it to the stream.")
412
(:method ((socket usocket:usocket) message &rest args)
413
(apply #'mqtt:send (usocket:socket-stream socket) message args))
414
(:method ((stream stream) (message t) &key timeout)
416
;; pack message and send
417
(let ((buffer (make-array 1024 :element-type '(unsigned-byte 8) :fill-pointer 0)))
418
(mqtt-implementation::build-packet buffer message)
419
;;(terpri *trace-output*)
420
;;(write buffer :stream *trace-output* :length nil)
421
(mqtt::send-buffer stream buffer))))
424
(defun mqtt::require-connack (stream &key timeout)
425
(let ((message (mqtt:receive stream :timeout timeout)))
426
(unless (mqtt:message-type-p message :connack)
427
(error "mqtt:require-connack: message type mismatch: ~s" message))))
429
(defun mqtt::require-suback (stream &key timeout)
430
(let ((message (mqtt:receive stream :timeout timeout)))
431
(unless (mqtt:message-type-p message :suback)
432
(error "mqtt:require-suback: message type mismatch: ~s" message))))
434
(defun mqtt::require-puback (stream &key timeout)
435
(let ((message (mqtt:receive stream :timeout timeout)))
436
(unless (mqtt:message-type-p message :puback)
437
(error "mqtt:require-puback: message type mismatch: ~s" message))))
439
(defun mqtt:request (stream message &key (timeout nil) response error)
442
(mqtt:send stream message :timeout timeout)
443
(when response (funcall response stream :timeout timeout)))
445
(when error (funcall error stream c)))))
448
(defun mqtt:send-connect-message (stream &rest args)
449
(mqtt:request stream (apply #'mqtt:make-connect-message args) :response #'mqtt:require-connack))
451
(defgeneric mqtt:connect (location &key client-id port)
452
(:method ((url string) &rest args)
453
(apply #'mqtt:connect (spocq:mqtt-url url) args))
454
(:method ((url spocq:mqtt-url) &key client-id
455
certificate key certificate-password verify max-depth ca-file ca-directory
456
(port (spocq:mqtt-url-port url))
457
(user (spocq:mqtt-url-user url))
458
(password (spocq:mqtt-url-password url)))
459
(let* ((host (spocq:mqtt-url-authority url))
461
#+Allegro (socket::make-socket :remote-host host :remote-port port :format :binary)
462
#+LispWorks (comm:open-tcp-stream host port :element-type '(unsigned-byte 8))
463
#+MCL (make-instance 'ccl::binary-tcp-stream :host host :port port :element-type 'unsigned-byte)
464
#+(or CMU scl) (sys:make-fd-stream (ext:connect-to-inet-socket host port) :buffering :full :element-type '(unsigned-byte 8))
465
#+sbcl (usocket:socket-stream (usocket:socket-connect host port :element-type '(unsigned-byte 8)))
466
#+clozure-common-lisp (make-ip-socket :remote-host host :remote-port port)))
469
;; the service is intended for intra-host operation.
470
;; at best, on a secure local network
471
(8883 (setf stream (drakma::make-ssl-stream stream
473
:certificate certificate
475
:certificate-password certificate-password
479
:ca-directory ca-directory))))
480
(let ((connection-id (format nil "~a-~s" client-id (spocq.i::getpid))))
481
(mqtt:send-connect-message stream :client-id connection-id
482
:username user :password password)
485
(defun mqtt:subscribe-failed (connection c)
486
(error "subscribe failed: ~s: ~a" connection c))
488
(defgeneric mqtt::send-error-message (broker-url string)
489
(:method ((broker-url string) (message string))
490
(mqtt::send-error-message (spocq:mqtt-url broker-url) message))
491
(:method ((broker-url spocq:mqtt-url) (message string))
492
(http:log-error *trace-output* message)
493
(let ((stream (mqtt:connect broker-url)))
495
(mqtt:send-response-message stream :topic-name "/errors" :payload message)
498
(defgeneric mqtt:send-response-message (stream &key topic-name payload)
499
;;!!! requires new connection per message
500
(:method ((location string) &rest args)
501
(apply #'mqtt:send-response-message (spocq:mqtt-url location) args))
502
(:method ((url spocq:mqtt-url) &rest args)
503
(let ((stream (mqtt:connect url)))
504
(unwind-protect (apply #'mqtt:send-response-message stream args)
506
(:method ((stream stream) &key topic-name payload)
507
(mqtt:request stream (mqtt:make-publish-message topic-name payload) :response #'mqtt:require-puback)))
509
(defun mqtt::log-warn (acceptor message &rest args)
510
(let ((message (format nil "~?" message args)))
511
(http:log-error *trace-output* "~a" message)
512
(log-error *trace-output* "~a" message)
513
(channel-put (acceptor-message-queue acceptor)
514
`(:direction :output :message ,(mqtt::make-error-message message)))))
516
(defun mqtt::log-error (acceptor message &rest args)
517
(let ((message (format nil "~?" message args)))
518
(http:log-warn *trace-output* "~a" message)
519
(log-warn *trace-output* "~a" message)
520
(channel-put (acceptor-message-queue acceptor)
521
`(:direction :output :message ,(mqtt::make-error-message message)))))
523
(defmethod initialize-instance ((instance mqtt:acceptor) &rest initargs &key broker-url port)
524
"unless authentication information is provided, check for registered values for the authority"
526
(string (setf broker-url (spocq:mqtt-url broker-url)))
528
(t (assert-argument-type mqtt:acceptor broker-url (or string spocq:mqtt-url))))
529
(if (spocq:mqtt-url-port broker-url)
530
(setf port (spocq:mqtt-url-port broker-url))
531
(setf (spocq:mqtt-url-port broker-url) port
532
(spocq:iri-lexical-form broker-url) ""))
533
(apply #'call-next-method instance
534
:broker-url broker-url
537
(let ((url (mqtt:acceptor-broker-url instance)))
538
(unless (or (spocq:mqtt-url-user url) (spocq:mqtt-url-password url))
539
(destructuring-bind (&key name password token)
540
(spocq.i::retrieve-authority-properties (concatenate 'string "mqtt@" (spocq:mqtt-url-authority url)))
541
(declare (ignore token))
542
(setf (spocq:mqtt-url-user url) name)
543
(setf (spocq:mqtt-url-password url) password)
544
(setf (spocq:iri-lexical-form url) ""))))
548
;;; http integration via hunchentoot initiation crontrol-flow
550
(defmethod hunchentoot:start ((acceptor mqtt:acceptor))
551
(setq mqtt::*packet-identifier-lock* (bt:make-lock "packet identifier lock"))
552
(setf (acceptor-message-thread acceptor)
553
(bt:make-thread #'(lambda () (mqtt:send-messages acceptor))
554
:name "mqtt send-message thread"))
557
(defmethod hunchentoot:stop ((acceptor mqtt:acceptor) &key soft)
558
(declare (ignore soft))
560
(bt:destroy-thread (acceptor-message-thread acceptor)))
562
(defmethod hunchentoot:start-listening ((acceptor mqtt:acceptor))
563
"The mqtt implementation actively connects to the broker and subscribes to the acceptor topics.
564
It does not listen on a socket for connections.
565
The single socket connection receives and delivers messages for subscribed topics.
566
Each message is parsed and then processed by a new thread.
567
That thread publishes any results and/or errors to the respective topic declared in the request."
568
;; connect to the broker
570
for retry-count from 1 to 10
571
until (tbnl::acceptor-shutdown-p acceptor)
575
(setq mqtt::*packet-identifier* 0)
576
(let* ((connection (handler-case (mqtt:connect (mqtt:acceptor-broker-url acceptor)
577
:client-id (mqtt:acceptor-client-id acceptor))
579
(http:log-warn *trace-output* "connection error, exit: ~s" condition)
580
(return-from :connected nil)))))
581
(setf (mqtt:acceptor-broker-connection acceptor) connection)
583
(loop for topic in (mqtt:acceptor-topics acceptor)
584
do (destructuring-bind (&key name (qos (acceptor-qos acceptor)))
585
(typecase topic (string `(:name ,topic)) (list topic))
586
(mqtt:request connection
587
(mqtt:make-subscribe-message name :qos qos)
589
:response #'mqtt:require-suback
590
:error #'mqtt:subscribe-failed)))
591
(mqtt:accept-messages acceptor)))
593
(http:log-warn *trace-output* "accept error, retry: ~s" condition)))
596
(defmethod SB-GRAY:STREAM-LISTEN ((stream de.setf.utility.implementation::vector-input-stream))
597
(< (de.setf.utility.implementation::get-stream-position stream)
598
(1- (length (de.setf.utility.implementation::get-vector-stream-vector stream)))))
600
(defmethod mqtt:accept-messages ((acceptor mqtt:acceptor))
601
(loop until (or (tbnl::acceptor-shutdown-p acceptor)
602
(null (mqtt:accept-message acceptor)))))
604
(defun call-with-acceptor-thread-count-incremented (acceptor operator)
605
(declare (dynamic-extent operator))
606
(when (and (acceptor-thread-limit acceptor)
607
(> (acceptor-threads-in-progress acceptor)
608
(acceptor-thread-limit acceptor))
609
(bt:with-lock-held ((acceptor-wait-lock acceptor))
610
(loop until (<= (acceptor-threads-in-progress acceptor) (acceptor-thread-limit acceptor))
611
do (tbnl::condition-variable-wait (acceptor-wait-queue acceptor) (acceptor-wait-lock acceptor))))))
612
(unwind-protect (progn
613
(bt:with-lock-held ((tbnl::acceptor-shutdown-lock acceptor))
614
(incf (acceptor-threads-in-progress acceptor)))
616
(bt:with-lock-held ((tbnl::acceptor-shutdown-lock acceptor))
617
(decf (acceptor-threads-in-progress acceptor))
618
(when (tbnl::acceptor-shutdown-p acceptor)
619
(tbnl::condition-variable-signal (tbnl::acceptor-shutdown-queue acceptor))))))
621
(defmacro with-acceptor-thread-count-incremented ((acceptor) &body body)
622
(let ((op (gensym (string :watci-))))
623
`(flet ((,op () ,@body))
624
(declare (dynamic-extent #',op))
625
(call-with-acceptor-thread-count-incremented ,acceptor #',op))))
627
(defgeneric mqtt:accept-message (acceptor)
628
(:documentation "for each message, in turn
629
- transform the headers in to http equivalents
630
- capture and wrap the content as the input stream
631
- fabricate an output stream to capture the response
632
- construct the equivalent http request/response pair
633
- process as for http.")
634
(:method ((acceptor mqtt:acceptor))
635
;; the with- operator does not apply as the decrement is in another thread
636
(let ((request-message (mqtt:receive (mqtt:acceptor-broker-connection acceptor))))
637
;; handle message by type: publish, puback, pingresp
638
(case (mqtt:message-type request-message)
639
(:publish ;; decode, dispatch, process and queue a reply for a request
640
(flet ((run-message-thread ()
641
(flet ((process-message ()
642
(channel-put (acceptor-message-queue acceptor)
643
`(:direction :input :message ,request-message))
644
(let* ((tbnl:*acceptor* acceptor)
645
(output-stream (make-instance 'de.setf.utility.implementation::vector-output-stream))
646
(hunchentoot:*reply* (mqtt:make-response acceptor
647
:server-protocol (mqtt:acceptor-protocol acceptor)
648
;; this may need to be a bivalent stream
649
:content-stream (make-instance 'http:output-stream :real-stream output-stream)))
650
(input-stream (make-instance 'de.setf.utility.implementation::vector-input-stream
651
;; the message should use the original buffer with
652
;; the respective payload position
653
:vector (mqtt:message-payload request-message)))
654
(tbnl::*hunchentoot-stream* input-stream)
655
(hunchentoot:*request* (mqtt:make-request acceptor
657
:headers-in (mqtt:message-headers request-message input-stream)
658
:content-stream (make-instance 'http:input-stream :real-stream input-stream)
660
:uri (mqtt:message-topic request-message)
661
;; :uri (mqtt:message-url request-message)
662
:server-protocol (mqtt:acceptor-protocol acceptor)))
663
(http:*request* hunchentoot:*request*)
664
(http:*response* hunchentoot:*reply*)
665
(tbnl::*tmp-files* nil)
666
(tbnl::*session* nil)
667
(response-location (http:request-header hunchentoot:*request* "Asynchronous-Location"))
668
(dydra:*class.query* 'mqtt:query))
669
;; iff the request includes a revision designator and it describes an interval,
670
;; then iterate over thos revisions
671
(setf (http:response-request http:*response*) http:*request*)
672
(setf (http:request-response http:*request*) http:*response*)
673
;; iff the response uri is set, construct a message and post it to the broker
674
;; nb. no output happens here. it is in spocq.i::initiate-task, wherethe result from each
675
;; revision step is captuered and sent as a message
676
(if response-location
677
;; already wrapped (tbnl::with-acceptor-request-count-incremented (acceptor)
678
(http:respond-to-request acceptor hunchentoot:*request* hunchentoot:*reply*) ;)
679
(mqtt::send-error-message (mqtt:acceptor-broker-url acceptor) "mqtt:accept-message: no asynchronous-location for response")))))
680
(tbnl::with-acceptor-request-count-incremented (acceptor)
681
(when (and (acceptor-request-limit acceptor)
682
(> (tbnl:acceptor-requests-in-progress acceptor)
683
(acceptor-request-limit acceptor)))
684
(mqtt::log-warn acceptor "mqtt:accept-message: too many requests pending: ~s" (tbnl:acceptor-requests-in-progress acceptor))
685
(return-from run-message-thread nil))
686
(with-acceptor-thread-count-incremented (acceptor)
687
(process-message))))))
688
(bt:make-thread #'run-message-thread)))
689
(:puback ;; for qos1, handle the acknowledgements
690
(channel-put (acceptor-message-queue acceptor)
691
`(:direction :input :message ,request-message)))
695
(http:log-warn *trace-output* "unsupported message type: ~s" (mqtt:message-type request-message)))))))
697
(defparameter dydra:*revision-windows* nil)
701
(defmethod dydra:pipe-query ((query mqtt:query) (output-destination stream) &key &allow-other-keys)
702
"when an mqtt query request includes a repetitive revision specification, iterate over the revisions,
703
reiterate the query for each, and coordinate the response with the acceptor to emit successive mqtt
705
(let* ((repository (spocq.i::task-repository query))
706
(dydra:*revision-windows*
707
(or dydra:*revision-windows*
708
(http:request-header http:*request* "Revision-Windows")
709
(http:request-header http:*request* "Windows")))
710
(windowed-revision (when dydra:*revision-windows*
711
(spocq.i::compute-repository-revision repository dydra:*revision-windows*))))
712
(cond (windowed-revision
714
(flet ((for-each-revision (min-record max-record)
715
(unless max-record (setf max-record min-record))
716
(let* ((min-id (ORG.DATAGRAPH.RDF.LMDB.IMPLEMENTATION::REVISION-RECORD-UUID min-record))
717
(max-id (ORG.DATAGRAPH.RDF.LMDB.IMPLEMENTATION::REVISION-RECORD-UUID max-record))
718
(id (if (equalp min-id max-id) min-id (concatenate 'string min-id "-" max-id)))
719
(effective-revision (spocq.i::compute-repository-revision repository id)))
720
(setf (spocq.i::task-revision query) effective-revision)
721
(setf (spocq.i::task-revision-id query) id)
722
(setf result (call-next-method))
723
(restart-bind ((mqtt::next-revision #'(lambda ()
724
(return-from for-each-revision))))
725
(signal 'mqtt::revision-processed)))))
726
(spocq.i::map-repository-revision-intervals #'for-each-revision windowed-revision))))
728
(call-next-method)))))
730
(defmethod dydra:pipe-query ((query mqtt:query) (output-destination stream) &key &allow-other-keys)
733
(defmethod spocq.i::reinitialize-task ((task t))
736
(defparameter mqtt::*close-tasks-p* t)
738
(defmethod spocq.i::reinitialize-task ((task mqtt:query))
739
;;;!!! need to check whether the transaction needs to be aborted
740
(if mqtt::*close-tasks-p*
741
(spocq.i::close-task task)
742
(spocq.i::setf-task-transaction nil task))
743
(setf (spocq.i::task-result-generator task) nil)
744
(spocq.i::initialize-task task)
747
(defmethod CHUNGA:CHUNKED-STREAM-STREAM ((stream broadcast-stream))
748
;; arises when transcribing, in which case the first of the targets is the http stream
749
(CHUNGA:CHUNKED-STREAM-STREAM (first (broadcast-stream-streams stream))))
751
(defmethod spocq.i::initiate-task ((task mqtt:query) (encoding mime:mime-type) &key (stream *standard-output*) &allow-other-keys)
752
"Given a query from an MQTT subscription, expect the request to include a revision window and arrange
753
to iterate over the designated revision to repeat the query for each revision.
754
wrap the results captured as a binary vector output stream to publish as a response message.
755
clear the output stream between passes in order to refresh the response headers - in particular the ETag.
756
finalize the query instance only once at the conclusion."
757
(with-task-environment (:task task)
758
(spocq.i::register-query task)
761
(spocq.i::initialize-task task)
762
(let* ((spocq.i::*thread-operations* (cons (list 'spocq.i::initiate-task (spocq.i::task-id task))
763
spocq.i::*thread-operations*))
764
(repository (spocq.i::task-repository task))
765
(dydra:*revision-windows*
766
(or dydra:*revision-windows*
767
(http:request-header http:*request* "Revision-Windows")
768
(http:request-header http:*request* "Windows")))
769
(windowed-revision (when dydra:*revision-windows*
770
(spocq.i::compute-repository-revision repository dydra:*revision-windows*)))
771
(acceptor tbnl:*acceptor*)
772
(response-location (http:request-header hunchentoot:*request* "Asynchronous-Location"))
773
(output-stream (chunga:chunked-stream-stream stream)))
774
(labels ((for-each-revision (min-record max-record)
775
(log-debug "initiate-task.mqtt: for-each-revision: ~a -- ~a" min-record max-record)
776
(unless max-record (setf max-record min-record))
777
(let* ((min-id (ORG.DATAGRAPH.RDF.LMDB.IMPLEMENTATION::REVISION-RECORD-UUID min-record))
778
(max-id (ORG.DATAGRAPH.RDF.LMDB.IMPLEMENTATION::REVISION-RECORD-UUID max-record))
779
(id (if (equalp min-id max-id) min-id (concatenate 'string min-id "-" max-id)))
780
(effective-revision (spocq.i::compute-repository-revision repository id)))
781
(run-task-for-revision effective-revision)
782
(spocq.i::reinitialize-task task)
783
;; clear the output stream to elminate any cached headers
784
(clear-output stream)))
785
(run-task-for-revision (effective-revision)
786
(let ((effective-revision-id (spocq.i::repository-revision-id effective-revision)))
787
(setf (spocq.i::task-revision task) effective-revision)
788
(setf (spocq.i::task-revision-id task) effective-revision-id)
789
(setf (http.i::response-state http:*response*) nil)
790
(setf (http:response-etag http:*response*) effective-revision-id)
791
(log-debug "initiate-task.mqtt: run-task-for-revision: ~a" effective-revision-id))
792
(multiple-value-prog1
794
(spocq.i::query-run-in-thread task (spocq.i::task-result-generator task))
795
(spocq.i::send-response-message (spocq.i::task-operation task) task
796
(http:response-content-stream http:*response*)
798
(generate-response)))
799
(generate-response ()
800
(channel-put (acceptor-message-queue acceptor)
802
:message ,(mqtt:make-publish-message response-location
803
;; get the sized vector and reset
804
(de.setf.utility.implementation::vector-stream-vector output-stream)
805
:qos (acceptor-qos acceptor))))))
806
(declare (dynamic-extent #'for-each-revision))
807
(if windowed-revision
808
(spocq.i::map-repository-revision-intervals #'for-each-revision windowed-revision)
809
(run-task-for-revision (spocq.i::task-revision task))))))
810
(spocq.i::finalize-task task))))
814
(handler-bind ((mqtt::revision-processed
816
;;(declare (ignore condition))
817
(print (list 'mqtt::revision-processed condition))
819
(setf output-stream (make-instance 'de.setf.utility.implementation::vector-output-stream))
820
(setf hunchentoot:*reply* (mqtt:make-response acceptor
821
:server-protocol (mqtt:acceptor-protocol acceptor)
822
;; this may need to be a bivalent stream
823
:content-stream (make-instance 'http:output-stream :real-stream output-stream)))
824
(setf http:*response* hunchentoot:*reply*)
825
(setf (http:response-request http:*response*) http:*request*)
826
(setf (http:request-response http:*request*) http:*response*)
827
(print (list 'mqtt::invoke-restart condition))
828
(invoke-restart 'mqtt::next-revision))))
829
(catch 'hunchentoot::request-processed
830
(print (list :respond hunchentoot:*request*))
832
(print (list :responded)))
833
(generate-response)))
836
(defgeneric mqtt:send-messages (acceptor)
838
"Work through the entries in the acceptor's message queue.
839
These are either publish messages, which are sent as responses to their respective location, or
840
puback messages, which serve to release some already sent message.
841
Should a published message receive no acknowledgement, it is resent")
842
(:method ((acceptor mqtt:acceptor))
843
(let ((sent-messages (make-hash-table :test 'eql)))
844
(labels ((manage-timeouts ()
845
(let ((now (get-universal-time)))
846
;; qos2 will require tracing send/receive publishes separately as they can share message ids
847
(maphash #'(lambda (mid entry)
848
(destructuring-bind (&key count message (timeout 0)) entry
849
(when (> now timeout)
850
(cond ((>= count (acceptor-send-limit acceptor))
851
(http:log-error *trace-output* "mqtt:send-messages: send limit exceeded: ~a: ~a" count message)
852
(remhash mid sent-messages))
855
(mqtt:send (mqtt:acceptor-broker-connection acceptor) message :timeout (acceptor-send-timeout acceptor))
856
(incf (getf entry :count))
857
(setf (getf entry :timeout) (+ now (acceptor-send-timeout acceptor))))
859
(http:log-error *trace-output* "mqtt:send-messages: send failed: ~a: ~a" c message)
860
(remhash mid sent-messages))))))))
862
(send-publish (message)
863
(http:log-debug *trace-output* "mqtt:send-messages: send publish: ~a" message)
865
(mqtt:send (mqtt:acceptor-broker-connection acceptor) message :timeout (acceptor-send-timeout acceptor))
866
(when (typep (mqtt:message-qos message) '(integer 1))
867
;; retain qos1 messages for confirmation
868
(setf (mqtt:message-dup message) 1)
869
(setf (gethash (mqtt:message-mid message) sent-messages)
870
`(:count 1 :message ,message :timeout ,(+ (get-universal-time) (acceptor-send-timeout acceptor))))))
872
(http:log-error *trace-output* "mqtt:send-messages: send failed: ~a: ~a" c message))))
873
(receive-publish (message)
874
(http:log-debug *trace-output* "mqtt:send-messages: receive publish: ~a" message)
875
(case (mqtt:message-qos message)
876
((0 2) ) ; qos0: no action, qos2: nyi
878
(handler-case (mqtt:send (mqtt:acceptor-broker-connection acceptor)
879
(mqtt:make-puback-message (mqtt:message-mid message))
880
:timeout (acceptor-send-timeout acceptor))
882
(http:log-error *trace-output* "mqtt:send-messages: send failed: ~a: ~a" c message))))))
883
(receive-puback (message)
884
(http:log-debug *trace-output* "mqtt:send-messages: receive puback: ~a" message)
885
(let* ((mid (mqtt:message-mid message)))
886
(destructuring-bind (&key message &allow-other-keys) (gethash mid sent-messages)
887
(http:log-debug *trace-output* "mqtt:send-messages: acknowledged: ~a" message)
888
(remhash mid sent-messages)))))
889
(loop for entry = (spocq.i::channel-get (acceptor-message-queue acceptor) :timeout (acceptor-send-timeout acceptor))
892
do (destructuring-bind (&key direction message) entry
895
(case (mqtt:message-type message)
896
(:publish (send-publish message))))
898
(case (mqtt:message-type message)
899
(:publish (receive-publish message))
900
(:puback (receive-puback message)))))))))))