Coverage report: /development/source/library/org/datagraph/spocq-shard/src/spocq-server/streams/websocket.lisp
| Kind | Covered | All | % |
| expression | 0 | 774 | 0.0 |
| branch | 0 | 58 | 0.0 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.server.implementation; -*-
2
;;; (load #p"patches/model.lisp")
4
(in-package :org.datagraph.spocq.server.implementation)
5
;;; (trace ws::process-websocket-frame http:respond-to-request tbnl::get-request-data hunchensocket::handle-handshake)
6
;;; (trace tbnl::START-LISTENING tbnl::EXECUTE-ACCEPTOR tbnl::HANDLE-INCOMING-CONNECTION tbnl::process-connection)
8
;;; (trace tbnl::process-connection http::respond-to-connection ws::websocket-request-loop ws::process-websocket-message spocq.si::propagation-server)
11
Augment the execution control structure of the HTTP server to support WebSocket connections.
13
- extend the acceptor logic in process-connection to recognize an upgrade and
14
allow that thread retain the connection to run a websocket request loop
15
where ws::process-websocket-message serves as the equivalent to
16
tbnl::process-connection, but using vector streams to present the input and
18
- otherwise, retain the standard response function dispatch mechanism to handle
19
http requests in request/response mode as before
20
- route replicated content to websocket output streams according to request
21
"Content-Disposition" header values.
22
- provide a control operator to register websocket response streams with the
23
acceptor according to disposition
24
- provide a replication side-effect for selected graph store operations, to
25
use the request disposition to select websocket output streams and distribute
27
- implement an in-memory patch operator which interprets the sections of a
28
multipart mime document as DELETE/POST/PUT operations.
29
(this could support section-specific dispositions and/or content-based logic
30
for dynamic routing based on a computed disposition.)
32
The extension is based on hunchensocket, which implements the framing and
33
message un/marshalling, but it retains the http dispatch and message syntax of
35
(https://github.com/joaotavora/hunchensocket)
36
it adds just the propagation-server resource function which adds two resources
37
- /ws is the target for the initial websocket upgrade request and responds with
39
- /:account/:repository/disposition is an authenticated resource which sets the
40
disposition for the respective response stream
42
in order to provide the net-facing proxy, nginx is configured as for a spocq
43
server, but with a long timeout
46
# when run on a specific server
47
proxy_pass http://spocq_ws;
48
# otherwise just as normal
49
# proxy_pass http://spocq;
50
proxy_set_header Host $host;
51
proxy_set_header X-Real-IP $remote_addr;
52
proxy_set_header DYDRA_SERVICE websocket;
53
fastcgi_read_timeout 1d;
54
proxy_read_timeout 1d;
56
proxy_http_version 1.0; # no chunking
57
proxy_set_header Upgrade $http_upgrade;
58
proxy_set_header Connection "upgrade";
59
# when run on a specific servier with a prefix
60
# leave /ws alone, but elimiate a prefix elsewhere
61
rewrite ^/ws(/.*)$ $1 break;
65
(defparameter ws:*class.request* 'ws:request)
66
(defparameter ws:*class.response* 'ws:response)
67
(defparameter ws:*request* nil)
68
(defparameter ws:*response* nil)
70
(defparameter *ws-request-limit* 16)
71
(defparameter *ws-thread-limit* 8)
72
(defparameter ws:*acceptor* nil)
74
(defclass ws:acceptor (hunchensocket::websocket-acceptor spocq-acceptor) ; (tbnl:acceptor)
75
((version :initform :rfc-6455
76
:reader acceptor-version)
79
:reader ws:acceptor-protocol
80
:documentation "The effective http protocol disables chunking as the message is a single unit.")
82
:initform *ws-request-limit*
83
:initarg :request-limit
84
:reader acceptor-request-limit)
86
:initform *ws-thread-limit*
87
:initarg :thread-limit
88
:reader acceptor-thread-limit)
91
:accessor acceptor-threads-in-progress)
93
:initform (tbnl::make-condition-variable)
94
:reader acceptor-wait-queue
96
"A queue that we use to wait for a free connection.")
98
:initform (bt:make-lock "acceptor-wait-lock")
99
:reader acceptor-wait-lock
101
"The lock for the connection wait queue.")
103
:initform (spocq.i::make-pool :name "ws message pool")
104
:reader acceptor-message-queue
106
"Passes message to the thread which emits and confirms receipts and responses.
107
These will be the publish and confirmation messages read and passed through from the main acceptor thread
108
and the response messages from the respective request processing thread.")
110
:accessor ws::acceptor-message-thread
112
"A dependent thread to multiplex response messages over the acceptor connection, manage their timeouts,
113
and correlate confirmation messages.")
115
:initform (bt:make-lock "acceptor-propagation-lock")
116
:reader ws::acceptor-propagation-lock
118
"The lock for the managing propagation.")
120
:initform (make-hash-table :test 'equal)
121
:reader ws::acceptor-propagation-streams
123
"A registry of streams to serve as propagation targets, indexed by disposition."))
125
"Combine ws-specific attributes with standard http acceptor behaviour and connection limits from
128
(defmethod initialize-instance :after ((instance ws:acceptor) &key)
129
(setf (tbnl::acceptor-output-chunking-p instance) nil))
131
(defclass ws:request (spocq-request)
132
((hunchensocket::state
134
:reader acceptor-state
135
:documentation "websocket frame state")
136
(hunchensocket::pending-fragments
138
:reader ws::request-pending-fragments)
139
(hunchensocket::pending-opcode :initform nil)
141
:initarg :content :initform #()
142
:reader ws::request-content
144
"caches the websocket frame content")))
146
(defun ws:make-request (&rest initargs)
147
(apply #'make-instance ws:*class.request* :acceptor initargs))
149
(defclass ws:response (spocq-response)
152
(defun ws:make-response (&rest initargs)
153
(apply #'make-instance ws:*class.response* :acceptor initargs))
155
(defclass ws:output-stream (http:output-stream)
157
:initform (bt:make-lock "stream-write-lock")
158
:reader stream-write-lock
160
"The lock to serialize socket output.")
163
:accessor ws::stream-disposition
164
:documentation "registers patterns for terms for which the stream's client
168
:accessor ws::stream-node-address
169
:documentation "Set from an ETag header provided with a request which sets
170
the disposition to enable possible round-trip suppression."))
172
"Extend an http output stream with a lock to serialize output and a registry for client's terms
173
as well as a dispositoon for routing and a location to suppress round-trip propagation."))
175
(defmethod initialize-instance :after ((stream ws:output-stream) &key)
176
(setf (chunga:chunked-stream-output-chunking-p stream) nil)
180
(defclass ws:query (dydra:query)
182
(:metaclass org.datagraph.spocq.implementation::applicable-query-class)
183
(:documentation "Distinguish ws queries in order to specialize spocq.i::initiate-task to iterate
187
;;; websocket splices on to
188
;;; - tbnl:process-connection
189
;;; - tbnl:respond-to-request
190
;;; see https://github.com/joaotavora/hunchensocket/blob/master/hunchensocket.lisp
192
(defstruct (ws::frame (:constructor ws::make-frame))
203
(defun ws::decode-frame (frame)
204
(let* ((byte0 (aref frame 0))
205
(byte1 (aref frame 1))
206
(masked-bit (ldb (byte 1 7) byte1)))
207
(ws::make-frame :fin (ldb (byte 1 7) byte0)
208
:opcode (ldb (byte 4 0) byte0)
209
:mask (eql masked-bit 1)
210
:length (case (ldb (byte 7 0) byte1)
211
(126 (+ (ash (aref frame 2) 8) (aref frame 3)))
212
(127 (reduce #'(lambda (l r) (+ (ash l 8) r)) frame :start 2 :end 10 :initial-value 0))
213
(t (ldb (byte 7 0) byte1)))
214
:masking-key (when (eql masked-bit 1) (subseq frame 10 13))
215
:data (subseq frame (if (eql masked-bit 1) 13 10)))))
218
(defgeneric ws:write-frame (stream opcode content)
220
"Send the frame to the websocket client.
221
Serialize output with the stream's lock.")
222
(:method ((stream ws:output-stream) opcode (content string))
223
;; if a strin gis passed, it is an error message
224
(ws:write-frame stream opcode (map 'vector #'char-code content)))
225
(:method ((stream ws:output-stream) opcode content)
226
(bt:with-lock-held ((stream-write-lock stream))
227
(hunchensocket::write-frame (CHUNGA:CHUNKED-STREAM-STREAM stream) opcode content)
228
;; (hunchensocket::write-frame stream opcode content)
230
(:method ((stream stream) (opcode number) content)
231
(hunchensocket::write-frame stream opcode content))
232
(:method ((stream ws:output-stream) (media-type mime:mime-type) content)
233
(ws:write-frame stream (ws:mime-type-opcode media-type) content)))
234
;;; tcpdump -A -s 0 -i lo tcp and port 8104
237
(let ((data (make-array 256))
238
(stream (make-instance 'de.setf.utility.implementation::vector-output-stream)))
239
(dotimes (x (length data)) (setf (aref data x) x))
240
(ws:write-frame stream hunchensocket::+binary-frame+ data)
241
(let ((vector (DE.SETF.UTILITY.IMPLEMENTATION::vector-stream-vector stream)))
242
(values (ws::decode-frame vector)
245
(defgeneric ws::close-connection (stream &key status reason)
246
(:method ((stream ws:output-stream) &key (status 1011) reason)
247
(ws:write-frame stream status reason)))
249
(defgeneric ws:mime-type-opcode (media-type)
250
(:method ((media-type null))
251
hunchensocket::+binary-frame+)
252
(:method ((media-type mime:mime-type))
253
(if (binary-mime-type-p media-type)
254
hunchensocket::+binary-frame+
255
hunchensocket::+text-frame+)))
258
(defgeneric ws::websocket-request-loop (acceptor request response &key version)
260
(:method ((acceptor ws:acceptor) request response
261
&key (version (acceptor-version acceptor)))
262
"Implements the main WebSocket loop for supported protocol
263
versions. Framing is handled automatically, CLIENT handles the actual
267
(handler-bind ((hunchensocket::websocket-error
269
(http:log-error "websocket-request-loop: websocket error in http response: [~a] ~a" (type-of error) error)
270
(ws::close-connection
271
(http:response-content-stream response)
272
:status (hunchensocket::websocket-error-status error)
273
:reason (format nil "Websocket error: [~a] ~a" (type-of error) error))))
274
(flexi-streams:external-format-error
276
(http:log-error "websocket-request-loop: flexistream error in http response: [~a] ~a" (type-of error) error)
277
(ws::close-connection
278
(http:response-content-stream response)
280
:reason "Bad UTF-8")))
283
(http:log-error "websocket-request-loop: http error in http response: [~a] ~a" (type-of c) c)
284
(ws::close-connection
285
(http:response-content-stream response)
287
:reason (format nil "HTTP error: [~a] ~a" (type-of c) c))))
290
(http:log-error "websocket-request-loop: error in http response: [~a] ~a" (type-of c) c)
291
(ws::close-connection
292
(http:response-content-stream response)
294
:reason (format nil "Error: [~a] ~a" (type-of c) c)))))
295
(loop do (ws::process-websocket-frame acceptor request response)
296
while (not (eq :closed (acceptor-state request)))))))))
299
(defgeneric ws::process-websocket-message (acceptor request response body)
300
(:documentation "For each websocket body, establish the processing context,
301
in terms of *hunchentoot-stream* w/o chunking, wrap the content
302
in an http stream, create a vector stream to capture output, parse the headers
303
establish the request/response context
304
tbnl::*request* : the reified request instance
305
tbnl::*reply* : the reified response instance
306
then proceed as with http processing via respond-to-request on the initial
307
acceptor and the request and response instances.
308
When the request completes, extract and send the response body.")
310
(:method ((acceptor ws:acceptor) request acceptor-response body)
313
(print (list :process-websocket-message c))
314
;; do not handle it, just log
315
(format *error-output* "ws::process-websocket-message: error: [~a] ~a" (type-of c) c)
316
(format *error-output* "~%~a" (tbnl::get-backtrace)))))
317
(http:log-debug "process-websocket-message: frame [~a]"
318
(map 'string #'code-char body))
319
(let* ((frame-stream (make-instance 'de.setf.utility.implementation::vector-input-stream
321
(tbnl::*hunchentoot-stream* frame-stream))
322
(multiple-value-bind (headers-in method url-string protocol)
323
(tbnl::get-request-data tbnl::*hunchentoot-stream*)
324
;; check if there was a request at all
326
(ws:write-frame (http:response-content-stream acceptor-response)
327
hunchensocket::+text-frame+
329
(return-from ws::process-websocket-message nil))
330
(setf protocol :http/1.0)
331
;; bind per-request special variables, then process the
332
;; request - note that *ACCEPTOR* was bound by an aound method
333
(let* ((tbnl:*acceptor* acceptor)
334
(output-stream (make-instance 'de.setf.utility.implementation::vector-output-stream))
335
(tbnl:*reply* (ws:make-response acceptor
336
:server-protocol protocol
337
;; create the output stream which supports character output for the headers
338
;; with the initial character encoding set to ascii
339
:content-stream (make-instance 'http:output-stream :real-stream output-stream)))
340
(input-stream (make-instance 'http:input-stream :real-stream frame-stream))
341
(tbnl:*request* (ws:make-request acceptor
343
:socket nil ; (tbnl::request-socket request)
344
:headers-in headers-in
345
:content-stream input-stream
349
:server-protocol protocol))
350
(http:*request* tbnl::*request*)
351
(http:*response* tbnl::*reply*)
352
(tbnl::*tmp-files* nil)
353
(tbnl::*session* nil)
354
(dydra:*class.query* 'ws:query))
355
(setf (http:response-request tbnl::*reply*) tbnl::*request*)
356
(setf (http:request-response tbnl::*request*) tbnl::*reply*)
357
(http:respond-to-request acceptor http:*request* http:*response*)
358
(let ((content (de.setf.utility.implementation::vector-stream-vector
359
(chunga:chunked-stream-stream (http:response-content-stream http:*response*)))))
360
;; this needs to hold the acceptor's lock
361
(ws:write-frame (http:response-content-stream acceptor-response)
362
(http:response-media-type http:*response*)
364
(tbnl::acceptor-log-access acceptor :return-code (http:response-status-code http:*response*)
365
:format-control "~:[-~@[ (~A)~]~;~:*~A~@[ (~A)~]~] ~A [~A--~A] ~A \"~A ~A~@[?~A~] ~
366
~A\" ~D ~:[-~;~:*~D~] \"~:[-~;~:*~A~]\"/ws \"~:[-~;~:*~A~]\"~%"))))))))
369
(in-package :hunchensocket)
371
(defgeneric ws::check-websocket-frame (acceptor request frame)
372
(:method ((acceptor ws:acceptor) (request ws:request) frame)
373
(let* ((length (frame-payload-length frame))
376
#'frame-payload-length
377
(ws::request-pending-fragments request))))))
378
(cond ((> length #xffff) ; 65KiB
379
(websocket-error 1009 "Message fragment too big"))
380
((> total #xfffff) ; 1 MiB
381
(websocket-error 1009 "Total message too big"))))))
383
(defun ws::process-websocket-frame (acceptor request response)
385
(#+(or)(hunchensocket::websocket-error (lambda (c)
386
(print (list :seen-as-websocket-rror c))
389
(print (list :seen-as-rror c))
390
;; do not handle it, just log
391
(format *error-output* "http::process-websocket-frame: error in frame processing: [~a] ~a" (type-of c) c)
392
(format *error-output* "~%~a" (tbnl::get-backtrace)))))
393
(with-slots (state pending-fragments pending-opcode) request
395
(loop for count below 3
397
do (handler-case (setf frame (hunchensocket::read-frame (http:request-content-stream request)))
398
(sb-sys:io-timeout (c)
400
(ws:write-frame (http:response-content-stream response) +ping+
401
(map 'vector #'char-code (format nil "ping ~d" count)))))
402
finally (unless frame
403
(http:log-debug "ws:process-websocket-frame: io-timeout: ~a" request)))
406
(with-slots (opcode finp payload-length masking-key) frame
407
(flet ((maybe-accept-data-frame ()
408
(ws::check-websocket-frame acceptor request frame)
409
(read-application-data (http:request-content-stream request) frame)))
411
((eq :awaiting-close state)
412
;; We're waiting a close because we explicitly sent one to the
413
;; client. Error out if the next message is not a close.
415
(unless (eq opcode +connection-close+)
417
1002 "Expected connection close from client, got 0x~x" opcode))
418
(setq state :closed))
420
;; This is a non-FIN fragment Check opcode, append to client's
423
(cond ((and (= opcode +continuation-frame+)
424
(not pending-fragments))
426
1002 "Unexpected continuation frame"))
427
((control-frame-p opcode)
429
1002 "Control frames can't be fragmented"))
430
((and pending-fragments
431
(/= opcode +continuation-frame+))
433
1002 "Not discarding initiated fragment sequence"))
435
;; A data frame, is either initiaing a new fragment sequence
438
(maybe-accept-data-frame)
439
(cond ((= opcode +continuation-frame+)
440
(push frame pending-fragments))
442
(setq pending-opcode opcode
443
pending-fragments (list frame)))))))
444
((and pending-fragments
445
(not (or (control-frame-p opcode)
446
(= opcode +continuation-frame+))))
447
;; This is a FIN fragment and (1) there are pending fragments and (2)
448
;; this isn't a control or continuation frame. Error out.
451
1002 "Only control frames can interleave fragment sequences."))
453
;; This is a final, FIN fragment. So first read the fragment's data
454
;; into the `data' slot.
457
((not (control-frame-p opcode))
458
;; This is either a single-fragment data frame or a continuation
459
;; frame. Join the fragments and keep on processing. Join any
460
;; outstanding fragments and process the message.
462
(maybe-accept-data-frame)
463
(unless pending-opcode
464
(setq pending-opcode opcode))
465
(let* ((ordered-frames
466
(reverse (cons frame pending-fragments)))
467
(body (apply #'concatenate 'vector
470
(setf pending-fragments nil)
472
(ws::process-websocket-message acceptor request response body)
475
1002 (format nil "Websocket message error: ~a" e))))))
477
;; Reply to client-initiated ping with a server-pong with the
479
(ws:write-frame (http:response-content-stream response) +pong+ (frame-data frame)))
480
((eq +connection-close+ opcode)
481
;; Reply to client-initiated close with a server-close with the
484
(ws:write-frame (http:response-content-stream response) +connection-close+ (frame-data frame))
485
(setq state :closed))
487
;; Probably just a heartbeat, don't do anything.
491
1002 "Client sent unknown opcode ~a" opcode))))))))
493
(http:log-info "ws:process-websocket-frame: timeout: ~a" request)
494
(setq state :closed)))))))
497
;;; toplevel with websocket support
499
(in-package :org.datagraph.spocq.server.implementation)
501
(defun ws:main (&rest args &key (init-name (or (getarg "--spocqinit") "init-websockets")) &allow-other-keys)
502
"Provide the main entry point for a service with websocket support:
503
- configure from --spocqinit
504
- initialize spocq runtime to start logs and establish connection to store
507
(when (getarg "--spocqhelp") ;; --help is seen by sbcl
508
(format *trace-output* "~a :~{~% ~a~}~%" (first (spocq.i::command-line-argument-list))
509
(sort spocq.i::*getarg-options* #'string-lessp))
511
(setq spocq.i:*configuration-pathname*
512
(merge-pathnames init-name (make-pathname :directory '(:relative) :type "sxp")))
513
(handler-case (spocq.i:initialize-spocq)
515
(log-error "ws:main: termination due to condition: ~a" condition)
516
(spocq.i::maybe-exit-on-error)))
517
;; avoid first initialization error
518
(handler-case (make-instance 'spocq.i::query :sse-expression () :id "" :repository-id "system/system")
519
(error (c) (warn "initial instantiation error: ~a" c))
520
(:no-error (result) (format t "instantiated: ~a" result)))
521
(apply #'ws:run args))
523
(defun ws:run (&key (request-limit *mqtt-request-limit*)
524
(thread-limit *mqtt-thread-limit*)
525
(request-class nil request-class-supplied-p)
526
(response-class nil response-class-supplied-p)
527
(query-class nil query-class-supplied-p)
528
(host-name (dydra:server-host-name))
529
(host-package (or (find-package host-name)
530
(make-package host-name :use ())))
533
"Initiate the ws service with a background admin process.
535
- bind the response operators
538
(spocq.i:enable-interrupt :sigterm #'spocq.i:sigterm-handler)
539
(unless spocq.i:*start-timestamp*
540
(setq spocq.i:*start-timestamp* (iso-time)))
541
(setq spocq.i:*response-header-types* nil) ; to be sure that no prefixes are sent out
542
#+sbcl(sb-ext:gc :full t)
544
(dydra:log-info "Start Websockets ~a." (iso-time))
546
(when request-class-supplied-p
547
(setq ws:*class.request* request-class))
548
(when response-class-supplied-p
549
(setq ws:*class.response* response-class))
550
(when query-class-supplied-p
551
(setq dydra:*class.query* query-class))
554
(make-instance 'ws:acceptor
557
:name (format nil "~a@~a" "dydra.spocq" (spocq.i::host-name))
558
:request-class request-class
559
:response-class response-class
560
:thread-limit thread-limit
561
:request-limit request-limit
563
(setq *spocq-acceptor* ws:*acceptor*)
564
(import (cons 'spocq.si::propagation-server
565
*response-functions*)
567
(with-package-iterator (next host-package :internal)
568
(loop (multiple-value-bind (symbol-p symbol) (next)
569
(unless symbol-p (return))
570
(export symbol host-package))))
572
(setf (http:acceptor-dispatch-function ws:*acceptor*) host-package)
574
(handler-case (http:start ws:*acceptor*)
576
(dydra:log-warn "Unable to initiate service: ~a" c)
577
(spocq.i::maybe-exit-on-error)
578
(break "Unable to initiate service: ~a" c)))
579
(dydra:log-info "Accepting websockets on ~a." port)
580
(spocq.i:run-processing-threads)
587
;;; http integration via hunchentoot initiation crontrol-flow
589
(defmethod start ((acceptor ws:acceptor))
590
#+(or) ;; no extra thread
591
(setf (ws::acceptor-message-thread acceptor)
592
(bt:make-thread #'(lambda () (ws::send-messages acceptor))
593
:name "ws send-message thread"))
596
(defmethod stop-server ((acceptor ws:acceptor) &key soft)
597
(declare (ignore soft))
600
(bt:destroy-thread (ws::acceptor-message-thread acceptor)))
602
(defmethod start-listening ((acceptor ws:acceptor))
603
;; unchanged from base class
606
(defmethod process-connection ((acceptor ws:acceptor) (socket t))
607
"Given acceptor, a ws:acceptor, and socket, a connection socket,
608
for each request, perform the websockt handshake and protocol switch.
609
then delegate to sebsocke logic for subsequent message handling."
611
(let ((socket-stream (make-socket-stream socket acceptor)))
613
;; process requests until either the acceptor is shut down,
614
;; *CLOSE-HUNCHENTOOT-STREAM* has been set to T by the
615
;; handler, or the peer fails to send a request
616
;; use as the base stream either the original socket stream or, if the connector
617
;; supports ssl, a wrapped stream for ssl support
618
(let* ((acceptor-stream (initialize-connection-stream acceptor socket-stream))
619
(tbnl::*hunchentoot-stream* acceptor-stream)) ; provide the dynamic binding
620
;; establish http condition handlers and an error handler which mapps to internal-error
622
(;; declared conditions are handled according to their report implementation
623
(http:error (lambda (c)
624
(when tbnl::*reply* ;; can happen while request is being parsed
625
(http:send-condition tbnl::*reply* c)
626
;; log the condition as request completion
627
(acceptor-log-access acceptor :return-code (http:response-status-code tbnl::*reply*)))
628
(http:log-error "process-connection: http error in http response: [~a] ~a" (type-of c) c)
629
;;(describe tbnl::*reply*)
630
;;(describe (http:response-content-stream tbnl::*reply*))
631
;;(dotimes (x 100) (write-char #\. (http:response-content-stream tbnl::*reply*)))
632
;;(finish-output (http:response-content-stream tbnl::*reply*))
633
;;(dotimes (x 100) (write-byte (char-code #\,) acceptor-stream))
634
;;(finish-output acceptor-stream)
635
;; (format *trace-output* "sent~%~a~%" c)
636
(return-from tbnl:process-connection
637
(values nil c nil)))))
639
;; establish an additional level to permit a general handler which maps to http:condition
640
(;; at this level decline to handle http:condition, to cause it to pass one level up
641
(http:condition (lambda (c)
643
;; a connection error is suppressed by returning from the connection handler.
644
;; this does not try to continue as any stream's socket
645
(usocket:connection-aborted-error (lambda (c)
646
(http:log-error "process-connection: [~a] ~a" (type-of c) c)
647
(return-from tbnl:process-connection nil)))
648
#+sbcl ;; caused by a broken pipe
649
(sb-int:simple-stream-error (lambda (c)
650
(http:log-error "process-connection: [~a] ~a" (type-of c) c)
651
(return-from tbnl:process-connection nil)))
652
;; while any other error is handled as per acceptor, where the default implementation
653
;; will be to log and re-signal as an http:internal-error, but other mapping are possible
654
;; as well as declining to handle in which the condition is re-signaled as an internal error
656
(http:handle-condition acceptor c)
657
;; if it remains unhandled, then resignal as an internal error
658
(http:log-error "process-connection: unhandled error in http response: [~a] ~a" (type-of c) c)
659
(http:log-error "~a" (get-backtrace))
660
;; re-signal to the acceptor's general handler
661
(http:internal-error "process-connection: unhandled error in http response: [~a] ~a" (type-of c) c))))
664
(let ((tbnl::*close-hunchentoot-stream* t))
665
(when (acceptor-shutdown-p acceptor)
667
(multiple-value-bind (headers-in method url-string protocol)
668
(get-request-data tbnl::*hunchentoot-stream*)
669
;; check if there was a request at all
672
;; bind per-request special variables, then process the
673
;; request - note that *ACCEPTOR* was bound by an aound method
674
(let* ((output-stream (make-instance 'ws:output-stream :real-stream tbnl::*hunchentoot-stream*))
675
(tbnl::*reply* (ws:make-response acceptor
676
:server-protocol protocol
677
;; create the output stream which supports character output for the headers
678
;; with the initial character encoding set to ascii
679
:content-stream output-stream))
680
(input-stream (make-instance 'http:input-stream :real-stream tbnl::*hunchentoot-stream*))
681
(tbnl::*request* (ws:make-request acceptor
683
:headers-in headers-in
684
:content-stream input-stream
687
:server-protocol protocol))
688
(tbnl::*tmp-files* nil)
689
(tbnl::*session* nil)
690
(transfer-encodings (cdr (assoc* :transfer-encoding headers-in))))
691
;; instantiation must follow this order as any errors are recorded as side-effects on the response
692
;; return code, which must be checked...
693
(setf (http:response-request tbnl::*reply*) tbnl::*request*)
694
(setf (http:request-response tbnl::*request*) tbnl::*reply*)
695
(when transfer-encodings
696
(setq transfer-encodings
697
(split "\\s*,\\s*" transfer-encodings))
698
(when (member "chunked" transfer-encodings :test #'equalp)
699
(cond ((acceptor-input-chunking-p acceptor)
700
;; turn chunking on before we read the request body
701
(setf (chunked-stream-input-chunking-p input-stream) t))
702
(t (http:bad-request "Client tried to use chunked encoding, but acceptor is configured to not use it.")))))
703
(if (eql +http-ok+ (return-code tbnl::*reply*))
704
;; if initialization succeeded, process
705
(with-acceptor-request-count-incremented (acceptor)
706
;; at this point thread counts as active wrt eventual soft shutdown (see stop)
707
(catch 'request-processed
708
(http::respond-to-connection acceptor tbnl::*request* tbnl::*reply*)))
709
;; otherwise, report the error
710
(http:error :code (return-code tbnl::*reply*)))
711
;; iff chunking, emit the last chunk and then the terminating chunk
712
(force-output output-stream)
713
;; record content disposition filter in the output strram
714
(when (chunga:chunked-stream-output-chunking-p output-stream)
715
(setf (chunga:chunked-stream-output-chunking-p output-stream) nil))
716
(close output-stream)
717
;;(reset-connection-stream *acceptor* (http:response-content-stream tbnl::*reply*))
718
;; access log message
719
(acceptor-log-access acceptor :return-code (http:response-status-code tbnl::*reply*)))
720
;; synchronize on the underlying stream
721
;; (finish-output acceptor-stream)
722
(when tbnl::*close-hunchentoot-stream*
724
(close acceptor-stream :abort t)
725
(setq socket-stream nil))
727
;; as we are at the end of the request here, we ignore all
728
;; errors that may occur while flushing and/or closing the
730
;; as the socket stream is still bound, an error occurred - do not flush, just close
732
(close socket-stream :abort t))))))
735
;;; adapted from hunchensocket:acceptor-dispatch-request
736
(defmethod http::respond-to-connection ((acceptor ws:acceptor) request response)
737
"Attempt WebSocket connection, else fall back to HTTP"
738
(cond ((and (member "upgrade" (split "\\s*,\\s*" (header-in* :connection))
739
:test #'string-equal)
740
(string-equal "websocket" (header-in* :upgrade)))
741
(hunchensocket::handle-handshake acceptor request response)
742
;; if that returns, the headers have been configured for a handshake reply
743
(let ((stream (http:response-content-stream response))
744
(ws:*request* request)
745
(ws:*response* response))
746
(http:send-headers response)
747
(http::finish-header-output stream)
748
(force-output stream)
749
(catch 'websocket-done
750
(handler-bind ((error #'(lambda (e)
751
(maybe-invoke-debugger e)
752
(log-message* :error "Error: ~a" e)
753
(throw 'websocket-done nil))))
754
(let ((ws:*acceptor* acceptor))
755
(ws::websocket-request-loop acceptor request response))))))
757
(let ((http:*request* tbnl::*request*)
758
(http:*response* tbnl::*reply*))
759
;; Client is not requesting websockets, let Hunchentoot do its HTTP
760
;; thing undisturbed.
761
(http::respond-to-request acceptor request response)))))