Coverage report: /development/source/library/org/datagraph/spocq-shard/src/algebra/operators/group.lisp

KindCoveredAll%
expression0239 0.0
branch016 0.0
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
2
 
3
 (in-package :org.datagraph.spocq.implementation)
4
 
5
 (:documentation "This file defines the four query form operators for the 'org.datagraph.spocq' RDF engine."
6
 
7
  (copyright
8
   "Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved.")
9
 
10
  (:long-description
11
   "The select query form may include a group aggregation clause. This is implemented in two phases, first
12
  the solutions are segmented into keyed groups, second each group is aggregated to generate a solution.
13
  This file implements the segmentation step. see select.lisp and aggregate.lisp for the other apsects."))
14
 
15
 
16
 (defmacro spocq.a:|group| (solution-field key-bindings &rest args &key count end offset start)
17
   "( ( solutionField ( variable (function (solution) RDFTerm) )* ) ( RDFTerm solutionField )* )
18
 A GROUP form extends a given solution field with keys which are composed of set of computed bindings and then
19
 segments the field according to those keys.
20
 "
21
 
22
   (declare (ignore count end offset start)
23
            (dynamic-extent args))
24
   (apply #'macroexpand-group solution-field key-bindings args))
25
 
26
 
27
 (defun macroexpand-group (solution-field key-bindings &rest args)
28
   (setf args (apply #'canonicalize-algebra-arguments args))
29
   (let ((sliced-expression (when args
30
                              (apply #'compute-expression-slice solution-field args))))
31
     (if sliced-expression
32
       `(spocq.e:group ,sliced-expression ',key-bindings)
33
       `(spocq.e:group ,solution-field ',key-bindings ,@args))))
34
 
35
 
36
 
37
 (defgeneric spocq.e:group (solution-field key-bindings &key end start)
38
   (:documentation "group a solution field as specified by the given group key specifications:
39
  SOLUTION-FIELD : a solution field stream
40
  KEY-BINDINGS : a list of (variable form) bindings, each form of which is applied iteratively to each
41
    solution to yield a cache key.
42
  The result is a field with the structure (key . (list solutions)).")
43
 
44
   (:method :before ((field agp) key-bindings  &key end start)
45
     (assert-argument-types process-group
46
       (start (or null (integer 0)))
47
       (end (or null (integer 0))))
48
     (incf-stat *algebra-operations*)
49
     (trace-algebra process-group field key-bindings
50
                    :start start :end end))
51
 
52
   (:method ((field solution-generator) key-bindings &rest args)
53
     (declare (dynamic-extent args))
54
     (apply #'spocq.e:stream-group field key-bindings args)))
55
 
56
 
57
 (defun spocq.e:stream-group (field-generator key-bindings &rest args &key end start)
58
   (let* ((base-dimensions (solution-generator-dimensions field-generator))
59
          (result-dimensions base-dimensions)
60
          (result-channel (make-channel :name (list 'spocq.a:|group| (task-id *query*))
61
                                        :dimensions result-dimensions
62
                                        :size (effective-channel-size :start start :end end)
63
                                        :page-length (effective-page-length :start start :end end))))
64
     (labels ((run-group-thread (result-channel field-generator key-bindings)
65
                (let ((base-dimensions (solution-generator-dimensions field-generator))
66
                      (base-channel (solution-generator-channel field-generator))
67
                      (base-expression (solution-generator-expression field-generator))
68
                      (*thread-operations* (cons (list* 'spocq.a:|group| base-dimensions
69
                                                        args)
70
                                                 *thread-operations*)))
71
                  (push 'spocq.a:|group| (channel-name base-channel))
72
                  (query-run-in-thread *query* base-expression)
73
                  (setf (channel-size result-channel) (min (channel-size base-channel)
74
                                                           (channel-size result-channel))
75
                        (channel-page-length result-channel) (min (channel-page-length base-channel)
76
                                                                  (channel-page-length result-channel)))
77
                  (apply #'process-group result-channel base-channel
78
                         base-dimensions
79
                         key-bindings
80
                         args)
81
                  'spocq.a:|group|)))
82
       (make-solution-generator :operator 'spocq.a:|group|
83
                                      ;; coerce elements to lists
84
                                      :dimensions result-dimensions
85
                                      :expression (list #'run-group-thread result-channel field-generator key-bindings)
86
                                      :channel result-channel
87
                                      :constituents (list field-generator)))))
88
 
89
 
90
 (defmethod process-group ((destination array-page-channel)
91
                           (base-source array-page-channel)
92
                           base-dimensions key-bindings &key (start 0) end)
93
   "Generate a stream of grouped solutions. The stream is segmented into (key . (solutions)) messages
94
  where the key is a list or atom which characterizes the group as per the given by-group specification."
95
 
96
   (declare (list base-dimensions key-bindings))
97
   (assert-argument-types process-group
98
     (base-dimensions list)
99
     (key-bindings list))
100
 
101
   (multiple-value-bind (cache-operator collector)
102
                        (compute-group-operators base-dimensions key-bindings)
103
     (declare (type (function (array fixnum hash-table) t) cache-operator)
104
              (type (function (array fixnum array fixnum) t) collector))
105
     (let* ((result-page-width (channel-page-width destination))
106
            (result-page-length (channel-page-length destination))
107
            (result-page nil)
108
            (result-index result-page-length)
109
            (cache (if (null (set-difference (bindings-value-forms key-bindings) base-dimensions))
110
                     (make-term-id-cache :single-thread t)
111
                     (make-solution-cache :single-thread t)))
112
            (group-pages ())
113
            (result-count 0)
114
            (solution-count 0))
115
       (declare (type fixnum result-page-width result-page-length result-index result-count)
116
                (type list group-pages))
117
       (assert (= (length base-dimensions) result-page-width) ()
118
               "Channel and operation dimensions do not match: ~a: ~a." destination base-dimensions)
119
       (labels ((base-processor (base-page)
120
                  (dotimes (base-index (array-dimension base-page 0))
121
                    (funcall cache-operator base-page base-index cache)))
122
                (collect-group (group-key base-page-list)
123
                  (when (> (incf result-count) start)
124
                    (loop for (base-page . base-index) in base-page-list
125
                          do (collect-solution base-page base-index))
126
                    (complete-group group-key)
127
                    (when (and end (>= result-count end)) (complete-solutions))))
128
                (collect-solution (base-page base-index)
129
                  (next-solution-location)
130
                  (funcall collector result-page result-index base-page base-index))
131
                (next-solution-location ()
132
                  ;; return a page (possible newly created) and the next free location in that page
133
                  (when (>= (incf result-index) result-page-length)
134
                    (when result-page (push result-page group-pages))
135
                    (setf result-page (new-field-page destination result-page-length result-page-width)
136
                          result-index 0)))
137
                (complete-group (group-key)
138
                  (when result-page
139
                    (let ((page-result-count (1+ result-index)))
140
                    (when (< page-result-count result-page-length)
141
                      (setf result-page (adjust-page result-page (list page-result-count result-page-width)))))
142
                    (put-page (cons group-key (nreverse (cons result-page group-pages)))))
143
                  (setf group-pages ()
144
                        result-page nil
145
                        result-index result-page-length))
146
                (complete-solutions ()
147
                  (complete-field destination)
148
                  (incf-stat *solutions-processed* solution-count)
149
                  (incf-stat *solutions-constructed* result-count)
150
                  (return-from process-group (values solution-count result-count)))
151
                (put-page (page)
152
                  (trace-data process-group destination base-dimensions (term-value-field page))
153
                  (put-field-page destination page)))
154
         (declare (dynamic-extent #'collect-group))
155
         (unless (and (plusp result-page-length) (or (null end) (> end start))) (complete-solutions))
156
         (do-pages (solutions base-source)
157
           (check-query-status *query*)
158
           (incf solution-count (array-dimension solutions 0))
159
           (base-processor solutions))
160
         (maphash #'collect-group cache)
161
         (complete-solutions)))))
162
 
163
 
164
 (defun compute-group-operators (base-dimensions key-bindings)
165
   (values (compute-extended-write-cache-op base-dimensions key-bindings)
166
           (compute-unary-collector base-dimensions base-dimensions)))
167
 
168
 
169
 ;;; (compute-extended-write-cache-op '(?::s ?::v ?::w) '((?::X (spocq.a::|coalesce| ?::w "a"))))