Coverage report: /development/source/library/org/datagraph/spocq-shard/src/spocq-server/replication.lisp
| Kind | Covered | All | % |
| expression | 0 | 432 | 0.0 |
| branch | 0 | 20 | 0.0 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.server.implementation; -*-
2
;;; Copyright 2018 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved
4
(in-package :org.datagraph.spocq.server.implementation)
6
(:documentation "rdf replication protocol"
8
"This file is the http layer for a crdt-based replication server.
11
;;; (load "/development/source/library/org/datagraph/spocq/src/spoca-server/replication.lisp")
14
(defclass |/:account/:repository/replication| (|/:account/:repository|)
15
((mime:mime-type :initform (mime:mime-type "application/json; charset=utf-8")
17
(:documentation "the replication endpoint for a given repository")
18
(:metaclass spocq.i::persistent-class))
20
(push 'replication-server *response-functions*)
22
(http:def-resource-function replication-server (resource-id request response)
23
(:documentation "Handle replication patch requests.
24
- support input as PUT PATCH and POST methods to add content.
25
- support DELETE to clear.
26
The first two replace and append the content body, respectively.
27
A PATCH operation expects a multipart body, of which each part indicates
28
which operation is to be performed with the part content.
29
The following headers controls this process
31
X-HTTP-Method-Override : specifies the part method
32
Content-Type : specifies the part content type
33
Revision : identifies the remote revision. if not present, the revision is local.
34
this can be present in the request headers only, but not in the section headers.")
38
(:auth http:authenticate-request-password)
39
(:auth http:authenticate-request-token)
40
(:auth http:authenticate-request-session)
41
(:auth http:authenticate-request-location)
43
(:auth http:authorize-request)
45
(:encode mime:application/rdf+json)
46
(:encode mime:text/turtle)
48
(:head ((resource |/:account/:repository/replication|) request response request-type response-type)
49
(setf (http:response-header response :access-control-allow-origin) "*")
50
(setf (http:response-header response :access-control-allow-credentials) "*")
51
(setf (http:response-header response :access-control-allow-headers) "Authorization, Content-Type, X-Requested-With")
52
(setf (http:response-content-type-header response) response-type)
53
(let* ((repository (resource-repository resource))
54
(request-location (http:request-header request :location)))
55
(when request-location
56
(let ((request-replica (spocq.i::ensure-replica repository request-location)))
57
(setf (spocq.i::replica-time request-replica) (get-universal-time)))))
62
(:delete ((resource |/:account/:repository/replication|) request response request-type response-type)
63
(setf (http:response-header response :access-control-allow-origin) "*")
64
(setf (http:response-header response :access-control-allow-credentials) "*")
65
(setf (http:response-header response :access-control-allow-headers) "Authorization, Content-Type, X-Requested-With")
66
(setf (http:response-content-type-header response) response-type)
67
(multiple-value-bind (pathname effective-content-type)
69
(replication-delete resource request response pathname effective-content-type)
70
(replication-propagate resource request response pathname effective-content-type))
74
(:patch ((resource |/:account/:repository/replication|) request response request-type response-type)
75
(setf (http:response-header response :access-control-allow-origin) "*")
76
(setf (http:response-header response :access-control-allow-credentials) "*")
77
(setf (http:response-header response :access-control-allow-headers) "Authorization, Content-Type, X-Requested-With")
78
(setf (http:response-content-type-header response) response-type)
79
(multiple-value-bind (pathname effective-content-type)
81
(replication-patch resource request response pathname effective-content-type)
82
(replication-propagate resource request response pathname effective-content-type)
86
(:post ((resource |/:account/:repository/replication|) request response request-type response-type)
87
(setf (http:response-header response :access-control-allow-origin) "*")
88
(setf (http:response-header response :access-control-allow-credentials) "*")
89
(setf (http:response-header response :access-control-allow-headers) "Authorization, Content-Type, X-Requested-With")
90
(setf (http:response-content-type-header response) response-type)
91
(multiple-value-bind (pathname effective-content-type)
93
(replication-post resource request response pathname effective-content-type)
94
(replication-propagate resource request response pathname effective-content-type))
98
(:put ((resource |/:account/:repository/replication|) request response request-type response-type)
99
(setf (http:response-header response :access-control-allow-origin) "*")
100
(setf (http:response-header response :access-control-allow-credentials) "*")
101
(setf (http:response-header response :access-control-allow-headers) "Authorization, Content-Type, X-Requested-With")
102
(setf (http:response-content-type-header response) response-type)
103
(multiple-value-bind (pathname effective-content-type)
105
(replication-put resource request response pathname effective-content-type)
106
(replication-propagate resource request response pathname effective-content-type))
110
(:decode ((resource |/:account/:repository/replication|) request response (request-type t) (response-type t))
111
"Perform the base receive of the request data into a file and return the pathname to
112
be used directly or to be trancoded."
113
(let* ((repository (resource-repository resource))
114
(pathname (tmp-import-pathname (dydra:account (dydra:repository-account repository))
115
(dydra:repository repository)))
116
(content-length (http:request-content-length request))
117
(import-limit (spocq.e:import-limit)))
119
(unless (<= content-length import-limit)
120
(http:request-entity-too-large "Content exceeds length limit: ~s." import-limit)))
121
(http:copy-stream (http:request-content-stream request) pathname :length (or content-length import-limit))
122
(values pathname request-type)))
128
- buffer entire file in order to be able to push it to other participants
129
- permit other than nquads for non-multipart
130
- collect the changeset (+/-)
132
- implement client support as triggers on changed graphs/resources
134
(defgeneric replication-delete (resource request response source content-type)
135
(:method ((resource http:resource) (request http:request) (response http:response) (source t) (content-type mime:rdf))
136
(replication-modify resource request source content-type :delete)))
138
(defgeneric replication-post (resource request response source content-type)
139
(:method ((resource http:resource) (request http:request) (response http:response) (source t) (content-type mime:rdf))
140
(replication-modify resource request response source content-type :post)))
142
(defgeneric replication-put (resource request response source content-type)
143
(:method ((resource http:resource) (request http:request) (response http:response) (source t) (content-type mime:rdf))
144
(replication-modify resource request response source content-type :put)))
146
(defgeneric replication-modify (resource request response source content-type method)
147
(:documentation "the abstract modify operator implements those methods which involve a single
148
dataset, which is a set of quads. this covers delete, post and put.")
150
(:method ((resource http:resource) (request http:request) (response http:response) (source pathname) (content-type t) (method t))
151
(with-open-file (stream source :direction :input :element-type :default)
152
(replication-modify resource request response stream content-type method)))
154
(:method ((resource http:resource) (request http:request) (response http:response) (input-stream stream) (request-type mime:rdf) method)
155
(let* ((graph-name (or (resource-graph resource) nil))
156
(repository (resource-repository resource))
157
(repository-id (dydra:repository-id repository))
158
(configuration-list (request-configuration-list request))
159
(parsed-configuration-list (parse-http-configuration configuration-list))
160
(revision-id (or (http:request-query-argument request "revision-id")
161
(http:request-header request "Revision")))
162
(separator (mime:mime-type-boundary request-type)))
163
(declare (ignore separator)) ;; not used
165
(cond (revision-id ; skip known revisions
166
(when (rlmdb:get-revision-record repository revision-id)
167
(return-from replication-modify nil)))
168
(t ;; if not remote, then local
169
(setf revision-id (dydra:make-revision-id))))
170
(with-http-configuration (list* :repository-id repository-id
171
:task-id (dydra:make-task-id)
172
parsed-configuration-list)
173
(when (find repository-id spocq.i::*disabled-repositories* :test #'string-equal)
174
(http:bad-request "The repository has been disabled: ~s." repository-id))
175
(let ((spocq.i::*repository* repository)
176
(spocq.i::*repository-id* repository-id)
177
(input-stream (http:request-content-stream request))
178
(start (spocq.e:unix-now))
181
;; in the mode, the revision id is supplied, not generated by the transaction itself
182
(dydra:with-open-transaction (repository-id :id revision-id :revision-id "HEAD" :normal-disposition :commit)
183
(let* ((transaction-uuid (spocq.i::transaction-id *transaction*))
184
(insert-operation (spocq.i::set-uuid-state (spocq.i::string-to-uuid transaction-uuid (spocq.i::make-uuid-vector)) :insert))
185
(delete-operation (spocq.i::set-uuid-state (spocq.i::string-to-uuid transaction-uuid (spocq.i::make-uuid-vector)) :delete))
186
(ordinal (rlmdb:get-metadata-ordinal repository)))
187
(labels ((read-next-line ()
188
(when (peek-char t input-stream nil)
190
(read-line input-stream)))
191
(supply-statements (set-operation accept-quad)
192
(funcall set-operation
194
((:post :put) insert-operation)
195
(:delete delete-operation)))
196
(loop for line = (read-next-line)
198
do (let ((statement (dydra:parse-nquads-statement line)))
200
(error "invalid statement[~d]: ~s" line-number line))
201
(incf statement-count)
202
(if (cdddr statement)
203
(apply accept-quad statement)
204
(apply accept-quad graph-name statement))))))
205
(rlmdb::repository-accept-field repository #'supply-statements)
206
(rlmdb:put-repository-metadata repository :uuid transaction-uuid
207
:ordinal (1+ ordinal)
208
:end (spocq.e:unix-now)
212
(defparameter *replication-patch.debug* t)
214
;;;!!! this should permit deletion of a graph by specifying an empty section and either a concrete graph identifier
215
;;;!!! or an abstract one - all, default, named
217
(defgeneric replication-patch (resource request response source content-type)
218
(:documentation "the patch method accepts mutipart mime documentation with per-section methods
219
and incorporates those in incremental delete/post/put operations.")
221
(:method ((resource http:resource) (request http:request) (response http:response) (source pathname) (content-type mime:multipart/related))
222
(with-open-file (stream source :direction :input :element-type :default)
223
(replication-patch resource request response stream content-type)))
225
(:method ((resource http:resource) (request http:request) (response http:response) (input-stream stream) (content-type mime:multipart/related))
226
(let* ((original-graph-name (or (resource-graph resource) nil))
227
(repository (resource-repository resource))
228
(repository-id (dydra:repository-id repository))
229
(configuration-list (request-configuration-list request))
230
(parsed-configuration-list (parse-http-configuration configuration-list))
231
(revision-id (or (http:request-query-argument request "revision-id")
232
(http:request-header request "Revision")))
233
(destination (http:response-content-stream response))
235
(cond (revision-id ; skip known revisions
236
(when (rlmdb:get-revision-record repository revision-id)
237
(return-from replication-patch nil)))
238
(t ;; if not remote, then local
239
(setf revision-id (dydra:make-revision-id))))
240
(with-http-configuration (list* :repository-id repository-id
241
:task-id (dydra:make-task-id)
242
parsed-configuration-list)
243
(unless (dydra:repository-exists-p repository)
244
(http:not-found "Repository not found: ~s." repository-id))
245
(when (find repository-id spocq.i::*disabled-repositories* :test #'string-equal)
246
(http:bad-request "The repository has been disabled: ~s." repository-id))
247
(multiple-value-prog1 (repository-patch-multipart-content repository
249
:context original-graph-name
250
:content-type content-type
252
:client-request-id (request-client-request-id request))
253
(when *replication-patch.debug*
254
(rlmdb::dump-repository repository :verbose t :stream destination)))))))
256
;;; classify locations as peer or client
257
;;; peers receive full replicated data
258
;;; clients should receive just the modications which relate to the resources in data which they have made.
259
;;; - sparql results and updates are tracked as for describe and all resources present are flagged.
260
;;; - graph store updats and responses are filtered to tag resources in passing
262
(defmethod replication-propagate ((resource http:resource) (request http:request) (response t) pathname content-type)
263
(let* ((repository (resource-repository resource))
264
(request-location (or (http:request-header request :location)
265
(spocq.i::repository-replica-location repository)))
266
(request-replica (spocq.i::ensure-replica repository request-location)))
267
(setf (spocq.i::replica-time request-replica) (get-universal-time))
268
(loop for replica-location being each hash-key of (spocq.i::repository-replicas repository)
269
using (hash-value replica)
270
do (unless (eq request-replica replica)
271
(let* ((replica-authority (spocq.i::authority (spocq.i::parse-url-authority replica-location)))
272
(replica-authorization (getf (spocq.i::agent-request-authentication replica-authority)
273
:basic-authentication)))
274
(flet ((propagate-request (stream)
275
(http:copy-stream pathname stream)))
276
(declare (dynamic-extent #'propagate-request))
277
(tbnl::call-with-open-request-stream #'propagate-request
279
:BASIC-AUTHORIZATION (spocq.i::split-string replica-authorization ":" :strict t)
280
:method (http:request-method request)
281
:content-type "multipart/related")))))))
286
;;; service operations are implemented as side-effects on graph store updates
287
;;; each is expressed as a single sparql query which yields solutions and an operator.
288
;;; all queries are compiled into an activation graph which is applied to each update field.
289
;;; the process must distinguish adds from deletes
292
"select ?op ?resource ?agent ?mode
293
where { [] acl:agent ?agent; acl:accessTo ?resource; acl:mode ?mode.
296
"select ?op ?resource ?agent ?mode
297
where { [] acl:agent ?agent; acl:accessTo ?resource; acl:mode ?mode.
298
bind acl:deleteACL ?op
303
;;; +/- column in schema
307
(defmethod graph-store-match-graph-content ((repository spocq.i::lmdb-replicable-repository) stream
308
context subject predicate object
309
&key content-type revision-id content-encoding timeout)
310
(declare (ignore timeout))
311
(let ((pattern (spocq.i::make-quad :subject subject :predicate predicate :object object
312
:graph (or context |urn:dydra|:|all|))))
313
(graph-store-get-graph-content repository revision-id content-type stream
315
:content-encoding content-encoding)))
318
(defmethod graph-store-get-graph-content ((repository spocq.i::lmdb-replicable-repository)
319
revision-id content-type stream
321
&key content-encoding context pattern)
322
(declare (ignore content-encoding pattern))
323
(apply #'spocq.i::repository-encode-content repository revision-id content-type stream
324
:context (or context |urn:dydra|:|all|)