Coverage report: /development/source/library/org/datagraph/spocq-shard/src/spocq-server/propagation.lisp

KindCoveredAll%
expression0191 0.0
branch012 0.0
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.server.implementation; -*-
2
 ;;;  Copyright 2019 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved
3
 
4
 (in-package :org.datagraph.spocq.server.implementation)
5
 
6
 (:documentation "rdf propagation protocol"
7
 
8
 "This file is the http layer for a simple propagation server.
9
 It implements propagation by just propagating requests verbatim among connected clients.
10
 It neither resolves conflicts nor limits propagation by subject.
11
 ")
12
 
13
 #|
14
 data propagation
15
 each participating replica must have an active websocket connection.
16
 the initial request on that connection is just a GET with the protocol upgrade.
17
 it includes neither headers nor content.
18
 subsequent websocket requests can be used for control operations.
19
 
20
 head: "content-disposition: replicate=patterns" indicates the patterns for identifiers to be
21
 replicated to the respective ws connection.
22
 
23
 all other gsp should be http with a "content-disposition: replicate=identifiers".
24
 this content is replicated to all clients interested in the given identifiers.
25
 
26
 
27
 the registration process creates an entry pattern-string -> (scanner . connections)
28
 relevance is determined by exhaustive scan of registration entries and scanner match results
29
 if a connection fails, it is removed from the respective list
30
 when new patterns are specified, the old patters are used to flush the connection before additi ig to new scaanners.
31
 
32
 a gsp head wtih 
33
 |#
34
 
35
 ;;; (load "/development/source/library/org/datagraph/spocq/src/spocq-server/propagation.lisp")
36
 
37
 (defparameter *propagation-suppressed-headers* '("authorization" "location"))
38
 (defparameter *propagation-endpoint-name* "propagate")
39
 
40
 
41
 (defclass websocket-resource ()
42
   ((connections
43
     :initform (make-hash-table :test #'equal)
44
     :accessor resource-connections
45
     :documentation "Holds the output streams to propagate content to clients.
46
      Keyed by request identifier for the initial connection request.")
47
    (lock
48
     :initform (bt:make-lock "resource lock")
49
     :reader resource-lock))
50
   (:metaclass spocq.i::persistent-class))
51
 
52
 (defclass |/:account/:repository/disposition| (websocket-resource |/:account/:repository|)
53
   ((mime:mime-type :initform (mime:mime-type "application/json; charset=utf-8")
54
                    :allocation :class))
55
   (:documentation "the disposition control operator for a given repository")
56
   (:metaclass spocq.i::persistent-class))
57
 
58
 (push 'propagation-server *response-functions*)
59
 
60
 (http:def-resource-function propagation-server (resource-id request response)
61
   (:documentation "Handle propagation requests.
62
 - support input as PUT PATCH and POST methods to add content.
63
 - support DELETE to clear.
64
  The first two replace and append the content body, respectively.
65
  A PATCH operation expects a multipart body, of which each part indicates
66
  which operation is to be performed with the part content.
67
  The following headers controls this process
68
 
69
     X-HTTP-Method-Override : specifies the part method
70
     Content-Type : specifies the part content type
71
     Revision : identifies the remote revision. if not present, the revision is local.
72
       this can be present in the request headers only, but not in the section headers.")
73
 
74
   (:log )
75
 
76
   (:auth http:authenticate-request-password)
77
   (:auth http:authenticate-request-token)
78
   (:auth http:authenticate-request-session)
79
   (:auth http:authenticate-request-location)
80
 
81
   (:auth http:authorize-request)
82
   
83
   (:encode mime:application/rdf+json)
84
   (:encode mime:text/turtle)
85
 
86
   (:get ((resource |/ws|) request response request-type response-type)
87
     "Respond to the initial websocket handshake request with nothing"
88
     (http:no-content)
89
     nil)
90
 
91
   #+(or) ;; superseded by /disposition
92
   (:head ((resource :|/|) request response request-type response-type)
93
     (let ((content-disposition (http:request-header request "Content-Disposition"))
94
           (etag (http:request-header request "ETag")))
95
       ;; update propagation specification, if present
96
       (when (and content-disposition ws:*response*)
97
         (multiple-value-bind (mode pattern)
98
                              (parse-replication-disposition content-disposition)
99
           (when (string-equal mode *content-disposition-replicate-mode*)
100
             ;; use a given disposition to register the websocket response stream
101
             (let ((disposition-stream (http:response-content-stream ws:*response*)))
102
               (setf (ws::stream-disposition disposition-stream) pattern)
103
               (when (= (length etag) 48)
104
                 (setf (ws::stream-node-address disposition-stream) (subseq etag 24)))))))))
105
 
106
   (:put ((resource |/:account/:repository/disposition|) request response request-type response-type)
107
         ;; first perform the ldf query
108
     (let ((content-disposition (http:request-header request "Content-Disposition"))
109
           (etag (http:request-header request "ETag")))
110
       ;; update propagation specification, if present
111
       (when (and content-disposition ws:*response*)
112
         (multiple-value-bind (mode pattern)
113
                              (parse-replication-disposition content-disposition)
114
           (when (string-equal mode *content-disposition-replicate-mode*)
115
             ;; use a given disposition to register the websocket response stream
116
             (let ((disposition-stream (http:response-content-stream ws:*response*)))
117
               (setf (ws::stream-disposition disposition-stream) pattern)
118
               (when (= (length etag) 48)
119
                 (setf (ws::stream-node-address disposition-stream) (subseq etag 24)))))))
120
 
121
       (setf (http:response-header response :access-control-allow-origin) "*")
122
       (setf (http:response-header response :access-control-allow-credentials) "*")
123
       (setf (http:response-header response :access-control-allow-headers) "Authorization, Content-Type, X-Requested-With")
124
       (setf (http:response-content-type-header response) response-type)
125
       (setf (http:response-content-length response) 0)
126
       (http:send-headers response)
127
       (http::finish-header-output (http:response-content-stream response))   
128
       nil
129
       ))
130
   #|
131
   (:delete ((resource |/:account/:repository/propagation|) request response request-type response-type)
132
         ;; first perform the ldf query
133
         (register-propagation-connection (resource-repository resource) ws:*request* ws:*response*)
134
         (setf (http:response-header response :access-control-allow-origin) "*")
135
         (setf (http:response-header response :access-control-allow-credentials) "*")
136
         (setf (http:response-header response :access-control-allow-headers) "Authorization, Content-Type, X-Requested-With")
137
         (setf (http:response-content-type-header response) response-type)
138
         (multiple-value-bind (pathname effective-content-type)
139
                              (call-next-method)
140
           (propagation-delete resource request response pathname effective-content-type)
141
           (propagation-propagate resource request response pathname effective-content-type))
142
         nil
143
         )
144
 
145
   (:patch ((resource |/:account/:repository/propagation|) request response (request-type mime:multipart/*) response-type)
146
     ;; handle a patch. process the request stream directly, without passing it through a file.
147
     ;; it is already buffered, compression is not possible and the media type permits no blanket conversion
148
     (register-propagation-connection resource ws:*request* ws:*response*)
149
     (let* ((location (http:request-header request "Location")))
150
       (log-graph-store-service-response resource request location)
151
       ;; operate direct from input stream - likely a websocket vector stream
152
       (setf (http:response-content-type-header response) response-type)
153
       (graph-store-patch-multipart-content resource request response (http:request-content-stream request) request-type)
154
       (propagate-request resource request response (ws::request-content ws:*request*))))
155
 
156
   (:post ((resource |/:account/:repository/propagation|) request response request-type response-type)
157
         ;; first perform the ldf query
158
         (setf (http:response-header response :access-control-allow-origin) "*")
159
         (setf (http:response-header response :access-control-allow-credentials) "*")
160
         (setf (http:response-header response :access-control-allow-headers) "Authorization, Content-Type, X-Requested-With")
161
         (setf (http:response-content-type-header response) response-type)
162
         (multiple-value-bind (pathname effective-content-type)
163
                              (call-next-method)
164
           (propagation-post resource request response pathname effective-content-type)
165
           (propagation-propagate resource request response pathname effective-content-type))
166
         nil
167
         )
168
 
169
   (:put ((resource |/:account/:repository/propagation|) request response request-type response-type)
170
         ;; first perform the ldf query
171
         (setf (http:response-header response :access-control-allow-origin) "*")
172
         (setf (http:response-header response :access-control-allow-credentials) "*")
173
         (setf (http:response-header response :access-control-allow-headers) "Authorization, Content-Type, X-Requested-With")
174
         (setf (http:response-content-type-header response) response-type)
175
         (multiple-value-bind (pathname effective-content-type)
176
                              (call-next-method)
177
           (propagation-put resource request response pathname effective-content-type)
178
           (propagation-propagate resource request response pathname effective-content-type))
179
         nil
180
         )
181
 
182
   (:decode ((resource |/:account/:repository/propagation|) request response (request-type t) (response-type t))
183
            "Perform the base receive of the request data into a file and return the pathname to
184
         be used directly or to be trancoded."
185
     (let* ((repository (resource-repository resource))
186
            (pathname (tmp-import-pathname (dydra:account (dydra:repository-account repository))
187
                                           (dydra:repository repository)))
188
            (content-length (http:request-content-length request))
189
            (import-limit (spocq.e:import-limit)))
190
       (when content-length 
191
         (unless (<= content-length import-limit)
192
           (http:request-entity-too-large "Content exceeds length limit: ~s." import-limit)))
193
       (http:copy-stream (http:request-content-stream request) pathname :length (or content-length import-limit))
194
       (values pathname request-type)))
195
   |#
196
   )
197
 
198
 (defgeneric replicate-patch (resource request response content disposition)
199
   (:documentation "forward the request content and minimal headers to all clients
200
  which are connected to the repository.")
201
   (:method ((resource repository-resource) (request http:request) (response http:response) (source pathname) disposition)
202
     (replicate-patch resource request response (spocq.i::read-file source) disposition))
203
   (:method ((resource repository-resource) (request http:request) (response http:response) (content string) disposition)
204
     (replicate-patch resource request response (map 'vector #'char-code content) disposition))
205
   (:method ((resource repository-resource) (request http:request) (response http:response) (content sequence) disposition)
206
     (let* ((repository (resource-repository resource))
207
            (headers (remove-if #'(lambda (key) (member key *propagation-suppressed-headers* :test #'string-equal))
208
                                (tbnl:headers-in request)
209
                                :key #'first))
210
            (header-string (with-output-to-string (stream) (write-propagation-headers stream repository request headers)))
211
            (frame-content (concatenate 'vector (map 'vector #'char-code header-string) content))
212
            (streams (propagation-disposition-streams disposition)))
213
       (labels ((propagate-to-stream (stream)
214
                  (when (open-stream-p stream)
215
                    (handler-case (progn
216
                                    (ws:write-frame stream (http:request-media-type request) frame-content)
217
                                    stream)
218
                      (error (c) (declare (ignore c)) nil)))))
219
         (let ((failed-streams (loop for stream in streams
220
                                 unless (or ;; need some indicatore to curb reflection (eq stream (http:response-content-stream ws:*response*))
221
                                            (propagate-to-stream stream))
222
                                        collect stream)))
223
           (loop for stream in failed-streams
224
             do (setf (ws::stream-disposition stream) nil))))
225
       frame-content)))
226
 
227
 (defgeneric write-propagation-headers (header-stream repository request headers)
228
   (:method ((destination stream) repository request headers)
229
     (format destination "~a /~A/~A~@[?~{~@[~a=~]~a~^&~}~] HTTP/1.1~C~C"
230
             (http:request-method request)
231
             (repository-id repository)
232
             *propagation-endpoint-name*
233
             (let ((parameters (http:request-query-argument-list request)))
234
                (loop for (name . value) in parameters
235
                         collect name collect value))
236
             #\Return #\Linefeed)
237
     (loop for (key . value) in headers
238
       when value
239
       do (hunchentoot::write-header-line (chunga:as-capitalized-string key) value destination))
240
     (format destination "~C~C" #\Return #\Linefeed)
241
     nil))
242
 
243
 ;;; these are not symmetric in that the reader operates on concrete dispositions
244
 ;;; while the writer expects regex patterns, which it compiles into test functions
245
 
246
 (defun propagation-disposition-streams (disposition)
247
   (when ws:*acceptor* ;; just in case
248
     (bt:with-lock-held ((ws::acceptor-propagation-lock ws:*acceptor*))
249
       (loop for entry being each hash-value in (ws::acceptor-propagation-streams ws:*acceptor*)
250
         for (scanner . matched-streams) = entry
251
         when (funcall scanner disposition 0 (length disposition))
252
         ;; each stream should be in just one set
253
         append matched-streams))))
254
 
255
 (defgeneric acceptor-disposition-streams (acceptor disposition)
256
   (:method ((acceptor ws:acceptor) disposition)
257
     (bt:with-lock-held ((ws::acceptor-propagation-lock acceptor))
258
       (rest (gethash disposition (ws::acceptor-propagation-streams acceptor)))))
259
   (:method ((acceptor tbnl:acceptor) disposition)
260
     ()))
261
 
262
 (defun (setf acceptor-disposition-streams) (streams acceptor disposition)
263
   "associate the stream with the known propagation patterns.
264
  per pattern store the filter once (as all are equivalent scanners)
265
  add the stream to the disposition set"
266
   (bt:with-lock-held ((ws::acceptor-propagation-lock acceptor))
267
     (let* ((table (ws::acceptor-propagation-streams acceptor))
268
            (entry (gethash disposition table)))
269
       (cond (entry
270
              (setf (rest entry) streams))
271
             (t
272
              (let ((scanner (cl-ppcre:create-scanner disposition)))
273
                (setf (gethash disposition table)
274
                      (cons scanner streams)))
275
              streams)))))
276
 
277
 (defmethod (setf ws::stream-disposition) :before (new-disposition stream)
278
   "Associate the stream with the disposition in the acceptor's index"
279
   (let ((old-disposition (ws::stream-disposition stream)))
280
     (when old-disposition
281
       (setf (acceptor-disposition-streams ws:*acceptor* old-disposition)
282
             (remove stream (acceptor-disposition-streams ws:*acceptor* old-disposition))))
283
     (when new-disposition
284
       (push stream (acceptor-disposition-streams ws:*acceptor* new-disposition)))))
285
 
286
 
287
 #|
288
 todo
289
 - buffer entire file in order to be able to push it to other participants
290
   - permit other than nquads for non-multipart
291
 - collect the changeset (+/-)
292
 !!!
293
 - implement client support as triggers on changed graphs/resources
294
 |#
295
 
296
 #|
297
 (defgeneric propagation-delete (resource request response source content-type)
298
   (:method ((resource http:resource) (request http:request) (response http:response) (source t) (content-type mime:rdf))
299
     (propagation-modify resource request source content-type :delete)))
300
 
301
 (defgeneric propagation-post (resource request response source content-type)
302
   (:method ((resource http:resource) (request http:request) (response http:response) (source t) (content-type mime:rdf))
303
     (propagation-modify resource request response source content-type :post)))
304
 
305
 (defgeneric propagation-put (resource request response source content-type)
306
   (:method ((resource http:resource) (request http:request) (response http:response) (source t) (content-type mime:rdf))
307
     (propagation-modify resource request response source content-type :put)))
308
 
309
 (defgeneric propagation-modify (resource request response source content-type method)
310
   (:method ((resource http:resource) (request http:request) (response http:response) (source pathname) (content-type t) (method t))
311
     (with-open-file (stream source :direction :input :element-type :default)
312
       (propagation-modify resource request response stream content-type method)))
313
   
314
   (:method ((resource http:resource) (request http:request) (response http:response) (input-stream stream) (request-type mime:rdf) method)
315
     (let* ((graph-name (or (resource-graph resource) nil))
316
            (repository (resource-repository resource))
317
            (repository-id (dydra:repository-id repository))
318
            (configuration-list (request-configuration-list request))
319
            (parsed-configuration-list (parse-http-configuration configuration-list))
320
            (revision-id (or (http:request-query-argument request "revision-id")
321
                             (http:request-header request "Revision")))
322
            (separator (mime:mime-type-boundary request-type)))
323
       (declare (ignore separator)) ;; not used
324
 
325
       (cond (revision-id ; skip known revisions
326
              (when (rlmdb:get-revision-record repository revision-id)
327
                (return-from propagation-modify nil)))
328
             (t ;; if not remote, then local
329
              (setf revision-id (dydra:make-revision-id))))
330
       (with-http-configuration (list* :repository-id repository-id
331
                                       :task-id (dydra:make-task-id)
332
                                       parsed-configuration-list)
333
         (when (find repository-id spocq.i::*disabled-repositories* :test #'string-equal)
334
           (http:bad-request "The repository has been disabled: ~s." repository-id))
335
         (let ((spocq.i::*repository* repository)
336
               (spocq.i::*repository-id* repository-id)
337
               (input-stream (http:request-content-stream request))
338
               (start (spocq.e:unix-now))
339
               (line-number 0)
340
               (statement-count 0))
341
           ;; in the mode, the revision id is supplied, not generated by the transaction itself
342
           (dydra:with-open-transaction (repository-id :id revision-id :revision-id "HEAD" :normal-disposition :commit)
343
             (let* ((transaction-uuid (spocq.i::transaction-id *transaction*))
344
                    (insert-operation (spocq.i::set-uuid-state (spocq.i::string-to-uuid transaction-uuid (spocq.i::make-uuid-vector)) :insert))
345
                    (delete-operation (spocq.i::set-uuid-state (spocq.i::string-to-uuid transaction-uuid (spocq.i::make-uuid-vector)) :delete))
346
                    (ordinal (rlmdb:get-metadata-ordinal repository)))
347
               (labels ((read-next-line ()
348
                          (when (peek-char t input-stream nil)
349
                            (incf line-number)
350
                            (read-line input-stream)))
351
                        (supply-statements (set-operation accept-quad)
352
                          (funcall set-operation
353
                                   (ecase method
354
                                     ((:post :put) insert-operation)
355
                                     (:delete delete-operation)))
356
                          (loop for line = (read-next-line)
357
                            until (null line)
358
                            do (let ((statement (dydra:parse-nquads-statement line)))
359
                                 (unless statement
360
                                   (error "invalid statement[~d]: ~s" line-number line))
361
                                 (incf statement-count)
362
                                 (if (cdddr statement)
363
                                     (apply accept-quad statement)
364
                                     (apply accept-quad graph-name statement))))))
365
                 (rlmdb::repository-accept-field repository #'supply-statements)
366
                 (rlmdb:put-repository-metadata repository :uuid transaction-uuid
367
                                                :ordinal (1+ ordinal)
368
                                                :end (spocq.e:unix-now)
369
                                                :start start)
370
                 nil))))))))
371
 
372
 (defparameter *propagation-patch.debug* t)
373
 
374
 ;;;!!! this should permit deletion of a graph by specifying an empty section and either a concrete graph identifier
375
 ;;;!!! or an abstract one - all, default, named
376
 
377
 (defgeneric propagation-patch (resource request response source content-type)
378
   (:method ((resource http:resource) (request http:request) (response http:response) (source pathname) (content-type mime:multipart/related))
379
     (with-open-file (stream source :direction :input :element-type :default)
380
       (propagation-patch resource request response stream content-type)))
381
 
382
   (:method ((resource http:resource) (request http:request) (response http:response) (input-stream stream) (content-type mime:multipart/related))
383
     (let* ((original-graph-name (or (resource-graph resource) nil))
384
            (repository (resource-repository resource))
385
            (repository-id (dydra:repository-id repository))
386
            (configuration-list (request-configuration-list request))
387
            (parsed-configuration-list (parse-http-configuration configuration-list))
388
            (revision-id (or (http:request-query-argument request "revision-id")
389
                             (http:request-header request "Revision")))
390
            (separator (mime:mime-type-boundary content-type))
391
            (destination (http:response-content-stream response))
392
            )
393
       (cond (revision-id ; skip known revisions
394
              (when (rlmdb:get-revision-record repository revision-id)
395
                (return-from propagation-patch nil)))
396
             (t ;; if not remote, then local
397
              (setf revision-id (dydra:make-revision-id))))
398
       (with-http-configuration (list* :repository-id repository-id
399
                                       :task-id (dydra:make-task-id)
400
                                       parsed-configuration-list)
401
         (when (find repository-id spocq.i::*disabled-repositories* :test #'string-equal)
402
           (http:bad-request "The repository has been disabled: ~s." repository-id))
403
         (let ((spocq.i::*repository* repository)
404
               (spocq.i::*repository-id* repository-id)
405
               (start (spocq.e:unix-now))
406
               (line-number 0)
407
               (statement-count 0))
408
           ;; Use the supplied revision id as the transaction id, rather than ganerating a new one
409
           ;; in order to place the content at the given position in the revision space.
410
           (dydra:with-open-transaction (repository-id :id revision-id :revision-id "HEAD" :normal-disposition :commit)
411
             (let* ((transaction-uuid (spocq.i::transaction-id *transaction*))
412
                    (insert-operation (spocq.i::set-uuid-state (spocq.i::string-to-uuid transaction-uuid (spocq.i::make-uuid-vector)) :insert))
413
                    (delete-operation (spocq.i::set-uuid-state (spocq.i::string-to-uuid transaction-uuid (spocq.i::make-uuid-vector)) :delete))
414
                    (operation insert-operation)
415
                    (ordinal (rlmdb:get-metadata-ordinal repository))
416
                    (graph-name original-graph-name))
417
               (labels ((read-next-line ()
418
                          (when (peek-char t input-stream nil)
419
                            (incf line-number)
420
                            (read-line input-stream)))
421
                        (read-part-header ()
422
                          (setf graph-name original-graph-name)
423
                          (let* ((CHUNGA:*ACCEPT-BOGUS-EOLS* t)
424
                                 (headers (tbnl::read-http-headers input-stream *trace-output*)))
425
                            (loop for (keyword . value) in headers
426
                              do (incf line-number)
427
                              do (case keyword
428
                                   ;; content type alternatives NYI
429
                                   (:content-type (unless (typep (mime:mime-type value) 'mime:application/n-quads)
430
                                                     (error "invalid content-type[~d]: ~s" line-number value)))
431
                                   (:x-http-method-override (cond ((or (equalp value "put") (equalp value "post"))
432
                                                                   (setf operation insert-operation))
433
                                                                  ((equalp value "delete")
434
                                                                   (setf operation delete-operation))))
435
                                   (:graph (setf graph-name (dydra:intern-iri value)))
436
                                   (t ))))
437
                          ;; blank line
438
                          (incf line-number))
439
                        (supply-statements (set-operation accept-quad)
440
                          (loop for line = (read-next-line)
441
                            until (null line)
442
                            do (cond ((zerop (length line)) )
443
                                     ((string= "--" line :end2 (min 2 (length line)))
444
                                      (if (and (>= (length line) (+ (length separator) 2))
445
                                                   (string-equal separator line
446
                                                            :start2 2
447
                                                            :end2 (+ (length separator) 2)))
448
                                          (when (and (>= (length line) (+ (length separator) 2))
449
                                                     (string= "--" line :start2 (- (length line) 2)))
450
                                            (return))
451
                                          (http:bad-request "Invalid mutipart separator: ~s"
452
                                                            line))
453
                                      (read-part-header)
454
                                      (funcall set-operation operation))
455
                                     (t
456
                                      (let ((statement (dydra:parse-nquads-statement line)))
457
                                        (unless statement
458
                                          (error "invalid statement[~d]: ~s" line-number line))
459
                                        (incf statement-count)
460
                                        (if (cdddr statement)
461
                                            (funcall accept-quad statement)
462
                                            (funcall accept-quad (append statement (list graph-name))))))))))
463
                 (rlmdb::repository-accept-field repository #'supply-statements)
464
                 (rlmdb:put-repository-metadata repository :uuid transaction-uuid
465
                                                :ordinal (1+ ordinal)
466
                                                :end (spocq.e:unix-now)
467
                                                :start start)))))
468
           )
469
       (when *propagation-patch.debug*
470
         (rlmdb::dump-repository repository :verbose t :stream destination))
471
       nil)))
472
 
473
 #|
474
 ;;; classify locations as peer or client
475
 ;;; peers receive full replicated data
476
 ;;; clients should receive just the modications which relate to the resources in data which they have made.
477
 ;;; - sparql results and updates are tracked as for describe and all resources present are flagged.
478
 ;;; - graph store updats and responses are filtered to tag resources in passing 
479
 
480
 
481
 (defmethod propagation-propagate ((resource http:resource) (request http:request) (response t) pathname content-type)
482
   (let* ((repository (resource-repository resource))
483
          (request-location (or (http:request-header request :location)
484
                                (spocq.i::repository-replica-location repository)))
485
          (request-replica (spocq.i::ensure-replica repository request-location)))
486
     (setf (spocq.i::replica-time request-replica) (get-universal-time))
487
     (loop for replica-location being each hash-key of (spocq.i::repository-replicas repository)
488
       using (hash-value replica)
489
       do (unless (eq request-replica replica)
490
            (let* ((replica-authority (spocq.i::authority (spocq.i::parse-url-authority replica-location)))
491
                   (replica-authorization (getf (spocq.i::agent-request-authentication replica-authority)
492
                                                :basic-authentication)))
493
              (flet ((propagate-request (stream)
494
                        (http:copy-stream pathname stream)))
495
                (declare (dynamic-extent #'propagate-request))
496
                (tbnl::call-with-open-request-stream #'propagate-request
497
                       replica-location
498
                       :BASIC-AUTHORIZATION (spocq.i::split-string replica-authorization ":" :strict t)
499
                       :method (http:request-method request)
500
                       :content-type "multipart/related")))))))
501
    |#
502
                
503
 
504
 
505
 ;;; service operations are implemented as side-effects on graph store updates
506
 ;;; each is expressed as a single sparql query which yields solutions and an operator.
507
 ;;; all queries are compiled into an activation graph which is applied to each update field.
508
 ;;; the process must distinguish adds from deletes
509
 #(
510
 ("acl"
511
  "select ?op ?resource ?agent ?mode
512
   where { [] acl:agent ?agent; acl:accessTo ?resource; acl:mode ?mode.
513
           bind acl:addACL ?op
514
   }"
515
  "select ?op ?resource ?agent ?mode
516
   where { [] acl:agent ?agent; acl:accessTo ?resource; acl:mode ?mode.
517
           bind acl:deleteACL ?op
518
   }")
519
 ;;; data source
520
 ;;; +/- source
521
 ;;; +/- schema
522
 ;;; +/- column in schema
523
   ;; 
524
 )
525
 
526
 
527
 
528
 (defmethod graph-store-match-graph-content ((repository spocq.i::lmdb-replicable-repository) stream
529
                                             context subject predicate object
530
                                             &key content-type revision-id content-encoding timeout)
531
   (declare (ignore timeout))
532
   (let ((pattern (spocq.i::make-quad :subject subject :predicate predicate :object object
533
                                      :graph (or context |urn:dydra|:|all|))))
534
     (graph-store-get-graph-content repository revision-id content-type stream
535
                                    :pattern pattern
536
                                    :content-encoding content-encoding)))
537
   
538
 
539
 (defmethod graph-store-get-graph-content ((repository spocq.i::lmdb-replicable-repository) 
540
                                           revision-id content-type stream
541
                                           &rest args
542
                                           &key content-encoding context pattern)
543
   (declare (ignore content-encoding pattern))
544
   (apply #'spocq.i::repository-encode-content repository revision-id content-type stream
545
          :context (or context |urn:dydra|:|all|)
546
          args))
547
 |#
548