Coverage report: /development/source/library/org/datagraph/spocq-shard/src/store/rdfcache/transaction-management.lisp
| Kind | Covered | All | % |
| expression | 310 | 489 | 63.4 |
| branch | 10 | 24 | 41.7 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
5
;;; collect all transaction-related functions and control operations in one place
7
(defparameter *transaction-trace-output* nil)
9
(defparameter *class.transaction* 'rdfcache-transaction)
12
;;; macros : in core;utilities.lisp
15
(progn ; remain in core;classes.lisp
16
(defmacro with-task ((variable &rest args) &body body)
18
`(flet ((,op (,variable)
19
(declare (ignorable ,variable))
21
(declare (dynamic-extent #',op))
22
(call-with-task #',op ,@args))))
24
(defmacro with-task-environment ((&key (query '*query*) (task query) normal-disposition abnormal-disposition)
26
"Bind the given query instance to the dynamic *query* variable, access and bind it's state variables,
27
and execute the body in this dynamic context."
29
`(flet ((,op () ,@body))
30
(declare (dynamic-extent #',op))
31
(call-with-task-environment #',op ,task
32
,@(when normal-disposition `(:normal-disposition ,normal-disposition))
33
,@(when abnormal-disposition `(:abnormal-disposition ,abnormal-disposition))))))
38
(defgeneric repository-make-transaction (repository &rest args)
39
(:method ((repository rdfcache-repository) &rest args &key
41
(revision-class (repository-revision-class repository))
42
(revision (if revision-id
43
(repository-revision revision-id :reference repository :revision-class revision-class)
44
(repository-revision repository :revision-class revision-class)))
46
(declare (dynamic-extent args))
47
(apply #'repository-make-transaction revision
48
:repository-id (repository-id repository)
51
(:method ((revision repository-revision) &rest args &key (operation 'spocq.a:|select|) &allow-other-keys)
52
(declare (dynamic-extent args))
53
(assert (or (operation-read-only-p operation)
54
(repository-revision-mutable-p revision)) ()
55
"Attempt to perform an update operation (~a) on an immutable revision: ~s: ~s: ~s."
57
(repository-revision-id revision)
58
(repository-revision-id (repository-revision-reference revision)))
59
(apply #'make-instance (repository-transaction-class revision)
64
(defgeneric repository-make-transaction (repository &rest args)
65
(:method ((repository rdfcache-repository) &rest args &key
67
(revision-class (repository-revision-class repository))
69
(declare (dynamic-extent args))
70
(apply #'repository-make-transaction
71
(make-instance revision-class :reference repository :revision-id revision-id)
74
(:method ((revision repository-revision) &rest args &key (operation 'spocq.a:|select|) &allow-other-keys)
75
(declare (dynamic-extent args))
76
(assert (or (operation-read-only-p operation)
77
(revision-mutable-p revision)) ()
78
"Attempt to modify an immutable revision: ~s." revision)
79
(apply #'make-instance 'rdfcache-transaction
84
(defgeneric repository-open-transaction (repository transaction)
86
(:method ((revision rdfcache-repository-revision) transaction)
87
(prog1 (transaction-open transaction)
88
(revision-uri revision)))
89
(:method repository-open-transaction ((repository repository) (transaction transaction))
90
(repository-open-transaction (transaction-revision transaction) transaction)))
93
(defgeneric repository-close-transaction (repository transaction disposition)
94
(:method :around (repository transaction disposition)
95
(trace-when *trace-output* "~%~s" (list :rct-before repository transaction disposition
96
(transaction-end-time transaction)))
97
(when-transaction-record (%record transaction)
98
(trace-when *trace-output* "~%~s" (list :rct-status (rdfcache:transaction-status %record))))
100
(trace-when *trace-output* "~%~s" (list :rct-after repository transaction disposition
101
(transaction-end-time transaction)))))
103
(defgeneric transaction-get-record-state (transaction)
104
(:method ((transaction transaction))
106
(:method ((transaction rdfcache-transaction))
107
(when-transaction-record (%record transaction)
108
(setf (transaction-delete-count transaction) (rdfcache:transaction-delete-count %record)
109
(transaction-insert-count transaction) (rdfcache:transaction-insert-count %record)))))
111
(defgeneric transaction-parent-p (transaction)
112
(:documentation "Given a transaction, return true iff it was created based on some other
113
transaction. If no predecessor transaction exists, then the repository is empty and
114
some operations ca be suppressed.")
115
(:method ((transaction rdfcache-transaction))
116
(rdfcache::transaction-parent-revision-p (transaction-record transaction))))
118
(defgeneric transaction-parent-revision-id (transaction)
119
(:documentation "Given a transaction, return true iff it was created based on some other
120
transaction. If no predecessor transaction exists, then the repository is empty and
121
some operations ca be suppressed.")
122
(:method ((transaction rdfcache-transaction))
123
(let ((%xaction (transaction-record transaction)))
124
(when (rdfcache::transaction-parent-revision-p %xaction)
125
(rdfcache::transaction-parent-revision-uuid-string %xaction)))))
127
(defmethod transaction-status ((transaction rdfcache-transaction))
128
(let ((record (transaction-record transaction)))
129
(when (typep record 'cffi:foreign-pointer)
130
(rdfcache:transaction-status record))))
132
(defmethod transaction-open ((transaction rdfcache-transaction) &key (if-does-not-exist :error))
133
(declare (ignore if-does-not-exist))
134
(when-transaction-record (%record transaction :error-p transaction-open)
135
(case (rdfcache::transaction-status %record)
136
((:initialized :uninitialized)
137
;; (rdfcache:print-transaction %record)
138
(rdfcache:begin-transaction %record)
139
(trace-transaction 'transaction-open %record)
140
;; generate the uri proactively
141
(transaction-uri transaction)
142
(revision-uri transaction)
143
(log-debug "transaction-open: transaction: ~a" transaction)
145
((:begun :begin :mutated) t)
147
;; in any other state attempt to cancel the thread. this is observed when a compilation
148
;; step fails for one node, 'simultaneously' with others just starting out
149
;; if that returns, which means there is nothing prepared for this, signal an error
150
(log-warn "transaction-open: open a completed transaction: ~a" transaction)
151
(cancel-thread (bt:current-thread))
152
(backtrace-thread (bt:current-thread))
153
(error "transaction-open: no cancel-thread handler: status ~a" (rdfcache::transaction-status %record))))))
155
(defmethod transaction-open :after ((transaction transaction) &key if-does-not-exist)
156
(declare (ignore if-does-not-exist))
158
(channel-put (task-transactions *task*) transaction)))
162
(defmethod transaction-close :around (transaction disposition)
163
(trace-when *trace-output* "~%~s" (list :rct-before transaction disposition
164
(transaction-end-time transaction)))
165
(when-transaction-record (%record transaction)
166
(trace-when *trace-output* "~%~s" (list :rct-status (rdfcache:transaction-status %record))))
168
(trace-when *trace-output* "~%~s" (list :rct-after transaction disposition
169
(transaction-end-time transaction))))
171
(defmethod transaction-close :before ((transaction rdfcache-transaction) (disposition t))
172
(log-info "transaction-close: (~a) ~a" transaction disposition))
174
(defmethod transaction-close ((transaction rdfcache-transaction) (disposition (eql :abort)))
175
;; There is no locking here, as the generic function handles it.
176
(when-transaction-record (%record transaction)
177
(block :abort-transaction
178
;; try to handle these more delicately if the transaction is being closed
179
;; than in the standard exit-on-error case
180
(handler-bind ((DYDRA-NDK:INPUT-OUTPUT-ERROR
183
(log-debug "transaction-close: (~a) ~a" (type-of c) c)
185
(return-from :abort-transaction))))
186
(case (rdfcache:transaction-status %record)
188
(rdfcache::abort-transaction %record)
189
(setf (rdfcache:transaction-status %record) :aborted)
190
(log-debug "transaction-close: (~a) aborted" transaction)))))
191
;; no end time if aborted
192
;; !! no copy out any useful data and destroy it
193
;; !! make sure it is locked
194
(trace-transaction 'repository-close-transaction.abort %record)
195
(transaction-get-record-state transaction)))
197
(defmethod transaction-close ((transaction t) (disposition (eql :begun)))
198
(transaction-close transaction :begin))
200
(defmethod transaction-close ((transaction rdfcache-transaction) (disposition (eql :begin)))
201
(let ((record (transaction-record transaction)))
203
(error "Attempt to begin a closed transaction: ~a." transaction))))
205
(defmethod transaction-close ((transaction rdfcache-transaction) (disposition (eql :commit))) ;;; (break "committing")
206
(when-transaction-record (%record transaction)
207
(case (rdfcache:transaction-status %record)
209
(rdfcache::commit-transaction %record)
210
(setf (rdfcache:transaction-status %record) :committed) ; did not seem to be the case
211
(transaction-write-event transaction)))
212
(setf (transaction-end-time transaction) (get-universal-time))
213
(transaction-get-record-state transaction)
214
(trace-transaction 'transaction-close %record)))
216
(defgeneric transaction-write-event (transaction)
217
(:method ((transaction t))
218
;; a stub until rlmdb is defined
223
(defmethod transaction-close ((transaction rdfcache-transaction) (disposition null))
224
"given a non-specific disposition, commit write transactions and abort reads"
225
(transaction-close transaction (if (transaction-read-only-p transaction) :abort :commit)))
228
(defmethod transaction-close ((transaction t) (disposition (eql :continue)))
229
;; no end time if continued -- this applies to read-only transactions
232
(defmethod transaction-modified-p ((transaction rdfcache-transaction))
233
(when-transaction-record (%record transaction :error-p transaction-modified-p)
234
(case (rdfcache::transaction-status %record)
239
(defgeneric transaction-open-p (transaction)
240
(:method ((transaction rdfcache-transaction))
241
(when-transaction-record (%record transaction :error-p nil)
242
(case (rdfcache::transaction-status %record)
243
((:begun :begin :mutated) t)
245
(:method ((no-transaction null))
248
(defmethod compute-transaction-uri ((transaction rdfcache-transaction))
249
(when-transaction-record (%record transaction)
250
(let ((id (rdfcache:transaction-uuid-string %record))
251
(*strict-vocabulary-terms* nil)) ; each is unique
253
(compute-transaction-uri id)))))
255
(defmethod compute-transaction-uri ((transaction-id string))
256
(intern-iri (format nil "urn:dydra:transaction:~a" transaction-id)))
258
(defgeneric make-rdfcache-transaction (instance id repository-id &key revision-id read-only-p operation)
259
(:method ((instance rdfcache-transaction) id repository-id &key revision-id read-only-p operation)
260
(rdfcache:%make-transaction id repository-id
261
;; 2017-09-13 @atomgraph.dydra.com dydra-ndk required lower case
262
:revision-id (if (equalp revision-id "HEAD")
263
(string-upcase revision-id)
264
(string-downcase revision-id))
265
:read-only-p read-only-p
266
:operation operation)))
268
(defmethod initialize-instance ((instance rdfcache-transaction) &rest initargs &key
270
(id (make-internal-task-id))
272
(repository-id (repository-id (repository-revision-reference revision)))
273
(repository (repository-revision-reference revision))
274
(revision-id (repository-revision-id revision))
276
(read-only-p (if operation (operation-read-only-p operation) t)))
277
;; nb. the transaction uses the abstract id, not the revision id
278
(declare (ignore api-key)) ; eventually pass that to rdfcache
279
(log-debug "initialize-instance (~a): ~s"
280
(class-of instance) initargs)
281
(let* (;;(reference-repository-id (repository-id (repository-revision-reference revision)))
282
(%record (make-rdfcache-transaction instance
284
;; 2017-09-13 @atomgraph.dydra.com dydra-ndk required lower case
285
:revision-id revision-id
286
:read-only-p read-only-p
287
:operation operation)))
288
;; double-check: was observed that an instance had nil as record
289
(assert-argument-type initialize-instance %record cffi:foreign-pointer)
290
(trace-transaction 'initialize-instance.rdfcache-repository %record
291
(if read-only-p :read-only :read-write))
292
(log-debug "initialize-instance (~a): id: ~a, record: ~a [~a]"
296
(with-output-to-string (stream) (rdfcache:print-transaction %record stream)))
297
(setf-transaction-record %record instance))
298
(apply #'call-next-method instance
300
:repository-id repository-id
301
:repository repository
302
:revision-id revision-id
303
:read-only-p read-only-p
306
(defgeneric probe-transaction (designator)
307
(:method ((designator string))
308
(let* ((revision-id (rdfcache:resolve-repository designator))
309
(uuid (make-task-id))
310
(%record (rdfcache:%make-transaction uuid designator
311
:revision-id (string-downcase revision-id)
315
(rdfcache:begin-transaction %record)
317
(rdfcache::abort-transaction %record)
318
(rdfcache:%destroy-transaction %record))))))
319
;;; (time (dotimes (x 100000) (probe-transaction "james/test"))) => 15.796/32.548
321
(defmethod destroy-transaction ((transaction rdfcache-transaction))
322
;; There is no locking here, as the generic function handles it.
323
(log-info "destroy transaction: ~s ~s."
325
(when-transaction-record (%record transaction)
326
(log-info "destroy-transaction: status: ~a" (rdfcache:transaction-status %record))
327
(setf-transaction-record nil transaction)
328
(case (rdfcache:transaction-status %record)
329
((:begun :mutated :initialized) (rdfcache::abort-transaction %record)))
330
(rdfcache:%destroy-transaction %record)))
333
(defmethod clone-transaction ((transaction rdfcache-transaction) &key api-key task-id)
334
(let ((old-record (transaction-record transaction)))
335
(assert-argument-type clone-transaction old-record cffi:foreign-pointer)
336
(let ((new-record (rdfcache:clone-transaction old-record task-id))
337
(new-transaction (make-instance 'rdfcache-transaction
338
:revision-id (transaction-revision-id transaction)
339
:repository-id (repository-id transaction)
340
:operation (transaction-operation transaction)
341
:api-key api-key :task-id task-id)))
342
(assert-argument-type clone-transaction new-record cffi:foreign-pointer)
343
(setf (rdfcache:transaction-status new-record) :initialized)
344
(log-trace "clone transaction: ~s ~s -> ~s."
345
*task* transaction new-transaction)
349
(defun repository-call-in-transaction (op repo xaction &rest args)
350
(log-warn "repository-call-in-transaction is deprecated!!!")
351
(apply #'call-with-revision-transaction op repo xaction args))
353
(defgeneric call-with-revision-transaction (operator revision transaction &key normal-disposition abnormal-disposition
355
(:documentation "Invoke the given operator, a function of one argument, the transaction, in a dynamic context
356
which closes the given transaction with the specified disposition,
357
either normal-disposition or abnormal-disposition, based whether the operation completes or effects an unspecific
358
non-local exit. Establishes two restarts for explicit dispositions:
359
abort-transaction : asserts abnormal disposition :abort and performs a non-local return nil.
360
commit-transaction : asserts normal disposition :commit, performs normal return t, which commits the transaction.")
361
(:argument-precedence-order revision transaction operator)
363
(:method ((operator function) (repository repository) (task t) &rest args)
364
(declare (dynamic-extent operator)
365
(dynamic-extent args))
366
(apply #'call-with-revision-transaction operator (repository-revision repository) task args))
368
(:method ((operator function) (revision repository-revision) (task data-task) &rest args)
369
(declare (dynamic-extent operator)
370
(dynamic-extent args))
371
;; continue with the task's transaction.
372
;; this causes the single transaction to be re-used by all operators of a given task
373
(apply #'call-with-revision-transaction operator revision (task-transaction task) args))
375
(:method ((operator function) (revision repository-revision) (transaction null) &rest args
376
&key id (read-only-p t) normal-disposition abnormal-disposition)
377
(declare (dynamic-extent operator))
378
(declare (dynamic-extent args))
379
(declare (ignore normal-disposition abnormal-disposition))
380
(let ((transaction (repository-make-transaction revision
381
:api-key (task-api-key *task*)
382
:id (or id (task-id *task*) (make-internal-task-id))
383
:read-only-p read-only-p)))
384
(unwind-protect (apply #'call-with-revision-transaction
385
operator revision transaction
387
(destroy-transaction transaction))))
389
(:method ((operator function) (*repository* repository-revision) (transaction transaction)
390
&rest args &key id normal-disposition abnormal-disposition)
391
"once the context arguments have been resolved, call the function with the context dynamically bound.
392
nb. this establishes the handlers even if nested, as the update clauses use nested transactions
393
specifically for commit. one could check the disposition nesting, but that would be more complex."
394
(declare (dynamic-extent operator))
395
(declare (dynamic-extent args))
396
(declare (ignore id))
397
(let ((*wildcard-identifier* (repository-wildcard-term (repository-revision-reference *repository*))))
398
(trace-algebra call-with-revision-transaction "entry: normal" normal-disposition " abmormal" abnormal-disposition)
399
(apply #'call-with-open-transaction operator transaction args))))
401
(defparameter *serialization-locks* (make-hash-table :test #'equal))
403
(defmethod call-with-open-transaction ((operator function) (repository repository) &rest args
405
(api-key (task-api-key *task*))
406
(id (make-internal-task-id))
408
(read-only-p t) ; given no other information
411
;; resolve the repository to its most recent revision and continue with that
412
;; is serialize is specified use a repo-specific lock ti limit access
413
(declare (dynamic-extent args))
414
(cond ((and *transaction*
415
(eql (repository-id *transaction*) (repository-id repository))
416
(or (not (transaction-read-only-p *transaction*))
418
(funcall operator *transaction*))
420
(let ((transaction (repository-make-transaction repository
422
:repository repository
423
:repository-id (repository-id repository)
424
:revision-id (or revision-id (rdfcache:resolve-repository (repository-id repository)))
425
:read-only-p read-only-p
427
:operation operation)))
428
(unwind-protect (if serialize
429
(let ((lock (bt:with-lock-held ((repository-lock repository))
430
(or (gethash (repository-id repository) *serialization-locks*)
431
(setf (gethash (repository-id repository) *serialization-locks*)
432
(bt:make-lock (format nil "serialize ~s" (repository-id repository))))))))
433
(bt:with-lock-held (lock)
434
(apply #'call-with-open-transaction operator transaction
436
(apply #'call-with-open-transaction operator transaction
438
(destroy-transaction transaction))))))
440
(defmethod call-with-open-transaction ((operator function) (transaction transaction) &key
441
id api-key operation revision-id if-does-not-exist
443
;; unless it is specified the normal disposition is to continue in order that read threads
444
;; leave the transaction running for the write thread to complete
445
(normal-disposition :continue)
446
(abnormal-disposition :abort)
448
"call the function with transaction the context dynamically bound.
449
nb. this establishes the handlers even if nested, as the update clauses use nested transactions
450
specifically for commit. one could check the disposition nesting, but that would be more complex.
451
nb. this does _not_ rebind the dynamic *transaction* value, but leaves that to the caller. see with-open-transaction"
452
(declare (dynamic-extent operator)
453
(ignore id api-key operation read-only-p revision-id serialize))
454
(trace-algebra call-with-open-transaction *task* (transaction-task-id transaction)
455
"entry: normal" normal-disposition " abnormal" abnormal-disposition)
457
(print (list :call-with-open-transaction *task* (transaction-task-id transaction)
458
"normal" normal-disposition " abnormal" abnormal-disposition))
459
;; commit only those transactions which the current task own
460
;; for scripting, allow for subtasks which would intend to commit, but for which the final commit
461
;; should be deferred until the root task commits and clamp read-only tasks to :continue
462
(let ((normal-disposition (if *task*
463
(if (eq (get-task-transaction *task*) transaction)
464
(if (transaction-read-only-p transaction)
465
(case normal-disposition
470
normal-disposition)))
471
#+(or) (print (list :cwot.1 transaction *transaction*))
472
#+(or) (describe transaction)
474
(multiple-value-prog1 (restart-case (progn (transaction-open transaction :if-does-not-exist if-does-not-exist)
475
#+(or) (print (list :cwot.2 transaction *transaction*))
476
(let ((*transaction* transaction)) ; carry over the lexical to dynamic
477
#+(or) (print (list :cwot.3 transaction *transaction*))
478
(unwind-protect (funcall operator transaction)
479
(setq *transaction* nil))))
480
(abort-transaction (&optional format-control &rest args)
481
(setf normal-disposition nil
482
abnormal-disposition :abort)
484
(log-info "abort transaction: ~a: ~?" transaction format-control args)
485
(log-info "abort transaction: ~a." transaction))
486
(return-from call-with-open-transaction nil))
487
(commit-transaction (&optional format-control &rest args)
488
(setf normal-disposition :commit)
490
(log-info "commit transaction: ~a: ~?" transaction format-control args))
492
(when normal-disposition
493
#+(or) (print (list 'call-with-open-transaction "exit normal: " normal-disposition))
494
(trace-algebra call-with-open-transaction "exit normal: " normal-disposition)
495
(transaction-close transaction normal-disposition)
496
(setf abnormal-disposition nil)))
497
(when abnormal-disposition
498
#+(or) (print (list 'call-with-open-transaction "exit abnormal: " abnormal-disposition))
499
(trace-algebra call-with-open-transaction "exit abnormal: " abnormal-disposition)
500
(transaction-close transaction abnormal-disposition)))))
503
(defmethod call-with-open-transaction (operation (repository repository) &rest args &key
504
(api-key (task-api-key *task*))
505
(id (make-internal-task-id))
508
(warn "unexpected call to (t repository) version of call-with-open-transaction")
509
(apply #'call-with-open-transaction operation
510
(repository-make-transaction repository :api-key api-key :id id :read-only-p read-only-p)
513
#+(or) ;; not valid, as this would permit autonomous revisions, while authorization pertains to repositories/graphs
514
(defmethod call-with-open-transaction ((operator function) (identifier spocq:uuid) &rest args &key api-key operation &allow-other-keys)
515
;; construct a new transaction instance for the given revision and continue with that
516
(declare (dynamic-extent args))
517
(let ((lexical-form (spocq:uuid-lexical-form identifier)))
518
(cond ((string-equal "uuid:" lexical-form :end2 (min (length lexical-form) 5))
519
(setf lexical-form (subseq lexical-form 5)))
520
((string-equal "urn:uuid:" lexical-form :end2 (min (length lexical-form) 9))
521
(setf lexical-form (subseq lexical-form 9))))
522
(apply #'call-with-open-transaction operator
523
(make-instance *class.transaction*
524
:revision-id lexical-form