Coverage report: /development/source/library/org/datagraph/spocq-shard/src/algebra/operators/revision.lisp
| Kind | Covered | All | % |
| expression | 0 | 552 | 0.0 |
| branch | 0 | 28 | 0.0 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
3
;;; for testing on stage with a dev build this file suffices, as the bnf changes are already
4
;;; present. otherwise, it must be loaded from the patches/20160414-revision.lisp patch file
6
;;; (load "patches/20160414-revision/revision.lisp")
7
;;; (sb-ext:save-lisp-and-die "sbcl-spocq.core.patched-for-revisions")
8
;;; mv sbcl-spocq.core.patched-for-revisions sbcl-spocq.core
10
(in-package :org.datagraph.spocq.implementation)
12
(:documentation "This file defines the REVISION operator for the 'org.datagraph.spocq'
16
"Copyright 2016 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved.")
19
"The revision operator involves running a subselect with the specified revision.
20
It permits either a constant iri or a variable. In case the revision is a constant designator,
21
that single revision constitutes the dataset. If it is a variable, that variable then renges over
23
See [1] for the designatos syntax.
25
[1] : http://api.dydra.com/sparql/revisioning.html
29
(defmacro spocq.a:|revision| (revision-designator group-graph-pattern &rest args)
30
"Perform the sub-query respective the revision and return the result field."
31
(apply #'macroexpand-revision revision-designator group-graph-pattern args))
33
(defun macroexpand-revision (revision-designator group-graph-pattern &rest args)
34
"Expand a revision form into a contour which rebinds the current revision the one specified
35
and arranges to run the sub-expression with that target.
36
Allow one of three alternative forms:
37
- constant : given a constant term, interpre that in the current dynamic context.
38
if it is a revision identifier, establish that one for the sub-select. If it is a relative
39
designator, combine that relative to the active revision and continue with that to be resolved
40
to a concrete revision when the sub-select transaction is created.
41
- unbound variable : enumerate the repository revisions and iterate over that set, running the
42
subseqlect once for each revision. Extend the result field to include a binding for the variable .
43
- bound variable : Propagate the values for the variable from source form results, run a sub-select
44
for each binding, extend the results with a binding for the variable and join the revision sub-select
45
results with the source form results.
47
It should arrange to compile the sub-expression in a context which is revision-non-specific,
48
as the argument may be a variable, but that is NYI."
49
;; for now reiterate the options even though the result is the same
50
;; in the second case, at som epoint
51
(cond ((or (stringp revision-designator) (iri-p revision-designator))
52
`(spocq.e:revision ',revision-designator
53
(quote ,(if (select-form-p group-graph-pattern)
55
`(spocq.a:|select| ,group-graph-pattern ,(expression-dimensions group-graph-pattern))))))
56
((variable-p revision-designator)
57
;; if a source field is passed, use that, otherwise invoke the elementary operator
58
;; with the variable sd the argument
59
(destructuring-bind (&optional source-field) args
61
`(spocq.e::revision-join ',revision-designator
62
(quote ,(if (select-form-p group-graph-pattern)
64
`(spocq.a:|select| ,group-graph-pattern ,(expression-dimensions group-graph-pattern))))
66
`(spocq.e:revision ',revision-designator
67
(quote ,(if (select-form-p group-graph-pattern)
69
`(spocq.a:|select| ,group-graph-pattern ,(expression-dimensions group-graph-pattern))))))))
71
(error "Invalid revision designator : ~s" revision-designator))))
74
(defgeneric spocq.e::revision (revision-designator query-expression)
75
(:documentation "Given a revision and a query expression - as an sse,
76
evaluate the sse pattern with the revision as the target.")
78
(:method ((revision-designator t) query-expression)
79
(error "Invalid revision designator: ~s." revision-designator))
81
(:method ((revision-designator spocq:iri) query-expression)
82
(spocq.e::revision (spocq:iri-lexical-form revision-designator) query-expression))
84
(:method ((revision-designator string) query-expression)
85
"given a string revision designator, allow for both typed uuid values and relative values"
86
(cond ((>= (length revision-designator) 1)
87
(cond ((string-equal "urn:uuid:" revision-designator :end2 (min (length revision-designator) 9))
88
(setf revision-designator (subseq revision-designator 9)))
89
((eql (char revision-designator 0) #\~)
90
;; anything which binds the revision runs as a sub-query, which means its resolved
91
;; revision is available from the target repository revision instance
92
(setf revision-designator (concatenate 'string (repository-revision-id *repository*)
93
revision-designator))))
94
(let ((revision (repository-revision revision-designator
95
:reference *repository*
96
:if-does-not-exist nil)))
98
(spocq.e::revision revision query-expression)
99
(null-generator (expression-dimensions query-expression)))))
101
(null-generator (expression-dimensions query-expression)))))
103
(:method ((revision repository-revision) query-expression)
104
"Given a constant revision designator, run a sub-query with that revision as the target"
105
(let ((sub-task (make-query :sse-expression query-expression
106
:id (make-service-task-id)
107
:dynamic-bindings (query-dynamic-bindings *task*)
108
:repository (repository-revision-reference revision)
110
:revision-id (repository-revision-id revision)
111
:user-id (task-user-id *task*)
112
:sparql-expression query-expression
113
:agent (task-agent *task*))))
114
(with-task-environment (:task sub-task)
115
;;(update-repository-revision-id (task-revision sub-task))
116
(initialize-task sub-task)
117
(flet ((retask (&rest args)
118
; interposes an extra thread
119
(query-run-in-thread sub-task args)
120
'spocq.a:|revision|))
121
(push #'retask (solution-generator-expression (task-result-generator sub-task)))
122
(log-debug "revision: wrapped for retask ~a -> ~a" *task* sub-task)
123
(task-result-generator sub-task)))))
125
(:method ((revision-variable symbol) query-expression)
126
"Given a variable revision, if the variable is bound, use that.
127
Otherwise, generate the complete revision set and iterate over
128
repeated subquery executions with the respective revision."
129
(let ((revision-value (query-binding-value revision-variable)))
130
(if (spocq:unbound-variable-p revision-value)
131
(let ((sub-task (make-query :sse-expression query-expression
132
:id (make-service-task-id)
133
:dynamic-bindings (query-dynamic-bindings *task*)
134
:repository *repository*
136
:user-id (task-user-id *task*)
137
:sparql-expression query-expression
138
:agent (task-agent *task*))))
139
(with-task-environment (:task sub-task)
140
;;(update-repository-revision-id (task-revision sub-task))
141
(initialize-task sub-task)
142
(let* ((sub-task-generator (task-result-generator sub-task))
143
(source-dimensions (solution-generator-dimensions sub-task-generator))
144
(result-dimensions (union-dimensions (list revision-variable) source-dimensions))
145
(result-channel (make-channel :name (list 'spocq.a:|revision| (task-id *query*))
146
:dimensions result-dimensions)))
147
(make-solution-generator :operator 'spocq.a:|revision|
148
:dimensions result-dimensions
149
:expression (list #'run-revision-thread result-channel sub-task)
150
:channel result-channel
151
:constituents (list sub-task-generator)))))
152
(spocq.e::revision revision-value query-expression)))))
154
(defun run-revision-thread (result-channel sub-task)
155
(process-revision result-channel sub-task)
158
(defmethod process-revision ((destination array-page-channel) (sub-task query) &key (start 0) end)
159
"Implement the revision clause case where the revision is a variable by iterative
160
over all known revisions and re-executing the groups cluse for each in turn emitting
161
the union of the results - augmented by the revision identifier _as an uuid_"
162
(let* ((revision-id-list (repository-revision-ids *repository*))
163
(result-page-width (channel-page-width destination))
164
(result-page-length (channel-page-length destination))
166
(result-index result-page-length)
168
(result-dimensions (channel-dimensions destination))
169
(source-dimensions (solution-generator-dimensions (task-result-generator sub-task)))
170
(extend-dimension (or (first (set-difference result-dimensions source-dimensions))
171
(error "revision: no extend dimension present: ~s ~s" result-dimensions source-dimensions)))
172
(extend-index (position extend-dimension result-dimensions))
174
(collector (compute-unary-collector result-dimensions source-dimensions))
175
(revision-uuid-term-number 0))
176
(labels ((base-processor (collector base-page)
177
(let ((solution-count (array-dimension base-page 0)))
178
(dotimes (base-index solution-count)
179
(collect-solution collector base-page base-index))))
180
(collect-solution (collector base-page base-index)
181
(declare (type (function (array fixnum array fixnum) t) collector))
182
(when (> (incf result-count) start)
183
(next-solution-location)
184
(funcall collector result-page result-index base-page base-index)
185
(setf (aref result-page result-index extend-index) revision-uuid-term-number)
186
(when (and end (>= result-count end)) (complete-solutions))))
187
(next-solution-location ()
188
;; return a page (possible newly created) and the next free location in that page
189
(when (>= (incf result-index) result-page-length)
190
(when result-page (put-page result-page))
191
(setf result-page (new-field-page destination result-page-length result-page-width)
193
(complete-solutions ()
195
(let ((page-result-count (1+ result-index)))
196
(when (< page-result-count result-page-length)
198
(adjust-page result-page (list page-result-count result-page-width)))))
199
(put-page result-page))
200
(complete-field destination)
201
(incf-stat *solutions-processed* solution-count)
202
(incf-stat *solutions-constructed* result-count)
203
(trace-algebra process-revision.complete destination)
204
(return-from process-revision (values solution-count result-count)))
206
(trace-data process-revision destination result-dimensions (term-value-field page))
207
(put-field-page destination page)))
208
(loop for revision-designator in revision-id-list
209
do (let* ((revision (handler-case (repository-revision revision-designator :reference *repository* :if-does-not-exist nil)
210
(error (c) (declare (ignore c)) nil))))
212
(let* ((revision-id-string (repository-revision-id revision))
213
(revision-id-uuid (intern-uuid revision-id-string)))
214
(setf (task-revision sub-task) revision)
215
(setf-task-transaction nil sub-task)
216
(initialize-task sub-task)
217
;; (print (list :xaction revision-designator (task-transaction sub-task)))
218
(setf revision-uuid-term-number (rlmdb:value-term-number revision-id-uuid))
219
(query-run-in-thread sub-task (solution-generator-expression (task-result-generator sub-task)))
220
(let ((source (solution-generator-channel (task-result-generator sub-task))))
222
;;(print (solution-generator-expression (task-result-generator sub-task)))
223
;; (print (list :page revision-id))
224
(do-pages (solutions source)
225
(check-query-status *query*)
226
(incf solution-count (array-dimension solutions 0))
227
(base-processor collector solutions)))))
229
(log-warn "revision: revision not found: ~s: ~s" *repository* revision-designator)))))
230
(complete-solutions ))))
233
(defgeneric spocq.e::revision-join (revision query-expression source-field)
234
(:documentation "Given a variable for a revision, a query expression - as an sse,
235
and a source field for the variable values, evaluate the sse pattern as a sub-select once
236
for each reviion value.
237
the variable must be amoung the source field dimensions.
238
for each solution in the source field, evaluate the sub-select and join the result field with that solution.")
240
(:method ((revision symbol) query-expression source-field)
241
(if (find revision (solution-generator-dimensions source-field))
242
;; if the revision can be taken from the source field, extract solutions
243
;; and join each with the results of the respective service
244
(revision-join-generator revision query-expression source-field)
245
;; otherwise fall back to joining a field which either uses a dynamic value
246
;; or enumerates the repository revisions with the source field
247
(let ((revision-value (query-binding-value revision)))
248
(if (spocq:unbound-variable-p revision-value )
249
(spocq.e:join source-field (apply #'spocq.e:revision revision query-expression))
250
(spocq.e:join source-field (apply #'spocq.e:revision revision-value query-expression)))))))
252
(defun revision-join-generator (revision query-expression base-field-generator)
253
"Delegate to a join operation supplying it two generators:
254
- one yields the solutions from the base field - after each has provided a revision value;
255
- the other yields union of the solutions from the iterated revision sub-queries."
256
(assert (variable-p revision) ()
257
"Invalid revision join revision variable: ~a" revision)
258
(let* ((base-dimensions (solution-generator-dimensions base-field-generator))
259
(query-dimensions (expression-dimensions query-expression))
260
(extended-service-dimensions (union-dimensions (list revision) query-dimensions))
261
(base-channel (make-channel :name (list 'spocq.a::|revision-base| (task-id *query*))
262
:dimensions base-dimensions))
263
(revision-channel (make-channel :name (list 'spocq.a:|revision| (task-id *query*))
264
:dimensions (list revision)))
265
(query-channel (make-channel :name (list 'spocq.a::|revision-query| (task-id *query*))
266
:dimensions query-dimensions))
267
;; the base field wrapper involves an operator which accepts the base channel and the sip channel to the service operator,
268
;; reads base solutions and for each one, writes the revision to the sip channel and the original solution
269
;; to the wrapped base channel
270
(base-field-generator-wrapped (make-solution-generator :operator 'spocq.a::|revision-copy|
271
:dimensions base-dimensions
272
:expression (list #'run-revision-join-base
277
:channel base-channel
278
:constituents (list base-field-generator)))
279
;; the revision field jenerator involves an operator which accepts a channel from which to read revisions
280
;; and for each one, initiates the revision sub-query and transcribes solutions
281
(revision-field-generator (make-solution-generator :operator 'spocq.a::|revision|
282
:dimensions extended-service-dimensions
283
:channel query-channel
284
:expression (list #'run-revision-join-subquery
289
;; delegate to join the two fields
290
(spocq.e:join base-field-generator-wrapped revision-field-generator)))
293
(defun run-revision-join-base (destination revision-variable base-generator revision-channel)
294
"initiate the source processing, read each solution and propagate each unique value to the
295
revision processor before passing the complete page to the destination."
297
(let ((base-channel (solution-generator-channel base-generator))
298
(revision-cache (make-hash-table :test'eql))
299
(revision-index (position revision-variable (solution-generator-dimensions base-generator))))
300
(query-run-in-thread *query* (solution-generator-expression base-generator))
301
(do-pages (solutions base-channel)
302
(loop for index from 0 below (array-dimension solutions 0)
303
for revision-term = (aref solutions index revision-index)
304
unless (gethash revision-term revision-cache)
305
do (progn (setf (gethash revision-term revision-cache) revision-term)
306
(channel-put revision-channel revision-term)))
307
(let ((result-page (make-page (array-dimension solutions 0) (array-dimension solutions 1))))
308
(copy-page solutions result-page)
309
(put-field-page destination result-page)))
310
(channel-put revision-channel nil)
311
(complete-field destination)))
314
(defun run-revision-join-subquery (destination revision-variable revision-channel query-expression)
315
(loop for revision-term = (channel-get revision-channel)
316
until (null revision-term)
317
for revision-object = (rlmdb:term-number-value revision-term)
318
do (let* ((revision-generator (spocq.e::revision revision-object query-expression))
319
(revision-dimensions (solution-generator-dimensions revision-generator))
320
(revision-source (solution-generator-channel revision-generator))
321
(revision-index (position revision-variable (solution-generator-dimensions revision-generator))))
322
(query-run-in-thread *query* (solution-generator-expression revision-generator))
324
;; just copy the pages
325
(do-pages (solutions revision-source)
326
(check-query-status *query*)
327
(let* ((result-page (make-page (array-dimension solutions 0) (array-dimension solutions 1))))
328
(copy-page solutions result-page)
329
(put-field-page destination result-page)))
330
;; otherwise, extend the page with the respective revision
331
(let ((extended-revision-dimensions (union-dimensions (list revision-variable) revision-dimensions)))
332
(setf revision-index (position revision-variable extended-revision-dimensions))
333
(do-pages (solutions revision-source)
334
(check-query-status *query*)
335
(let* ((length (array-dimension solutions 0))
336
(width (array-dimension solutions 1))
337
(result-page (make-page length (1+ width))))
338
(assert (<= revision-index width) ()
339
"run-revision-join-subselect: revision-index exceeds page width: ~a, ~a: ~a: ~a"
341
revision-variable revision-generator)
342
(loop for solution-index below length
344
(loop for value-index below revision-index
345
do (setf (aref result-page solution-index value-index)
346
(aref solutions solution-index value-index)))
347
(setf (aref result-page solution-index revision-index)
349
(loop for from-index from revision-index below width
350
do (setf (aref result-page solution-index (1+ from-index))
351
(aref solutions solution-index from-index)))))
352
(put-field-page destination result-page)))))))
353
(complete-field destination))