Coverage report: /development/source/library/org/datagraph/spocq-shard/src/store/rlmdb/spocq-mutation.lisp
| Kind | Covered | All | % |
| expression | 0 | 800 | 0.0 |
| branch | 0 | 38 | 0.0 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
3
(in-package :org.datagraph.spocq.implementation)
6
write operations with lmdb environment and databases.
8
three tasks are central to mutation
9
- decode as per media type, to produce term objects
10
this is implemented elsewhere, in core/encoding/...
11
- intern, to register terms with the term and string databases, to produce term identifers.
12
this has been handled through the rdfcache+dydra-ndk api (intern-term, lookup-term ...)
13
it is to be implemented here through direct operations on the lmdb term dictionary
14
- modify the repository, to add entries to the repository indices
15
this has been handled through the rdfcache+dydra-ndk api (insert-statement, ...)
16
it is implemented in rlmdb mutation&co for each or the possible index patterns
18
the three operations - parse the stream, intern the terms, and mutate the repository,
19
permit eight different arrangements between two threads, whereby just a restricted set of
20
alternatives satisfy the lmdb constraints on write transactions.
21
in additionthe media types and operation protocols introduce their own issues.
23
- lmdb requires that the intern and the mutate operations be in different threads.
24
- the mutation must happen in a single transaction.
25
- the write operation must permit delete, patch, put and post modes
26
- the media type can specify modes by including them within the content.
28
four alternatives arrangement satisfy the lmdb constraint:
36
1-1-2 : parent thread decodes the stream, interns terms and passes the field to
37
the child thread to write to the repository
38
1-2-1 : parent thread decodes the stream, sends the value field to the child
39
thread, receives back the id field and writes it to the repository
40
2-1-2 : is symmetrical to 1-2-1
41
2-2-1 : is symmterical to 1-1-2
43
1-1-2 and 2-2-1 require commands accompany the term id fields to represent the
44
in-line media type operations and/or request delete/patch/post/put method.
45
while that adds to the data passed, it offers the advantages that
46
a) when it becomes possible to parse directly to a term identifier, that is
47
without the intermediate interned term object, this would reduce space and
48
time as the parse-intern control-flow can be inline without intermediate storage.
49
b) decoupling the intern process through passed messages, allows to abstract
50
the process over storage class.
52
of those two, the 1-1-2 arrangement leaves the parent process in control
53
of any initial resources (eg. streams), which are active.
55
for the present, where rdfcache handles term interning, as the import
56
transaction exists in the child thread, there must be a secondary transaction
58
this can be on an arbitrary repository, for example the respective system
60
should the mutation thread fail, the mutation transaction will be rolled back,
61
but the terms should remain.
62
if the mutation transaction succeeds, whenre rdfcache handles write transactions,
64
one the mutation process is independent, that thread must write transaction
65
record and metadata properties as appropriate for the storage form.
68
(defclass mutation-task (data-task) ())
70
(defmethod QUERY-DYNAMIC-BINDINGS ((task mutation-task))
73
(defmethod TASK-OPERATION-ACCESS-MODE ((task mutation-task) (operation t))
76
(defparameter *import-field-length-limit* 512)
78
(defgeneric repository-mutate (repository source media-type &key agent context provenance skolemize method)
80
"Accept a dataset source, convert it into quads and add/remove them to/from a repository
81
as per the given method.
82
Arrange for cooperating threads to do this in order to satisfy transaction restrictions:
83
term interning and index modifications must be in distinct threads.
84
This operator implemented the 1-1-2 pattern, above.
87
:put with quad content celars the entire repositry
88
:put with triple content clears just the indicated graph")
90
(:method ((repository repository) (source pathname) (content-type t) &rest args)
91
(with-open-file (input source :direction :input)
92
(apply #'repository-mutate repository input content-type args)))
94
;;;!!! needs to establish a transaction and then segment and mutate as per part
95
(:method ((repository repository) (source stream) (media-type mime:multipart/*)
97
(declare (ignore args))
98
(error "NYI mime:multipart/*"))
100
(:method ((repository repository) (source stream) (media-type t)
105
(skolemize (spocq.i::skolemize-insertions-p)))
106
(declare (ignore provenance))
107
(setf method (intern (string-upcase method) :keyword))
108
(let* ((id-channel (sb-concurrency:make-mailbox :name "id-fields"))
109
(mutate-thread (bt:make-thread #'(lambda ()
110
(let ((*package* (find-package :spocq.i)))
111
(repository-mutate-field repository id-channel :agent agent)))))
115
(with-open-repository ("system/null") ;;can be read-only - just for terms
116
(labels ((accept-quad (quad)
117
(when (>= count *import-field-length-limit*)
118
(accept-field (nreverse quads))
123
(accept-field (term-object-statements)
124
(unless (bt:thread-alive-p mutate-thread)
125
(return-from :mutate nil))
126
;;!! interim solution: rely on rdfcache to intern terms
127
(let ((term-id-field (repository-intern-statements *transaction* term-object-statements
130
(sb-concurrency:send-message id-channel (list method term-id-field)))))
131
(declare (dynamic-extent #'accept-quad))
134
(:put (setf method :post)
135
(sb-concurrency:send-message id-channel (list :delete :all))))
136
(call-with-n-quads #'accept-quad source media-type)
137
(when (> count 0) ;; pending quads...
138
(accept-field (nreverse quads))))
139
(sb-concurrency:send-message id-channel nil)))))
140
(let ((result (bt:join-thread mutate-thread)))
142
(condition (signal result))
146
(defgeneric repository-mutate-field (repository source &key)
148
"Accept a stream of mutation operations, perform them on the given repository
149
and return the modified statement count")
151
;; in general use, this is not invoked from the service api as the spocq.si access path
152
;; defines a method which uses the external import program
153
(:method ((repository rdfcache-repository) (source sb-concurrency:mailbox)
154
&key ((:agent *agent*) *agent*))
155
"A repository which relies on rdfcache must establish a task environment to
156
create its transaction and then us rdfcache operators to insert/delete
157
according to given arguments."
158
(handler-case (let ((task (make-instance 'mutation-task :repository repository :agent *agent*)))
159
(with-task-environment (:task task :abnormal-disposition :abort :normal-disposition :commit)
160
(loop for (operation data) = (sb-concurrency:receive-message source)
161
;;do (print (list operation data))
162
until (null operation)
164
(array (ecase operation
165
(:post (repository-insert-field *transaction* data))
166
(:delete (repository-delete-field *transaction* data)))
167
(array-dimension data 0))
168
(keyword (ecase operation
169
(:delete (repository-clear-graph *transaction* data)
173
(log-warn "repository-mutate-field: field operation signaled error: ~a" c)
174
(return-from repository-mutate-field c))))
176
(:method ((repository lmdb-repository) (source sb-concurrency:mailbox)
177
&key ((:agent *agent*) *agent*))
178
"A direct lmdb repository establishes a direct transaction context, performs
179
insert/delete operations directly and returns the modified statement count
181
Failure returns the condition."
183
(let ((rlmdb:repository (repository-lmdb-repository repository))
184
(*break-on-signals* 'error))
185
(labels ((mutate-repository ()
186
(let ((count (loop for (operation data) = (sb-concurrency:receive-message source)
187
until (null operation)
188
sum (rlmdb:repository-mutate-field rlmdb:repository lmdb:*transaction* operation data))))
190
(lmdb:with-transaction ((transaction (lmdb:make-transaction rlmdb:repository :flags 0))
191
:initial-disposition :begin
192
:normal-disposition :commit
193
:error-disposition :abort)
194
(mutate-repository))))
196
(log-warn "repository-mutate-field: field operation signaled error: ~a" c)
197
(return-from repository-mutate-field c)))))
199
(defmethod rlmdb:enter-transaction ((transaction rlmdb:transaction) (disposition t))
200
"timeout write transaction start"
201
(if (logior (rlmdb:transaction-flags transaction) liblmdb:+rdonly+)
203
(bt:with-timeout ((rlmdb:transaction-timeout transaction)) (call-next-method))))
207
(defmethod repository-insert-field ((transaction lmdb-transaction) (solution-field array))
208
(rlmdb:repository-mutate-field (repository-lmdb-repository (transaction-repository transaction))
209
(transaction-lmdb-transaction transaction)
212
(defmethod repository-delete-field ((transaction lmdb-transaction) (solution-field array))
213
(rlmdb:repository-mutate-field (repository-lmdb-repository (transaction-repository transaction))
214
(transaction-lmdb-transaction transaction)
217
(defgeneric call-with-n-quads (operator source media-type)
219
"Parse the the content as per the media type.
220
Pass the decoded (but not interned) statement input, segmented into quads,
222
Non-quad media is transformed into a quad/triple stream for parsing and delivery.")
224
(:method ((operator t) (source pathname) (media-type t))
225
(with-open-file (stream source :direction :input)
226
(call-with-n-quads operator stream media-type)))
228
(:method ((operator t) (source stream) (media-type mime:application/n-triples))
229
(call-with-n-quads operator source mime:application/n-quads))
231
(:method ((operator t) (source stream) (media-type mime:application/n-quads))
232
"Continue with the respective parsed statements modeled as triple/quad instances
233
depending on whether the graph term was present."
234
(loop for count from 0
235
for line = (read-line source nil nil)
237
when (find-if (complement #'whitespace-p) line)
238
do (let ((terms (parse-nquads-statement line)))
240
(funcall operator (cons (ecase (length terms)
241
(3 'spocq.a:|triple|)
242
(4 'spocq.a:|quad|)) terms))
243
(spocq.e:request-error "repository-import: syntax error (~a) @~s: ~s"
247
finally (return count)))
249
(:method ((operator t) (source stream) (media-type t))
250
(flet ((do-call (translated-stream)
251
(call-with-n-quads operator translated-stream mime:application/n-quads)))
252
(declare (dynamic-extent #'do-call))
253
(call-with-n-quads-stream #'do-call source media-type))))
257
(defgeneric call-with-n-quads-stream (operator source media-type)
259
"Transform the stream from the given media type into nquads and invoke the given
260
operator with it as the single argument.")
262
(:method ((operator t) (source stream) (media-type mime:application/n-quads))
264
(funcall operator source))
266
(:method ((operator t) (source stream) (media-type mime:application/n-triples))
268
(funcall operator source))
270
(:method ((operator t) (source stream) (media-type mime:application/trig))
271
(let* ((process (run-program spocq.i::*executable-pathname.rapper*
272
`("-q" "-i" "trig" "-o" "nquads" "/dev/stdin" "-")
273
:environment () ;; isolate rapper from dydra libraries
277
(input (when process (run-program-output process))))
278
(unwind-protect (when input (funcall operator input))
279
(when process (run-program-close process)))))
281
(:method ((operator t) (source stream) (media-type mime:text/turtle))
282
(let* ((process (run-program spocq.i::*executable-pathname.rapper*
283
`("-q" "-i" "turtle" "-o" "ntriples" "/dev/stdin" ,(spocq.i::iri-lexical-form *base-iri*))
284
:environment () ;; isolate rapper from dydra libraries
288
(input (when process (run-program-output process))))
289
(unwind-protect (when input (funcall operator input))
290
(when process (run-program-close process)))))
292
(:method ((operator t) (source stream) (media-type mime:application/xhtml+xml))
293
(let* ((process (run-program spocq.i::*executable-pathname.rapper*
294
`("-q" "-i" "rdfa" "-o" "ntriples" "/dev/stdin" "-")
295
:environment () ;; isolate rapper from dydra libraries
299
(input (when process (run-program-output process))))
300
(unwind-protect (when input (funcall operator input))
301
(when process (run-program-close process)))))
303
(:method ((operator t) (source stream) (media-type mime:application/rdf+json))
304
(let* ((process (run-program spocq.i::*executable-pathname.rapper*
305
`("-q" "-i" "json" "-o" "ntriples" "/dev/stdin" "-")
306
:environment () ;; isolate rapper from dydra libraries
310
(input (when process (run-program-output process))))
311
(unwind-protect (when input (funcall operator input))
312
(when process (run-program-close process)))))
314
(:method ((operator t) (source stream) (media-type mime:application/rdf+xml))
315
(let* ((process (run-program spocq.i::*executable-pathname.rapper*
316
`("-q" "-i" "rdfxml" "-o" "ntriples" "/dev/stdin" "-")
317
:environment () ;; isolate rapper from dydra libraries
321
(input (when process (run-program-output process))))
322
(unwind-protect (when input (funcall operator input))
323
(when process (run-program-close process)))))
325
(:method ((operator t) (source stream) (media-type mime:application/ld+json))
326
(let* ((process (run-program spocq.i::*executable-pathname.jsonld* `(*output-format.jsonld*
328
"--uri" ,(spocq.i::iri-lexical-form *base-iri*)
330
:environment () ;; isolate from dydra libraries
333
(input (when process (run-program-output process))))
334
(unwind-protect (when input (funcall operator input))
335
(when process (run-program-close process)))))
339
(defmethod make-rdfcache-transaction ((instance bitemporal-transaction) id repository-id &rest args
340
&key revision-id read-only-p operation)
341
(declare (ignore revision-id operation read-only-p))
342
(call-next-method instance id repository-id
344
;; maybe coerce the operation ?
347
(defmethod transaction-close ((transaction bitemporal-transaction) (disposition (eql :commit))) ;;; (break "committing")
348
(when-transaction-record (%record transaction)
349
(case (rdfcache:transaction-status %record)
351
(transaction-close transaction :abort)
352
;; skip (rdfcache::commit-transaction %record)
353
(setf (rdfcache:transaction-status %record) :committed) ; did not seem to be the case
354
(transaction-write-event transaction)
355
(lmdb:commit-transaction (transaction-lmdb-transaction transaction))))
356
(setf (transaction-end-time transaction) (get-universal-time))
357
(transaction-get-record-state transaction)
358
(trace-transaction 'transaction-close %record)))
360
;; needs to be an lmdb-transaction in order to get the ordinal
361
(defmethod transaction-write-event ((transaction bitemporal-transaction))
362
"Override the lmdb- method, to generate the transaction event from the instance itself,
363
since the rdfcache transaction is not used to perform the write."
364
(let* ((revision-id (transaction-id transaction))
365
(repository (transaction-repository transaction))
366
(agent (when *task* (task-agent *task*)))
367
(agent-id (agent-name agent))
368
(agent-tag (when *task* (task-user-tag *task*)))
369
(uuid (transaction-id transaction))
370
(ordinal (transaction-next-ordinal transaction))
371
(end-timestamp (spocq.e::unix-now))
372
(start-timestamp (transaction-timestamp transaction))
373
(inserted (transaction-inserted-count transaction))
374
(removed (transaction-deleted-count transaction)))
375
(rlmdb:put-repository-metadata repository
377
;; carry over the current transaction ordinal
380
(store-transaction-event :revision-id revision-id
381
:task-id (or (task-id *task*) revision-id)
382
:timestamp-start start-timestamp
383
:timestamp end-timestamp
384
:repository repository
389
(let ((mr (rlmdb.i::make-metadata-record :timestamp end-timestamp
392
(rlmdb:put-metadata-record transaction mr))
393
(when (repository-is-revisioned repository)
394
(let* ((rlr (rlmdb.i::make-revision-log-record :uuid uuid
396
:timestamp-begun start-timestamp
397
:timestamp end-timestamp
398
:inserted-count inserted
399
:removed-count removed)))
400
(rlmdb:put-revision-record transaction rlr)))))
404
alternative situations
405
- read from an import stream the terms would be lines of quad/triple terms.
406
if the library interns nquads encoding, those shoold be provided directly
407
rather then first parsed.
408
- if the library interns term structures, the terms can be parsed, instantiated
409
and converted or they could be parsed directly to term structs.
410
- during hdt processing the terms will be canonical strings
412
this indicates that the passed argument should be an external array of either
413
external term strings or external term records.
415
;;; interim solution uses rdfcache
416
;;; needs to open its own rdfcache transaction
417
(defmethod repository-intern-statements ((transaction lmdb-transaction) quad-or-triple-data &rest args
419
(declare (ignore args))
425
(x-record (when transaction (transaction-record transaction))))
426
(labels ((map-node (node)
427
(rest (or (assoc node node-map)
428
(first (push (cons node (cons-global-blank-node :transaction x-record))
432
((nil :undef) (error "repository-intern-statements: invalid term: ~s." term))
433
(|urn:dydra|:|all| *true-all-context-term-number*)
434
(|urn:dydra|:|default| *true-default-context-term-number*)
435
(|urn:dydra|:|named| *true-named-context-term-number*)
436
(t (cond ((undistinguished-variable-p term)
437
(rlmdb:value-term-number (map-node term)))
438
((spocq:blank-node-p term)
440
(rlmdb:value-term-number (map-node term))
441
(rlmdb:value-term-number term)))
443
(rlmdb:value-term-number term))
445
(error "Invalid term: ~s." term)))))))
446
(flet ((intern-triples (triples)
447
(let ((field (make-page (length triples) 3)))
448
(loop for statement in triples
449
for (s p o) = (triple-terms statement)
451
do (setf (aref field index 0) (map-term s)
452
(aref field index 1) (map-term p)
453
(aref field index 2) (map-term o)))
455
(intern-cspos (cspos)
456
(let ((field (make-page (length cspos) 4)))
457
(loop for statement in cspos
458
for (c s p o) = (quad-terms statement)
460
;;!!! rdf 1.1 intends to allow blank nodes for graph terms
461
;;!!! a. optionally skolemize
462
;;!!! b. exclude other than iri and blank node
463
do (progn (setf (aref field index 0) (map-term s)
464
(aref field index 1) (map-term p)
465
(aref field index 2) (map-term o)
466
;; no blank node allowed
467
(aref field index 3) (rlmdb:value-term-number c))))
469
(intern-quads (quads)
470
(let ((field (make-page (length quads) 4)))
471
(loop for statement in quads
472
for (s p o c) = (quad-terms statement)
474
do (progn (setf (aref field index 0) (map-term s)
475
(aref field index 1) (map-term p)
476
(aref field index 2) (map-term o)
477
;; no blank node allowed
478
(aref field index 3) (rlmdb:value-term-number c))))
481
(destructuring-bind (graph triples) (rest form)
482
(let ((field (make-page (length triples) 4))
483
;; no blank node allowed
484
(graph-id (rlmdb:value-term-number graph)))
485
(loop for statement in triples
486
for (s p o) = (triple-terms statement)
488
;; place in the same order an sexp-quad
489
do (setf (aref field index 0) (map-term s)
490
(aref field index 1) (map-term p)
491
(aref field index 2) (map-term o)
492
(aref field index 3) graph-id))
494
(if (graph-form-p quad-or-triple-data)
495
(push quad-or-triple-data graphs)
496
(dolist (expression quad-or-triple-data)
497
(cond ((graph-form-p expression)
498
(push expression graphs))
499
((triple-form-p expression)
500
(push expression triples))
501
((quad-form-p expression)
502
(push expression quads))
504
(push expression cspos))
506
(error "Invalid statement: ~s." expression)))))
507
(let ((interned (append (when triples (list (intern-triples (reverse triples))))
508
(when quads (list (intern-quads (reverse quads))))
509
(when cspos (list (intern-cspos (reverse cspos))))
510
(mapcar #'intern-graph (reverse graphs)))))
511
(values (if (rest interned)
512
(coerce interned 'vector)
517
;;; generate a u32:term database
518
;;; ? what is the size
519
;;; ? how much slower is access
521
;;; /srv/dydra/storage/strings.mdb
522
;;; (decode-db-name "736861313a753332") : "sha1:u32"
523
;;; (decode-db-name "7533323a63737472") : "u32:cstr"
524
;;; (decode-db-name "7533323a73686131") : "u32:sha1"
525
;;; /srv/dydra/storage/terms : vector of term records
526
;;; /srv/dydra/storage/terms.mdb : sha1:u32
528
;;; (with-open-transaction ("test/test-revisioned-repository" :revision-id "HEAD" :normal-disposition :commit) *transaction*)
529
(defmethod repository-make-transaction ((repository lmdb-repository) &rest args &key
531
(revision-class (repository-revision-class repository))
532
(revision (if revision-id
533
(repository-revision revision-id :reference repository :revision-class revision-class)
534
(repository-revision repository :revision-class revision-class)))
536
"Given any form of lmdb repository (other than rdfcache-lmdb-repository, delegate to its revision"
537
(declare (dynamic-extent args))
538
(apply #'repository-make-transaction revision
539
:repository-id (repository-id repository)
543
;;; interface used from the http api
544
(defmethod repository-clear-graph ((repository lmdb-repository) (graph-designator t) &rest args)
545
(declare (dynamic-extent args))
547
(apply #'repository-clear-graph *transaction* graph-designator args)
548
;; even though writing, rdfcache transaction must be read-only
549
(with-open-transaction ((repository-id repository) :revision-id "HEAD" :normal-disposition :abort :read-only-p t)
550
(apply #'repository-clear-graph *transaction* graph-designator args))))
551
;;; (with-open-transaction ("test/test-revisioned-repository" :revision-id "HEAD" :normal-disposition :commit) *transaction*)
553
(defmethod repository-clear-graph ((transaction lmdb-transaction) (graph-designator t) &key if-does-not-exist)
554
"Use the transaction to register graph modification, but also establish
555
a native lmdb transaction to cover the changes.
556
NB. rlmdb:clear-repository does the wrong thing for revisioned repositories.
557
it is used by delete statements for wildcard patters for non-revisioned repos"
558
(let ((rlmdb:repository (repository-lmdb-repository transaction)))
559
(typecase graph-designator
560
((or keyword boolean)
561
(lmdb:with-transaction ((transaction (lmdb:make-transaction rlmdb:repository :flags 0))
562
:initial-disposition :begin :normal-disposition :commit
563
:error-disposition :abort)
564
(or (ecase graph-designator
565
((nil) ;; clear the repostory including metadata
566
(rlmdb:clear-repository rlmdb:repository :type t))
568
(when (plusp (rlmdb::repository-delete-statements rlmdb:repository rlmdb:*default-context-number* 0 0 0))
569
(setf (transaction-graph-id-modified transaction rlmdb:*default-context-number*) t)
572
;; do not just clear all index databases
573
;; revisioned dbs must mark, not clear (rlmdb:clear-repository rlmdb:repository :type 'rlmdb::index-database)
574
(plusp (rlmdb::repository-delete-statements rlmdb::repository 0 0 0 0)))
576
;;;!!! this does not work as it creates one transaction for each graph
578
(do-repository-contexts (context :transaction transaction :default nil)
579
(rlmdb::repository-delete-statements rlmdb::repository context 0 0 0)
580
(setf (transaction-graph-id-modified transaction context) t)
583
(ecase if-does-not-exist
584
(:error (spocq.e::graph-not-found-error
585
:identifier (ecase graph-designator
586
((nil :default |urn:dydra|:|default|) |urn:dydra|:|default|)
587
((:all t |urn:dydra|:|all|) |urn:dydra|:|all|)
588
((:named |urn:dydra|:|named|) |urn:dydra|:|named|))
589
:operation 'repository-clear-graph))
592
(case graph-designator
593
(|urn:dydra|:|default| (repository-clear-graph transaction :default :if-does-not-exist if-does-not-exist))
594
(|urn:dydra|:|named| (repository-clear-graph transaction :named :if-does-not-exist if-does-not-exist))
595
(|urn:dydra|:|all| (repository-clear-graph transaction :all :if-does-not-exist if-does-not-exist))
597
(let ((graph-term-number (rlmdb:value-term-number graph-designator)))
598
(cond ((plusp (rlmdb::repository-delete-statements rlmdb:repository graph-term-number 0 0 0))
599
(setf (transaction-graph-id-modified transaction graph-term-number) t)
602
(ecase if-does-not-exist
603
(:error (spocq.e::graph-not-found-error :identifier graph-designator :operation 'repository-clear-graph))
606
(ecase if-does-not-exist
607
(:error (spocq.e::invalid-graph-error :identifier graph-designator :operation 'repository-clear-graph))