Coverage report: /development/source/library/org/datagraph/spocq-shard/src/spocq-server/propagation.lisp
| Kind | Covered | All | % |
| expression | 0 | 191 | 0.0 |
| branch | 0 | 12 | 0.0 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.server.implementation; -*-
2
;;; Copyright 2019 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved
4
(in-package :org.datagraph.spocq.server.implementation)
6
(:documentation "rdf propagation protocol"
8
"This file is the http layer for a simple propagation server.
9
It implements propagation by just propagating requests verbatim among connected clients.
10
It neither resolves conflicts nor limits propagation by subject.
15
each participating replica must have an active websocket connection.
16
the initial request on that connection is just a GET with the protocol upgrade.
17
it includes neither headers nor content.
18
subsequent websocket requests can be used for control operations.
20
head: "content-disposition: replicate=patterns" indicates the patterns for identifiers to be
21
replicated to the respective ws connection.
23
all other gsp should be http with a "content-disposition: replicate=identifiers".
24
this content is replicated to all clients interested in the given identifiers.
27
the registration process creates an entry pattern-string -> (scanner . connections)
28
relevance is determined by exhaustive scan of registration entries and scanner match results
29
if a connection fails, it is removed from the respective list
30
when new patterns are specified, the old patters are used to flush the connection before additi ig to new scaanners.
35
;;; (load "/development/source/library/org/datagraph/spocq/src/spocq-server/propagation.lisp")
37
(defparameter *propagation-suppressed-headers* '("authorization" "location"))
38
(defparameter *propagation-endpoint-name* "propagate")
41
(defclass websocket-resource ()
43
:initform (make-hash-table :test #'equal)
44
:accessor resource-connections
45
:documentation "Holds the output streams to propagate content to clients.
46
Keyed by request identifier for the initial connection request.")
48
:initform (bt:make-lock "resource lock")
49
:reader resource-lock))
50
(:metaclass spocq.i::persistent-class))
52
(defclass |/:account/:repository/disposition| (websocket-resource |/:account/:repository|)
53
((mime:mime-type :initform (mime:mime-type "application/json; charset=utf-8")
55
(:documentation "the disposition control operator for a given repository")
56
(:metaclass spocq.i::persistent-class))
58
(push 'propagation-server *response-functions*)
60
(http:def-resource-function propagation-server (resource-id request response)
61
(:documentation "Handle propagation requests.
62
- support input as PUT PATCH and POST methods to add content.
63
- support DELETE to clear.
64
The first two replace and append the content body, respectively.
65
A PATCH operation expects a multipart body, of which each part indicates
66
which operation is to be performed with the part content.
67
The following headers controls this process
69
X-HTTP-Method-Override : specifies the part method
70
Content-Type : specifies the part content type
71
Revision : identifies the remote revision. if not present, the revision is local.
72
this can be present in the request headers only, but not in the section headers.")
76
(:auth http:authenticate-request-password)
77
(:auth http:authenticate-request-token)
78
(:auth http:authenticate-request-session)
79
(:auth http:authenticate-request-location)
81
(:auth http:authorize-request)
83
(:encode mime:application/rdf+json)
84
(:encode mime:text/turtle)
86
(:get ((resource |/ws|) request response request-type response-type)
87
"Respond to the initial websocket handshake request with nothing"
91
#+(or) ;; superseded by /disposition
92
(:head ((resource :|/|) request response request-type response-type)
93
(let ((content-disposition (http:request-header request "Content-Disposition"))
94
(etag (http:request-header request "ETag")))
95
;; update propagation specification, if present
96
(when (and content-disposition ws:*response*)
97
(multiple-value-bind (mode pattern)
98
(parse-replication-disposition content-disposition)
99
(when (string-equal mode *content-disposition-replicate-mode*)
100
;; use a given disposition to register the websocket response stream
101
(let ((disposition-stream (http:response-content-stream ws:*response*)))
102
(setf (ws::stream-disposition disposition-stream) pattern)
103
(when (= (length etag) 48)
104
(setf (ws::stream-node-address disposition-stream) (subseq etag 24)))))))))
106
(:put ((resource |/:account/:repository/disposition|) request response request-type response-type)
107
;; first perform the ldf query
108
(let ((content-disposition (http:request-header request "Content-Disposition"))
109
(etag (http:request-header request "ETag")))
110
;; update propagation specification, if present
111
(when (and content-disposition ws:*response*)
112
(multiple-value-bind (mode pattern)
113
(parse-replication-disposition content-disposition)
114
(when (string-equal mode *content-disposition-replicate-mode*)
115
;; use a given disposition to register the websocket response stream
116
(let ((disposition-stream (http:response-content-stream ws:*response*)))
117
(setf (ws::stream-disposition disposition-stream) pattern)
118
(when (= (length etag) 48)
119
(setf (ws::stream-node-address disposition-stream) (subseq etag 24)))))))
121
(setf (http:response-header response :access-control-allow-origin) "*")
122
(setf (http:response-header response :access-control-allow-credentials) "*")
123
(setf (http:response-header response :access-control-allow-headers) "Authorization, Content-Type, X-Requested-With")
124
(setf (http:response-content-type-header response) response-type)
125
(setf (http:response-content-length response) 0)
126
(http:send-headers response)
127
(http::finish-header-output (http:response-content-stream response))
131
(:delete ((resource |/:account/:repository/propagation|) request response request-type response-type)
132
;; first perform the ldf query
133
(register-propagation-connection (resource-repository resource) ws:*request* ws:*response*)
134
(setf (http:response-header response :access-control-allow-origin) "*")
135
(setf (http:response-header response :access-control-allow-credentials) "*")
136
(setf (http:response-header response :access-control-allow-headers) "Authorization, Content-Type, X-Requested-With")
137
(setf (http:response-content-type-header response) response-type)
138
(multiple-value-bind (pathname effective-content-type)
140
(propagation-delete resource request response pathname effective-content-type)
141
(propagation-propagate resource request response pathname effective-content-type))
145
(:patch ((resource |/:account/:repository/propagation|) request response (request-type mime:multipart/*) response-type)
146
;; handle a patch. process the request stream directly, without passing it through a file.
147
;; it is already buffered, compression is not possible and the media type permits no blanket conversion
148
(register-propagation-connection resource ws:*request* ws:*response*)
149
(let* ((location (http:request-header request "Location")))
150
(log-graph-store-service-response resource request location)
151
;; operate direct from input stream - likely a websocket vector stream
152
(setf (http:response-content-type-header response) response-type)
153
(graph-store-patch-multipart-content resource request response (http:request-content-stream request) request-type)
154
(propagate-request resource request response (ws::request-content ws:*request*))))
156
(:post ((resource |/:account/:repository/propagation|) request response request-type response-type)
157
;; first perform the ldf query
158
(setf (http:response-header response :access-control-allow-origin) "*")
159
(setf (http:response-header response :access-control-allow-credentials) "*")
160
(setf (http:response-header response :access-control-allow-headers) "Authorization, Content-Type, X-Requested-With")
161
(setf (http:response-content-type-header response) response-type)
162
(multiple-value-bind (pathname effective-content-type)
164
(propagation-post resource request response pathname effective-content-type)
165
(propagation-propagate resource request response pathname effective-content-type))
169
(:put ((resource |/:account/:repository/propagation|) request response request-type response-type)
170
;; first perform the ldf query
171
(setf (http:response-header response :access-control-allow-origin) "*")
172
(setf (http:response-header response :access-control-allow-credentials) "*")
173
(setf (http:response-header response :access-control-allow-headers) "Authorization, Content-Type, X-Requested-With")
174
(setf (http:response-content-type-header response) response-type)
175
(multiple-value-bind (pathname effective-content-type)
177
(propagation-put resource request response pathname effective-content-type)
178
(propagation-propagate resource request response pathname effective-content-type))
182
(:decode ((resource |/:account/:repository/propagation|) request response (request-type t) (response-type t))
183
"Perform the base receive of the request data into a file and return the pathname to
184
be used directly or to be trancoded."
185
(let* ((repository (resource-repository resource))
186
(pathname (tmp-import-pathname (dydra:account (dydra:repository-account repository))
187
(dydra:repository repository)))
188
(content-length (http:request-content-length request))
189
(import-limit (spocq.e:import-limit)))
191
(unless (<= content-length import-limit)
192
(http:request-entity-too-large "Content exceeds length limit: ~s." import-limit)))
193
(http:copy-stream (http:request-content-stream request) pathname :length (or content-length import-limit))
194
(values pathname request-type)))
198
(defgeneric replicate-patch (resource request response content disposition)
199
(:documentation "forward the request content and minimal headers to all clients
200
which are connected to the repository.")
201
(:method ((resource repository-resource) (request http:request) (response http:response) (source pathname) disposition)
202
(replicate-patch resource request response (spocq.i::read-file source) disposition))
203
(:method ((resource repository-resource) (request http:request) (response http:response) (content string) disposition)
204
(replicate-patch resource request response (map 'vector #'char-code content) disposition))
205
(:method ((resource repository-resource) (request http:request) (response http:response) (content sequence) disposition)
206
(let* ((repository (resource-repository resource))
207
(headers (remove-if #'(lambda (key) (member key *propagation-suppressed-headers* :test #'string-equal))
208
(tbnl:headers-in request)
210
(header-string (with-output-to-string (stream) (write-propagation-headers stream repository request headers)))
211
(frame-content (concatenate 'vector (map 'vector #'char-code header-string) content))
212
(streams (propagation-disposition-streams disposition)))
213
(labels ((propagate-to-stream (stream)
214
(when (open-stream-p stream)
216
(ws:write-frame stream (http:request-media-type request) frame-content)
218
(error (c) (declare (ignore c)) nil)))))
219
(let ((failed-streams (loop for stream in streams
220
unless (or ;; need some indicatore to curb reflection (eq stream (http:response-content-stream ws:*response*))
221
(propagate-to-stream stream))
223
(loop for stream in failed-streams
224
do (setf (ws::stream-disposition stream) nil))))
227
(defgeneric write-propagation-headers (header-stream repository request headers)
228
(:method ((destination stream) repository request headers)
229
(format destination "~a /~A/~A~@[?~{~@[~a=~]~a~^&~}~] HTTP/1.1~C~C"
230
(http:request-method request)
231
(repository-id repository)
232
*propagation-endpoint-name*
233
(let ((parameters (http:request-query-argument-list request)))
234
(loop for (name . value) in parameters
235
collect name collect value))
237
(loop for (key . value) in headers
239
do (hunchentoot::write-header-line (chunga:as-capitalized-string key) value destination))
240
(format destination "~C~C" #\Return #\Linefeed)
243
;;; these are not symmetric in that the reader operates on concrete dispositions
244
;;; while the writer expects regex patterns, which it compiles into test functions
246
(defun propagation-disposition-streams (disposition)
247
(when ws:*acceptor* ;; just in case
248
(bt:with-lock-held ((ws::acceptor-propagation-lock ws:*acceptor*))
249
(loop for entry being each hash-value in (ws::acceptor-propagation-streams ws:*acceptor*)
250
for (scanner . matched-streams) = entry
251
when (funcall scanner disposition 0 (length disposition))
252
;; each stream should be in just one set
253
append matched-streams))))
255
(defgeneric acceptor-disposition-streams (acceptor disposition)
256
(:method ((acceptor ws:acceptor) disposition)
257
(bt:with-lock-held ((ws::acceptor-propagation-lock acceptor))
258
(rest (gethash disposition (ws::acceptor-propagation-streams acceptor)))))
259
(:method ((acceptor tbnl:acceptor) disposition)
262
(defun (setf acceptor-disposition-streams) (streams acceptor disposition)
263
"associate the stream with the known propagation patterns.
264
per pattern store the filter once (as all are equivalent scanners)
265
add the stream to the disposition set"
266
(bt:with-lock-held ((ws::acceptor-propagation-lock acceptor))
267
(let* ((table (ws::acceptor-propagation-streams acceptor))
268
(entry (gethash disposition table)))
270
(setf (rest entry) streams))
272
(let ((scanner (cl-ppcre:create-scanner disposition)))
273
(setf (gethash disposition table)
274
(cons scanner streams)))
277
(defmethod (setf ws::stream-disposition) :before (new-disposition stream)
278
"Associate the stream with the disposition in the acceptor's index"
279
(let ((old-disposition (ws::stream-disposition stream)))
280
(when old-disposition
281
(setf (acceptor-disposition-streams ws:*acceptor* old-disposition)
282
(remove stream (acceptor-disposition-streams ws:*acceptor* old-disposition))))
283
(when new-disposition
284
(push stream (acceptor-disposition-streams ws:*acceptor* new-disposition)))))
289
- buffer entire file in order to be able to push it to other participants
290
- permit other than nquads for non-multipart
291
- collect the changeset (+/-)
293
- implement client support as triggers on changed graphs/resources
297
(defgeneric propagation-delete (resource request response source content-type)
298
(:method ((resource http:resource) (request http:request) (response http:response) (source t) (content-type mime:rdf))
299
(propagation-modify resource request source content-type :delete)))
301
(defgeneric propagation-post (resource request response source content-type)
302
(:method ((resource http:resource) (request http:request) (response http:response) (source t) (content-type mime:rdf))
303
(propagation-modify resource request response source content-type :post)))
305
(defgeneric propagation-put (resource request response source content-type)
306
(:method ((resource http:resource) (request http:request) (response http:response) (source t) (content-type mime:rdf))
307
(propagation-modify resource request response source content-type :put)))
309
(defgeneric propagation-modify (resource request response source content-type method)
310
(:method ((resource http:resource) (request http:request) (response http:response) (source pathname) (content-type t) (method t))
311
(with-open-file (stream source :direction :input :element-type :default)
312
(propagation-modify resource request response stream content-type method)))
314
(:method ((resource http:resource) (request http:request) (response http:response) (input-stream stream) (request-type mime:rdf) method)
315
(let* ((graph-name (or (resource-graph resource) nil))
316
(repository (resource-repository resource))
317
(repository-id (dydra:repository-id repository))
318
(configuration-list (request-configuration-list request))
319
(parsed-configuration-list (parse-http-configuration configuration-list))
320
(revision-id (or (http:request-query-argument request "revision-id")
321
(http:request-header request "Revision")))
322
(separator (mime:mime-type-boundary request-type)))
323
(declare (ignore separator)) ;; not used
325
(cond (revision-id ; skip known revisions
326
(when (rlmdb:get-revision-record repository revision-id)
327
(return-from propagation-modify nil)))
328
(t ;; if not remote, then local
329
(setf revision-id (dydra:make-revision-id))))
330
(with-http-configuration (list* :repository-id repository-id
331
:task-id (dydra:make-task-id)
332
parsed-configuration-list)
333
(when (find repository-id spocq.i::*disabled-repositories* :test #'string-equal)
334
(http:bad-request "The repository has been disabled: ~s." repository-id))
335
(let ((spocq.i::*repository* repository)
336
(spocq.i::*repository-id* repository-id)
337
(input-stream (http:request-content-stream request))
338
(start (spocq.e:unix-now))
341
;; in the mode, the revision id is supplied, not generated by the transaction itself
342
(dydra:with-open-transaction (repository-id :id revision-id :revision-id "HEAD" :normal-disposition :commit)
343
(let* ((transaction-uuid (spocq.i::transaction-id *transaction*))
344
(insert-operation (spocq.i::set-uuid-state (spocq.i::string-to-uuid transaction-uuid (spocq.i::make-uuid-vector)) :insert))
345
(delete-operation (spocq.i::set-uuid-state (spocq.i::string-to-uuid transaction-uuid (spocq.i::make-uuid-vector)) :delete))
346
(ordinal (rlmdb:get-metadata-ordinal repository)))
347
(labels ((read-next-line ()
348
(when (peek-char t input-stream nil)
350
(read-line input-stream)))
351
(supply-statements (set-operation accept-quad)
352
(funcall set-operation
354
((:post :put) insert-operation)
355
(:delete delete-operation)))
356
(loop for line = (read-next-line)
358
do (let ((statement (dydra:parse-nquads-statement line)))
360
(error "invalid statement[~d]: ~s" line-number line))
361
(incf statement-count)
362
(if (cdddr statement)
363
(apply accept-quad statement)
364
(apply accept-quad graph-name statement))))))
365
(rlmdb::repository-accept-field repository #'supply-statements)
366
(rlmdb:put-repository-metadata repository :uuid transaction-uuid
367
:ordinal (1+ ordinal)
368
:end (spocq.e:unix-now)
372
(defparameter *propagation-patch.debug* t)
374
;;;!!! this should permit deletion of a graph by specifying an empty section and either a concrete graph identifier
375
;;;!!! or an abstract one - all, default, named
377
(defgeneric propagation-patch (resource request response source content-type)
378
(:method ((resource http:resource) (request http:request) (response http:response) (source pathname) (content-type mime:multipart/related))
379
(with-open-file (stream source :direction :input :element-type :default)
380
(propagation-patch resource request response stream content-type)))
382
(:method ((resource http:resource) (request http:request) (response http:response) (input-stream stream) (content-type mime:multipart/related))
383
(let* ((original-graph-name (or (resource-graph resource) nil))
384
(repository (resource-repository resource))
385
(repository-id (dydra:repository-id repository))
386
(configuration-list (request-configuration-list request))
387
(parsed-configuration-list (parse-http-configuration configuration-list))
388
(revision-id (or (http:request-query-argument request "revision-id")
389
(http:request-header request "Revision")))
390
(separator (mime:mime-type-boundary content-type))
391
(destination (http:response-content-stream response))
393
(cond (revision-id ; skip known revisions
394
(when (rlmdb:get-revision-record repository revision-id)
395
(return-from propagation-patch nil)))
396
(t ;; if not remote, then local
397
(setf revision-id (dydra:make-revision-id))))
398
(with-http-configuration (list* :repository-id repository-id
399
:task-id (dydra:make-task-id)
400
parsed-configuration-list)
401
(when (find repository-id spocq.i::*disabled-repositories* :test #'string-equal)
402
(http:bad-request "The repository has been disabled: ~s." repository-id))
403
(let ((spocq.i::*repository* repository)
404
(spocq.i::*repository-id* repository-id)
405
(start (spocq.e:unix-now))
408
;; Use the supplied revision id as the transaction id, rather than ganerating a new one
409
;; in order to place the content at the given position in the revision space.
410
(dydra:with-open-transaction (repository-id :id revision-id :revision-id "HEAD" :normal-disposition :commit)
411
(let* ((transaction-uuid (spocq.i::transaction-id *transaction*))
412
(insert-operation (spocq.i::set-uuid-state (spocq.i::string-to-uuid transaction-uuid (spocq.i::make-uuid-vector)) :insert))
413
(delete-operation (spocq.i::set-uuid-state (spocq.i::string-to-uuid transaction-uuid (spocq.i::make-uuid-vector)) :delete))
414
(operation insert-operation)
415
(ordinal (rlmdb:get-metadata-ordinal repository))
416
(graph-name original-graph-name))
417
(labels ((read-next-line ()
418
(when (peek-char t input-stream nil)
420
(read-line input-stream)))
422
(setf graph-name original-graph-name)
423
(let* ((CHUNGA:*ACCEPT-BOGUS-EOLS* t)
424
(headers (tbnl::read-http-headers input-stream *trace-output*)))
425
(loop for (keyword . value) in headers
426
do (incf line-number)
428
;; content type alternatives NYI
429
(:content-type (unless (typep (mime:mime-type value) 'mime:application/n-quads)
430
(error "invalid content-type[~d]: ~s" line-number value)))
431
(:x-http-method-override (cond ((or (equalp value "put") (equalp value "post"))
432
(setf operation insert-operation))
433
((equalp value "delete")
434
(setf operation delete-operation))))
435
(:graph (setf graph-name (dydra:intern-iri value)))
439
(supply-statements (set-operation accept-quad)
440
(loop for line = (read-next-line)
442
do (cond ((zerop (length line)) )
443
((string= "--" line :end2 (min 2 (length line)))
444
(if (and (>= (length line) (+ (length separator) 2))
445
(string-equal separator line
447
:end2 (+ (length separator) 2)))
448
(when (and (>= (length line) (+ (length separator) 2))
449
(string= "--" line :start2 (- (length line) 2)))
451
(http:bad-request "Invalid mutipart separator: ~s"
454
(funcall set-operation operation))
456
(let ((statement (dydra:parse-nquads-statement line)))
458
(error "invalid statement[~d]: ~s" line-number line))
459
(incf statement-count)
460
(if (cdddr statement)
461
(funcall accept-quad statement)
462
(funcall accept-quad (append statement (list graph-name))))))))))
463
(rlmdb::repository-accept-field repository #'supply-statements)
464
(rlmdb:put-repository-metadata repository :uuid transaction-uuid
465
:ordinal (1+ ordinal)
466
:end (spocq.e:unix-now)
469
(when *propagation-patch.debug*
470
(rlmdb::dump-repository repository :verbose t :stream destination))
474
;;; classify locations as peer or client
475
;;; peers receive full replicated data
476
;;; clients should receive just the modications which relate to the resources in data which they have made.
477
;;; - sparql results and updates are tracked as for describe and all resources present are flagged.
478
;;; - graph store updats and responses are filtered to tag resources in passing
481
(defmethod propagation-propagate ((resource http:resource) (request http:request) (response t) pathname content-type)
482
(let* ((repository (resource-repository resource))
483
(request-location (or (http:request-header request :location)
484
(spocq.i::repository-replica-location repository)))
485
(request-replica (spocq.i::ensure-replica repository request-location)))
486
(setf (spocq.i::replica-time request-replica) (get-universal-time))
487
(loop for replica-location being each hash-key of (spocq.i::repository-replicas repository)
488
using (hash-value replica)
489
do (unless (eq request-replica replica)
490
(let* ((replica-authority (spocq.i::authority (spocq.i::parse-url-authority replica-location)))
491
(replica-authorization (getf (spocq.i::agent-request-authentication replica-authority)
492
:basic-authentication)))
493
(flet ((propagate-request (stream)
494
(http:copy-stream pathname stream)))
495
(declare (dynamic-extent #'propagate-request))
496
(tbnl::call-with-open-request-stream #'propagate-request
498
:BASIC-AUTHORIZATION (spocq.i::split-string replica-authorization ":" :strict t)
499
:method (http:request-method request)
500
:content-type "multipart/related")))))))
505
;;; service operations are implemented as side-effects on graph store updates
506
;;; each is expressed as a single sparql query which yields solutions and an operator.
507
;;; all queries are compiled into an activation graph which is applied to each update field.
508
;;; the process must distinguish adds from deletes
511
"select ?op ?resource ?agent ?mode
512
where { [] acl:agent ?agent; acl:accessTo ?resource; acl:mode ?mode.
515
"select ?op ?resource ?agent ?mode
516
where { [] acl:agent ?agent; acl:accessTo ?resource; acl:mode ?mode.
517
bind acl:deleteACL ?op
522
;;; +/- column in schema
528
(defmethod graph-store-match-graph-content ((repository spocq.i::lmdb-replicable-repository) stream
529
context subject predicate object
530
&key content-type revision-id content-encoding timeout)
531
(declare (ignore timeout))
532
(let ((pattern (spocq.i::make-quad :subject subject :predicate predicate :object object
533
:graph (or context |urn:dydra|:|all|))))
534
(graph-store-get-graph-content repository revision-id content-type stream
536
:content-encoding content-encoding)))
539
(defmethod graph-store-get-graph-content ((repository spocq.i::lmdb-replicable-repository)
540
revision-id content-type stream
542
&key content-encoding context pattern)
543
(declare (ignore content-encoding pattern))
544
(apply #'spocq.i::repository-encode-content repository revision-id content-type stream
545
:context (or context |urn:dydra|:|all|)