Coverage report: /development/source/library/org/datagraph/spocq-shard/src/algebra/matrix-operators/aggregate.lisp

KindCoveredAll%
expression0282 0.0
branch012 0.0
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
2
 
3
 (in-package :org.datagraph.spocq.implementation)
4
 
5
 (:documentation "This file defines the matrix AGGREGATE operator for the 'org.datagraph.spocq' RDF SPARQL engine."
6
 
7
  (copyright
8
   "Copyright 2015 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved.")
9
 
10
  (:long-description
11
   "Where a select query form includes a group aggregation clause, this is implemented in phases.
12
  First, the base solutions are augmented with any additional keys.
13
  Second, each extended solution is examined in turn, the respective group is identified and
14
  the solution is incorporated into the respective group aggregate.
15
  Once all extended solutions are processed, each aggregate is reduced to produce the result solution.
16
  This file implements the aggregation step. see select.lisp and group.lisp for the other aspects.
17
 
18
  The specified evaluation order is
19
  - group by
20
  - select expressions
21
  - solution modifiers, particularly, order and slice
22
  by which, the groups can be processed unorderd and only the reduced field must be ordered."))
23
 
24
 
25
 (defmethod spocq.e:aggregate ((source-field matrix-field) projection key-bindings &rest arguments &key start end)
26
   "Constuct a result field and aggregate the source directly"
27
   (declare (dynamic-extent arguments))
28
   (let* ((length (if (or start end)
29
                    (- (or end (field-length source-field)) (or start 0))
30
                    *field-page-length*))
31
          (group-dimensions (bindings-variables key-bindings))
32
          (result-dimensions (union-dimensions group-dimensions (solution-field-dimensions source-field)))
33
          (width (length result-dimensions))
34
          (%data (rdfcache:make-matrix length width))
35
          (result-field (clone-matrix-field source-field :data %data
36
                                            :dimensions result-dimensions
37
                                            ;; no sort dimensions as the aggregate value are computed
38
                                            :sort-dimensions ())))
39
     (apply #'matrix-aggregate result-field source-field projection key-bindings arguments)))
40
 
41
 (defun matrix-aggregate (result-field source-field projection key-bindings &rest arguments)
42
   (declare (dynamic-extent arguments))
43
   (let* ((result-dimensions (solution-field-dimensions result-field))
44
          (group-dimensions (bindings-variables key-bindings))
45
          (operator (apply #'matrix-aggregate-operator
46
                           result-dimensions
47
                           (solution-field-dimensions source-field)
48
                           projection
49
                           key-bindings
50
                           arguments)))
51
     (apply operator result-field source-field arguments)
52
     (values result-field
53
             (solution-field-length source-field)
54
             (solution-field-length result-field))))
55
          
56
 (defmethod process-aggregate ((result-field matrix-page-channel)
57
                               (source-field matrix-page-channel)
58
                               base-dimensions
59
                               projection
60
                               key-bindings
61
                               &rest args)
62
     (declare (dynamic-extent args))
63
     (let ((operator (apply #'matrix-aggregate-operator
64
                            (channel-dimensions result-field)
65
                            (channel-dimensions source-field)
66
                            projection
67
                            key-bindings
68
                            args)))
69
       (apply operator result-field source-field args)
70
       (values (solution-field-length source-field)
71
               (solution-field-length result-field))))
72
 
73
 
74
 (defun matrix-aggregate-operator (result-dimensions source-dimensions projection group-dimensions &rest args)
75
   (declare (dynamic-extent args))
76
   (setf projection (sort (copy-list projection) #'string-lessp :key #'first))
77
   (let* ((variables (union-dimensions result-dimensions source-dimensions))
78
         (abstract-variables (loop for v in variables 
79
                                   for count from 0 collect (cons-symbol *variable-package* "v" (princ-to-string count))))
80
         (abstract-map (loop for v in variables for av in abstract-variables collect (cons v av)))
81
         (abstract-result-dimensions (sublis abstract-map result-dimensions))
82
         (abstract-source-dimensions (sublis abstract-map source-dimensions))
83
         (abstract-group-dimensions (sublis abstract-map group-dimensions))
84
         (abstract-projection  (sublis abstract-map projection)))
85
     (values (ensure-matrix-operator 'aggregate
86
                                     :source-dimensions abstract-source-dimensions
87
                                     :group-dimensions abstract-group-dimensions
88
                                     :projection abstract-projection
89
                                     :result-dimensions abstract-result-dimensions
90
                                     :slice (not (null args)))
91
             result-dimensions)))
92
 
93
 #+(or);;  the old methd, required a sorted field
94
 (defmethod compute-matrix-operator-lambda ((operator (eql 'aggregate)) &key
95
                                            source-dimensions group-dimensions projection result-dimensions slice)
96
   "Return an operator which iterates over a stream of solutions to aggregate grouped so;utions.
97
    each aggregation is computed by a distinct closure, which is generated on demand."
98
   (let ((result-column-count (length result-dimensions))
99
         (source-column-count (length source-dimensions))
100
         (group-column-count (length group-dimensions)))
101
     (multiple-value-bind (initialization-lambda step-lambda reduction-lambda group-predicate-lambda group-projection-lambda)
102
                          (compute-matrix-aggregate-lambdas source-dimensions group-dimensions projection result-dimensions)
103
       `(lambda (result-field source-field ,@(when slice '(&key (start 0) end)))
104
          ,(format nil "aggregation operator for projection: ~s group: ~s." projection group-dimensions)
105
          
106
          (let ((%source-data (cffi::null-pointer))
107
                (%result-data (cffi::null-pointer))
108
                (source-row 0)
109
                (result-row 0))
110
            (declare (foreign-type (foreign-array ,+matrix-element-type+ (* ,source-column-count)) %source-data)
111
                     (foreign-type (foreign-array ,+matrix-element-type+ (* ,result-column-count)) %result-data)
112
                     (type sb-sys:system-area-pointer %source-data %result-data)
113
                     (type fixnum source-row result-row)
114
                     )
115
            (cffi:with-foreign-pointer (%cache-data ,(* group-column-count +matrix-element-size+))
116
              (declare (foreign-type (foreign-array ,+matrix-element-type+ (* ,group-column-count)) %cache-data))
117
              (loop for i below ,(* group-column-count +matrix-element-size+)
118
                    do (setf (cffi:mem-ref %cache-data :uchar i) #xff))
119
              (let (,@(mapcar #'rest (remove-if-not #'(lambda (form) (and (consp form) (eq (first form) 'setf))) initialization-lambda))
120
                    ,@(when slice '((result-count 0)))
121
                    (group-state :initial))
122
                ,@(when slice '((declare (type fixnum result-count))))
123
                (macrolet ((result-value (name form)
124
                             `(let ((value (catch :skip ,form)))
125
                                (typecase value
126
                                  (condition (load-time-value (spocq:make-unbound-variable ',name)))
127
                                  (t (case value
128
                                       ((t) spocq.a:|true|)
129
                                       ((nil) spocq.a:|false|)
130
                                       (t value)))))))
131
                  (labels ((initialize ()
132
                             ,@(cddr initialization-lambda))
133
                           (step-in-group ()
134
                             ,@(cddr step-lambda)
135
                             (setf group-state :step))
136
                           (reduce-group ()
137
                             (setf group-state :reduce)
138
                             ,@(cddr reduction-lambda))
139
                           (cache-group-key ()
140
                             ,@(cddr group-projection-lambda))
141
                           (test-group-key ()
142
                             ,@(cddr group-predicate-lambda))
143
                           (complete-group ()
144
                             (,@(if slice
145
                                  '(when (> (incf result-count) start))
146
                                  '(progn))
147
                              (setf (values %result-data result-row) (new-field-row result-field))
148
                              (trace-matrix "~&aggregate.next-result ~@{~a ~}"
149
                                            :source source-row :result result-row ,@(when slice '(:result-count result-count)))
150
                              (reduce-group))))
151
                    (handler-bind ((error (lambda (c)
152
                                            ;; should an error occur, throw it back to a waiting point where
153
                                            ;; it will be noted to suppress further processing of a given binding
154
                                            (throw :skip c))))
155
                      (setf (values %source-data source-row) (next-field-row source-field))
156
                      (loop until (cffi:null-pointer-p %source-data)
157
                            ,@(when slice '(until (and end (>= result-count (the fixnum end)))))
158
                            do (progn (trace-matrix "~&aggregate.next ~@{~a ~}" :source source-row :result result-row)
159
                                      ;; perform grouped aggregates when there is a group key
160
                                      ,@(when group-dimensions
161
                                          '((ecase group-state
162
                                              (:initial
163
                                               (cache-group-key))
164
                                              ((:step :reduce)
165
                                               (unless (test-group-key)
166
                                                 (cache-group-key)
167
                                                 (complete-group)
168
                                                 (initialize))))))
169
                                      (step-in-group)
170
                                      (setf (values %source-data source-row) (next-field-row source-field)))
171
                            ;; perform a last aggregation iff a group was started.
172
                            finally (when (eq group-state :step)
173
                                      (complete-group))))))
174
                result-field)))))))
175
 
176
 (defmethod compute-matrix-operator-lambda ((operator (eql 'aggregate)) &key
177
                                            source-dimensions group-dimensions projection result-dimensions slice)
178
   "Return an operator which iterates over a stream of solutions to aggregate grouped so;utions.
179
    each aggregation is computed by a distinct closure, which is generated on demand."
180
   (let ((result-column-count (length result-dimensions))
181
         (source-column-count (length source-dimensions))
182
         (group-column-count (length group-dimensions)))
183
     (multiple-value-bind (initialization-lambda step-lambda reduction-lambda group-predicate-lambda group-key-lambda)
184
                          (compute-matrix-aggregate-lambdas source-dimensions group-dimensions projection result-dimensions)
185
       `(lambda (result-field source-field ,@(when slice '(&key (start 0) end)))
186
          ,(format nil "aggregation operator for projection: ~s group: ~s." projection group-dimensions)
187
          
188
          (let ((%source-data (cffi::null-pointer))
189
                (%result-data (cffi::null-pointer))
190
                (source-row 0)
191
                (result-row 0)
192
                (group-operators (make-hash-table :test #'equal)))
193
            (declare (foreign-type (foreign-array ,+matrix-element-type+ (* ,source-column-count)) %source-data)
194
                     (foreign-type (foreign-array ,+matrix-element-type+ (* ,result-column-count)) %result-data)
195
                     (type sb-sys:system-area-pointer %source-data %result-data)
196
                     (type fixnum source-row result-row)
197
                     )
198
            (labels ((make-group-operators ()
199
                       (macrolet ((result-value (name form)
200
                                    `(let ((value (catch :skip ,form)))
201
                                       (typecase value
202
                                         (condition (load-time-value (spocq:make-unbound-variable ',name)))
203
                                         (t (case value
204
                                              ((t) spocq.a:|true|)
205
                                              ((nil) spocq.a:|false|)
206
                                              (t value)))))))
207
                         (let ((group-state :initial)
208
                               ,@(mapcar #'rest (remove-if-not #'(lambda (form) (and (consp form) (eq (first form) 'setf))) initialization-lambda)))
209
                           (flet ((step-group ()
210
                                    (setf group-state :step)
211
                                    ,@(cddr step-lambda))
212
                                  (reduce-group ()
213
                                    (setf group-state :reduce)
214
                                    (setf (values %result-data result-row) (new-field-row result-field))
215
                                    ,@(cddr reduction-lambda)))
216
                             (cons #'step-group #'reduce-group)))))
217
                     (get-group-key () ,@(cddr group-key-lambda))
218
                     (test-group-key () ,@(cddr group-predicate-lambda)) ;; ? not used
219
                     (get-step-group (key)
220
                       (or (first (gethash key group-operators))
221
                           (first (setf (gethash (copy-list key) group-operators) (make-group-operators))))))
222
                     
223
                (let ((cache-data (make-list ,group-column-count))
224
                      ,@(when slice '((result-count 0))))
225
                  ,@(when slice '((declare (type fixnum result-count))))
226
                        (handler-bind ((error (lambda (c)
227
                                                 ;; should an error occur, throw it back to a waiting point where
228
                                                 ;; it will be noted to suppress further processing of a given binding
229
                                                 (throw :skip c))))
230
                           
231
                           (loop until (cffi:null-pointer-p (setf (values %source-data source-row) (next-field-row source-field)))
232
                                 for group-key = (get-group-key cache-data %source-data source-row)
233
                                 for step-group = (get-step-group group-key)
234
                                 do (progn
235
                                      (trace-matrix "~&aggregate.next ~@{~a ~}" :key group-key :source source-row)
236
                                      ;; perform grouped aggregates when there is a group key
237
                                      (funcall step-group)))
238
                           (loop for (nil reduce-group) being the hash-values of group-operators
239
                                 do (funcall reduce-group))
240
                           (complete-field result-field)))))))))
241
 
242
 ;;; (compile nil (compute-matrix-operator-lambda 'aggregate :result-column-count 2 :source-dimensions '(v1 v2 v3) :projection '((v2max (spocq.a:|max| v2)) (v1sum (spocq.a:|sum| v1))) :group-dimensions '(v3)))
243
 ;;; (compile nil (compute-matrix-operator-lambda 'aggregate :result-column-count 2 :source-dimensions '(v1 v2 v3) :projection '((v2max (spocq.a:|min| v2)) (v1sum (spocq.a:|avg| v1))) :group-dimensions '(v3)))
244
 ;;; (compile nil (compute-matrix-operator-lambda 'aggregate :result-column-count 2 :source-dimensions '(v1 v2 v3) :projection '((v2max (spocq.a:|sample| v2)) (v1sum (spocq.a:|count| v1))) :group-dimensions '(v3)))
245
 ;;; (compile nil (compute-matrix-operator-lambda 'aggregate :result-column-count 2 :source-dimensions '(v1 v2 v3) :projection '((v2count (spocq.a:|count|  v2 :distinct t)) (v1sum (spocq.a:|sum| v1))) :group-dimensions '(v3)))
246
 
247
 
248
 (defun compute-matrix-aggregate-lambdas (source-dimensions key-dimensions projection result-dimensions)
249
   "returns six operators which close over a lexical environment with accumulation state:
250
  - initialization, to reset the state
251
  - step (%source-data source-index) to perform the iterative step
252
  - reduce (%result-data result-row %source-data source-index group-key-values), to produce the aggregate solution
253
  - key-predicate (%source-data source-row)
254
  - key-projection (%source-data source-row)"
255
   
256
   (let* ((aggregate-variables (bindings-variables projection))
257
          (condition-variables (loop for variable in aggregate-variables collect (cons-symbol nil variable "-condition")))
258
          (aggregate-forms (bindings-value-forms projection))
259
          (temporary-bindings ())         ; ((<variable> <form>) ... ) to allow specific initial values
260
          (iteration-forms ())
261
          (result-map ()))
262
     (labels ((abstract-aggregate-form (condition-variable expression)
263
                ;; for each expression,
264
                ;; - locate the aggregation forms,
265
                ;; - note for each the requisite temporary variables
266
                ;; - note an iteration form
267
                ;; - replace the form with a reduction form
268
                (typecase expression
269
                  (cons (cond ((aggregate-operator-p (first expression))
270
                               (assert (notany #'aggregate-expression-p (rest expression)) ()
271
                                       "Invalid aggregate term (nested): ~a" expression)
272
                               (let ((existing-result (rest (assoc expression result-map :test #'equalp))))
273
                                 (or existing-result
274
                                     (multiple-value-bind (t-bindings i-form r-form)
275
                                                          (apply #'aggregation-form-components expression)
276
                                       (setf temporary-bindings (append temporary-bindings t-bindings))
277
                                       (push `(unless ,condition-variable
278
                                                (when (typep (catch :skip ,i-form) 'condition)
279
                                                  (setf ,condition-variable t)))
280
                                             iteration-forms)
281
                                       (let ((r-variable (gensym "RESULT-")))
282
                                         (push `(,r-variable nil) temporary-bindings)
283
                                         (push (cons expression r-variable) result-map)
284
                                         `(setf ,r-variable ,r-form))))))
285
                              (t
286
                               (cons (first expression)
287
                                     (loop for form in (rest expression) collect (abstract-aggregate-form condition-variable form))))))
288
                  (t expression)))
289
              (abstract-aggregate-condition (condition-variable term)
290
                (cond ((variable-p term) ;; (member term key-dimensions)
291
                       ;; allow the key variable to appear as a sample
292
                       (let ((r-variable (gensym "SAMPLE-")))
293
                         (push (list r-variable nil) temporary-bindings)
294
                         (setf iteration-forms (append iteration-forms `((spocq.e::sample-step (,r-variable) ,term))))
295
                         (push (cons term r-variable) result-map)
296
                         r-variable))
297
                      ((or (aggregate-expression-p term) (null key-dimensions))
298
                       (abstract-aggregate-form condition-variable term))
299
                      (t
300
                       (error "Invalid selection specification: invalid aggregation form: ~a." term))))
301
 
302
              (handled-form (form) `(handler-case ,form (error () nil))))
303
       ;; iterate over the aggregation bindings and collect from each
304
       ;; - any initialization forms fir temporary variables
305
       ;; - any iteration forms, with the field reference replaced by a reference to a temporary interation state
306
       ;; - any reduction forms, with references to the temporary interation state
307
       (setf aggregate-forms
308
             (loop for form in aggregate-forms
309
                   for condition-variable in condition-variables
310
                   collect (abstract-aggregate-condition condition-variable form)))      
311
       (let* ((result-macros (loop for v in aggregate-variables
312
                                   for v-index = (position v result-dimensions)
313
                                   when v-index
314
                                   collect `(,v (foreign-array-named-object-ref ,v %result-data result-row ,v-index))))
315
              (result-bindings (loop for v in aggregate-variables
316
                                     for v-index = (position v result-dimensions)
317
                                     unless v-index
318
                                     collect `(,v nil)))
319
              (base-macros (cons
320
                            ;; make complete solution available as the list of term numbers
321
                            ;; for use in (distinct *) 
322
                            `(solution (list ,@(loop for v in source-dimensions
323
                                                     for v-index from 0
324
                                                     collect `(foreign-array-ref %source-data source-row ,v-index))))
325
                            ;; and each binding available as the interned value
326
                            (loop for v in source-dimensions
327
                                  for v-index from 0
328
                                  collect `(,v (foreign-array-named-object-ref ,v %source-data source-row ,v-index)))))
329
              (initialization-lambda
330
               `(lambda ()
331
                  ,@(mapcar #'(lambda (variable) `(setf ,variable nil)) condition-variables)
332
                  ,@(mapcar #'(lambda (binding) `(setf ,@binding)) temporary-bindings)))
333
              (step-lambda
334
               `(lambda (%source-data source-row)
335
                  (symbol-macrolet ,base-macros
336
                    ,@iteration-forms)))
337
              (reduction-lambda
338
               `(lambda (%result-data result-row)
339
                  (let ,result-bindings          ; for intermediates not projected
340
                    (symbol-macrolet ,result-macros
341
                      (progn (trace-data reduce-aggregation-before)
342
                             ,@(loop for variable in aggregate-variables
343
                                     for form in aggregate-forms
344
                                     for condition-variable in condition-variables
345
                                     collect `(setf ,variable
346
                                                    (if ,condition-variable
347
                                                      (load-time-value (spocq:make-unbound-variable ',variable))
348
                                                      (result-value ,variable ,form))))
349
                             (trace-data reduce-aggregation-after %result-data result-row))))))
350
              (key-predicate
351
               `(lambda (%cache-data %source-data source-row)
352
                  (and ,@(loop for dimension in key-dimensions
353
                               for source-index from 0
354
                               for key-index = (position dimension key-dimensions)
355
                               when key-index
356
                               collect `(= (foreign-array-ref %cache-data 0 ,key-index)
357
                                           (foreign-array-ref %source-data source-row ,source-index))))))
358
              (key-projection
359
               `(lambda (cache-data %source-data source-row)
360
                  (let ((cache-data cache-data))
361
                    ,@(loop for dimension in key-dimensions
362
                        for source-index from 0
363
                        for key-index = (position dimension key-dimensions)
364
                        when key-index
365
                        collect `(progn (setf (first cache-data)
366
                                              (foreign-array-ref %source-data source-row ,source-index))
367
                                   (pop cache-data))))
368
                  cache-data)))
369
         (values initialization-lambda
370
                 step-lambda
371
                 reduction-lambda
372
                 key-predicate
373
                 key-projection)))))
374
         
375
 
376
 ;;; (compute-matrix-aggregate-lambdas '(v1 v2 v3) '((v2max (spocq.a:|max| v2)) (v1sum (spocq.a:|sum| v1))) '(v3))