Coverage report: /development/source/library/org/datagraph/spocq-shard/src/algebra/operators/slice.lisp

KindCoveredAll%
expression200260 76.9
branch822 36.4
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
2
 
3
 (in-package :org.datagraph.spocq.implementation)
4
 
5
 
6
 (defmacro spocq.a:|slice| (solution-field &rest args &key count end offset start)
7
   "( ( solutionField ) solutionField )
8
 A SLICE form reduces a given solution field to produce a result field which
9
 includes only those solutions within the specified [offset end) interval.
10
 "
11
 
12
   (declare (ignore count end offset start))
13
   (apply #'macroexpand-slice solution-field args))
14
 
15
 
16
 (defun macroexpand-slice (solution-field &rest args)
17
   (setf args (apply #'canonicalize-algebra-arguments args))
18
   (if (or (getf args :end)
19
           (typep (getf args :start) '(integer 1)))
20
     (or (apply #'compute-expression-slice solution-field args)
21
         `(spocq.e:slice ,solution-field ,@args))
22
     solution-field))
23
 
24
 
25
 (defgeneric spocq.e:slice (solution-field &rest args &key end start)
26
   (:documentation "Given a solution field and a predicate, return a new field of those solutions
27
     which satisfy the predicate.")
28
 
29
   (:method :before ((solution-field t) &key end start)
30
     (assert-argument-types spocq.e:slice
31
       (start (or null (integer 0)))
32
       (end (or null (integer 0))))
33
     (incf-stat *algebra-operations*)
34
     (trace-algebra spocq.e:slice solution-field
35
                    :start start :end end))
36
 
37
   (:method ((solution-field solution-generator) &rest args)
38
     (declare (dynamic-extent args))
39
     (apply #'spocq.e:stream-slice solution-field args))
40
   )
41
 
42
 
43
 (defun spocq.e:stream-slice (field-generator &rest args &key end start)
44
   (if args
45
     (let* ((base-dimensions (solution-generator-dimensions field-generator))
46
            (result-channel (make-channel :name (list 'spocq.a:|slice| (task-id *query*))
47
                                        :dimensions base-dimensions
48
                                        :sort-dimensions base-dimensions
49
                                        :size (effective-channel-size :start start :end end)
50
                                        :page-length (effective-page-length :start start :end end))))
51
       (labels ((run-slice-thread (result-channel field-generator)
52
                  (let* ((base-dimensions (solution-generator-dimensions field-generator))
53
                         (base-channel (solution-generator-channel field-generator))
54
                         (base-expression (solution-generator-expression field-generator))
55
                         (*thread-operations* (cons (list* 'spocq.a:|slice| base-dimensions
56
                                                           args)
57
                                                    *thread-operations*)))
58
                    (push 'spocq.a:|slice| (channel-name base-channel))
59
                    (query-run-in-thread *query* base-expression)
60
                    (apply #'process-slice result-channel base-channel
61
                           base-dimensions
62
                           args)
63
                    'spocq.a:|slice|)))
64
         ;; return the binding function to the combination operator
65
         (make-solution-generator :operator 'spocq.a:|slice|
66
                                  :dimensions base-dimensions
67
                                  :expression (list #'run-slice-thread result-channel field-generator)
68
                                  :channel result-channel
69
                                  :constituents (list field-generator))))
70
     field-generator))
71
 
72
 
73
 (defmethod process-slice ((destination array-page-channel)
74
                           (base-source array-page-channel)
75
                           base-dimensions &key (start 0) end)
76
   "Generate a sliced stream to a destination given a solution source and slice (start, end) specifications."
77
 
78
   (declare (list base-dimensions))
79
   (assert-argument-types process-slice
80
     (base-dimensions list))
81
 
82
   (multiple-value-bind (collector)
83
                        (compute-slice-operators base-dimensions)
84
     (let* ((result-page-width (channel-page-width destination))
85
            (result-page-length (channel-page-length destination))
86
            (result-page nil)
87
            (result-index result-page-length)
88
            (result-count 0)
89
            (solution-count 0))
90
       (assert (= (length base-dimensions) result-page-width) ()
91
               "Channel and operation dimensions do not match: ~a: ~a." destination base-dimensions)
92
       (labels ((base-processor (base-page)
93
                  (let ((solution-count (array-dimension base-page 0)))
94
                    (cond ((> result-count start)
95
                           (complete-page)
96
                           (cond ((or (null end) (<= (+ result-count solution-count) end))
97
                                  (let ((result-page (new-field-page destination result-page-length result-page-width)))
98
                                    ;; iff they match, re-use, otherwise discard the resourced page
99
                                    (if (= (array-dimension result-page 0) (array-dimension base-page 0))
100
                                      (copy-page base-page result-page)
101
                                      (setf result-page (copy-page base-page)))
102
                                    (put-page result-page))
103
                                  (incf result-count solution-count)
104
                                  (when (and end (>= result-count end)) (complete-solutions)))
105
                                 (t
106
                                  (dotimes (base-index solution-count)
107
                                    (collect-solution base-page base-index)))))
108
                          ((> (+ result-count solution-count) start)
109
                           (loop for base-index from (- start (shiftf result-count start)) below solution-count
110
                                 do (collect-solution base-page base-index)))
111
                          (t
112
                           ;; skip the page
113
                           (incf result-count solution-count)))))
114
                (collect-solution (base-page base-index)
115
                  ;; start constraint is already satisfied
116
                  (incf result-count)
117
                  (next-solution-location)
118
                  (funcall collector result-page result-index base-page base-index)
119
                  (when (and end (>= result-count end)) (complete-solutions)))
120
                (next-solution-location ()
121
                  ;; return a page (possible newly created) and the next free location in that page
122
                  (when (>= (incf result-index) result-page-length)
123
                    (when result-page (put-page result-page))
124
                    (setf result-page (new-field-page destination result-page-length result-page-width)
125
                          result-index 0)))
126
                (complete-page ()
127
                  (when result-page
128
                    (let ((page-result-count (1+ result-index)))
129
                      (when (< page-result-count result-page-length)
130
                        (setf result-page
131
                              (adjust-page result-page (list page-result-count result-page-width)))))
132
                    (put-page result-page)
133
                    (setf result-index result-page-length
134
                          ;; clear result page for the case of the extra complete-page when iterating
135
                          result-page nil)))
136
                (complete-solutions ()
137
                  (complete-page)
138
                  (put-page nil)
139
                  (incf-stat *solutions-processed* solution-count)
140
                  (incf-stat *solutions-constructed* result-count)
141
                  (return-from process-slice (values solution-count result-count)))
142
                (put-page (page)
143
                  (trace-data process-slice.put-page destination base-dimensions page)
144
                  (if page
145
                    (put-field-page destination page)
146
                    (complete-field destination))))
147
         (unless (and (plusp result-page-length) (or (null end) (> end start))) (complete-solutions))
148
         (do-pages (solutions base-source)
149
           (incf solution-count (array-dimension solutions 0))
150
           (check-query-status *query*)
151
           (base-processor solutions))
152
         (complete-solutions)))))
153
 
154
 
155
 (defun compute-slice-operators (base-dimensions)
156
   (values (compute-unary-collector base-dimensions base-dimensions)))
157
 
158
 
159
 
160