Coverage report: /development/source/library/org/datagraph/spocq-shard/src/store/rlmdb/replicable-repository.lisp

KindCoveredAll%
expression0476 0.0
branch020 0.0
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
2
 
3
 (in-package :org.datagraph.spocq.implementation)
4
 
5
 (:documentation "replicable repositories"
6
                 "A replicable repository manages eventual consistency among peers.
7
  Each revision is identified as +/- statements sets and propagated to peers with the
8
  local revision identifier. This is used by the recipient(s) to merge the changes into
9
  their respective replica.
10
  In addition to replication, the repository form permits ephemeral processing.
11
  In this case, the repository is created and then treated  as an in-memory cache.
12
  This is accomlished by operating on an LMDB database, for which LMDB_NOSYNC is specified.
13
  In this case, the storage is in /srv/dydra/runtime and is treated as temporary:
14
  anything which is no longer open is deleted.
15
 
16
  * classes
17
  - lmdb-replicable-repository : binds a primary rlmdb:repository instance
18
    as a reference to the respective lmdb environment and to its databases.
19
    Open the environment when initializing and close and delete it when finalizing.
20
    There is just one revision HEAD.
21
    Otherwise it behaves as an lmdb-repository.
22
  - lmdb-replicable-revision : spezialized for the transaction class
23
  - lmdb-replicable-transaction : specialized in order to avoid rdfcache interaction.
24
 
25
  The core of the replicability is the persistence identifier, which provides a global,
26
  universal identifier for each insertion/deletion operation.
27
  These are recorded as a 'position sequence' for each quad.
28
  That sequence merges all changes into a consistent view on each node.
29
  The query process locates a position (or interval) in the sequence and the
30
  insert/remove state indicator determines whether the quad is present respective
31
  the specific query process.
32
 
33
  Each identifer is the eqivalent of a v1 uuid. the high bit of the time component is used
34
  as a flag to indicate insertion (= 0) and deletion (= 1).
35
 
36
  wrt riak and crdt behaviour: https://aphyr.com/posts/285-jepsen-riak
37
  wrt tinc-mce and crdt: https://www.tiny.cloud/blog/real-time-collaboration-ot-vs-crdt/
38
 ")
39
 
40
 (defclass replica ()
41
   ((location
42
     :initform (error "replica-location is required.")
43
     :initarg :location
44
     :reader replica-location)
45
    (time
46
     :initform (get-universal-time)
47
     :accessor replica-time)))
48
 
49
 (defmethod revision-relation ((revision lmdb-replicable-revision))
50
   "The default relation ensures the statement visibility includes the specified location.
51
   Where just the start is specified, that serves also as the end."
52
   (or (call-next-method)
53
       (setf (revision-relation revision)
54
             (let ((min-id (revision-min-revision-id revision))
55
                   (max-id (revision-max-revision-id revision)))
56
               (if (null max-id)
57
                   `(|time|:|versionIncludes| ,min-id ,min-id)
58
                   `(|time|:|versionIncludes| ,min-id ,max-id))))))
59
 
60
 
61
 (defun make-replicable-repository (&rest args)
62
   (apply #'make-instance 'lmdb-replicable-repository
63
          args))
64
 
65
 (defgeneric ensure-replica (repository location)
66
   (:method ((repository replicable-repository) location)
67
     (let ((replicas (repository-replicas repository)))
68
       (or (get-registry location replicas)
69
           (setf (get-registry location replicas)
70
                 (make-instance 'replica :location location))))))
71
 
72
 (defmethod make-rdfcache-transaction ((instance lmdb-replicable-transaction) id repository-id
73
                                       &key revision-id read-only-p operation)
74
   (declare (ignore read-only-p))
75
   (rdfcache:%make-transaction id repository-id
76
                               ;; 2017-09-13 @atomgraph.dydra.com dydra-ndk required lower case
77
                               :revision-id (if (equalp revision-id "HEAD")
78
                                                (string-upcase revision-id)
79
                                                (string-downcase revision-id))
80
                               ;; the rdfcache transaction is read-only
81
                               :read-only-p t
82
                               :operation operation))
83
 
84
 (defmethod transaction-close ((transaction lmdb-replicable-transaction) (disposition (eql :commit)))
85
   ;; do not yield to the rdfcache method as rdfcache does not know about the transaction
86
   ;;(call-next-method)
87
   (let ((lmdb:transaction (bound-slot-value transaction 'lmdb-transaction)))
88
     (when (lmdb:open-p lmdb:transaction)
89
       (lmdb:commit-transaction lmdb:transaction))
90
     (setf-transaction-lmdb-transaction nil transaction)))
91
 
92
 ;;;
93
 
94
 #+(or)
95
 (defmethod initialize-instance ((instance lmdb-replicable-repository) &rest args)
96
   (call-next-method))
97
 
98
 (defmethod initialize-instance :after ((instance replicable-repository) &key)
99
   (let ((uri (puri:uri (repository-uri instance))))
100
     (setf (puri:uri-scheme uri)  :https)
101
     (setf (repository-replica-location instance)
102
           (concatenate 'string (iri-lexical-form uri) "/replica"))))
103
 
104
 (defmethod initialize-repository-storage ((repository-id string) (prototype lmdb-replicable-repository) &key)
105
   "Augment the rdfcache-specific storage for lmdb variants"
106
   (call-next-method)
107
   (let* ((location (repository-pathname repository-id))
108
          (env (lmdb:make-environment location :max-dbs 1)))
109
     (lmdb:with-environment (env)
110
       (lmdb:with-transaction ((txn (lmdb:make-transaction env :flags 0)))
111
           (flet ((ensure-db (name &key class)
112
                    (let ((db (lmdb:make-database name :class class)))
113
                      (lmdb:open-database db :if-does-not-exist :create))))
114
             (ensure-db rlmdb.i::*revision-sequence-database-name* :class 'rlmdb:revision-sequence-database))))))
115
 
116
 #|
117
 
118
                                 &key account-name repository-name)
119
   (let ((directory (merge-pathnames (make-pathname :directory `(:relative "replicable" ,account-name ,repository-name))
120
                                     *runtime-root-pathname*)))
121
     (ensure-directories-exist directory)
122
     (apply #'call-next-method instance
123
            :authorization-list nil
124
            :identifier (cons-v1-uuid)
125
            :pathname directory
126
            :if-does-not-exist nil
127
            args)
128
     (sb-ext:finalize instance (eval `(function
129
                                       (lambda ()
130
                                         (log-notice "deleted replicable repository: ~a." ,(repository-id instance))
131
                                         (lmdb::delete-environment ,(repository-lmdb-repository instance))))))))
132
 |#
133
 
134
 #+(or)
135
 (defmethod initialize-instance :after ((instance lmdb-replicable-repository) &key content)
136
   (when content (repository-insert-field instance content)))
137
 
138
 
139
 
140
 ;;; 
141
 (defmethod compute-reference-lmdb-repository ((reference lmdb-replicable-repository) &key
142
                                               &allow-other-keys)
143
   (call-next-method))
144
 #|
145
   ;; construct an lmdb environment with reference to this repository 
146
   ;; specific lmdb_nosync
147
   (make-instance (repository-storage-class reference)
148
     :repository reference
149
     :directory (repository-pathname reference)
150
     :open-arguments '(:if-does-not-exist :create)
151
     :open-flags (+ LIBLMDB:+NOTLS+ open-flags)))
152
 |#
153
 ;;; for ephemeral LIBLMDB:+NOSYNC+
154
 
155
 (defmethod resolve-repository-revision-id ((repository lmdb-replicable-repository) &key revision if-does-not-exist)
156
   (declare (ignore revision if-does-not-exist))
157
   (call-next-method))
158
 
159
 (defmethod intern-revision-relation ((repository lmdb-replicable-repository) (expression list))
160
   (flet ((intern-if-revision-designator (designator)
161
            (typecase designator
162
              (integer (string-to-uuid (or (rlmdb:find-revision-uuid repository designator)
163
                                           (spocq.e:revision-not-found-error :identifier designator))
164
                                       (make-uuid-vector)))
165
              (string (string-to-uuid designator (make-uuid-vector)))
166
              (t designator))))
167
       (declare (dynamic-extent #'intern-if-revision-designator))
168
       (map-tree #'intern-if-revision-designator expression)))
169
 
170
 #+(or)
171
 (defmethod rlmdb:find-revision-record ((repository lmdb-replicable-repository) revision-designator &rest args)
172
   (declare (dynamic-extent args) (ignore args))
173
   (let ((revision-id (repository-uuid repository)))
174
     (cond ((or (string-equal revision-designator :head)
175
                (string-equal revision-designator revision-id))
176
            (rlmdb.i::make-metadata-record :ordinal 1
177
                                           :uuid (iri-lexical-form revision-id)
178
                                           :timestamp (repository-creation-timestamp repository))))))
179
 
180
 #+(or)
181
 (defmethod compute-repository-revision ((reference lmdb-replicable-repository) (revision-designator string)
182
                                         &key  (if-does-not-exist :error)
183
                                         (class (repository-revision-class reference))
184
                                         (revision-class class))
185
   (let ((revision-uuid (iri-lexical-form (repository-uuid reference))))
186
     (cond ((or (string-equal revision-designator :head)
187
                (string-equal revision-designator revision-uuid))
188
            (let ((revision-key (list revision-uuid revision-uuid))
189
                  (record (rlmdb:find-revision-record reference revision-designator)))
190
              (or (get-registry revision-key *repositories*)
191
                  (setf (get-registry revision-key *repositories*)
192
                        (make-instance revision-class
193
                          :reference-revision-id revision-uuid
194
                          :revision-id revision-uuid :reference reference
195
                          :min-revision-record record :max-revision-record record
196
                          :min-revision-ordinal 1 :max-revision-ordinal 1)))))
197
           (if-does-not-exist
198
            (error 'spocq.e:revision-invalid-error :datum revision-designator)))))
199
 
200
 
201
 #+(or) ;; needs to follow the rdfcache method in order to the teh transaction is to use
202
 (defmethod call-with-open-transaction ((operator function) (transaction lmdb-replicable-transaction) &rest args)
203
   "Provide a thread-specific lmdb transaction.
204
  This is cloned from the respective repository's initial transaction and active
205
  with dynamic extent only. As the lmdb data path is read-only, the disposition
206
  as always to abort."
207
   (declare (ignore args))
208
   (let* ((revision (transaction-revision transaction))
209
          (rlmdb:repository (repository-lmdb-repository revision)))
210
     ;; if the transaction is read-only, reuse the global transaction
211
     (cond ((transaction-read-only-p transaction) 
212
            ;;;(call-next-method)
213
            (funcall operator nil)
214
            )
215
           ;; otherwise establish a new - thread-specific transaction over the repository environment
216
           (t
217
            (let ((lmdb:transaction (lmdb:make-transaction rlmdb:repository :flags 0)))
218
              (lmdb:with-transaction (lmdb:transaction :normal-disposition :commit)
219
                ;;;(call-next-method)
220
                (funcall operator nil)))))))
221
 
222
 (:documentation "direct lmdb update operations on replicable repositories"
223
                 "An replicable repository is always revisioned.
224
  Update operations modify the respective quad state vector to reflect the insert/delete mode for the operation.
225
  The modification merges the active occurrence uuid into the statements vector with a flag masked in the
226
  time lsb as insert = 0, delete = 1, to effect deletion-takes-precedence semantics for reconciliation.")
227
                 
228
 
229
 (defun %test-uuid-visibility (%uuid %vector length)
230
   "Locate the highest uuid less that that given.
231
  Iff it indicates insertion, then the quad is visible in that revision.
232
  Handle the degenerate cases without searching."
233
   (labels ((test-position (position)
234
              (%compare-v1-uuid-date (cffi:mem-aptr %vector '(:struct v1-uuid) position) %uuid))
235
            (binary-search (start end)
236
              (let* ((test-position (ash (+ start end) -1))
237
                     (result (test-position test-position)))
238
                ;; (format t "~&next: start ~s end ~s test-position ~s result ~s~%" start end test-position result)
239
                (ecase result
240
                  (0 test-position)
241
                  (-1 (if (= test-position start)
242
                          start
243
                          (binary-search test-position end)))
244
                  (+1 (if (= test-position start)
245
                          nil
246
                          (binary-search start test-position)))))))
247
     #+(or)
248
     (print (list (%uuid-to-string %uuid)
249
                  (let ((vector (make-array length)))
250
                    (loop for i below length
251
                      do (setf (aref vector i)
252
                               (%uuid-to-string (cffi:mem-aptr %vector '(:struct v1-uuid) i))))
253
                    vector)
254
                  length))
255
     (case length
256
       (0 t)
257
       (1 (and (>= (%compare-v1-uuid-date %uuid %vector) 0)
258
               (insert-uuid-p %vector)))
259
       (t (let ((position (binary-search 0 length)))
260
            (when position
261
              (values (insert-uuid-p (cffi:mem-aptr %vector '(:struct v1-uuid) position))
262
                      position)))))))
263
 
264
 
265
 (defun combine-revision-state (state-vector uuid-vector operation)
266
   (setf (aref uuid-vector 9)
267
         (ecase operation
268
           (:insert (logand (aref uuid-vector 9) #xfe))
269
           (:delete (logior (aref uuid-vector 9) #x01))))
270
   (let ((result (make-array (1+ (length state-vector)))))
271
     (loop with i = 0
272
       for state-uuid across state-vector
273
       when (uuid-greater-p state-uuid uuid-vector)
274
       do (progn (when state-uuid
275
                   (setf (aref result i) (shiftf uuid-vector nil))
276
                   (incf i))
277
            (setf (aref result i) state-uuid)
278
            (incf i))
279
       finally (when (< i (length result))
280
                 (assert uuid-vector () "combine-revision-state: lost uuid")
281
                 (setf (aref result i) uuid-vector)))
282
     result))
283
 
284
 (defun state-record-to-state-vector (%record state-vector)
285
   (let* ((count (length state-vector))
286
          (vector (make-array count)))
287
     (loop with offset = 0
288
       for i below count
289
       for uuid-vector = (make-uuid-vector)
290
       do (loop for uuid-offset below 16
291
            do (setf (aref uuid-vector uuid-offset) (cffi:mem-aref %record :uint8 offset)
292
                     offset (1+ offset)))
293
       do (setf (aref state-vector i) uuid-vector))
294
     vector))
295
 
296
 (defun state-vector-to-state-record (state-vector %record)
297
   (let* ((count (length state-vector)))
298
     (loop with offset = 0
299
       for i below count
300
       for uuid-vector across state-vector
301
       do (loop for uuid-offset below 16
302
            do (setf (cffi:mem-aref %record :uint8 offset) (aref uuid-vector uuid-offset)
303
                     offset (1+ offset))))
304
     %record))
305
 
306
 
307
 
308
 (defmethod repository-insert-field ((repository lmdb-replicable-repository) (solution-field array))
309
   (repository-insert-field (repository-lmdb-repository repository) solution-field))
310
 
311
 (defmethod repository-insert-field ((repository lmdb-replicable-repository) (solution-field cons))
312
   (repository-insert-field (repository-lmdb-repository repository) solution-field))
313
 
314
 (defmethod repository-insert-field ((repository lmdb-replicable-repository) (solution-field solution-field))
315
   (repository-insert-field (repository-lmdb-repository repository) solution-field))
316
 
317
 (defmethod repository-insert-field ((repository rlmdb:repository) (solution-field cons))
318
   (repository-insert-field repository (term-number-field solution-field)))
319
 
320
 
321
 (defmethod repository-insert-field ((transaction lmdb-replicable-transaction) (solution-field array))
322
   (let* ((repository (transaction-repository transaction))
323
          (rlmdb-repository (repository-lmdb-repository repository))
324
          (uuid (copy-uuid-vector (transaction-id transaction))))
325
     (rlmdb:replicable-repository-update-field rlmdb-repository solution-field
326
                                               (set-uuid-state uuid :insert))))
327
          
328
 (defmethod repository-delete-field ((transaction lmdb-replicable-transaction) (solution-field array))
329
   (let* ((repository (transaction-repository transaction))
330
          (rlmdb-repository (repository-lmdb-repository repository))
331
          (uuid (copy-uuid-vector (transaction-id transaction))))
332
     (rlmdb:replicable-repository-update-field rlmdb-repository solution-field
333
                                               (set-uuid-state uuid :delete))))
334
 
335
 (defmethod repository-accept-field ((transaction lmdb-replicable-transaction) (source function))
336
   (let* ((repository (transaction-repository transaction))
337
          (rlmdb-repository (repository-lmdb-repository repository)))
338
     (rlmdb::repository-accept-field rlmdb-repository source)))
339
 
340
 
341
 (defmethod rlmdb::repository-accept-field ((repository lmdb-replicable-repository) (source t))
342
   (rlmdb::repository-accept-field (spocq.i::repository-lmdb-repository repository) source))
343
   
344
 (defmethod rlmdb::dump-repository ((repository spocq.i::lmdb-replicable-repository) &rest args)
345
     (apply #'rlmdb::dump-repository (spocq.i::repository-lmdb-repository repository) args))
346
 
347
 
348
 
349
 
350
 (defmethod repository-encode-content ((repository spocq.i::lmdb-replicable-repository) 
351
                                       revision-id content-type stream
352
                                       &key content-encoding (context |urn:dydra|:|all|)
353
                                       (pattern (make-quad :graph context)))
354
   "The replica method wraps a generator around a statement scan and delegates
355
  the encoding to send-response-message to handle media types.
356
  Iff the content encoding is gzip, wrap the export process with a compressor."
357
   ;; match the repository content emit the output to the stream and return nil
358
   (let ((revision (compute-repository-revision repository revision-id)))
359
     (labels ((generate-statements (generator)
360
                (let* ((destination (spocq.i::solution-generator-channel generator))
361
                      (result-page nil)
362
                      (result-page-length (channel-page-length destination))
363
                      (result-index *field-page-length*)
364
                      (result-count 0))
365
                  (labels ((collect-solution (%quad)
366
                             (trace-bgp graph-store-get-graph-content.collect-solution)
367
                             (incf result-count)
368
                             (next-solution-location)
369
                             (setf (aref result-page result-index 0) (spocq.i::%quad-subject %quad)
370
                                   (aref result-page result-index 1) (spocq.i::%quad-predicate %quad)
371
                                   (aref result-page result-index 2) (spocq.i::%quad-object %quad)
372
                                   (aref result-page result-index 3) (spocq.i::%quad-context %quad))
373
                             result-count)
374
                           (next-solution-location ()
375
                             ;; return a page (possible newly created) and the next free location in that page
376
                             (when (>= (incf result-index) result-page-length)
377
                               (when result-page (put-result result-page))
378
                               (setf result-page (new-field-page destination result-page-length 4)
379
                                     result-index 0))
380
                             (values result-page result-index))
381
                           (complete-solutions ()
382
                             (trace-bgp graph-store-get-graph-content.complete-solutions result-count)
383
                             (incf spocq.i::*match-requests* 1)
384
                             (incf spocq.i::*match-responses* result-count)
385
                             #+(or) ; no graph tracking
386
                             (when (plusp (hash-table-count graph-ids-read))
387
                               (with-locked-cache ((transaction-read-graph-ids transaction))
388
                                 (loop for id being each hash-key of graph-ids-read
389
                                   do (setf (transaction-graph-id-read transaction id) t))))
390
                             (log-debug "bgp matches+counts: ~s: solutions: ~s"
391
                                        (repository-id repository) result-count)
392
                             (when result-page
393
                               (let ((page-result-count (1+ result-index)))
394
                                 (when (< page-result-count result-page-length)
395
                                   (setf result-page
396
                                         (adjust-page result-page (list page-result-count 4)))))
397
                               (put-result result-page))
398
                             (complete-field destination)
399
                             (incf-stat spocq.i::*solutions-constructed* result-count)
400
                             (return-from generate-statements result-count))
401
                           (put-result (page)
402
                             (trace-bgp graph-store-get-graph-content.put destination 4 page)
403
                             (put-field-page destination page)))
404
                    (rlmdb:map-repository-statements #'collect-solution
405
                                                     revision
406
                                                     pattern)
407
                    ;(print (list :index result-index :count result-count))
408
                    (complete-solutions))))
409
              (make-generator ()
410
                (let* ((base-dimensions spocq.i::*quad-dimensions*)
411
                       (result-channel (make-channel :name (list 'spocq.a:|service| (repository-id repository))
412
                                                     :dimensions base-dimensions
413
                                                     :size 10
414
                                                     :page-length 128)))
415
                  (make-solution-generator :operator 'spocq.a:|service|
416
                                           :dimensions base-dimensions
417
                                           :expression nil
418
                                           :channel result-channel
419
                                           :constituents nil)))
420
              (graph-export (stream)
421
                (let ((generator (make-generator)))
422
                  (flet ((generate-graph-export (transaction)
423
                           (declare (ignore transaction))
424
                           (send-response-message :service generator stream content-type)))
425
                    (bt:make-thread #'(lambda () (generate-statements generator)))
426
                    (call-with-revision-transaction #'generate-graph-export revision nil)))))
427
                
428
        (case content-encoding
429
         ((nil) (graph-export stream))
430
         (:gzip (setf (http:stream-media-type stream) mime:application/gzip)
431
                (typecase stream
432
                  (broadcast-stream (setf stream (first (broadcast-stream-streams stream)))))
433
                (let ((compressor (run-program "/bin/gzip" '() :output stream :input :stream :wait nil)))
434
                  (cond (compressor
435
                         (unwind-protect (graph-export (run-program-input compressor))
436
                           (close (run-program-input compressor))
437
                           (run-program-wait compressor)
438
                           (case (run-program-exit-code compressor)
439
                             ((0 nil))
440
                             (t (http:bad-request "Graph export compression failed.")))
441
                           (run-program-close compressor)))
442
                        (t
443
                         (http:internal-error "Graph export compression not started.")))))
444
          (t (http:not-acceptable "Unsupported content encoding: ~a" content-encoding))))))
445
 
446
 
447
 ;;; compute size
448
 
449
 (defmethod compute-repository-size ((repository lmdb-replicable-repository) &rest args)
450
   (apply #'compute-repository-size (repository-lmdb-repository repository) args))
451