Coverage report: /development/source/library/org/datagraph/spocq-shard/src/spocq-server/replication.lisp

KindCoveredAll%
expression0432 0.0
branch020 0.0
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.server.implementation; -*-
2
 ;;;  Copyright 2018 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved
3
 
4
 (in-package :org.datagraph.spocq.server.implementation)
5
 
6
 (:documentation "rdf replication protocol"
7
 
8
 "This file is the http layer for a crdt-based replication server.
9
 ")
10
 
11
 ;;; (load "/development/source/library/org/datagraph/spocq/src/spoca-server/replication.lisp")
12
 
13
 
14
 (defclass |/:account/:repository/replication| (|/:account/:repository|)
15
   ((mime:mime-type :initform (mime:mime-type "application/json; charset=utf-8")
16
                    :allocation :class))
17
   (:documentation "the replication endpoint for a given repository")
18
   (:metaclass spocq.i::persistent-class))
19
 
20
 (push 'replication-server *response-functions*)
21
 
22
 (http:def-resource-function replication-server (resource-id request response)
23
   (:documentation "Handle replication patch requests.
24
 - support input as PUT PATCH and POST methods to add content.
25
 - support DELETE to clear.
26
  The first two replace and append the content body, respectively.
27
  A PATCH operation expects a multipart body, of which each part indicates
28
  which operation is to be performed with the part content.
29
  The following headers controls this process
30
 
31
     X-HTTP-Method-Override : specifies the part method
32
     Content-Type : specifies the part content type
33
     Revision : identifies the remote revision. if not present, the revision is local.
34
       this can be present in the request headers only, but not in the section headers.")
35
 
36
   (:log )
37
 
38
   (:auth http:authenticate-request-password)
39
   (:auth http:authenticate-request-token)
40
   (:auth http:authenticate-request-session)
41
   (:auth http:authenticate-request-location)
42
 
43
   (:auth http:authorize-request)
44
   
45
   (:encode mime:application/rdf+json)
46
   (:encode mime:text/turtle)
47
 
48
   (:head ((resource |/:account/:repository/replication|) request response request-type response-type)
49
         (setf (http:response-header response :access-control-allow-origin) "*")
50
         (setf (http:response-header response :access-control-allow-credentials) "*")
51
         (setf (http:response-header response :access-control-allow-headers) "Authorization, Content-Type, X-Requested-With")
52
         (setf (http:response-content-type-header response) response-type)
53
         (let* ((repository (resource-repository resource))
54
                (request-location (http:request-header request :location)))
55
           (when request-location
56
             (let ((request-replica (spocq.i::ensure-replica repository request-location)))
57
               (setf (spocq.i::replica-time request-replica) (get-universal-time)))))
58
    
59
         nil
60
         )
61
 
62
   (:delete ((resource |/:account/:repository/replication|) request response request-type response-type)
63
         (setf (http:response-header response :access-control-allow-origin) "*")
64
         (setf (http:response-header response :access-control-allow-credentials) "*")
65
         (setf (http:response-header response :access-control-allow-headers) "Authorization, Content-Type, X-Requested-With")
66
         (setf (http:response-content-type-header response) response-type)
67
         (multiple-value-bind (pathname effective-content-type)
68
                              (call-next-method)
69
           (replication-delete resource request response pathname effective-content-type)
70
           (replication-propagate resource request response pathname effective-content-type))
71
         nil
72
         )
73
 
74
   (:patch ((resource |/:account/:repository/replication|) request response request-type response-type)
75
         (setf (http:response-header response :access-control-allow-origin) "*")
76
         (setf (http:response-header response :access-control-allow-credentials) "*")
77
         (setf (http:response-header response :access-control-allow-headers) "Authorization, Content-Type, X-Requested-With")
78
         (setf (http:response-content-type-header response) response-type)
79
         (multiple-value-bind (pathname effective-content-type)
80
                              (call-next-method)
81
           (replication-patch resource request response pathname effective-content-type)
82
           (replication-propagate resource request response pathname effective-content-type)
83
           nil
84
           ))
85
 
86
   (:post ((resource |/:account/:repository/replication|) request response request-type response-type)
87
         (setf (http:response-header response :access-control-allow-origin) "*")
88
         (setf (http:response-header response :access-control-allow-credentials) "*")
89
         (setf (http:response-header response :access-control-allow-headers) "Authorization, Content-Type, X-Requested-With")
90
         (setf (http:response-content-type-header response) response-type)
91
         (multiple-value-bind (pathname effective-content-type)
92
                              (call-next-method)
93
           (replication-post resource request response pathname effective-content-type)
94
           (replication-propagate resource request response pathname effective-content-type))
95
         nil
96
         )
97
 
98
   (:put ((resource |/:account/:repository/replication|) request response request-type response-type)
99
         (setf (http:response-header response :access-control-allow-origin) "*")
100
         (setf (http:response-header response :access-control-allow-credentials) "*")
101
         (setf (http:response-header response :access-control-allow-headers) "Authorization, Content-Type, X-Requested-With")
102
         (setf (http:response-content-type-header response) response-type)
103
         (multiple-value-bind (pathname effective-content-type)
104
                              (call-next-method)
105
           (replication-put resource request response pathname effective-content-type)
106
           (replication-propagate resource request response pathname effective-content-type))
107
         nil
108
         )
109
 
110
   (:decode ((resource |/:account/:repository/replication|) request response (request-type t) (response-type t))
111
            "Perform the base receive of the request data into a file and return the pathname to
112
         be used directly or to be trancoded."
113
     (let* ((repository (resource-repository resource))
114
            (pathname (tmp-import-pathname (dydra:account (dydra:repository-account repository))
115
                                           (dydra:repository repository)))
116
            (content-length (http:request-content-length request))
117
            (import-limit (spocq.e:import-limit)))
118
       (when content-length 
119
         (unless (<= content-length import-limit)
120
           (http:request-entity-too-large "Content exceeds length limit: ~s." import-limit)))
121
       (http:copy-stream (http:request-content-stream request) pathname :length (or content-length import-limit))
122
       (values pathname request-type)))
123
   )
124
 
125
 
126
 #|
127
 todo
128
 - buffer entire file in order to be able to push it to other participants
129
   - permit other than nquads for non-multipart
130
 - collect the changeset (+/-)
131
 !!!
132
 - implement client support as triggers on changed graphs/resources
133
 |#
134
 (defgeneric replication-delete (resource request response source content-type)
135
   (:method ((resource http:resource) (request http:request) (response http:response) (source t) (content-type mime:rdf))
136
     (replication-modify resource request source content-type :delete)))
137
 
138
 (defgeneric replication-post (resource request response source content-type)
139
   (:method ((resource http:resource) (request http:request) (response http:response) (source t) (content-type mime:rdf))
140
     (replication-modify resource request response source content-type :post)))
141
 
142
 (defgeneric replication-put (resource request response source content-type)
143
   (:method ((resource http:resource) (request http:request) (response http:response) (source t) (content-type mime:rdf))
144
     (replication-modify resource request response source content-type :put)))
145
 
146
 (defgeneric replication-modify (resource request response source content-type method)
147
   (:documentation "the abstract modify operator implements those methods which involve a single
148
    dataset, which is a set of quads. this covers delete, post and put.")
149
 
150
   (:method ((resource http:resource) (request http:request) (response http:response) (source pathname) (content-type t) (method t))
151
     (with-open-file (stream source :direction :input :element-type :default)
152
       (replication-modify resource request response stream content-type method)))
153
   
154
   (:method ((resource http:resource) (request http:request) (response http:response) (input-stream stream) (request-type mime:rdf) method)
155
     (let* ((graph-name (or (resource-graph resource) nil))
156
            (repository (resource-repository resource))
157
            (repository-id (dydra:repository-id repository))
158
            (configuration-list (request-configuration-list request))
159
            (parsed-configuration-list (parse-http-configuration configuration-list))
160
            (revision-id (or (http:request-query-argument request "revision-id")
161
                             (http:request-header request "Revision")))
162
            (separator (mime:mime-type-boundary request-type)))
163
       (declare (ignore separator)) ;; not used
164
 
165
       (cond (revision-id ; skip known revisions
166
              (when (rlmdb:get-revision-record repository revision-id)
167
                (return-from replication-modify nil)))
168
             (t ;; if not remote, then local
169
              (setf revision-id (dydra:make-revision-id))))
170
       (with-http-configuration (list* :repository-id repository-id
171
                                       :task-id (dydra:make-task-id)
172
                                       parsed-configuration-list)
173
         (when (find repository-id spocq.i::*disabled-repositories* :test #'string-equal)
174
           (http:bad-request "The repository has been disabled: ~s." repository-id))
175
         (let ((spocq.i::*repository* repository)
176
               (spocq.i::*repository-id* repository-id)
177
               (input-stream (http:request-content-stream request))
178
               (start (spocq.e:unix-now))
179
               (line-number 0)
180
               (statement-count 0))
181
           ;; in the mode, the revision id is supplied, not generated by the transaction itself
182
           (dydra:with-open-transaction (repository-id :id revision-id :revision-id "HEAD" :normal-disposition :commit)
183
             (let* ((transaction-uuid (spocq.i::transaction-id *transaction*))
184
                    (insert-operation (spocq.i::set-uuid-state (spocq.i::string-to-uuid transaction-uuid (spocq.i::make-uuid-vector)) :insert))
185
                    (delete-operation (spocq.i::set-uuid-state (spocq.i::string-to-uuid transaction-uuid (spocq.i::make-uuid-vector)) :delete))
186
                    (ordinal (rlmdb:get-metadata-ordinal repository)))
187
               (labels ((read-next-line ()
188
                          (when (peek-char t input-stream nil)
189
                            (incf line-number)
190
                            (read-line input-stream)))
191
                        (supply-statements (set-operation accept-quad)
192
                          (funcall set-operation
193
                                   (ecase method
194
                                     ((:post :put) insert-operation)
195
                                     (:delete delete-operation)))
196
                          (loop for line = (read-next-line)
197
                            until (null line)
198
                            do (let ((statement (dydra:parse-nquads-statement line)))
199
                                 (unless statement
200
                                   (error "invalid statement[~d]: ~s" line-number line))
201
                                 (incf statement-count)
202
                                 (if (cdddr statement)
203
                                     (apply accept-quad statement)
204
                                     (apply accept-quad graph-name statement))))))
205
                 (rlmdb::repository-accept-field repository #'supply-statements)
206
                 (rlmdb:put-repository-metadata repository :uuid transaction-uuid
207
                                                :ordinal (1+ ordinal)
208
                                                :end (spocq.e:unix-now)
209
                                                :start start)
210
                 nil))))))))
211
 
212
 (defparameter *replication-patch.debug* t)
213
 
214
 ;;;!!! this should permit deletion of a graph by specifying an empty section and either a concrete graph identifier
215
 ;;;!!! or an abstract one - all, default, named
216
 
217
 (defgeneric replication-patch (resource request response source content-type)
218
   (:documentation "the patch method accepts mutipart mime documentation with per-section methods
219
    and incorporates those in incremental delete/post/put operations.")
220
 
221
   (:method ((resource http:resource) (request http:request) (response http:response) (source pathname) (content-type mime:multipart/related))
222
     (with-open-file (stream source :direction :input :element-type :default)
223
       (replication-patch resource request response stream content-type)))
224
 
225
   (:method ((resource http:resource) (request http:request) (response http:response) (input-stream stream) (content-type mime:multipart/related))
226
     (let* ((original-graph-name (or (resource-graph resource) nil))
227
            (repository (resource-repository resource))
228
            (repository-id (dydra:repository-id repository))
229
            (configuration-list (request-configuration-list request))
230
            (parsed-configuration-list (parse-http-configuration configuration-list))
231
            (revision-id (or (http:request-query-argument request "revision-id")
232
                             (http:request-header request "Revision")))
233
            (destination (http:response-content-stream response))
234
            )
235
       (cond (revision-id ; skip known revisions
236
              (when (rlmdb:get-revision-record repository revision-id)
237
                (return-from replication-patch nil)))
238
             (t ;; if not remote, then local
239
              (setf revision-id (dydra:make-revision-id))))
240
       (with-http-configuration (list* :repository-id repository-id
241
                                       :task-id (dydra:make-task-id)
242
                                       parsed-configuration-list)
243
         (unless (dydra:repository-exists-p repository)
244
           (http:not-found "Repository not found: ~s." repository-id))
245
         (when (find repository-id spocq.i::*disabled-repositories* :test #'string-equal)
246
           (http:bad-request "The repository has been disabled: ~s." repository-id))
247
         (multiple-value-prog1 (repository-patch-multipart-content repository
248
                                                                   input-stream
249
                                                                   :context original-graph-name
250
                                                                   :content-type content-type
251
                                                                   :request request
252
                                                                   :client-request-id (request-client-request-id request))
253
           (when *replication-patch.debug*
254
             (rlmdb::dump-repository repository :verbose t :stream destination)))))))
255
 
256
 ;;; classify locations as peer or client
257
 ;;; peers receive full replicated data
258
 ;;; clients should receive just the modications which relate to the resources in data which they have made.
259
 ;;; - sparql results and updates are tracked as for describe and all resources present are flagged.
260
 ;;; - graph store updats and responses are filtered to tag resources in passing 
261
 
262
 (defmethod replication-propagate ((resource http:resource) (request http:request) (response t) pathname content-type)
263
   (let* ((repository (resource-repository resource))
264
          (request-location (or (http:request-header request :location)
265
                                (spocq.i::repository-replica-location repository)))
266
          (request-replica (spocq.i::ensure-replica repository request-location)))
267
     (setf (spocq.i::replica-time request-replica) (get-universal-time))
268
     (loop for replica-location being each hash-key of (spocq.i::repository-replicas repository)
269
       using (hash-value replica)
270
       do (unless (eq request-replica replica)
271
            (let* ((replica-authority (spocq.i::authority (spocq.i::parse-url-authority replica-location)))
272
                   (replica-authorization (getf (spocq.i::agent-request-authentication replica-authority)
273
                                                :basic-authentication)))
274
              (flet ((propagate-request (stream)
275
                        (http:copy-stream pathname stream)))
276
                (declare (dynamic-extent #'propagate-request))
277
                (tbnl::call-with-open-request-stream #'propagate-request
278
                       replica-location
279
                       :BASIC-AUTHORIZATION (spocq.i::split-string replica-authorization ":" :strict t)
280
                       :method (http:request-method request)
281
                       :content-type "multipart/related")))))))
282
    
283
                
284
 
285
 
286
 ;;; service operations are implemented as side-effects on graph store updates
287
 ;;; each is expressed as a single sparql query which yields solutions and an operator.
288
 ;;; all queries are compiled into an activation graph which is applied to each update field.
289
 ;;; the process must distinguish adds from deletes
290
 #(
291
 ("acl"
292
  "select ?op ?resource ?agent ?mode
293
   where { [] acl:agent ?agent; acl:accessTo ?resource; acl:mode ?mode.
294
           bind acl:addACL ?op
295
   }"
296
  "select ?op ?resource ?agent ?mode
297
   where { [] acl:agent ?agent; acl:accessTo ?resource; acl:mode ?mode.
298
           bind acl:deleteACL ?op
299
   }")
300
 ;;; data source
301
 ;;; +/- source
302
 ;;; +/- schema
303
 ;;; +/- column in schema
304
   ;; 
305
 )
306
 
307
 (defmethod graph-store-match-graph-content ((repository spocq.i::lmdb-replicable-repository) stream
308
                                             context subject predicate object
309
                                             &key content-type revision-id content-encoding timeout)
310
   (declare (ignore timeout))
311
   (let ((pattern (spocq.i::make-quad :subject subject :predicate predicate :object object
312
                                      :graph (or context |urn:dydra|:|all|))))
313
     (graph-store-get-graph-content repository revision-id content-type stream
314
                                    :pattern pattern
315
                                    :content-encoding content-encoding)))
316
   
317
 
318
 (defmethod graph-store-get-graph-content ((repository spocq.i::lmdb-replicable-repository) 
319
                                           revision-id content-type stream
320
                                           &rest args
321
                                           &key content-encoding context pattern)
322
   (declare (ignore content-encoding pattern))
323
   (apply #'spocq.i::repository-encode-content repository revision-id content-type stream
324
          :context (or context |urn:dydra|:|all|)
325
          args))
326
 
327
 
328