Coverage report: /development/source/library/org/datagraph/spocq-shard/src/algebra/operators/filter.lisp

KindCoveredAll%
expression297494 60.1
branch3690 40.0
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
2
 
3
 (in-package :org.datagraph.spocq.implementation)
4
 
5
 (:documentation "This file defines the FILTER operator for the 'org.datagraph.spocq' RDF engine."
6
 
7
  (copyright
8
   "Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved."))
9
 
10
 
11
 (defun fold-filter (expression)
12
   "Given a filter expression, where possible, substitute constraints from filter clauses into the matching
13
  patterns. This requires equality predicates and an iri value. If all test expressions are
14
  folded, return just the solution field form. If any are left over - eg. if no substitutions were
15
  possible, reconstitute the filter.
16
  Return an a-list as the second value to specifiy variables to be bound to their constriants."
17
 
18
   (destructuring-bind (field test-expression) (rest expression)
19
     (let ((variables (expression-projected-variables field)))
20
       (labels ((same-term-p (form)
21
                  (and (consp form) (member (first form) '(spocq.a:= spocq.a:|sameTerm|))))
22
                (iri-equality-match (form)
23
                  (when (same-term-p form)
24
                    (destructuring-bind (arg1 arg2) (rest form)
25
                      (cond ((and (member arg1 variables(iri-p arg2))
26
                             (cons arg1 arg2))
27
                            ((and (member arg2 variables(iri-p arg1))
28
                             (cons arg2 arg1)))))))
29
         (cond ((null *fold-agp-filters*)
30
                expression)
31
               ((not (bgp-form-p field))
32
                expression)
33
               ((and (consp test-expression)
34
                     (eq (first test-expression) 'spocq.a:|exprlist|))
35
                (let* ((tests (rest test-expression))
36
                       (multiple-bindings (mapcar #'iri-equality-match tests))
37
                       (substitutions (remove nil multiple-bindings))
38
                       (unfolded-tests (loop for binding in multiple-bindings
39
                                             for test in tests
40
                                             unless binding collect test)))
41
                 (cond (substitutions
42
                        (setf field (sublis substitutions field))
43
                        (values (if unfolded-tests
44
                                  `(spocq.a:|filter| ,field
45
                                                     ,(if (rest unfolded-tests)
46
                                                        `(spocq.a:|exprlist| ,@unfolded-tests)
47
                                                        (first unfolded-tests)))
48
                                  field)
49
                                substitutions))
50
                       (t
51
                        expression))))
52
               ((and (same-term-p test-expression)
53
                     (not (select-form-p field)))
54
                (let ((substitution (iri-equality-match test-expression)))
55
                  (if substitution
56
                    (values (sublis (list substitution) field)
57
                            (list substitution))
58
                    expression)))
59
               (t
60
                expression))))))
61
 
62
 (defparameter *push-join-filters* t)
63
 
64
 (defparameter *TASK-DYNAMIC-BINDINGS* ())
65
 (defmethod TASK-DYNAMIC-BINDINGS ((task null))
66
   *TASK-DYNAMIC-BINDINGS*)
67
 
68
 (defun push-filter-test (test-expression field-expression)
69
   "Given a filter over some field, examine the particular field form
70
    and decide whether it is possible to push the filter down into it.
71
    If so, return the rewritten field form.
72
    If not, return NIL"
73
   (unless (find 'spocq.a:|exists| (expression-algebra-operators test-expression))
74
     (let ((test-variables (expression-variables test-expression))
75
           (field-variables (expression-variables field-expression))
76
           (dynamic-variables (first (task-dynamic-bindings *task*))))
77
       ;(print (list test-variables field-variables dynamic-variables))
78
       (unless (set-difference (set-difference test-variables field-variables) dynamic-variables)
79
         (cond ((version-constraint-p test-expression)
80
                ;; extract alle exprression which refer the the revision date
81
                `(spocq.e:with-version-constraint ,test-expression
82
                   ,field-expression))
83
               ;; extract other temporal constraints.
84
               ((bgp-form-p field-expression)
85
                ;; push the just filter and the slice into the bgp.
86
                ;;; should try to fold it
87
                `(spocq.a:|bgp| (spocq.a:|filter| ,test-expression)
88
                          ,@(rest field-expression)))
89
               ((agp-generator-form-p field-expression)
90
                (let* ((agp (second field-expression))
91
                       (agp-dimensions (agp-projection-dimensions agp)))
92
                  (when (or (null (set-difference (set-difference test-variables agp-dimensions) dynamic-variables))
93
                            (version-constraint-p test-expression))
94
                    (push `(spocq.a:|filter| ,test-expression)
95
                          (agp-filters agp))
96
                    ; (print (list agp (agp-filters agp) (agp-statements agp) (agp-projection-dimensions agp)))
97
                    field-expression)))
98
               ((and *push-join-filters* (join-form-p field-expression))
99
                (destructuring-bind (operator left-field-expression right-field-expression &rest args) field-expression
100
                  (let ((pushed-left-field-expression (push-filter-test test-expression left-field-expression))
101
                        (pushed-right-field-expression (push-filter-test test-expression right-field-expression)))
102
                    (if (or pushed-left-field-expression pushed-right-field-expression)
103
                        `(,operator ,(or pushed-left-field-expression left-field-expression)
104
                                    ,(or pushed-right-field-expression right-field-expression)
105
                                    ,@args)
106
                        (let ((test (getf args :test)))
107
                          (when test 
108
                            (setf test-expression 
109
                                  (if (exprlist-form-p test)
110
                                      `(spocq.a:|exprlist| ,test-expression ,@(rest test))
111
                                      `(spocq.a:|exprlist| ,test-expression ,test))))
112
                          `(,operator ,left-field-expression ,right-field-expression
113
                                      :test ,test-expression
114
                                      ,@args))))))
115
               ((union-form-p field-expression)
116
                (destructuring-bind (operator left-field-expression right-field-expression &rest args) field-expression
117
                  (let ((pushed-left-field-expression (push-filter-test test-expression left-field-expression))
118
                        (pushed-right-field-expression (push-filter-test test-expression right-field-expression)))
119
                    (when (and pushed-left-field-expression pushed-right-field-expression)
120
                      `(,operator ,(or pushed-left-field-expression left-field-expression)
121
                                  ,(or pushed-right-field-expression right-field-expression)
122
                                  ,@args)))))
123
               ((leftjoin-form-p field-expression)
124
                ;; can push into the main clause only
125
                (destructuring-bind (operator left-field-expression right-field-expression &rest args) field-expression
126
                  (let ((pushed-left-field-expression (push-filter-test test-expression left-field-expression)))
127
                    (when pushed-left-field-expression
128
                      `(,operator ,pushed-left-field-expression
129
                                  ,right-field-expression
130
                                  ,@args)))))
131
               ((minus-form-p field-expression)
132
                (destructuring-bind (operator base-field difference-field &rest args) field-expression
133
                  (let ((pushed-base-field (push-filter-test test-expression base-field)))
134
                    (when pushed-base-field
135
                      `(,operator ,pushed-base-field ,difference-field ,@args)))))
136
               ((graph-form-p field-expression)
137
                (destructuring-bind (operator graph bgp) field-expression
138
                  (let ((pushed-bgp (push-filter-test test-expression bgp)))
139
                    (when pushed-bgp
140
                      `(,operator ,graph ,pushed-bgp)))))
141
               ((extend-form-p field-expression)
142
                (multiple-value-bind (new-field variables values) (compact-extend field-expression #'temporal-value-p)
143
                  (when (bgp-form-p new-field)
144
                    `(spocq.e::with-reference-dimensions ,variables
145
                       (spocq.a:|bgp|
146
                                (spocq.a:|filter| ,test-expression)
147
                                ,@(loop for variable in variables for value in values
148
                                    collect `(spocq.a:|bind| ,variable ,value))
149
                                ,@(rest new-field))))))
150
               ((distinct-form-p field-expression)
151
                (destructuring-bind (operator distinct-field-expression &rest args) field-expression
152
                  (let ((pushed-field (push-filter-test test-expression distinct-field-expression)))
153
                    (when pushed-field
154
                      `(,operator ,pushed-field ,@args)))))
155
               ((select-form-p field-expression)
156
                (destructuring-bind (operator select-field-expression select-variables &rest args) field-expression
157
                  (when (null (set-difference (set-difference test-variables select-variables) dynamic-variables))
158
                    (let ((pushed-field (push-filter-test test-expression select-field-expression)))
159
                    (when pushed-field
160
                      `(,operator ,pushed-field ,select-variables ,@args))))))
161
               (t
162
                nil))))))
163
 
164
 
165
 (defun push-filter (field exprlist-form args)
166
   "Given a filter expression, where possible, push respective predicate forms closer to the location
167
  at which the tested terms are bound.
168
  In the simple case, the solution-field form generates solutions which pass through the predicate filter.
169
  If, however, some subexpression, eg. a pattern in a consitutent bgp, can be filtered directly, the filter is
170
  pushed closer to that expression."
171
   (let ((test-expressions (if (or (exprlist-form-p exprlist-form)
172
                                   (&&-form-p exprlist-form))
173
                               (rest exprlist-form)
174
                               (list exprlist-form))))
175
     (let ((remaining-test-expressions
176
            (loop for test-expression in test-expressions
177
              for new-field = (push-filter-test test-expression field)
178
              if new-field
179
              do (setf field new-field)
180
              else collect test-expression)))
181
       (cond (args
182
              nil)
183
             ((equal remaining-test-expressions test-expressions)
184
              nil)
185
             (remaining-test-expressions
186
              `(spocq.e:filter ,field '(spocq.a:|exprlist| ,@remaining-test-expressions)))
187
             (t
188
              field)))))
189
 
190
 
191
 (defmacro spocq.a:|filter| (solution-field test-expression &rest args &key count end offset start)
192
   "( ( solutionField (function (solution) xsd:boolean) ) solutionField )
193
 A FILTER form applies a predicate function to each solution in a given field
194
 to generate a new solution field which includes only those input solutions
195
 which satisfy the predicate.
196
 
197
 Where possible, the evaluation process folds any constant values into the argument
198
 patterns and pushes predicate to limit the matching and combination
199
 processing in the constitutent solution field."
200
 
201
   ;; nb. a pattern-based universal distribution mechanism is not correct as applicability depends on
202
   ;; dimensionality - thus the push-filter implementation which constraints motion based on sufficient
203
   ;; bindings.
204
   (declare (ignore count end offset start)
205
            (dynamic-extent args))
206
   (apply #'macroexpand-filter solution-field test-expression args))
207
 
208
 (defun macroexpand-filter (solution-field test-expression &rest args)
209
   "push filter into any bgp which covers the test expression."
210
   (setf args (apply #'canonicalize-algebra-arguments args))
211
   (or (push-filter solution-field test-expression args)
212
       `(spocq.e:filter ,solution-field ',test-expression ,@args)))
213
 
214
 #+(or)
215
 ;;; this version both pushd and folded.
216
 ;;; now replaced with a version which just pushes and leaves folding for bgp macro-expansion
217
 (defun macroexpand-filter (form solution-field test-expression &rest args)
218
   "first, attempt to directly fold any matching filters - but just in bgp forms
219
    then, if anything is left, try to push it down further."
220
   (declare (ignore solution-field test-expression))
221
   (setf args (apply #'canonicalize-algebra-arguments args))
222
   (multiple-value-bind (folded-filter substitutions) (fold-filter form)
223
     (let ((expansion (if (filter-form-p folded-filter)
224
                          (let ((pushed-filter (push-filter folded-filter)))
225
                            (if (filter-form-p pushed-filter)
226
                                ;; the only case where some filter is left over
227
                                (destructuring-bind (solution-field test-expression) (rest pushed-filter)
228
                                  (let ((variables (expression-free-dimensions test-expression)))
229
                                    ;; the presence in a filter is sufficient to require the value, as it could be
230
                                    ;; a test for a simple effective boolean value (cf boolean-effective-value/dawg-bev-5_spec.rb)
231
                                    (setf (variable-opacity variables) :transparent)
232
                                    `(spocq.e:filter (spocq.e::with-reference-dimensions ,variables ,solution-field)
233
                                                     ',test-expression
234
                                                     ,@args)))
235
                                (if args
236
                                    `(spocq.a:|slice| pushed-filter ,@args)
237
                                    pushed-filter)))
238
                          (if args
239
                            `(spocq.a:|slice| folded-filter ,@args)
240
                            folded-filter))))
241
         ;; use extend as, join would be wrong
242
         ;; no need to mark variables as the bindings are all to constant terms.
243
         (if substitutions
244
             `(spocq.e:extend ,expansion ',(loop for (variable . value) in substitutions
245
                                             collect (list variable value)))
246
             expansion))))
247
 ;;; (parse-sparql "select * where {?s ?p ?o . filter(?o = 0)}")
248
 ;;; (macroexpand-1 '(spocq.a:|filter| (spocq.a:|bgp| (spocq.a:|triple| ?::s ?::p ?::o)) (spocq.a:= ?::o 0)))
249
 ;;; (macroexpand-1 '(spocq.a:|filter| (spocq.a:|bgp| (spocq.a:|triple| ?::s ?::p ?::o)) (spocq.a:= ?::o <http://test.org>)))
250
 ;;; with a symbolic service form, the filter is folded
251
 ;;; (macroexpand-1 '(spocq.a:|filter| (spocq.a:|service| ?::iri (spocq.a:|bgp| (spocq.a:|triple| ?::s ?::p ?::o))) (spocq.a:= ?::o <http://test.org>)))
252
 ;;; (macroexpand-1 (second (parse-sparql "select * where { service ?iri {?s ?p ?o} filter(?o = 0) }")))
253
 
254
 (defgeneric spocq.e:filter (solution-field test &rest args &key end start)
255
   (:documentation "Given a solution field and a predicate, return a new field of those solutions
256
     which satisfy the predicate.")
257
 
258
   (:method :before ((field t) test &key end start)
259
     (assert-argument-types spocq.e:filter
260
       (start (or null (integer 0)))
261
       (end (or null (integer 0))))
262
     (trace-algebra spocq.e:filter field :start start :end end :test test))
263
 
264
   (:method ((field-generator solution-generator) test &rest args)
265
     (declare (dynamic-extent args))
266
     (apply #'filter-generator field-generator test args)))
267
 
268
 
269
 (defun filter-generator (field-generator test &rest args &key end start)
270
   (let* ((base-dimensions (solution-generator-dimensions field-generator))
271
          (result-channel (make-channel :name (list 'spocq.a:|filter| (task-id *query*))
272
                                        :dimensions base-dimensions
273
                                        :size (effective-channel-size :start start :end end)
274
                                        :page-length (effective-page-length :start start :end end))))
275
     ;; return the binding function to the combination operator
276
     (make-solution-generator :operator 'spocq.a:|filter|
277
                              :dimensions base-dimensions
278
                              :expression (list #'run-filter-thread result-channel field-generator
279
                                                test args)
280
                              :channel result-channel
281
                              :constituents (list field-generator))))
282
 
283
 
284
 (defun run-filter-thread (result-channel field-generator test args)
285
   (let* ((base-dimensions (solution-generator-dimensions field-generator))
286
          (base-channel (solution-generator-channel field-generator))
287
          (base-expression (solution-generator-expression field-generator))
288
          (*thread-operations* (cons (list* 'spocq.a:|filter| (task-id *task*)
289
                                            base-dimensions test args)
290
                                     *thread-operations*)))
291
     (push 'spocq.a:|filter| (channel-name base-channel))
292
     (query-run-in-thread *query* base-expression)
293
     (setf (channel-size result-channel) (min (channel-size base-channel)
294
                                              (channel-size result-channel))
295
           (channel-page-length result-channel) (min (channel-page-length base-channel)
296
                                                     (channel-page-length result-channel)))
297
     (apply #'process-filter result-channel base-channel
298
            base-dimensions
299
            test
300
            args)
301
     'spocq.a:|filter|))
302
 
303
 
304
 (defmethod process-filter ((destination array-page-channel)
305
                            (base-source array-page-channel)
306
                            base-dimensions test &key (start 0) end)
307
   "Generate a stream of filtered solutions to a continuation given a solution source and
308
  a test expression. Apply slice (start, end) constraints on-the-fly to solutions which pass
309
  the test"
310
 
311
   (declare (list base-dimensions))
312
   (assert-argument-types process-filter
313
     (base-dimensions list))
314
   (incf-stat *algebra-operations*)
315
 
316
   (multiple-value-bind (predicate collector)
317
                        (compute-filter-operators base-dimensions test)
318
     (declare (type (function (array fixnum) boolean) predicate)
319
              (type (function (array fixnum array fixnum) t) collector))
320
     (let* ((result-page-width (channel-page-width destination))
321
            (result-page-length (channel-page-length destination))
322
            (result-page nil)
323
            (result-index result-page-length)
324
            (result-count 0)
325
            #+trace-solution-count (next-count 0)
326
            (solution-count 0))
327
       (assert (= (length base-dimensions) result-page-width) ()
328
               "Channel and operation dimensions do not match: ~a: ~a." destination base-dimensions)
329
       (labels ((base-processor (base-page)
330
                  (dotimes (base-index (array-dimension base-page 0))
331
                    ;; (print (list base-index (funcall predicate base-page base-index)))
332
                    (when (and (funcall predicate base-page base-index)
333
                               (> (incf result-count) start))
334
                      (next-solution-location)
335
                      (funcall collector result-page result-index base-page base-index)
336
                      (when (and end (>= result-count end)) (complete-solutions)))))
337
                (next-solution-location ()
338
                  ;; return a page (possible newly created) and the next free location in that page
339
                  (when (>= (incf result-index) result-page-length)
340
                    (when result-page (put-page result-page))
341
                    (setf result-page (new-field-page destination result-page-length result-page-width)
342
                          result-index 0)))
343
                (complete-solutions ()
344
                  (when result-page
345
                    (let ((page-result-count (1+ result-index)))
346
                      (when (< page-result-count result-page-length)
347
                        (setf result-page
348
                              (adjust-page result-page (list page-result-count result-page-width)))))
349
                    (put-page result-page))
350
                  (complete-field destination)
351
                  (incf-stat *solutions-processed* solution-count)
352
                  (incf-stat *solutions-constructed* result-count)
353
                  (return-from process-filter (values solution-count result-count)))
354
              (put-page (page)
355
                (trace-data process-filter.put destination base-dimensions (term-value-field page))
356
                (put-field-page destination page)))
357
         (unless (and (plusp result-page-length) (or (null end) (> end start))) (complete-solutions))
358
         (rlmdb:with-string-database (db)
359
           (do-pages (solutions base-source)
360
                     (check-query-status *query*)
361
                     (incf solution-count (array-dimension solutions 0))
362
                     (trace-data process-filter.get base-source base-dimensions (term-value-field solutions))
363
                     (base-processor solutions)))
364
         (complete-solutions)))))
365
 
366
 
367
 (defun compute-filter-operators (base-dimensions  test)
368
   (values (compute-unary-predicate test base-dimensions)
369
           (compute-unary-collector base-dimensions base-dimensions)))
370
 
371
 
372
 ;;; the exprlist and table operators do not appear in the sse descriptions, but the examples suggest that:
373
 ;;; - exprlist is intended to collect a sequence of filter clauses, for which it acts as a conjunction
374
 ;;; - table constructs a solution field, with the 'unit argument indicating a single empty field
375
 
376
 (defmacro spocq.a:|exprlist| (&rest expressions)
377
   `(spocq.a:|&&| ,@expressions))