Coverage report: /development/source/library/org/datagraph/spocq-shard/src/algebra/matrix-operators/aggregate.lisp
| Kind | Covered | All | % |
| expression | 0 | 282 | 0.0 |
| branch | 0 | 12 | 0.0 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
3
(in-package :org.datagraph.spocq.implementation)
5
(:documentation "This file defines the matrix AGGREGATE operator for the 'org.datagraph.spocq' RDF SPARQL engine."
8
"Copyright 2015 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved.")
11
"Where a select query form includes a group aggregation clause, this is implemented in phases.
12
First, the base solutions are augmented with any additional keys.
13
Second, each extended solution is examined in turn, the respective group is identified and
14
the solution is incorporated into the respective group aggregate.
15
Once all extended solutions are processed, each aggregate is reduced to produce the result solution.
16
This file implements the aggregation step. see select.lisp and group.lisp for the other aspects.
18
The specified evaluation order is
21
- solution modifiers, particularly, order and slice
22
by which, the groups can be processed unorderd and only the reduced field must be ordered."))
25
(defmethod spocq.e:aggregate ((source-field matrix-field) projection key-bindings &rest arguments &key start end)
26
"Constuct a result field and aggregate the source directly"
27
(declare (dynamic-extent arguments))
28
(let* ((length (if (or start end)
29
(- (or end (field-length source-field)) (or start 0))
31
(group-dimensions (bindings-variables key-bindings))
32
(result-dimensions (union-dimensions group-dimensions (solution-field-dimensions source-field)))
33
(width (length result-dimensions))
34
(%data (rdfcache:make-matrix length width))
35
(result-field (clone-matrix-field source-field :data %data
36
:dimensions result-dimensions
37
;; no sort dimensions as the aggregate value are computed
38
:sort-dimensions ())))
39
(apply #'matrix-aggregate result-field source-field projection key-bindings arguments)))
41
(defun matrix-aggregate (result-field source-field projection key-bindings &rest arguments)
42
(declare (dynamic-extent arguments))
43
(let* ((result-dimensions (solution-field-dimensions result-field))
44
(group-dimensions (bindings-variables key-bindings))
45
(operator (apply #'matrix-aggregate-operator
47
(solution-field-dimensions source-field)
51
(apply operator result-field source-field arguments)
53
(solution-field-length source-field)
54
(solution-field-length result-field))))
56
(defmethod process-aggregate ((result-field matrix-page-channel)
57
(source-field matrix-page-channel)
62
(declare (dynamic-extent args))
63
(let ((operator (apply #'matrix-aggregate-operator
64
(channel-dimensions result-field)
65
(channel-dimensions source-field)
69
(apply operator result-field source-field args)
70
(values (solution-field-length source-field)
71
(solution-field-length result-field))))
74
(defun matrix-aggregate-operator (result-dimensions source-dimensions projection group-dimensions &rest args)
75
(declare (dynamic-extent args))
76
(setf projection (sort (copy-list projection) #'string-lessp :key #'first))
77
(let* ((variables (union-dimensions result-dimensions source-dimensions))
78
(abstract-variables (loop for v in variables
79
for count from 0 collect (cons-symbol *variable-package* "v" (princ-to-string count))))
80
(abstract-map (loop for v in variables for av in abstract-variables collect (cons v av)))
81
(abstract-result-dimensions (sublis abstract-map result-dimensions))
82
(abstract-source-dimensions (sublis abstract-map source-dimensions))
83
(abstract-group-dimensions (sublis abstract-map group-dimensions))
84
(abstract-projection (sublis abstract-map projection)))
85
(values (ensure-matrix-operator 'aggregate
86
:source-dimensions abstract-source-dimensions
87
:group-dimensions abstract-group-dimensions
88
:projection abstract-projection
89
:result-dimensions abstract-result-dimensions
90
:slice (not (null args)))
93
#+(or);; the old methd, required a sorted field
94
(defmethod compute-matrix-operator-lambda ((operator (eql 'aggregate)) &key
95
source-dimensions group-dimensions projection result-dimensions slice)
96
"Return an operator which iterates over a stream of solutions to aggregate grouped so;utions.
97
each aggregation is computed by a distinct closure, which is generated on demand."
98
(let ((result-column-count (length result-dimensions))
99
(source-column-count (length source-dimensions))
100
(group-column-count (length group-dimensions)))
101
(multiple-value-bind (initialization-lambda step-lambda reduction-lambda group-predicate-lambda group-projection-lambda)
102
(compute-matrix-aggregate-lambdas source-dimensions group-dimensions projection result-dimensions)
103
`(lambda (result-field source-field ,@(when slice '(&key (start 0) end)))
104
,(format nil "aggregation operator for projection: ~s group: ~s." projection group-dimensions)
106
(let ((%source-data (cffi::null-pointer))
107
(%result-data (cffi::null-pointer))
110
(declare (foreign-type (foreign-array ,+matrix-element-type+ (* ,source-column-count)) %source-data)
111
(foreign-type (foreign-array ,+matrix-element-type+ (* ,result-column-count)) %result-data)
112
(type sb-sys:system-area-pointer %source-data %result-data)
113
(type fixnum source-row result-row)
115
(cffi:with-foreign-pointer (%cache-data ,(* group-column-count +matrix-element-size+))
116
(declare (foreign-type (foreign-array ,+matrix-element-type+ (* ,group-column-count)) %cache-data))
117
(loop for i below ,(* group-column-count +matrix-element-size+)
118
do (setf (cffi:mem-ref %cache-data :uchar i) #xff))
119
(let (,@(mapcar #'rest (remove-if-not #'(lambda (form) (and (consp form) (eq (first form) 'setf))) initialization-lambda))
120
,@(when slice '((result-count 0)))
121
(group-state :initial))
122
,@(when slice '((declare (type fixnum result-count))))
123
(macrolet ((result-value (name form)
124
`(let ((value (catch :skip ,form)))
126
(condition (load-time-value (spocq:make-unbound-variable ',name)))
129
((nil) spocq.a:|false|)
131
(labels ((initialize ()
132
,@(cddr initialization-lambda))
135
(setf group-state :step))
137
(setf group-state :reduce)
138
,@(cddr reduction-lambda))
140
,@(cddr group-projection-lambda))
142
,@(cddr group-predicate-lambda))
145
'(when (> (incf result-count) start))
147
(setf (values %result-data result-row) (new-field-row result-field))
148
(trace-matrix "~&aggregate.next-result ~@{~a ~}"
149
:source source-row :result result-row ,@(when slice '(:result-count result-count)))
151
(handler-bind ((error (lambda (c)
152
;; should an error occur, throw it back to a waiting point where
153
;; it will be noted to suppress further processing of a given binding
155
(setf (values %source-data source-row) (next-field-row source-field))
156
(loop until (cffi:null-pointer-p %source-data)
157
,@(when slice '(until (and end (>= result-count (the fixnum end)))))
158
do (progn (trace-matrix "~&aggregate.next ~@{~a ~}" :source source-row :result result-row)
159
;; perform grouped aggregates when there is a group key
160
,@(when group-dimensions
165
(unless (test-group-key)
170
(setf (values %source-data source-row) (next-field-row source-field)))
171
;; perform a last aggregation iff a group was started.
172
finally (when (eq group-state :step)
173
(complete-group))))))
176
(defmethod compute-matrix-operator-lambda ((operator (eql 'aggregate)) &key
177
source-dimensions group-dimensions projection result-dimensions slice)
178
"Return an operator which iterates over a stream of solutions to aggregate grouped so;utions.
179
each aggregation is computed by a distinct closure, which is generated on demand."
180
(let ((result-column-count (length result-dimensions))
181
(source-column-count (length source-dimensions))
182
(group-column-count (length group-dimensions)))
183
(multiple-value-bind (initialization-lambda step-lambda reduction-lambda group-predicate-lambda group-key-lambda)
184
(compute-matrix-aggregate-lambdas source-dimensions group-dimensions projection result-dimensions)
185
`(lambda (result-field source-field ,@(when slice '(&key (start 0) end)))
186
,(format nil "aggregation operator for projection: ~s group: ~s." projection group-dimensions)
188
(let ((%source-data (cffi::null-pointer))
189
(%result-data (cffi::null-pointer))
192
(group-operators (make-hash-table :test #'equal)))
193
(declare (foreign-type (foreign-array ,+matrix-element-type+ (* ,source-column-count)) %source-data)
194
(foreign-type (foreign-array ,+matrix-element-type+ (* ,result-column-count)) %result-data)
195
(type sb-sys:system-area-pointer %source-data %result-data)
196
(type fixnum source-row result-row)
198
(labels ((make-group-operators ()
199
(macrolet ((result-value (name form)
200
`(let ((value (catch :skip ,form)))
202
(condition (load-time-value (spocq:make-unbound-variable ',name)))
205
((nil) spocq.a:|false|)
207
(let ((group-state :initial)
208
,@(mapcar #'rest (remove-if-not #'(lambda (form) (and (consp form) (eq (first form) 'setf))) initialization-lambda)))
209
(flet ((step-group ()
210
(setf group-state :step)
211
,@(cddr step-lambda))
213
(setf group-state :reduce)
214
(setf (values %result-data result-row) (new-field-row result-field))
215
,@(cddr reduction-lambda)))
216
(cons #'step-group #'reduce-group)))))
217
(get-group-key () ,@(cddr group-key-lambda))
218
(test-group-key () ,@(cddr group-predicate-lambda)) ;; ? not used
219
(get-step-group (key)
220
(or (first (gethash key group-operators))
221
(first (setf (gethash (copy-list key) group-operators) (make-group-operators))))))
223
(let ((cache-data (make-list ,group-column-count))
224
,@(when slice '((result-count 0))))
225
,@(when slice '((declare (type fixnum result-count))))
226
(handler-bind ((error (lambda (c)
227
;; should an error occur, throw it back to a waiting point where
228
;; it will be noted to suppress further processing of a given binding
231
(loop until (cffi:null-pointer-p (setf (values %source-data source-row) (next-field-row source-field)))
232
for group-key = (get-group-key cache-data %source-data source-row)
233
for step-group = (get-step-group group-key)
235
(trace-matrix "~&aggregate.next ~@{~a ~}" :key group-key :source source-row)
236
;; perform grouped aggregates when there is a group key
237
(funcall step-group)))
238
(loop for (nil reduce-group) being the hash-values of group-operators
239
do (funcall reduce-group))
240
(complete-field result-field)))))))))
242
;;; (compile nil (compute-matrix-operator-lambda 'aggregate :result-column-count 2 :source-dimensions '(v1 v2 v3) :projection '((v2max (spocq.a:|max| v2)) (v1sum (spocq.a:|sum| v1))) :group-dimensions '(v3)))
243
;;; (compile nil (compute-matrix-operator-lambda 'aggregate :result-column-count 2 :source-dimensions '(v1 v2 v3) :projection '((v2max (spocq.a:|min| v2)) (v1sum (spocq.a:|avg| v1))) :group-dimensions '(v3)))
244
;;; (compile nil (compute-matrix-operator-lambda 'aggregate :result-column-count 2 :source-dimensions '(v1 v2 v3) :projection '((v2max (spocq.a:|sample| v2)) (v1sum (spocq.a:|count| v1))) :group-dimensions '(v3)))
245
;;; (compile nil (compute-matrix-operator-lambda 'aggregate :result-column-count 2 :source-dimensions '(v1 v2 v3) :projection '((v2count (spocq.a:|count| v2 :distinct t)) (v1sum (spocq.a:|sum| v1))) :group-dimensions '(v3)))
248
(defun compute-matrix-aggregate-lambdas (source-dimensions key-dimensions projection result-dimensions)
249
"returns six operators which close over a lexical environment with accumulation state:
250
- initialization, to reset the state
251
- step (%source-data source-index) to perform the iterative step
252
- reduce (%result-data result-row %source-data source-index group-key-values), to produce the aggregate solution
253
- key-predicate (%source-data source-row)
254
- key-projection (%source-data source-row)"
256
(let* ((aggregate-variables (bindings-variables projection))
257
(condition-variables (loop for variable in aggregate-variables collect (cons-symbol nil variable "-condition")))
258
(aggregate-forms (bindings-value-forms projection))
259
(temporary-bindings ()) ; ((<variable> <form>) ... ) to allow specific initial values
262
(labels ((abstract-aggregate-form (condition-variable expression)
263
;; for each expression,
264
;; - locate the aggregation forms,
265
;; - note for each the requisite temporary variables
266
;; - note an iteration form
267
;; - replace the form with a reduction form
269
(cons (cond ((aggregate-operator-p (first expression))
270
(assert (notany #'aggregate-expression-p (rest expression)) ()
271
"Invalid aggregate term (nested): ~a" expression)
272
(let ((existing-result (rest (assoc expression result-map :test #'equalp))))
274
(multiple-value-bind (t-bindings i-form r-form)
275
(apply #'aggregation-form-components expression)
276
(setf temporary-bindings (append temporary-bindings t-bindings))
277
(push `(unless ,condition-variable
278
(when (typep (catch :skip ,i-form) 'condition)
279
(setf ,condition-variable t)))
281
(let ((r-variable (gensym "RESULT-")))
282
(push `(,r-variable nil) temporary-bindings)
283
(push (cons expression r-variable) result-map)
284
`(setf ,r-variable ,r-form))))))
286
(cons (first expression)
287
(loop for form in (rest expression) collect (abstract-aggregate-form condition-variable form))))))
289
(abstract-aggregate-condition (condition-variable term)
290
(cond ((variable-p term) ;; (member term key-dimensions)
291
;; allow the key variable to appear as a sample
292
(let ((r-variable (gensym "SAMPLE-")))
293
(push (list r-variable nil) temporary-bindings)
294
(setf iteration-forms (append iteration-forms `((spocq.e::sample-step (,r-variable) ,term))))
295
(push (cons term r-variable) result-map)
297
((or (aggregate-expression-p term) (null key-dimensions))
298
(abstract-aggregate-form condition-variable term))
300
(error "Invalid selection specification: invalid aggregation form: ~a." term))))
302
(handled-form (form) `(handler-case ,form (error () nil))))
303
;; iterate over the aggregation bindings and collect from each
304
;; - any initialization forms fir temporary variables
305
;; - any iteration forms, with the field reference replaced by a reference to a temporary interation state
306
;; - any reduction forms, with references to the temporary interation state
307
(setf aggregate-forms
308
(loop for form in aggregate-forms
309
for condition-variable in condition-variables
310
collect (abstract-aggregate-condition condition-variable form)))
311
(let* ((result-macros (loop for v in aggregate-variables
312
for v-index = (position v result-dimensions)
314
collect `(,v (foreign-array-named-object-ref ,v %result-data result-row ,v-index))))
315
(result-bindings (loop for v in aggregate-variables
316
for v-index = (position v result-dimensions)
320
;; make complete solution available as the list of term numbers
321
;; for use in (distinct *)
322
`(solution (list ,@(loop for v in source-dimensions
324
collect `(foreign-array-ref %source-data source-row ,v-index))))
325
;; and each binding available as the interned value
326
(loop for v in source-dimensions
328
collect `(,v (foreign-array-named-object-ref ,v %source-data source-row ,v-index)))))
329
(initialization-lambda
331
,@(mapcar #'(lambda (variable) `(setf ,variable nil)) condition-variables)
332
,@(mapcar #'(lambda (binding) `(setf ,@binding)) temporary-bindings)))
334
`(lambda (%source-data source-row)
335
(symbol-macrolet ,base-macros
338
`(lambda (%result-data result-row)
339
(let ,result-bindings ; for intermediates not projected
340
(symbol-macrolet ,result-macros
341
(progn (trace-data reduce-aggregation-before)
342
,@(loop for variable in aggregate-variables
343
for form in aggregate-forms
344
for condition-variable in condition-variables
345
collect `(setf ,variable
346
(if ,condition-variable
347
(load-time-value (spocq:make-unbound-variable ',variable))
348
(result-value ,variable ,form))))
349
(trace-data reduce-aggregation-after %result-data result-row))))))
351
`(lambda (%cache-data %source-data source-row)
352
(and ,@(loop for dimension in key-dimensions
353
for source-index from 0
354
for key-index = (position dimension key-dimensions)
356
collect `(= (foreign-array-ref %cache-data 0 ,key-index)
357
(foreign-array-ref %source-data source-row ,source-index))))))
359
`(lambda (cache-data %source-data source-row)
360
(let ((cache-data cache-data))
361
,@(loop for dimension in key-dimensions
362
for source-index from 0
363
for key-index = (position dimension key-dimensions)
365
collect `(progn (setf (first cache-data)
366
(foreign-array-ref %source-data source-row ,source-index))
369
(values initialization-lambda
376
;;; (compute-matrix-aggregate-lambdas '(v1 v2 v3) '((v2max (spocq.a:|max| v2)) (v1sum (spocq.a:|sum| v1))) '(v3))