Coverage report: /development/source/library/org/datagraph/spocq-shard/src/algebra/operators/group.lisp
| Kind | Covered | All | % |
| expression | 0 | 239 | 0.0 |
| branch | 0 | 16 | 0.0 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
3
(in-package :org.datagraph.spocq.implementation)
5
(:documentation "This file defines the four query form operators for the 'org.datagraph.spocq' RDF engine."
8
"Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved.")
11
"The select query form may include a group aggregation clause. This is implemented in two phases, first
12
the solutions are segmented into keyed groups, second each group is aggregated to generate a solution.
13
This file implements the segmentation step. see select.lisp and aggregate.lisp for the other apsects."))
16
(defmacro spocq.a:|group| (solution-field key-bindings &rest args &key count end offset start)
17
"( ( solutionField ( variable (function (solution) RDFTerm) )* ) ( RDFTerm solutionField )* )
18
A GROUP form extends a given solution field with keys which are composed of set of computed bindings and then
19
segments the field according to those keys.
22
(declare (ignore count end offset start)
23
(dynamic-extent args))
24
(apply #'macroexpand-group solution-field key-bindings args))
27
(defun macroexpand-group (solution-field key-bindings &rest args)
28
(setf args (apply #'canonicalize-algebra-arguments args))
29
(let ((sliced-expression (when args
30
(apply #'compute-expression-slice solution-field args))))
32
`(spocq.e:group ,sliced-expression ',key-bindings)
33
`(spocq.e:group ,solution-field ',key-bindings ,@args))))
37
(defgeneric spocq.e:group (solution-field key-bindings &key end start)
38
(:documentation "group a solution field as specified by the given group key specifications:
39
SOLUTION-FIELD : a solution field stream
40
KEY-BINDINGS : a list of (variable form) bindings, each form of which is applied iteratively to each
41
solution to yield a cache key.
42
The result is a field with the structure (key . (list solutions)).")
44
(:method :before ((field agp) key-bindings &key end start)
45
(assert-argument-types process-group
46
(start (or null (integer 0)))
47
(end (or null (integer 0))))
48
(incf-stat *algebra-operations*)
49
(trace-algebra process-group field key-bindings
50
:start start :end end))
52
(:method ((field solution-generator) key-bindings &rest args)
53
(declare (dynamic-extent args))
54
(apply #'spocq.e:stream-group field key-bindings args)))
57
(defun spocq.e:stream-group (field-generator key-bindings &rest args &key end start)
58
(let* ((base-dimensions (solution-generator-dimensions field-generator))
59
(result-dimensions base-dimensions)
60
(result-channel (make-channel :name (list 'spocq.a:|group| (task-id *query*))
61
:dimensions result-dimensions
62
:size (effective-channel-size :start start :end end)
63
:page-length (effective-page-length :start start :end end))))
64
(labels ((run-group-thread (result-channel field-generator key-bindings)
65
(let ((base-dimensions (solution-generator-dimensions field-generator))
66
(base-channel (solution-generator-channel field-generator))
67
(base-expression (solution-generator-expression field-generator))
68
(*thread-operations* (cons (list* 'spocq.a:|group| base-dimensions
70
*thread-operations*)))
71
(push 'spocq.a:|group| (channel-name base-channel))
72
(query-run-in-thread *query* base-expression)
73
(setf (channel-size result-channel) (min (channel-size base-channel)
74
(channel-size result-channel))
75
(channel-page-length result-channel) (min (channel-page-length base-channel)
76
(channel-page-length result-channel)))
77
(apply #'process-group result-channel base-channel
82
(make-solution-generator :operator 'spocq.a:|group|
83
;; coerce elements to lists
84
:dimensions result-dimensions
85
:expression (list #'run-group-thread result-channel field-generator key-bindings)
86
:channel result-channel
87
:constituents (list field-generator)))))
90
(defmethod process-group ((destination array-page-channel)
91
(base-source array-page-channel)
92
base-dimensions key-bindings &key (start 0) end)
93
"Generate a stream of grouped solutions. The stream is segmented into (key . (solutions)) messages
94
where the key is a list or atom which characterizes the group as per the given by-group specification."
96
(declare (list base-dimensions key-bindings))
97
(assert-argument-types process-group
98
(base-dimensions list)
101
(multiple-value-bind (cache-operator collector)
102
(compute-group-operators base-dimensions key-bindings)
103
(declare (type (function (array fixnum hash-table) t) cache-operator)
104
(type (function (array fixnum array fixnum) t) collector))
105
(let* ((result-page-width (channel-page-width destination))
106
(result-page-length (channel-page-length destination))
108
(result-index result-page-length)
109
(cache (if (null (set-difference (bindings-value-forms key-bindings) base-dimensions))
110
(make-term-id-cache :single-thread t)
111
(make-solution-cache :single-thread t)))
115
(declare (type fixnum result-page-width result-page-length result-index result-count)
116
(type list group-pages))
117
(assert (= (length base-dimensions) result-page-width) ()
118
"Channel and operation dimensions do not match: ~a: ~a." destination base-dimensions)
119
(labels ((base-processor (base-page)
120
(dotimes (base-index (array-dimension base-page 0))
121
(funcall cache-operator base-page base-index cache)))
122
(collect-group (group-key base-page-list)
123
(when (> (incf result-count) start)
124
(loop for (base-page . base-index) in base-page-list
125
do (collect-solution base-page base-index))
126
(complete-group group-key)
127
(when (and end (>= result-count end)) (complete-solutions))))
128
(collect-solution (base-page base-index)
129
(next-solution-location)
130
(funcall collector result-page result-index base-page base-index))
131
(next-solution-location ()
132
;; return a page (possible newly created) and the next free location in that page
133
(when (>= (incf result-index) result-page-length)
134
(when result-page (push result-page group-pages))
135
(setf result-page (new-field-page destination result-page-length result-page-width)
137
(complete-group (group-key)
139
(let ((page-result-count (1+ result-index)))
140
(when (< page-result-count result-page-length)
141
(setf result-page (adjust-page result-page (list page-result-count result-page-width)))))
142
(put-page (cons group-key (nreverse (cons result-page group-pages)))))
145
result-index result-page-length))
146
(complete-solutions ()
147
(complete-field destination)
148
(incf-stat *solutions-processed* solution-count)
149
(incf-stat *solutions-constructed* result-count)
150
(return-from process-group (values solution-count result-count)))
152
(trace-data process-group destination base-dimensions (term-value-field page))
153
(put-field-page destination page)))
154
(declare (dynamic-extent #'collect-group))
155
(unless (and (plusp result-page-length) (or (null end) (> end start))) (complete-solutions))
156
(do-pages (solutions base-source)
157
(check-query-status *query*)
158
(incf solution-count (array-dimension solutions 0))
159
(base-processor solutions))
160
(maphash #'collect-group cache)
161
(complete-solutions)))))
164
(defun compute-group-operators (base-dimensions key-bindings)
165
(values (compute-extended-write-cache-op base-dimensions key-bindings)
166
(compute-unary-collector base-dimensions base-dimensions)))
169
;;; (compute-extended-write-cache-op '(?::s ?::v ?::w) '((?::X (spocq.a::|coalesce| ?::w "a"))))