Coverage report: /development/source/library/org/datagraph/spocq-shard/src/spocq-server/streams/websocket.lisp

KindCoveredAll%
expression0774 0.0
branch058 0.0
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.server.implementation; -*-
2
 ;;; (load #p"patches/model.lisp")
3
 
4
 (in-package :org.datagraph.spocq.server.implementation)
5
 ;;; (trace ws::process-websocket-frame http:respond-to-request tbnl::get-request-data hunchensocket::handle-handshake)
6
 ;;; (trace tbnl::START-LISTENING tbnl::EXECUTE-ACCEPTOR tbnl::HANDLE-INCOMING-CONNECTION tbnl::process-connection)
7
 
8
 ;;; (trace tbnl::process-connection http::respond-to-connection ws::websocket-request-loop ws::process-websocket-message spocq.si::propagation-server)
9
 
10
 #|
11
 Augment the execution control structure of the HTTP server to support WebSocket connections.
12
 This involves
13
 - extend the acceptor logic in process-connection to recognize an upgrade and
14
   allow that thread retain the connection to run a websocket request loop
15
   where ws::process-websocket-message serves as the equivalent to
16
   tbnl::process-connection, but using vector streams to present the input and
17
   capture the output.
18
 - otherwise, retain the standard response function dispatch mechanism to handle
19
   http requests in request/response mode as before
20
 - route replicated content to websocket output streams according to request
21
   "Content-Disposition" header values.
22
   - provide a control operator to register websocket response streams with the
23
     acceptor according to disposition
24
   - provide a replication side-effect for selected graph store operations, to
25
     use the request disposition to select websocket output streams and distribute
26
     requests to them
27
 - implement an in-memory patch operator which interprets the sections of a
28
   multipart mime document as DELETE/POST/PUT operations.
29
   (this could support section-specific dispositions and/or content-based logic
30
   for dynamic routing based on a computed disposition.)
31
 
32
 The extension is based on hunchensocket, which implements the framing and
33
 message un/marshalling, but it retains the http dispatch and message syntax of
34
 the core server.
35
 (https://github.com/joaotavora/hunchensocket)
36
 it adds just the propagation-server resource function which adds two resources
37
 - /ws is the target for the initial websocket upgrade request and responds with
38
   no content
39
 - /:account/:repository/disposition is an authenticated resource which sets the
40
   disposition for the respective response stream
41
 
42
 in order to provide the net-facing proxy, nginx is configured as for a spocq
43
 server, but with a long timeout
44
 
45
 location ^~ /ws {
46
   # when run on a specific server
47
   proxy_pass       http://spocq_ws;
48
   # otherwise just as normal
49
   # proxy_pass http://spocq;
50
   proxy_set_header Host      $host;
51
   proxy_set_header X-Real-IP $remote_addr;
52
   proxy_set_header DYDRA_SERVICE websocket;
53
   fastcgi_read_timeout 1d;
54
   proxy_read_timeout 1d;
55
 
56
   proxy_http_version 1.0; # no chunking
57
   proxy_set_header Upgrade $http_upgrade;
58
   proxy_set_header Connection "upgrade";
59
   # when run on a specific servier with a prefix
60
   # leave /ws alone, but elimiate a prefix elsewhere
61
   rewrite ^/ws(/.*)$ $1 break;
62
 }
63
 |#
64
 
65
 (defparameter ws:*class.request* 'ws:request)
66
 (defparameter ws:*class.response* 'ws:response)
67
 (defparameter ws:*request* nil)
68
 (defparameter ws:*response* nil)
69
 
70
 (defparameter *ws-request-limit* 16)
71
 (defparameter *ws-thread-limit* 8)
72
 (defparameter ws:*acceptor* nil)
73
 
74
 (defclass ws:acceptor (hunchensocket::websocket-acceptor spocq-acceptor) ; (tbnl:acceptor)
75
   ((version :initform :rfc-6455
76
             :reader acceptor-version)
77
    (protocol
78
     :initform "HTTP 1.0"
79
     :reader ws:acceptor-protocol
80
     :documentation "The effective http protocol disables chunking as the message is a single unit.")
81
    (request-limit
82
     :initform *ws-request-limit*
83
     :initarg :request-limit
84
     :reader acceptor-request-limit)
85
    (thread-limit
86
     :initform *ws-thread-limit*
87
     :initarg :thread-limit
88
     :reader acceptor-thread-limit)
89
    (threads-in-progress
90
     :initform 0
91
     :accessor acceptor-threads-in-progress)
92
    (wait-queue
93
     :initform (tbnl::make-condition-variable)
94
     :reader acceptor-wait-queue
95
     :documentation
96
     "A queue that we use to wait for a free connection.")
97
    (wait-lock
98
     :initform (bt:make-lock "acceptor-wait-lock")
99
     :reader acceptor-wait-lock
100
     :documentation
101
     "The lock for the connection wait queue.")
102
    (message-queue
103
     :initform (spocq.i::make-pool :name "ws message pool")
104
     :reader acceptor-message-queue
105
     :documentation
106
     "Passes message to the thread which emits and confirms receipts and responses.
107
      These will be the publish and confirmation messages read and passed through from the main acceptor thread
108
      and the response messages from the respective request processing thread.")
109
    (message-thread
110
     :accessor ws::acceptor-message-thread
111
     :documentation
112
     "A dependent thread to multiplex response messages over the acceptor connection, manage their timeouts,
113
      and correlate confirmation messages.")
114
    (propagation-lock
115
     :initform (bt:make-lock "acceptor-propagation-lock")
116
     :reader ws::acceptor-propagation-lock
117
     :documentation
118
     "The lock for the managing propagation.")
119
    (propagation-streams
120
     :initform (make-hash-table :test 'equal)
121
     :reader ws::acceptor-propagation-streams
122
     :documentation
123
     "A registry of streams to serve as propagation targets, indexed by disposition."))
124
   (:documentation
125
    "Combine ws-specific attributes with standard http acceptor behaviour and connection limits from
126
     task-manager"))
127
 
128
 (defmethod initialize-instance :after ((instance ws:acceptor) &key)
129
   (setf (tbnl::acceptor-output-chunking-p instance) nil))
130
 
131
 (defclass ws:request (spocq-request)
132
   ((hunchensocket::state
133
     :initform nil
134
     :reader acceptor-state
135
     :documentation "websocket frame state")
136
    (hunchensocket::pending-fragments
137
     :initform nil
138
     :reader ws::request-pending-fragments)
139
    (hunchensocket::pending-opcode :initform nil)
140
    (content
141
     :initarg :content :initform #()
142
     :reader ws::request-content
143
     :documentation
144
     "caches the websocket frame content")))
145
 
146
 (defun ws:make-request (&rest initargs)
147
   (apply #'make-instance ws:*class.request* :acceptor initargs))
148
 
149
 (defclass ws:response (spocq-response)
150
   ())
151
 
152
 (defun ws:make-response (&rest initargs)
153
   (apply #'make-instance ws:*class.response* :acceptor initargs))
154
 
155
 (defclass ws:output-stream (http:output-stream)
156
   ((write-lock
157
     :initform (bt:make-lock "stream-write-lock")
158
     :reader stream-write-lock
159
     :documentation
160
     "The lock to serialize socket output.")
161
    (disposition
162
     :initform nil
163
     :accessor ws::stream-disposition
164
     :documentation "registers patterns for terms for which the stream's client
165
      is to be informed")
166
    (node-address
167
     :initform nil
168
     :accessor ws::stream-node-address
169
     :documentation "Set from an ETag header provided with a request which sets
170
      the disposition to enable possible round-trip suppression."))
171
   (:documentation
172
    "Extend an http output stream with a lock to serialize output and a registry for client's terms
173
     as well as a dispositoon for routing and a location to suppress round-trip propagation."))
174
 
175
 (defmethod initialize-instance :after ((stream ws:output-stream) &key)
176
   (setf (chunga:chunked-stream-output-chunking-p stream) nil)
177
   )
178
   
179
 
180
 (defclass ws:query (dydra:query)
181
   ()
182
   (:metaclass org.datagraph.spocq.implementation::applicable-query-class)
183
   (:documentation "Distinguish ws queries in order to specialize spocq.i::initiate-task to iterate
184
  over revisions"))
185
 
186
 
187
 ;;; websocket splices on to
188
 ;;; - tbnl:process-connection
189
 ;;; - tbnl:respond-to-request
190
 ;;; see https://github.com/joaotavora/hunchensocket/blob/master/hunchensocket.lisp
191
 
192
 (defstruct (ws::frame (:constructor ws::make-frame))
193
   fin
194
   (resv1 0)
195
   (resv2 0)
196
   (resv3 0)
197
   (opcode 0)
198
   (mask 0)
199
   (length 0)
200
   (masking-key nil)
201
   (data #()))
202
 
203
 (defun ws::decode-frame (frame)
204
   (let* ((byte0 (aref frame 0))
205
          (byte1 (aref frame 1))
206
          (masked-bit (ldb (byte 1 7) byte1)))
207
   (ws::make-frame :fin (ldb (byte 1 7) byte0)
208
                   :opcode (ldb (byte 4 0) byte0)
209
                   :mask (eql masked-bit 1)
210
                   :length (case (ldb (byte 7 0) byte1)
211
                             (126 (+ (ash (aref frame 2) 8) (aref frame 3)))
212
                             (127 (reduce #'(lambda (l r) (+ (ash l 8) r)) frame :start 2 :end 10 :initial-value 0))
213
                             (t (ldb (byte 7 0) byte1)))
214
                   :masking-key (when (eql masked-bit 1) (subseq frame 10 13))
215
                   :data (subseq frame (if (eql masked-bit 1) 13 10)))))
216
                   
217
 
218
 (defgeneric ws:write-frame (stream opcode content)
219
   (:documentation
220
    "Send the frame to the websocket client.
221
  Serialize output with the stream's lock.")
222
   (:method ((stream ws:output-stream) opcode (content string))
223
     ;; if a strin gis passed, it is an error message
224
     (ws:write-frame stream opcode (map 'vector #'char-code content)))
225
   (:method ((stream ws:output-stream) opcode content)
226
     (bt:with-lock-held ((stream-write-lock stream))
227
       (hunchensocket::write-frame (CHUNGA:CHUNKED-STREAM-STREAM stream) opcode content)
228
       ;; (hunchensocket::write-frame stream opcode content)
229
       ))
230
   (:method ((stream stream) (opcode number) content)
231
     (hunchensocket::write-frame stream opcode content))
232
   (:method ((stream ws:output-stream) (media-type mime:mime-type) content)
233
     (ws:write-frame stream (ws:mime-type-opcode media-type) content)))
234
 ;;; tcpdump -A -s 0 -i lo tcp and port 8104
235
 
236
 #+(or)
237
 (let ((data (make-array 256))
238
       (stream (make-instance 'de.setf.utility.implementation::vector-output-stream)))
239
   (dotimes (x (length data)) (setf (aref data x) x))
240
   (ws:write-frame stream hunchensocket::+binary-frame+ data)
241
   (let ((vector (DE.SETF.UTILITY.IMPLEMENTATION::vector-stream-vector stream)))
242
     (values (ws::decode-frame vector)
243
             vector)))
244
 
245
 (defgeneric ws::close-connection (stream &key status reason)
246
   (:method ((stream ws:output-stream) &key (status 1011) reason)
247
     (ws:write-frame stream status reason)))
248
 
249
 (defgeneric ws:mime-type-opcode (media-type)
250
   (:method ((media-type null))
251
     hunchensocket::+binary-frame+)
252
   (:method ((media-type mime:mime-type))
253
     (if (binary-mime-type-p media-type)
254
         hunchensocket::+binary-frame+
255
         hunchensocket::+text-frame+)))
256
 
257
 
258
 (defgeneric ws::websocket-request-loop (acceptor request response &key version)
259
 
260
   (:method ((acceptor ws:acceptor) request response
261
             &key (version (acceptor-version acceptor)))
262
   "Implements the main WebSocket loop for supported protocol
263
 versions. Framing is handled automatically, CLIENT handles the actual
264
 payloads."
265
   (ecase version
266
     (:rfc-6455
267
      (handler-bind ((hunchensocket::websocket-error
268
                       #'(lambda (error)
269
                           (http:log-error "websocket-request-loop: websocket error in http response: [~a] ~a" (type-of error) error)
270
                           (ws::close-connection
271
                            (http:response-content-stream response)
272
                            :status (hunchensocket::websocket-error-status error)
273
                            :reason (format nil "Websocket error: [~a] ~a" (type-of error) error))))
274
                     (flexi-streams:external-format-error
275
                       #'(lambda (error)
276
                           (http:log-error "websocket-request-loop: flexistream error in http response: [~a] ~a" (type-of error) error)
277
                           (ws::close-connection
278
                              (http:response-content-stream response)
279
                              :status 1007
280
                              :reason "Bad UTF-8")))
281
                     (http:error
282
                      (lambda (c)
283
                        (http:log-error "websocket-request-loop: http error in http response: [~a] ~a" (type-of c) c)
284
                        (ws::close-connection
285
                         (http:response-content-stream response)
286
                         :status 1011
287
                         :reason (format nil "HTTP error: [~a] ~a" (type-of c) c))))
288
                     (error
289
                       #'(lambda (c)
290
                           (http:log-error "websocket-request-loop: error in http response: [~a] ~a" (type-of c) c)
291
                           (ws::close-connection
292
                              (http:response-content-stream response)
293
                              :status 1011
294
                              :reason (format nil "Error: [~a] ~a" (type-of c) c)))))
295
        (loop do (ws::process-websocket-frame acceptor request response)
296
          while (not (eq :closed (acceptor-state request)))))))))
297
 
298
 
299
 (defgeneric ws::process-websocket-message (acceptor request response body)
300
   (:documentation "For each websocket body, establish the processing context,
301
  in terms of *hunchentoot-stream* w/o chunking, wrap the content
302
  in an http stream, create a vector stream to capture output, parse the headers
303
  establish the request/response context
304
    tbnl::*request* : the reified request instance
305
    tbnl::*reply* : the reified response instance
306
  then proceed as with http processing via respond-to-request on the initial
307
  acceptor and the request and response instances.
308
  When the request completes, extract and send the response body.")
309
 
310
   (:method ((acceptor ws:acceptor) request acceptor-response body)
311
     (handler-bind 
312
         ((error (lambda (c)
313
                   (print (list :process-websocket-message c))
314
                   ;; do not handle it, just log
315
                   (format *error-output* "ws::process-websocket-message: error: [~a] ~a" (type-of c) c)
316
                   (format *error-output* "~%~a" (tbnl::get-backtrace)))))
317
     (http:log-debug "process-websocket-message: frame [~a]"
318
                     (map 'string #'code-char body))
319
     (let* ((frame-stream (make-instance 'de.setf.utility.implementation::vector-input-stream
320
                            :vector body))
321
            (tbnl::*hunchentoot-stream* frame-stream))
322
       (multiple-value-bind (headers-in method url-string protocol)
323
                            (tbnl::get-request-data tbnl::*hunchentoot-stream*)
324
         ;; check if there was a request at all
325
         (unless method
326
           (ws:write-frame (http:response-content-stream acceptor-response)
327
                           hunchensocket::+text-frame+
328
                           body)
329
           (return-from ws::process-websocket-message nil))
330
         (setf protocol :http/1.0)
331
         ;; bind per-request special variables, then process the
332
         ;; request - note that *ACCEPTOR* was bound by an aound method
333
         (let* ((tbnl:*acceptor* acceptor)
334
                (output-stream (make-instance 'de.setf.utility.implementation::vector-output-stream))
335
                (tbnl:*reply* (ws:make-response acceptor
336
                                                 :server-protocol protocol
337
                                                 ;; create the output stream which supports character output for the headers
338
                                                 ;; with the initial character encoding set to ascii
339
                                                 :content-stream (make-instance 'http:output-stream :real-stream output-stream)))
340
                (input-stream (make-instance 'http:input-stream :real-stream frame-stream))
341
                (tbnl:*request* (ws:make-request acceptor
342
                                                  ;; bogus, but ?
343
                                                  :socket nil ; (tbnl::request-socket request)
344
                                                  :headers-in headers-in
345
                                                  :content-stream input-stream
346
                                                  :content body
347
                                                  :method method
348
                                                  :uri url-string
349
                                                  :server-protocol protocol))
350
                (http:*request* tbnl::*request*)
351
                (http:*response* tbnl::*reply*)
352
                (tbnl::*tmp-files* nil)
353
                (tbnl::*session* nil)
354
                (dydra:*class.query* 'ws:query))
355
           (setf (http:response-request tbnl::*reply*) tbnl::*request*)
356
           (setf (http:request-response tbnl::*request*) tbnl::*reply*)
357
           (http:respond-to-request acceptor http:*request* http:*response*)
358
           (let ((content (de.setf.utility.implementation::vector-stream-vector
359
                           (chunga:chunked-stream-stream (http:response-content-stream http:*response*)))))
360
             ;; this needs to hold the acceptor's lock
361
             (ws:write-frame (http:response-content-stream acceptor-response)
362
                             (http:response-media-type http:*response*)
363
                             content)
364
             (tbnl::acceptor-log-access acceptor :return-code (http:response-status-code http:*response*)
365
                                        :format-control "~:[-~@[ (~A)~]~;~:*~A~@[ (~A)~]~] ~A [~A--~A] ~A \"~A ~A~@[?~A~] ~
366
                                                         ~A\" ~D ~:[-~;~:*~D~] \"~:[-~;~:*~A~]\"/ws \"~:[-~;~:*~A~]\"~%"))))))))
367
 
368
 
369
 (in-package :hunchensocket)
370
 
371
 (defgeneric ws::check-websocket-frame (acceptor request frame)
372
   (:method ((acceptor ws:acceptor) (request ws:request) frame)
373
     (let* ((length (frame-payload-length frame))
374
            (total (+ length
375
                      (reduce #'+ (mapcar
376
                                   #'frame-payload-length
377
                                   (ws::request-pending-fragments request))))))
378
       (cond ((> length #xffff) ; 65KiB
379
              (websocket-error 1009 "Message fragment too big"))
380
             ((> total #xfffff) ; 1 MiB
381
              (websocket-error 1009 "Total message too big"))))))
382
 
383
 (defun ws::process-websocket-frame (acceptor request response)
384
   (handler-bind 
385
       (#+(or)(hunchensocket::websocket-error (lambda (c)
386
                                          (print (list :seen-as-websocket-rror c))
387
                                          (signal c)))
388
        (error (lambda (c)
389
                 (print (list :seen-as-rror c))
390
                 ;; do not handle it, just log
391
                 (format *error-output* "http::process-websocket-frame: error in frame processing: [~a] ~a" (type-of c) c)
392
                 (format *error-output* "~%~a" (tbnl::get-backtrace)))))
393
     (with-slots (state pending-fragments pending-opcode) request
394
       (let ((frame nil))
395
         (loop for count below 3
396
           until frame
397
           do (handler-case (setf frame (hunchensocket::read-frame (http:request-content-stream request)))
398
                (sb-sys:io-timeout (c)
399
                  (declare (ignore c))
400
                  (ws:write-frame (http:response-content-stream response) +ping+
401
                                  (map 'vector #'char-code (format nil "ping ~d" count)))))
402
           finally (unless frame
403
                     (http:log-debug "ws:process-websocket-frame: io-timeout: ~a" request)))
404
                                   
405
         (cond (frame
406
             (with-slots (opcode finp payload-length masking-key) frame
407
               (flet ((maybe-accept-data-frame ()
408
                        (ws::check-websocket-frame acceptor request frame)
409
                        (read-application-data (http:request-content-stream request) frame)))
410
                 (cond
411
                  ((eq :awaiting-close state)
412
                   ;; We're waiting a close because we explicitly sent one to the
413
                   ;; client. Error out if the next message is not a close.
414
                   ;;
415
                   (unless (eq opcode +connection-close+)
416
                     (websocket-error
417
                      1002 "Expected connection close from client, got 0x~x" opcode))
418
                   (setq state :closed))
419
                  ((not finp)
420
                   ;; This is a non-FIN fragment Check opcode, append to client's
421
                   ;; fragments.
422
                   ;;
423
                   (cond ((and (= opcode +continuation-frame+)
424
                               (not pending-fragments))
425
                          (websocket-error
426
                           1002 "Unexpected continuation frame"))
427
                         ((control-frame-p opcode)
428
                          (websocket-error
429
                           1002 "Control frames can't be fragmented"))
430
                         ((and pending-fragments
431
                               (/= opcode +continuation-frame+))
432
                          (websocket-error
433
                           1002 "Not discarding initiated fragment sequence"))
434
                         (t
435
                          ;; A data frame, is either initiaing a new fragment sequence
436
                          ;; or continuing one
437
                          ;;
438
                          (maybe-accept-data-frame)
439
                          (cond ((= opcode +continuation-frame+)
440
                                 (push frame pending-fragments))
441
                                (t
442
                                 (setq pending-opcode opcode
443
                                       pending-fragments (list frame)))))))
444
                  ((and pending-fragments
445
                        (not (or (control-frame-p opcode)
446
                                 (= opcode +continuation-frame+))))
447
                   ;; This is a FIN fragment and (1) there are pending fragments and (2)
448
                   ;; this isn't a control or continuation frame. Error out.
449
                   ;;
450
                   (websocket-error
451
                    1002 "Only control frames can interleave fragment sequences."))
452
                  (t
453
                   ;; This is a final, FIN fragment. So first read the fragment's data
454
                   ;; into the `data' slot.
455
                   ;;
456
                   (cond
457
                    ((not (control-frame-p opcode))
458
                     ;; This is either a single-fragment data frame or a continuation
459
                     ;; frame. Join the fragments and keep on processing. Join any
460
                     ;; outstanding fragments and process the message.
461
                     ;;
462
                     (maybe-accept-data-frame)
463
                     (unless pending-opcode
464
                       (setq pending-opcode opcode))
465
                     (let* ((ordered-frames
466
                             (reverse (cons frame pending-fragments)))
467
                            (body (apply #'concatenate 'vector
468
                                         (mapcar #'frame-data
469
                                                 ordered-frames))))
470
                       (setf pending-fragments nil)
471
                       (handler-case
472
                           (ws::process-websocket-message acceptor request response body)
473
                         (error (e)
474
                           (websocket-error
475
                            1002 (format nil "Websocket message error: ~a" e))))))
476
                    ((eq +ping+ opcode)
477
                     ;; Reply to client-initiated ping with a server-pong with the
478
                     ;; same data
479
                     (ws:write-frame (http:response-content-stream response) +pong+ (frame-data frame)))
480
                    ((eq +connection-close+ opcode)
481
                     ;; Reply to client-initiated close with a server-close with the
482
                     ;; same data
483
                     ;;
484
                     (ws:write-frame (http:response-content-stream response) +connection-close+ (frame-data frame))
485
                     (setq state :closed))
486
                    ((eq +pong+ opcode)
487
                     ;; Probably just a heartbeat, don't do anything.
488
                     )
489
                    (t
490
                     (websocket-error
491
                      1002 "Client sent unknown opcode ~a" opcode))))))))
492
               (t
493
                (http:log-info "ws:process-websocket-frame: timeout: ~a" request)
494
                (setq state :closed)))))))
495
 
496
 
497
 ;;; toplevel with websocket support
498
 
499
 (in-package :org.datagraph.spocq.server.implementation)
500
 
501
 (defun ws:main (&rest args &key (init-name (or (getarg "--spocqinit") "init-websockets")) &allow-other-keys)
502
   "Provide the main entry point for a service with websocket support:
503
  - configure from --spocqinit
504
  - initialize spocq runtime to start logs and establish connection to store
505
  - run the ws service
506
  "
507
   (when (getarg "--spocqhelp") ;; --help is seen by sbcl
508
     (format *trace-output* "~a :~{~% ~a~}~%" (first (spocq.i::command-line-argument-list))
509
             (sort spocq.i::*getarg-options* #'string-lessp))
510
     (exit-lisp 0))
511
   (setq spocq.i:*configuration-pathname*
512
         (merge-pathnames init-name (make-pathname :directory '(:relative) :type "sxp")))
513
   (handler-case (spocq.i:initialize-spocq)
514
     (error (condition)
515
       (log-error "ws:main: termination due to condition: ~a" condition)
516
       (spocq.i::maybe-exit-on-error)))
517
   ;; avoid first initialization error
518
   (handler-case (make-instance 'spocq.i::query :sse-expression () :id "" :repository-id "system/system")
519
     (error (c) (warn "initial instantiation error: ~a" c))
520
     (:no-error (result) (format t "instantiated: ~a" result)))
521
   (apply #'ws:run args))
522
 
523
 (defun ws:run (&key (request-limit *mqtt-request-limit*)
524
                       (thread-limit *mqtt-thread-limit*)
525
                       (request-class nil request-class-supplied-p)
526
                       (response-class nil response-class-supplied-p)
527
                       (query-class nil query-class-supplied-p)
528
                       (host-name (dydra:server-host-name))
529
                       (host-package (or (find-package host-name)
530
                                         (make-package host-name :use ())))
531
                       (port *host-port*))
532
                       
533
   "Initiate the ws service with a background admin process.
534
  - create an acceptor
535
  - bind the response operators
536
  - start the acceptor
537
  "
538
   (spocq.i:enable-interrupt :sigterm #'spocq.i:sigterm-handler)
539
   (unless spocq.i:*start-timestamp*
540
     (setq spocq.i:*start-timestamp* (iso-time)))
541
   (setq spocq.i:*response-header-types* nil)  ; to be sure that no prefixes are sent out
542
   #+sbcl(sb-ext:gc :full t)
543
 
544
   (dydra:log-info "Start Websockets ~a." (iso-time))
545
 
546
   (when request-class-supplied-p
547
     (setq ws:*class.request* request-class))
548
   (when response-class-supplied-p
549
     (setq ws:*class.response* response-class))
550
   (when query-class-supplied-p
551
     (setq dydra:*class.query* query-class))
552
 
553
   (setq ws:*acceptor*
554
         (make-instance 'ws:acceptor
555
           :port port
556
           :address host-name
557
           :name (format nil "~a@~a" "dydra.spocq" (spocq.i::host-name))
558
           :request-class request-class
559
           :response-class response-class
560
           :thread-limit thread-limit
561
           :request-limit request-limit
562
           ))
563
   (setq *spocq-acceptor* ws:*acceptor*)
564
   (import (cons 'spocq.si::propagation-server
565
                 *response-functions*)
566
           host-package)
567
   (with-package-iterator (next host-package :internal)
568
     (loop (multiple-value-bind (symbol-p symbol) (next)
569
             (unless symbol-p (return))
570
             (export symbol host-package))))
571
   
572
   (setf (http:acceptor-dispatch-function ws:*acceptor*) host-package)
573
   
574
   (handler-case (http:start ws:*acceptor*)
575
     (error (c)
576
       (dydra:log-warn "Unable to initiate service: ~a" c)
577
       (spocq.i::maybe-exit-on-error)
578
       (break "Unable to initiate service: ~a" c)))
579
   (dydra:log-info "Accepting websockets on ~a." port)
580
   (spocq.i:run-processing-threads)
581
   )
582
 
583
 
584
 
585
 (in-package :tbnl)
586
 
587
 ;;; http integration via hunchentoot initiation crontrol-flow
588
 
589
 (defmethod start ((acceptor ws:acceptor))
590
   #+(or) ;; no extra thread
591
   (setf (ws::acceptor-message-thread acceptor)
592
         (bt:make-thread #'(lambda () (ws::send-messages acceptor))
593
                         :name "ws send-message thread"))
594
   (call-next-method))
595
 
596
 (defmethod stop-server ((acceptor ws:acceptor) &key soft)
597
   (declare (ignore soft))
598
   (call-next-method)
599
   #+(or)
600
   (bt:destroy-thread (ws::acceptor-message-thread acceptor)))
601
 
602
 (defmethod start-listening ((acceptor ws:acceptor))
603
   ;; unchanged from base class
604
   (call-next-method))
605
 
606
 (defmethod process-connection ((acceptor ws:acceptor) (socket t))
607
   "Given acceptor, a ws:acceptor, and socket, a connection socket,
608
  for each request, perform the websockt handshake and protocol switch.
609
  then delegate to sebsocke logic for subsequent message handling."
610
 
611
   (let ((socket-stream (make-socket-stream socket acceptor)))
612
     (unwind-protect
613
       ;; process requests until either the acceptor is shut down,
614
       ;; *CLOSE-HUNCHENTOOT-STREAM* has been set to T by the
615
       ;; handler, or the peer fails to send a request
616
       ;; use as the base stream either the original socket stream or, if the connector
617
       ;; supports ssl, a wrapped stream for ssl support
618
       (let* ((acceptor-stream (initialize-connection-stream acceptor socket-stream))
619
              (tbnl::*hunchentoot-stream* acceptor-stream)) ; provide the dynamic binding
620
           ;; establish http condition handlers and an error handler which mapps to internal-error
621
           (handler-bind
622
             (;; declared conditions are handled according to their report implementation
623
              (http:error (lambda (c)
624
                                (when tbnl::*reply*  ;; can happen while request is being parsed
625
                                  (http:send-condition tbnl::*reply* c)
626
                                  ;; log the condition as request completion
627
                                  (acceptor-log-access acceptor :return-code (http:response-status-code tbnl::*reply*)))
628
                                (http:log-error "process-connection: http error in http response: [~a] ~a" (type-of c) c)
629
                                ;;(describe tbnl::*reply*)
630
                                ;;(describe (http:response-content-stream tbnl::*reply*))
631
                                ;;(dotimes (x 100) (write-char #\. (http:response-content-stream tbnl::*reply*)))
632
                                ;;(finish-output (http:response-content-stream tbnl::*reply*))
633
                                ;;(dotimes (x 100) (write-byte (char-code #\,) acceptor-stream))
634
                                ;;(finish-output acceptor-stream)
635
                                ;; (format *trace-output*  "sent~%~a~%" c)
636
                                (return-from tbnl:process-connection
637
                                  (values nil c nil)))))
638
             (handler-bind
639
               ;; establish an additional level to permit a general handler which maps to http:condition
640
               (;; at this level decline to handle http:condition, to cause it to pass one level up
641
                (http:condition (lambda (c)
642
                                  (signal c)))
643
                ;; a connection error is suppressed by returning from the connection handler.
644
                ;; this does not try to continue as any stream's socket
645
                (usocket:connection-aborted-error (lambda (c) 
646
                                                    (http:log-error "process-connection: [~a] ~a" (type-of c) c)
647
                                                    (return-from tbnl:process-connection nil)))
648
                #+sbcl  ;; caused by a broken pipe
649
                (sb-int:simple-stream-error (lambda (c)
650
                                              (http:log-error "process-connection: [~a] ~a" (type-of c) c)
651
                                              (return-from tbnl:process-connection nil)))
652
                ;; while any other error is handled as per acceptor, where the default implementation
653
                ;; will be to log and re-signal as an http:internal-error, but other mapping are possible
654
                ;; as well as declining to handle in which the condition is re-signaled as an internal error
655
                (error (lambda (c)
656
                         (http:handle-condition acceptor c)
657
                         ;; if it remains unhandled, then resignal as an internal error
658
                         (http:log-error "process-connection: unhandled error in http response: [~a] ~a" (type-of c) c)
659
                         (http:log-error "~a" (get-backtrace))
660
                         ;; re-signal to the acceptor's general handler
661
                         (http:internal-error "process-connection: unhandled error in http response: [~a] ~a" (type-of c) c))))
662
             
663
               (loop
664
                 (let ((tbnl::*close-hunchentoot-stream* t))
665
                   (when (acceptor-shutdown-p acceptor)
666
                     (return))
667
                   (multiple-value-bind (headers-in method url-string protocol)
668
                                        (get-request-data tbnl::*hunchentoot-stream*)
669
                     ;; check if there was a request at all
670
                     (unless method
671
                       (return))
672
                     ;; bind per-request special variables, then process the
673
                     ;; request - note that *ACCEPTOR* was bound by an aound method
674
                     (let* ((output-stream (make-instance 'ws:output-stream :real-stream tbnl::*hunchentoot-stream*))
675
                            (tbnl::*reply* (ws:make-response acceptor
676
                                                         :server-protocol protocol
677
                                                         ;; create the output stream which supports character output for the headers
678
                                                         ;; with the initial character encoding set to ascii
679
                                                         :content-stream output-stream))
680
                            (input-stream (make-instance 'http:input-stream :real-stream tbnl::*hunchentoot-stream*))
681
                            (tbnl::*request* (ws:make-request acceptor
682
                                                          :socket socket
683
                                                          :headers-in headers-in
684
                                                          :content-stream input-stream
685
                                                          :method method
686
                                                          :uri url-string
687
                                                          :server-protocol protocol))
688
                            (tbnl::*tmp-files* nil)
689
                            (tbnl::*session* nil)
690
                            (transfer-encodings (cdr (assoc* :transfer-encoding headers-in))))
691
                       ;; instantiation must follow this order as any errors are recorded as side-effects on the response
692
                       ;; return code, which must be checked...
693
                       (setf (http:response-request tbnl::*reply*) tbnl::*request*)
694
                       (setf (http:request-response tbnl::*request*) tbnl::*reply*)
695
                       (when transfer-encodings
696
                         (setq transfer-encodings
697
                               (split "\\s*,\\s*" transfer-encodings))
698
                         (when (member "chunked" transfer-encodings :test #'equalp)
699
                           (cond ((acceptor-input-chunking-p acceptor)
700
                                  ;; turn chunking on before we read the request body
701
                                  (setf (chunked-stream-input-chunking-p input-stream) t))
702
                                 (t (http:bad-request "Client tried to use chunked encoding, but acceptor is configured to not use it.")))))
703
                       (if (eql +http-ok+ (return-code tbnl::*reply*))
704
                           ;; if initialization succeeded, process
705
                           (with-acceptor-request-count-incremented (acceptor)
706
                             ;; at this point thread counts as active wrt eventual soft shutdown (see stop)
707
                             (catch 'request-processed
708
                               (http::respond-to-connection acceptor tbnl::*request* tbnl::*reply*)))
709
                           ;; otherwise, report the error
710
                           (http:error :code (return-code tbnl::*reply*)))
711
                       ;; iff chunking, emit the last chunk and then the terminating chunk
712
                       (force-output output-stream)
713
                       ;; record content disposition filter in the output strram
714
                       (when (chunga:chunked-stream-output-chunking-p output-stream)
715
                         (setf (chunga:chunked-stream-output-chunking-p output-stream) nil))
716
                       (close output-stream)
717
                       ;;(reset-connection-stream *acceptor* (http:response-content-stream tbnl::*reply*))
718
                       ;; access log message
719
                       (acceptor-log-access acceptor :return-code (http:response-status-code tbnl::*reply*)))
720
                     ;; synchronize on the underlying stream
721
                     ;; (finish-output acceptor-stream)
722
                     (when tbnl::*close-hunchentoot-stream*
723
                       (return)))))))
724
         (close acceptor-stream :abort t)
725
         (setq socket-stream nil))
726
       (when socket-stream
727
         ;; as we are at the end of the request here, we ignore all
728
         ;; errors that may occur while flushing and/or closing the
729
         ;; stream.
730
         ;; as the socket stream is still bound, an error occurred - do not flush, just close
731
         (ignore-errors*
732
          (close socket-stream :abort t))))))
733
 
734
 
735
 ;;; adapted from hunchensocket:acceptor-dispatch-request
736
 (defmethod http::respond-to-connection ((acceptor ws:acceptor) request response)
737
   "Attempt WebSocket connection, else fall back to HTTP"
738
     (cond ((and (member "upgrade" (split "\\s*,\\s*" (header-in* :connection))
739
                         :test #'string-equal)
740
                 (string-equal "websocket" (header-in* :upgrade)))
741
            (hunchensocket::handle-handshake acceptor request response)
742
            ;; if that returns, the headers have been configured for a handshake reply
743
            (let ((stream (http:response-content-stream response))
744
                  (ws:*request* request)
745
                  (ws:*response* response))
746
              (http:send-headers response)
747
              (http::finish-header-output stream)
748
              (force-output stream)
749
              (catch 'websocket-done
750
                (handler-bind ((error #'(lambda (e)
751
                                          (maybe-invoke-debugger e)
752
                                          (log-message* :error "Error: ~a" e)
753
                                          (throw 'websocket-done nil))))
754
                  (let ((ws:*acceptor* acceptor))
755
                    (ws::websocket-request-loop acceptor request response))))))
756
           (t
757
            (let ((http:*request* tbnl::*request*)
758
                  (http:*response* tbnl::*reply*))
759
              ;; Client is not requesting websockets, let Hunchentoot do its HTTP
760
              ;; thing undisturbed.
761
              (http::respond-to-request acceptor request response)))))