Coverage report: /development/source/library/org/datagraph/spocq-shard/src/algebra/operators/union.lisp

KindCoveredAll%
expression228252 90.5
branch712 58.3
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; -*- Mode: lisp; Syntax: ansi-common-lisp; first: 10; Package: org.datagraph.spocq.implementation; -*-
2
 
3
 (in-package :org.datagraph.spocq.implementation)
4
 
5
 (defmacro spocq.a:|union| (solution-field1 solution-field2 &rest args &key count end offset start)
6
   "( ( solutionField solutionField ) solutionField )
7
 A UNION form combines to solution fields to produce a result field which
8
 contains all solutions from the first field followed by those in the second."
9
 
10
   (declare (ignore count end offset start))
11
   (apply #'macroexpand-union solution-field1 solution-field2 args))
12
 
13
 
14
 (defun macroexpand-union (field1 field2 &rest args)
15
   "Compute the combination of the two solution fields with a predicate to be
16
  applied to each compatible, merged solution. Both field forms are evaluated.
17
  The result is those solutions from the first field which are either incompatible with all
18
  solutions in the second field, or fail the predicate when combined with compatible solutions."
19
   (setf args (apply #'canonicalize-algebra-arguments args))
20
   `(spocq.e:union (spocq.e::with-join-scope ,(gensym "union-") ,field1)
21
                   (spocq.e::with-join-scope ,(gensym "union-") ,field2)
22
                    ,@args))
23
 
24
 
25
 (defgeneric spocq.e:union (base-field added-field &key end start)
26
   (:documentation "Append the two solution fields. NB. this is not a set union, as
27
  the result cardinality is specified to be the sum of the argument fields' cardinalities.")
28
 
29
   (:method :before ((field1 t) (field2 t) &key end start)
30
     (assert-argument-types spocq.e:union
31
       (start (or null (integer 0)))
32
       (end (or null (integer 0))))
33
     (incf-stat *algebra-operations*)
34
     (trace-algebra spocq.e:union field1 field2
35
                    :start start :end end))
36
   
37
   (:method ((field1 solution-generator) (field2 solution-generator) &rest args)
38
     (declare (dynamic-extent args))
39
     (apply #'union-generator field1 field2
40
            args)))
41
 
42
 
43
 (defun run-union-thread (result-channel first-field-generator second-field-generator
44
                                         result-dimensions first-dimensions second-dimensions
45
                                         args)
46
   (let ((first-dimensions (solution-generator-dimensions first-field-generator))
47
         (second-dimensions (solution-generator-dimensions second-field-generator))
48
         (result-dimensions (union-dimensions first-dimensions second-dimensions))
49
         (first-channel (solution-generator-channel first-field-generator))
50
         (second-channel (solution-generator-channel second-field-generator))
51
         (first-expression (solution-generator-expression first-field-generator))
52
         (second-expression (solution-generator-expression second-field-generator))
53
         (*thread-operations* (cons (list* 'spocq.a:|slice| (task-id *task*)
54
                                           result-dimensions first-dimensions second-dimensions
55
                                           args)
56
                                    *thread-operations*)))
57
     (push '(spocq.a:|union| :first) (channel-name first-channel))
58
     (push '(spocq.a:|union| :second) (channel-name second-channel))
59
     (query-run-in-thread *query* first-expression)
60
     (query-run-in-thread *query* second-expression)
61
     (setf (channel-size result-channel) (min (channel-size first-channel)
62
                                              (channel-size second-channel)
63
                                              (channel-size result-channel))
64
           (channel-page-length result-channel) (min (channel-page-length first-channel)
65
                                                     (channel-page-length second-channel)
66
                                                     (channel-page-length result-channel)))
67
     (apply #'process-union result-channel first-channel second-channel
68
            result-dimensions
69
            first-dimensions
70
            second-dimensions
71
            args)
72
     'spocq.a:|union|))
73
 
74
 (defun union-generator (first-field-generator second-field-generator &rest args &key end start)
75
   "Combine two argument field generators into a new generator together with a new result channel and
76
  specifications for the result dimensions and the expression to be run to generate the result stream."
77
 
78
   (let* ((first-dimensions (solution-generator-dimensions first-field-generator))
79
          (second-dimensions (solution-generator-dimensions second-field-generator))
80
          (result-dimensions (union-dimensions first-dimensions second-dimensions))
81
          (result-channel (make-channel :name (list 'spocq.a:|union| (task-id *query*))
82
                                        :dimensions result-dimensions
83
                                        :size (effective-channel-size :start start :end end)
84
                                        :page-length (effective-page-length :start start :end end))))
85
     ;; return the binding function to the combination operator
86
     (make-solution-generator :operator 'spocq.a:|union|
87
                              :dimensions result-dimensions
88
                              :expression (list #'run-union-thread result-channel first-field-generator second-field-generator
89
                                                result-dimensions first-dimensions second-dimensions
90
                                                args)
91
                              :channel result-channel
92
                              :constituents (list first-field-generator second-field-generator))))
93
 
94
 
95
 (defmethod process-union ((destination array-page-channel)
96
                           (first-source array-page-channel)
97
                           (second-source array-page-channel)
98
                           result-dimensions first-dimensions second-dimensions &key (start 0) end)
99
   "Combine two source streams to produce solutions to a continuation.
100
  Apply optional slice (start,end) constraints to limit the result.
101
 
102
  nb. this must allow incompatible constituents by extending the result domain as required.
103
  in order for that result to be useful, the dominant operators cannot be a join/leftjoin
104
  as the constant field domain would cause any unbound variables to miss solutions."
105
 
106
   (declare (list result-dimensions first-dimensions second-dimensions))
107
   (assert-argument-types process-union
108
     (result-dimensions list)
109
     (first-dimensions list)
110
     (second-dimensions list))
111
 
112
   (let* ((result-page-width (channel-page-width destination))
113
          (result-page-length (channel-page-length destination))
114
          (result-page nil)
115
          (result-index result-page-length)
116
          (result-count 0)
117
          (first-count 0)
118
          (second-count 0)
119
          (first-collector (compute-unary-collector result-dimensions first-dimensions))
120
          (second-collector (compute-unary-collector result-dimensions second-dimensions)))
121
     (declare (type (function (array fixnum array fixnum) t) first-collector second-collector))
122
     (assert (= (length result-dimensions) result-page-width) ()
123
             "Channel and operation dimensions do not match: ~a: ~a." destination result-dimensions)
124
     (labels ((base-processor (collector base-page)
125
                ;; (print (list :page (term-value-field base-page)))
126
                (let ((solution-count (array-dimension base-page 0)))
127
                  (dotimes (base-index solution-count)
128
                    (collect-solution collector base-page base-index))))
129
              (collect-solution (collector base-page base-index)
130
                (declare (type (function (array fixnum array fixnum) t) collector))
131
                (when (> (incf result-count) start)
132
                  (next-solution-location)
133
                  (funcall collector result-page result-index base-page base-index)
134
                  (when (and end (>= result-count end)) (complete-solutions))))
135
              (next-solution-location ()
136
                ;; return a page (possible newly created) and the next free location in that page
137
                (when (>= (incf result-index) result-page-length)
138
                  (when result-page (put-page result-page))
139
                  (setf result-page (new-field-page destination result-page-length result-page-width)
140
                        result-index 0)))
141
              (complete-solutions ()
142
                (when result-page
143
                  (let ((page-result-count (1+ result-index)))
144
                    (when (< page-result-count result-page-length)
145
                      (setf result-page
146
                            (adjust-page result-page (list page-result-count result-page-width)))))
147
                  (put-page result-page))
148
                (complete-field destination)
149
                (let ((solution-count (+ first-count second-count)))
150
                  (incf-stat *solutions-processed* solution-count)
151
                  (incf-stat *solutions-constructed* result-count)
152
                  (trace-algebra process-union.complete destination)
153
                  (return-from process-union (values solution-count result-count
154
                                                     first-count
155
                                                     second-count))))
156
              (put-page (page)
157
                ;; (print (list :put  (term-value-field page)))
158
                (trace-data process-union destination result-dimensions (term-value-field page))
159
                (put-field-page destination page)))
160
       (unless (and (plusp result-page-length) (or (null end) (> end start))) (complete-solutions))
161
       (do-pages (solutions first-source)
162
         ;; (print (list :left solutions))
163
         (check-query-status *query*)
164
         (incf first-count (array-dimension solutions 0))
165
         (base-processor first-collector solutions))
166
       (do-pages (solutions second-source)
167
         ;; (print (list :right solutions))
168
         (check-query-status *query*)
169
         (incf second-count (array-dimension solutions 0))
170
         (base-processor second-collector solutions))
171
       (complete-solutions))))
172