Coverage report: /development/source/library/org/datagraph/spocq-shard/src/algebra/operators/revision.lisp

KindCoveredAll%
expression0552 0.0
branch028 0.0
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
2
 ;;;
3
 ;;; for testing on stage with a dev build this file suffices, as the bnf changes are already
4
 ;;; present. otherwise, it must be loaded from the patches/20160414-revision.lisp patch file
5
 ;;;
6
 ;;; (load "patches/20160414-revision/revision.lisp")
7
 ;;; (sb-ext:save-lisp-and-die "sbcl-spocq.core.patched-for-revisions")
8
 ;;; mv sbcl-spocq.core.patched-for-revisions sbcl-spocq.core
9
 
10
 (in-package :org.datagraph.spocq.implementation)
11
 
12
 (:documentation "This file defines the REVISION operator for the 'org.datagraph.spocq'
13
  RDF engine."
14
 
15
  (copyright
16
   "Copyright 2016 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved.")
17
 
18
  (long-description
19
   "The revision operator involves running a subselect with the specified revision.
20
  It permits either a constant iri or a variable. In case the revision is a constant designator,
21
  that single revision constitutes the dataset. If it is a variable, that variable then renges over
22
  the known revisions.
23
    See [1] for the designatos syntax.
24
  ---
25
  [1] : http://api.dydra.com/sparql/revisioning.html
26
  "))
27
 
28
 
29
 (defmacro spocq.a:|revision| (revision-designator group-graph-pattern &rest args)
30
   "Perform the sub-query respective the revision and return the result field."
31
   (apply #'macroexpand-revision revision-designator group-graph-pattern args))
32
 
33
 (defun macroexpand-revision (revision-designator group-graph-pattern &rest args)
34
   "Expand a revision form into a contour which rebinds the current revision the one specified
35
  and arranges to run the sub-expression with that target.
36
  Allow one of three alternative forms:
37
  - constant : given a constant term, interpre that in the current dynamic context.
38
    if it is a revision identifier, establish that one for the sub-select. If it is a relative
39
    designator, combine that relative to the active revision and continue with that to be resolved
40
    to a concrete revision when the sub-select transaction is created.
41
  - unbound variable : enumerate the repository revisions and iterate over that set, running the
42
    subseqlect once for each revision. Extend the result field to include a binding for the variable .
43
  - bound variable : Propagate the values for the variable from source form results, run a sub-select
44
    for each binding, extend the results with a binding for the variable and join the revision sub-select
45
    results with the source form results.
46
    
47
  It should arrange to compile the sub-expression in a context which is revision-non-specific,
48
  as the argument may be a variable, but that is NYI."
49
   ;; for now reiterate the options even though the result is the same
50
   ;; in the second case, at som epoint 
51
   (cond ((or (stringp revision-designator) (iri-p revision-designator))
52
          `(spocq.e:revision ',revision-designator
53
                             (quote ,(if (select-form-p group-graph-pattern)
54
                                         group-graph-pattern
55
                                         `(spocq.a:|select| ,group-graph-pattern ,(expression-dimensions group-graph-pattern))))))
56
         ((variable-p revision-designator)
57
          ;; if a source field is passed, use that, otherwise invoke the elementary operator
58
          ;; with the variable sd the argument
59
          (destructuring-bind (&optional source-field) args
60
            (if source-field
61
                `(spocq.e::revision-join ',revision-designator
62
                             (quote ,(if (select-form-p group-graph-pattern)
63
                                         group-graph-pattern
64
                                         `(spocq.a:|select| ,group-graph-pattern ,(expression-dimensions group-graph-pattern))))
65
                             ,source-field)
66
                `(spocq.e:revision ',revision-designator
67
                                   (quote ,(if (select-form-p group-graph-pattern)
68
                                               group-graph-pattern
69
                                               `(spocq.a:|select| ,group-graph-pattern ,(expression-dimensions group-graph-pattern))))))))
