Coverage report: /development/source/library/org/datagraph/spocq-shard/src/store/rlmdb/replicable-repository.lisp
| Kind | Covered | All | % |
| expression | 0 | 476 | 0.0 |
| branch | 0 | 20 | 0.0 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
3
(in-package :org.datagraph.spocq.implementation)
5
(:documentation "replicable repositories"
6
"A replicable repository manages eventual consistency among peers.
7
Each revision is identified as +/- statements sets and propagated to peers with the
8
local revision identifier. This is used by the recipient(s) to merge the changes into
9
their respective replica.
10
In addition to replication, the repository form permits ephemeral processing.
11
In this case, the repository is created and then treated as an in-memory cache.
12
This is accomlished by operating on an LMDB database, for which LMDB_NOSYNC is specified.
13
In this case, the storage is in /srv/dydra/runtime and is treated as temporary:
14
anything which is no longer open is deleted.
17
- lmdb-replicable-repository : binds a primary rlmdb:repository instance
18
as a reference to the respective lmdb environment and to its databases.
19
Open the environment when initializing and close and delete it when finalizing.
20
There is just one revision HEAD.
21
Otherwise it behaves as an lmdb-repository.
22
- lmdb-replicable-revision : spezialized for the transaction class
23
- lmdb-replicable-transaction : specialized in order to avoid rdfcache interaction.
25
The core of the replicability is the persistence identifier, which provides a global,
26
universal identifier for each insertion/deletion operation.
27
These are recorded as a 'position sequence' for each quad.
28
That sequence merges all changes into a consistent view on each node.
29
The query process locates a position (or interval) in the sequence and the
30
insert/remove state indicator determines whether the quad is present respective
31
the specific query process.
33
Each identifer is the eqivalent of a v1 uuid. the high bit of the time component is used
34
as a flag to indicate insertion (= 0) and deletion (= 1).
36
wrt riak and crdt behaviour: https://aphyr.com/posts/285-jepsen-riak
37
wrt tinc-mce and crdt: https://www.tiny.cloud/blog/real-time-collaboration-ot-vs-crdt/
42
:initform (error "replica-location is required.")
44
:reader replica-location)
46
:initform (get-universal-time)
47
:accessor replica-time)))
49
(defmethod revision-relation ((revision lmdb-replicable-revision))
50
"The default relation ensures the statement visibility includes the specified location.
51
Where just the start is specified, that serves also as the end."
52
(or (call-next-method)
53
(setf (revision-relation revision)
54
(let ((min-id (revision-min-revision-id revision))
55
(max-id (revision-max-revision-id revision)))
57
`(|time|:|versionIncludes| ,min-id ,min-id)
58
`(|time|:|versionIncludes| ,min-id ,max-id))))))
61
(defun make-replicable-repository (&rest args)
62
(apply #'make-instance 'lmdb-replicable-repository
65
(defgeneric ensure-replica (repository location)
66
(:method ((repository replicable-repository) location)
67
(let ((replicas (repository-replicas repository)))
68
(or (get-registry location replicas)
69
(setf (get-registry location replicas)
70
(make-instance 'replica :location location))))))
72
(defmethod make-rdfcache-transaction ((instance lmdb-replicable-transaction) id repository-id
73
&key revision-id read-only-p operation)
74
(declare (ignore read-only-p))
75
(rdfcache:%make-transaction id repository-id
76
;; 2017-09-13 @atomgraph.dydra.com dydra-ndk required lower case
77
:revision-id (if (equalp revision-id "HEAD")
78
(string-upcase revision-id)
79
(string-downcase revision-id))
80
;; the rdfcache transaction is read-only
82
:operation operation))
84
(defmethod transaction-close ((transaction lmdb-replicable-transaction) (disposition (eql :commit)))
85
;; do not yield to the rdfcache method as rdfcache does not know about the transaction
87
(let ((lmdb:transaction (bound-slot-value transaction 'lmdb-transaction)))
88
(when (lmdb:open-p lmdb:transaction)
89
(lmdb:commit-transaction lmdb:transaction))
90
(setf-transaction-lmdb-transaction nil transaction)))
95
(defmethod initialize-instance ((instance lmdb-replicable-repository) &rest args)
98
(defmethod initialize-instance :after ((instance replicable-repository) &key)
99
(let ((uri (puri:uri (repository-uri instance))))
100
(setf (puri:uri-scheme uri) :https)
101
(setf (repository-replica-location instance)
102
(concatenate 'string (iri-lexical-form uri) "/replica"))))
104
(defmethod initialize-repository-storage ((repository-id string) (prototype lmdb-replicable-repository) &key)
105
"Augment the rdfcache-specific storage for lmdb variants"
107
(let* ((location (repository-pathname repository-id))
108
(env (lmdb:make-environment location :max-dbs 1)))
109
(lmdb:with-environment (env)
110
(lmdb:with-transaction ((txn (lmdb:make-transaction env :flags 0)))
111
(flet ((ensure-db (name &key class)
112
(let ((db (lmdb:make-database name :class class)))
113
(lmdb:open-database db :if-does-not-exist :create))))
114
(ensure-db rlmdb.i::*revision-sequence-database-name* :class 'rlmdb:revision-sequence-database))))))
118
&key account-name repository-name)
119
(let ((directory (merge-pathnames (make-pathname :directory `(:relative "replicable" ,account-name ,repository-name))
120
*runtime-root-pathname*)))
121
(ensure-directories-exist directory)
122
(apply #'call-next-method instance
123
:authorization-list nil
124
:identifier (cons-v1-uuid)
126
:if-does-not-exist nil
128
(sb-ext:finalize instance (eval `(function
130
(log-notice "deleted replicable repository: ~a." ,(repository-id instance))
131
(lmdb::delete-environment ,(repository-lmdb-repository instance))))))))
135
(defmethod initialize-instance :after ((instance lmdb-replicable-repository) &key content)
136
(when content (repository-insert-field instance content)))
141
(defmethod compute-reference-lmdb-repository ((reference lmdb-replicable-repository) &key
145
;; construct an lmdb environment with reference to this repository
146
;; specific lmdb_nosync
147
(make-instance (repository-storage-class reference)
148
:repository reference
149
:directory (repository-pathname reference)
150
:open-arguments '(:if-does-not-exist :create)
151
:open-flags (+ LIBLMDB:+NOTLS+ open-flags)))
153
;;; for ephemeral LIBLMDB:+NOSYNC+
155
(defmethod resolve-repository-revision-id ((repository lmdb-replicable-repository) &key revision if-does-not-exist)
156
(declare (ignore revision if-does-not-exist))
159
(defmethod intern-revision-relation ((repository lmdb-replicable-repository) (expression list))
160
(flet ((intern-if-revision-designator (designator)
162
(integer (string-to-uuid (or (rlmdb:find-revision-uuid repository designator)
163
(spocq.e:revision-not-found-error :identifier designator))
165
(string (string-to-uuid designator (make-uuid-vector)))
167
(declare (dynamic-extent #'intern-if-revision-designator))
168
(map-tree #'intern-if-revision-designator expression)))
171
(defmethod rlmdb:find-revision-record ((repository lmdb-replicable-repository) revision-designator &rest args)
172
(declare (dynamic-extent args) (ignore args))
173
(let ((revision-id (repository-uuid repository)))
174
(cond ((or (string-equal revision-designator :head)
175
(string-equal revision-designator revision-id))
176
(rlmdb.i::make-metadata-record :ordinal 1
177
:uuid (iri-lexical-form revision-id)
178
:timestamp (repository-creation-timestamp repository))))))
181
(defmethod compute-repository-revision ((reference lmdb-replicable-repository) (revision-designator string)
182
&key (if-does-not-exist :error)
183
(class (repository-revision-class reference))
184
(revision-class class))
185
(let ((revision-uuid (iri-lexical-form (repository-uuid reference))))
186
(cond ((or (string-equal revision-designator :head)
187
(string-equal revision-designator revision-uuid))
188
(let ((revision-key (list revision-uuid revision-uuid))
189
(record (rlmdb:find-revision-record reference revision-designator)))
190
(or (get-registry revision-key *repositories*)
191
(setf (get-registry revision-key *repositories*)
192
(make-instance revision-class
193
:reference-revision-id revision-uuid
194
:revision-id revision-uuid :reference reference
195
:min-revision-record record :max-revision-record record
196
:min-revision-ordinal 1 :max-revision-ordinal 1)))))
198
(error 'spocq.e:revision-invalid-error :datum revision-designator)))))
201
#+(or) ;; needs to follow the rdfcache method in order to the teh transaction is to use
202
(defmethod call-with-open-transaction ((operator function) (transaction lmdb-replicable-transaction) &rest args)
203
"Provide a thread-specific lmdb transaction.
204
This is cloned from the respective repository's initial transaction and active
205
with dynamic extent only. As the lmdb data path is read-only, the disposition
207
(declare (ignore args))
208
(let* ((revision (transaction-revision transaction))
209
(rlmdb:repository (repository-lmdb-repository revision)))
210
;; if the transaction is read-only, reuse the global transaction
211
(cond ((transaction-read-only-p transaction)
212
;;;(call-next-method)
213
(funcall operator nil)
215
;; otherwise establish a new - thread-specific transaction over the repository environment
217
(let ((lmdb:transaction (lmdb:make-transaction rlmdb:repository :flags 0)))
218
(lmdb:with-transaction (lmdb:transaction :normal-disposition :commit)
219
;;;(call-next-method)
220
(funcall operator nil)))))))
222
(:documentation "direct lmdb update operations on replicable repositories"
223
"An replicable repository is always revisioned.
224
Update operations modify the respective quad state vector to reflect the insert/delete mode for the operation.
225
The modification merges the active occurrence uuid into the statements vector with a flag masked in the
226
time lsb as insert = 0, delete = 1, to effect deletion-takes-precedence semantics for reconciliation.")
229
(defun %test-uuid-visibility (%uuid %vector length)
230
"Locate the highest uuid less that that given.
231
Iff it indicates insertion, then the quad is visible in that revision.
232
Handle the degenerate cases without searching."
233
(labels ((test-position (position)
234
(%compare-v1-uuid-date (cffi:mem-aptr %vector '(:struct v1-uuid) position) %uuid))
235
(binary-search (start end)
236
(let* ((test-position (ash (+ start end) -1))
237
(result (test-position test-position)))
238
;; (format t "~&next: start ~s end ~s test-position ~s result ~s~%" start end test-position result)
241
(-1 (if (= test-position start)
243
(binary-search test-position end)))
244
(+1 (if (= test-position start)
246
(binary-search start test-position)))))))
248
(print (list (%uuid-to-string %uuid)
249
(let ((vector (make-array length)))
250
(loop for i below length
251
do (setf (aref vector i)
252
(%uuid-to-string (cffi:mem-aptr %vector '(:struct v1-uuid) i))))
257
(1 (and (>= (%compare-v1-uuid-date %uuid %vector) 0)
258
(insert-uuid-p %vector)))
259
(t (let ((position (binary-search 0 length)))
261
(values (insert-uuid-p (cffi:mem-aptr %vector '(:struct v1-uuid) position))
265
(defun combine-revision-state (state-vector uuid-vector operation)
266
(setf (aref uuid-vector 9)
268
(:insert (logand (aref uuid-vector 9) #xfe))
269
(:delete (logior (aref uuid-vector 9) #x01))))
270
(let ((result (make-array (1+ (length state-vector)))))
272
for state-uuid across state-vector
273
when (uuid-greater-p state-uuid uuid-vector)
274
do (progn (when state-uuid
275
(setf (aref result i) (shiftf uuid-vector nil))
277
(setf (aref result i) state-uuid)
279
finally (when (< i (length result))
280
(assert uuid-vector () "combine-revision-state: lost uuid")
281
(setf (aref result i) uuid-vector)))
284
(defun state-record-to-state-vector (%record state-vector)
285
(let* ((count (length state-vector))
286
(vector (make-array count)))
287
(loop with offset = 0
289
for uuid-vector = (make-uuid-vector)
290
do (loop for uuid-offset below 16
291
do (setf (aref uuid-vector uuid-offset) (cffi:mem-aref %record :uint8 offset)
293
do (setf (aref state-vector i) uuid-vector))
296
(defun state-vector-to-state-record (state-vector %record)
297
(let* ((count (length state-vector)))
298
(loop with offset = 0
300
for uuid-vector across state-vector
301
do (loop for uuid-offset below 16
302
do (setf (cffi:mem-aref %record :uint8 offset) (aref uuid-vector uuid-offset)
303
offset (1+ offset))))
308
(defmethod repository-insert-field ((repository lmdb-replicable-repository) (solution-field array))
309
(repository-insert-field (repository-lmdb-repository repository) solution-field))
311
(defmethod repository-insert-field ((repository lmdb-replicable-repository) (solution-field cons))
312
(repository-insert-field (repository-lmdb-repository repository) solution-field))
314
(defmethod repository-insert-field ((repository lmdb-replicable-repository) (solution-field solution-field))
315
(repository-insert-field (repository-lmdb-repository repository) solution-field))
317
(defmethod repository-insert-field ((repository rlmdb:repository) (solution-field cons))
318
(repository-insert-field repository (term-number-field solution-field)))
321
(defmethod repository-insert-field ((transaction lmdb-replicable-transaction) (solution-field array))
322
(let* ((repository (transaction-repository transaction))
323
(rlmdb-repository (repository-lmdb-repository repository))
324
(uuid (copy-uuid-vector (transaction-id transaction))))
325
(rlmdb:replicable-repository-update-field rlmdb-repository solution-field
326
(set-uuid-state uuid :insert))))
328
(defmethod repository-delete-field ((transaction lmdb-replicable-transaction) (solution-field array))
329
(let* ((repository (transaction-repository transaction))
330
(rlmdb-repository (repository-lmdb-repository repository))
331
(uuid (copy-uuid-vector (transaction-id transaction))))
332
(rlmdb:replicable-repository-update-field rlmdb-repository solution-field
333
(set-uuid-state uuid :delete))))
335
(defmethod repository-accept-field ((transaction lmdb-replicable-transaction) (source function))
336
(let* ((repository (transaction-repository transaction))
337
(rlmdb-repository (repository-lmdb-repository repository)))
338
(rlmdb::repository-accept-field rlmdb-repository source)))
341
(defmethod rlmdb::repository-accept-field ((repository lmdb-replicable-repository) (source t))
342
(rlmdb::repository-accept-field (spocq.i::repository-lmdb-repository repository) source))
344
(defmethod rlmdb::dump-repository ((repository spocq.i::lmdb-replicable-repository) &rest args)
345
(apply #'rlmdb::dump-repository (spocq.i::repository-lmdb-repository repository) args))
350
(defmethod repository-encode-content ((repository spocq.i::lmdb-replicable-repository)
351
revision-id content-type stream
352
&key content-encoding (context |urn:dydra|:|all|)
353
(pattern (make-quad :graph context)))
354
"The replica method wraps a generator around a statement scan and delegates
355
the encoding to send-response-message to handle media types.
356
Iff the content encoding is gzip, wrap the export process with a compressor."
357
;; match the repository content emit the output to the stream and return nil
358
(let ((revision (compute-repository-revision repository revision-id)))
359
(labels ((generate-statements (generator)
360
(let* ((destination (spocq.i::solution-generator-channel generator))
362
(result-page-length (channel-page-length destination))
363
(result-index *field-page-length*)
365
(labels ((collect-solution (%quad)
366
(trace-bgp graph-store-get-graph-content.collect-solution)
368
(next-solution-location)
369
(setf (aref result-page result-index 0) (spocq.i::%quad-subject %quad)
370
(aref result-page result-index 1) (spocq.i::%quad-predicate %quad)
371
(aref result-page result-index 2) (spocq.i::%quad-object %quad)
372
(aref result-page result-index 3) (spocq.i::%quad-context %quad))
374
(next-solution-location ()
375
;; return a page (possible newly created) and the next free location in that page
376
(when (>= (incf result-index) result-page-length)
377
(when result-page (put-result result-page))
378
(setf result-page (new-field-page destination result-page-length 4)
380
(values result-page result-index))
381
(complete-solutions ()
382
(trace-bgp graph-store-get-graph-content.complete-solutions result-count)
383
(incf spocq.i::*match-requests* 1)
384
(incf spocq.i::*match-responses* result-count)
385
#+(or) ; no graph tracking
386
(when (plusp (hash-table-count graph-ids-read))
387
(with-locked-cache ((transaction-read-graph-ids transaction))
388
(loop for id being each hash-key of graph-ids-read
389
do (setf (transaction-graph-id-read transaction id) t))))
390
(log-debug "bgp matches+counts: ~s: solutions: ~s"
391
(repository-id repository) result-count)
393
(let ((page-result-count (1+ result-index)))
394
(when (< page-result-count result-page-length)
396
(adjust-page result-page (list page-result-count 4)))))
397
(put-result result-page))
398
(complete-field destination)
399
(incf-stat spocq.i::*solutions-constructed* result-count)
400
(return-from generate-statements result-count))
402
(trace-bgp graph-store-get-graph-content.put destination 4 page)
403
(put-field-page destination page)))
404
(rlmdb:map-repository-statements #'collect-solution
407
;(print (list :index result-index :count result-count))
408
(complete-solutions))))
410
(let* ((base-dimensions spocq.i::*quad-dimensions*)
411
(result-channel (make-channel :name (list 'spocq.a:|service| (repository-id repository))
412
:dimensions base-dimensions
415
(make-solution-generator :operator 'spocq.a:|service|
416
:dimensions base-dimensions
418
:channel result-channel
420
(graph-export (stream)
421
(let ((generator (make-generator)))
422
(flet ((generate-graph-export (transaction)
423
(declare (ignore transaction))
424
(send-response-message :service generator stream content-type)))
425
(bt:make-thread #'(lambda () (generate-statements generator)))
426
(call-with-revision-transaction #'generate-graph-export revision nil)))))
428
(case content-encoding
429
((nil) (graph-export stream))
430
(:gzip (setf (http:stream-media-type stream) mime:application/gzip)
432
(broadcast-stream (setf stream (first (broadcast-stream-streams stream)))))
433
(let ((compressor (run-program "/bin/gzip" '() :output stream :input :stream :wait nil)))
435
(unwind-protect (graph-export (run-program-input compressor))
436
(close (run-program-input compressor))
437
(run-program-wait compressor)
438
(case (run-program-exit-code compressor)
440
(t (http:bad-request "Graph export compression failed.")))
441
(run-program-close compressor)))
443
(http:internal-error "Graph export compression not started.")))))
444
(t (http:not-acceptable "Unsupported content encoding: ~a" content-encoding))))))
449
(defmethod compute-repository-size ((repository lmdb-replicable-repository) &rest args)
450
(apply #'compute-repository-size (repository-lmdb-repository repository) args))