70
         (t
71
          (error "Invalid revision designator : ~s" revision-designator))))
72
 
73
 
74
 (defgeneric spocq.e::revision (revision-designator query-expression)
75
   (:documentation "Given a revision and a query expression - as an sse,
76
     evaluate the sse pattern with the revision as the target.")
77
 
78
   (:method ((revision-designator t) query-expression)
79
     (error "Invalid revision designator: ~s." revision-designator))
80
 
81
   (:method ((revision-designator spocq:iri) query-expression)
82
     (spocq.e::revision (spocq:iri-lexical-form revision-designator) query-expression))
83
 
84
   (:method ((revision-designator string) query-expression)
85
     "given a string revision designator, allow for both typed uuid values and relative values"
86
     (cond ((>= (length revision-designator) 1)
87
            (cond ((string-equal "urn:uuid:" revision-designator :end2 (min (length revision-designator) 9))
88
                   (setf revision-designator (subseq revision-designator 9)))
89
                  ((eql (char revision-designator 0) #\~)
90
                   ;; anything which binds the revision runs as a sub-query, which means its resolved
91
                   ;; revision is available from the target repository revision instance
92
                   (setf revision-designator (concatenate 'string (repository-revision-id *repository*)
93
                                                          revision-designator))))
94
            (let ((revision (repository-revision revision-designator 
95
                                                 :reference *repository*
96
                                                 :if-does-not-exist nil)))
97
              (if revision
98
                  (spocq.e::revision revision query-expression)
99
                  (null-generator (expression-dimensions query-expression)))))
100
           (t
101
            (null-generator (expression-dimensions query-expression)))))
102
 
103
   (:method ((revision repository-revision) query-expression)
104
     "Given a constant revision designator, run a sub-query with that revision as the target"
105
     (let ((sub-task (make-query :sse-expression query-expression
106
                                     :id (make-service-task-id)
107
                                     :dynamic-bindings (query-dynamic-bindings *task*)
108
                                     :repository (repository-revision-reference revision)
109
                                     :revision revision
110
                                     :revision-id (repository-revision-id revision)
111
                                     :user-id (task-user-id *task*)
112
                                     :sparql-expression query-expression
113
                                     :agent (task-agent *task*))))
114
       (with-task-environment (:task sub-task)
115
         ;;(update-repository-revision-id (task-revision sub-task))
116
         (initialize-task sub-task)
117
         (flet ((retask (&rest args)
118
                  ; interposes an extra thread
119
                  (query-run-in-thread sub-task args)
120
                  'spocq.a:|revision|))
121
           (push #'retask (solution-generator-expression (task-result-generator sub-task)))
122
           (log-debug "revision: wrapped for retask ~a -> ~a" *task* sub-task)
123
           (task-result-generator sub-task)))))
124
 
125
   (:method ((revision-variable symbol) query-expression)
126
     "Given a variable revision, if the variable is bound, use that.
127
      Otherwise, generate the complete revision set and iterate over
128
      repeated subquery executions with the respective revision."
129
     (let ((revision-value (query-binding-value revision-variable)))
130
       (if (spocq:unbound-variable-p revision-value)
131
           (let ((sub-task (make-query :sse-expression query-expression
132
                                       :id (make-service-task-id)
133
                                       :dynamic-bindings (query-dynamic-bindings *task*)
134
                                       :repository *repository*
135
                                       :revision-id "HEAD"
136
                                       :user-id (task-user-id *task*)
137
                                       :sparql-expression query-expression
138
                                       :agent (task-agent *task*))))
139
             (with-task-environment (:task sub-task)
140
               ;;(update-repository-revision-id (task-revision sub-task))
141
               (initialize-task sub-task)
142
               (let* ((sub-task-generator (task-result-generator sub-task))
143
                      (source-dimensions (solution-generator-dimensions sub-task-generator))
144
                      (result-dimensions (union-dimensions (list revision-variable) source-dimensions))
145
                      (result-channel (make-channel :name (list 'spocq.a:|revision| (task-id *query*))
146
                                                    :dimensions result-dimensions)))
147
                 (make-solution-generator :operator 'spocq.a:|revision|
148
                                          :dimensions result-dimensions
149
                                          :expression (list #'run-revision-thread result-channel sub-task)
150
                                          :channel result-channel
151
                                          :constituents (list sub-task-generator)))))
152
           (spocq.e::revision revision-value query-expression)))))
153
 
154
 (defun run-revision-thread (result-channel sub-task)
155
   (process-revision result-channel sub-task)
156
   'spocq.a:|revision|)
157
 
158
 (defmethod process-revision ((destination array-page-channel) (sub-task query) &key (start 0) end)
159
   "Implement the revision clause case where the revision is a variable by iterative
160
    over all known revisions and re-executing the groups cluse for each in turn emitting
161
    the union of the results - augmented by the revision identifier _as an uuid_"
162
   (let* ((revision-id-list (repository-revision-ids *repository*))
163
          (result-page-width (channel-page-width destination))
164
          (result-page-length (channel-page-length destination))
165
          (result-page nil)
166
          (result-index result-page-length)
167
          (result-count 0)
168
          (result-dimensions (channel-dimensions destination))
169
          (source-dimensions (solution-generator-dimensions (task-result-generator sub-task)))
170
          (extend-dimension (or (first (set-difference result-dimensions source-dimensions))
171
                               (error "revision: no extend dimension present: ~s ~s" result-dimensions source-dimensions)))
172
          (extend-index (position extend-dimension result-dimensions))
173
          (solution-count 0)
174
          (collector (compute-unary-collector result-dimensions source-dimensions))
175
          (revision-uuid-term-number 0))
176
     (labels ((base-processor (collector base-page)
177
                (let ((solution-count (array-dimension base-page 0)))
178
                  (dotimes (base-index solution-count)
179
                    (collect-solution collector base-page base-index))))
180
              (collect-solution (collector base-page base-index)
181
                (declare (type (function (array fixnum array fixnum) t) collector))
182
                (when (> (incf result-count) start)
183
                  (next-solution-location)
184
                  (funcall collector result-page result-index base-page base-index)
185
                  (setf (aref result-page result-index extend-index) revision-uuid-term-number)
186
                  (when (and end (>= result-count end)) (complete-solutions))))
187
              (next-solution-location ()
188
                ;; return a page (possible newly created) and the next free location in that page
189
                (when (>= (incf result-index) result-page-length)
190
                  (when result-page (put-page result-page))
191
                  (setf result-page (new-field-page destination result-page-length result-page-width)
192
                        result-index 0)))
193
              (complete-solutions ()
194
                (when result-page
195
                  (let ((page-result-count (1+ result-index)))
196
                    (when (< page-result-count result-page-length)
197
                      (setf result-page
198
                            (adjust-page result-page (list page-result-count result-page-width)))))
199
                  (put-page result-page))
200
                (complete-field destination)
201
                (incf-stat *solutions-processed* solution-count)
202
                (incf-stat *solutions-constructed* result-count)
203
                (trace-algebra process-revision.complete destination)
204
                (return-from process-revision (values solution-count result-count)))
205
              (put-page (page)
206
                (trace-data process-revision destination result-dimensions (term-value-field page))
207
                (put-field-page destination page)))
208
       (loop for revision-designator in revision-id-list
209
         do (let* ((revision (handler-case (repository-revision revision-designator :reference *repository* :if-does-not-exist nil)
210
                               (error (c) (declare (ignore c)) nil))))
211
              (cond (revision
212
                     (let* ((revision-id-string (repository-revision-id revision))
213
                            (revision-id-uuid (intern-uuid revision-id-string)))
214
                       (setf (task-revision sub-task) revision)
215
                       (setf-task-transaction nil sub-task)
216
                       (initialize-task sub-task)
217
                       ;; (print (list :xaction revision-designator (task-transaction sub-task)))
218
                       (setf revision-uuid-term-number (rlmdb:value-term-number revision-id-uuid))
219
                       (query-run-in-thread sub-task (solution-generator-expression (task-result-generator sub-task)))
220
                       (let ((source (solution-generator-channel (task-result-generator sub-task))))
221
                         ;;(describe source)
222
                         ;;(print (solution-generator-expression (task-result-generator sub-task)))
223
                         ;; (print (list :page revision-id))
224
                         (do-pages (solutions source)
225
                                   (check-query-status *query*)
226
                                   (incf solution-count (array-dimension solutions 0))
227
                                   (base-processor collector solutions)))))
228
                    (t
229
                     (log-warn "revision: revision not found: ~s: ~s" *repository* revision-designator)))))
230
       (complete-solutions ))))
231
 
232
 
233
 (defgeneric spocq.e::revision-join (revision query-expression source-field)
234
   (:documentation "Given a variable for a revision, a query expression - as an sse,
235
     and a source field for the variable values, evaluate the sse pattern as a sub-select once
236
     for each reviion value.
237
     the variable must be amoung the source field dimensions.
238
     for each solution in the source field, evaluate the sub-select and join the result field with that solution.")
239
 
240
   (:method ((revision symbol) query-expression source-field)
241
     (if (find revision (solution-generator-dimensions source-field))
242
         ;; if the revision can be taken from the source field, extract solutions
243
         ;; and join each with the results of the respective service
244
         (revision-join-generator revision query-expression source-field)
245
         ;; otherwise fall back to joining a field which either uses a dynamic value
246
         ;; or enumerates the repository revisions with the source field
247
         (let ((revision-value (query-binding-value revision)))
248
           (if (spocq:unbound-variable-p revision-value )
249
               (spocq.e:join source-field (apply #'spocq.e:revision revision query-expression))
250
               (spocq.e:join source-field (apply #'spocq.e:revision revision-value query-expression)))))))
251
 
252
 (defun revision-join-generator (revision query-expression base-field-generator)
253
   "Delegate to a join operation supplying it two generators:
254
    - one yields the solutions from the base field - after each has provided a revision value;
255
    - the other yields union of the solutions from the iterated revision sub-queries."
256
   (assert (variable-p revision) ()
257
           "Invalid revision join revision variable: ~a" revision)
258
   (let* ((base-dimensions (solution-generator-dimensions base-field-generator))
259
          (query-dimensions (expression-dimensions query-expression))
260
          (extended-service-dimensions (union-dimensions (list revision) query-dimensions))
261
          (base-channel (make-channel :name (list 'spocq.a::|revision-base| (task-id *query*))
262
                                      :dimensions base-dimensions))
263
          (revision-channel (make-channel :name (list 'spocq.a:|revision| (task-id *query*))
264
                                          :dimensions (list revision)))
265
          (query-channel (make-channel :name (list 'spocq.a::|revision-query| (task-id *query*))
266
                                      :dimensions query-dimensions))
267
          ;; the base field wrapper involves an operator which accepts the base channel and the sip channel to the service operator,
268
          ;; reads base solutions and for each one, writes the revision to the sip channel and the original solution
269
          ;; to the wrapped base channel
270
          (base-field-generator-wrapped (make-solution-generator :operator 'spocq.a::|revision-copy|
271
                                                                 :dimensions base-dimensions
272
                                                                 :expression (list #'run-revision-join-base
273
                                                                                   base-channel
274
                                                                                   revision
275
                                                                                   base-field-generator
276
                                                                                   revision-channel)
277
                                                                 :channel base-channel
278
                                                                 :constituents (list base-field-generator)))
279
          ;; the revision field jenerator involves an operator which accepts a channel from which to read revisions
280
          ;; and for each one, initiates the revision sub-query and transcribes solutions
281
          (revision-field-generator (make-solution-generator :operator 'spocq.a::|revision|
282
                                                            :dimensions extended-service-dimensions
283
                                                            :channel query-channel
284
                                                            :expression (list #'run-revision-join-subquery
285
                                                                              query-channel
286
                                                                              revision
287
                                                                              revision-channel
288
                                                                              query-expression))))
289
     ;; delegate to join the two fields
290
     (spocq.e:join base-field-generator-wrapped revision-field-generator)))
291
 
292
 
293
 (defun run-revision-join-base (destination revision-variable base-generator revision-channel)
294
   "initiate the source processing, read each solution and propagate each unique value to the
295
  revision processor before passing the complete page to the destination."
296
 
297
   (let ((base-channel (solution-generator-channel base-generator))
298
         (revision-cache (make-hash-table :test'eql))
299
         (revision-index (position revision-variable (solution-generator-dimensions base-generator))))
300
     (query-run-in-thread *query* (solution-generator-expression base-generator))
301
     (do-pages (solutions base-channel)
302
               (loop for index from 0 below (array-dimension solutions 0)
303
                 for revision-term = (aref solutions index revision-index)
304
                 unless (gethash revision-term revision-cache)
305
                 do (progn (setf (gethash revision-term revision-cache) revision-term)
306
                      (channel-put revision-channel revision-term)))
307
               (let ((result-page (make-page (array-dimension solutions 0) (array-dimension solutions 1))))
308
                 (copy-page solutions result-page)
309
                 (put-field-page destination result-page)))
310
     (channel-put revision-channel nil)
311
     (complete-field destination)))
312
 
313
 
314
 (defun run-revision-join-subquery (destination revision-variable revision-channel query-expression)
315
   (loop for revision-term = (channel-get revision-channel)
316
     until (null revision-term)
317
     for revision-object = (rlmdb:term-number-value revision-term)
318
     do (let* ((revision-generator (spocq.e::revision revision-object query-expression))
319
               (revision-dimensions (solution-generator-dimensions revision-generator))
320
               (revision-source (solution-generator-channel revision-generator))
321
               (revision-index (position revision-variable (solution-generator-dimensions revision-generator))))
322
          (query-run-in-thread *query* (solution-generator-expression revision-generator))
323
          (if revision-index
324
              ;; just copy the pages
325
              (do-pages (solutions revision-source)
326
                        (check-query-status *query*)
327
                        (let* ((result-page (make-page (array-dimension solutions 0) (array-dimension solutions 1))))
328
                          (copy-page solutions result-page)
329
                          (put-field-page destination result-page)))
330
              ;; otherwise, extend the page with the respective revision
331
              (let ((extended-revision-dimensions (union-dimensions (list revision-variable) revision-dimensions)))
332
                (setf revision-index (position revision-variable extended-revision-dimensions))
333
                (do-pages (solutions revision-source)
334
                          (check-query-status *query*)
335
                          (let* ((length (array-dimension solutions 0))
336
                                 (width (array-dimension solutions 1))
337
                                 (result-page (make-page length (1+ width))))
338
                            (assert (<= revision-index width) ()
339
                                    "run-revision-join-subselect: revision-index exceeds page width: ~a, ~a:  ~a: ~a"
340
                                    revision-index width
341
                                    revision-variable revision-generator)
342
                            (loop for solution-index below length
343
                              do (progn
344
                                   (loop for value-index below revision-index
345
                                     do (setf (aref result-page solution-index value-index)
346
                                              (aref solutions solution-index value-index)))
347
                                   (setf (aref result-page solution-index revision-index)
348
                                         revision-term)
349
                                   (loop for from-index from revision-index below width
350
                                     do (setf (aref result-page solution-index (1+ from-index))
351
                                              (aref solutions solution-index from-index)))))
352
                            (put-field-page destination result-page)))))))
353
   (complete-field destination))
354