Coverage report: /development/source/library/org/datagraph/spocq-shard/src/algebra/operators/distinct.lisp

KindCoveredAll%
expression202230 87.8
branch1420 70.0
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
2
 
3
 (in-package :org.datagraph.spocq.implementation)
4
 
5
 (:documentation "This file defines the DISTINCT operator for the 'org.datagraph.spocq' RDF engine."
6
  (copyright
7
   "Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved."))
8
 
9
 (defparameter *distinct-translations*
10
   '(((spocq.a:|distinct|
11
       (spocq.a:|select| (spocq.a:|bgp| (spocq.a:|triple| ?s (?is ?p variable-p) (?is ?o variable-p)))
12
                         (?s)))
13
      . (spocq.e::subjects :dimensions '(?s) :distinct t))
14
     ((spocq.a:|distinct|
15
       (spocq.a:|select| (spocq.a:|order| (?s)
16
                                          (spocq.a:|bgp| (spocq.a:|triple| ?s (?is ?p variable-p) (?is ?o variable-p))))
17
                         (?s)))
18
      . (spocq.a:|order| (spocq.e::subjects :dimensions '(?s) :distinct t)
19
                         (?s)))
20
     ((spocq.a:|distinct|
21
       (spocq.a:|select| (spocq.a:|bgp| (spocq.a:|triple| (?is ?s variable-p) ?p (?is ?o variable-p)))
22
                         (?p)))
23
      . (spocq.e::predicates :dimensions '(?p) :distinct t))
24
     ((spocq.a:|distinct|
25
       (spocq.a:|select| (spocq.a:|order| (?p)
26
                                          (spocq.a:|bgp| (spocq.a:|triple| (?is ?s variable-p) ?p (?is ?o variable-p))))
27
                         (?p)))
28
      . (spocq.a:|order| (spocq.e::predicates :dimensions '(?p) :distinct t)
29
                         (?p)))
30
     ((spocq.a:|distinct|
31
       (spocq.a:|select| (spocq.a:|bgp| (spocq.a:|triple| (?is ?s variable-p) (?is ?p variable-p) ?o))
32
                         (?o)))
33
      . (spocq.e::objects :dimensions '(?o) :distinct t))
34
     ((spocq.a:|distinct|
35
       (spocq.a:|select| (spocq.a:|order| (?o)
36
                                          (spocq.a:|bgp| (spocq.a:|triple| (?is ?s variable-p) (?is ?p variable-p) ?o)))
37
                         (?o)))
38
      . (spocq.a:|order| (spocq.e::objects :dimensions '(?o) :distinct t)
39
                         (?o)))))
40
 
41
 
42
 (defmacro spocq.a:|distinct| (&whole form solution-field &rest args &key count end offset start)
43
   "( ( solutionField ) solutionField )
44
 A DISTINCT form accepts a single solution field and yields an analogous field
45
 which removes the duplicate solutions."
46
 
47
   (declare (ignore count end offset start))
48
   (setf args (apply #'canonicalize-algebra-arguments args))
49
   (or (and (null (dataset-default-graphs *task*))
50
            (rule-based-translator form *distinct-translations*))
51
       (apply #'macroexpand-distinct solution-field args)))
52
 
53
 
54
 (defun macroexpand-distinct (solution-field &rest args)
55
   `(spocq.e:distinct ,solution-field ,@args))
56
 
57
 
58
 (defgeneric spocq.e:distinct (solution-field &rest args &key end start)
59
 
60
   (:method :before ((field t) &key end start)
61
     (assert-argument-types spocq>distinct
62
       (start (or null (integer 0)))
63
       (end (or null (integer 0))))
64
     (incf-stat *algebra-operations*)
65
     (trace-algebra spocq.e:distinct field :start start :end end))
66
 
67
   (:method ((field solution-generator) &rest args)
68
     (declare (dynamic-extent args))
69
     (apply 'distinct-generator field args)))
70
 
71
 
72
 (defun distinct-generator (field-generator &rest args &key end start)
73
   (let* ((base-dimensions (solution-generator-dimensions field-generator))
74
          (result-channel (make-channel :name (list 'spocq.a:|distinct| (task-id *query*))
75
                                        :dimensions base-dimensions
76
                                        :size (effective-channel-size :start start :end end)
77
                                        :page-length (effective-page-length :start start :end end))))
78
     (make-solution-generator :operator 'spocq.a:|distinct|
79
                              :dimensions base-dimensions
80
                              :expression (list #'run-distinct-thread result-channel field-generator
81
                                                args)
82
                              :channel result-channel
83
                              :constituents (list field-generator))))
84
 
85
 (defun run-distinct-thread (result-channel field-generator args)
86
   (let* ((base-dimensions (solution-generator-dimensions field-generator))
87
          (base-channel (solution-generator-channel field-generator))
88
          (base-expression (solution-generator-expression field-generator))
89
          (*thread-operations* (cons (list* 'spocq.a:|distinct| base-dimensions args)
90
                                     *thread-operations*)))
91
     (push 'spocq.a:|distinct| (channel-name base-channel))
92
     (query-run-in-thread *query* base-expression)
93
     (setf (channel-size result-channel) (min (channel-size base-channel)
94
                                                           (channel-size result-channel))
95
                        (channel-page-length result-channel) (min (channel-page-length base-channel)
96
                                                                  (channel-page-length result-channel)))
97
     (apply #'process-distinct result-channel base-channel
98
            base-dimensions
99
            args)
100
     'spocq.a:|distinct|))
101
 
102
 
103
 (defmethod process-distinct ((destination array-page-channel)
104
                              (base-source array-page-channel)
105
                              result-dimensions &key (start 0) end)
106
   "Generate a stream of unique solutions to a destination given a solution source.
107
  slice (start, end) constraints are applied on-the-fly to constrain the solutions emitted."
108
 
109
   (declare (list result-dimensions))
110
   (assert-argument-types process-distinct
111
     (result-dimensions list))
112
   
113
   (multiple-value-bind (cache-operator collector)
114
                        (compute-distinct-operators result-dimensions)
115
     (declare (type (function (array fixnum hash-table) t) cache-operator)
116
              (type (function (array fixnum array fixnum) t) collector))
117
     (let* ((result-page-width (channel-page-width destination))
118
            (result-page-length (channel-page-length destination))
119
            (result-page nil)
120
            (result-index result-page-length)
121
            (cache (make-term-id-cache :single-thread t))
122
            (result-count 0)
123
            (solution-count 0)
124
            #+trace-solution-count (next-count 0)
125
            (page-count 0))
126
       (assert (= (length result-dimensions) result-page-width) ()
127
               "Channel and operation dimensions do not match: ~a: ~a." destination result-dimensions)
128
       (labels ((base-processor (base-page)
129
                  (dotimes (base-index (array-dimension base-page 0))
130
                    (unless (funcall cache-operator base-page base-index cache)
131
                      (collect-solution base-page base-index))))
132
                (collect-solution (base-page base-index)
133
                  (when (> (incf result-count) start)
134
                    (next-solution-location)
135
                    (funcall collector result-page result-index base-page base-index)
136
                    (when (and end (>= result-count end)) (complete-solutions))))
137
                (next-solution-location ()
138
                  ;; return a page (possible newly created) and the next free location in that page
139
                  (when (>= (incf result-index) result-page-length)
140
                    (when result-page (put-page result-page))
141
                    (setf result-page (new-field-page destination result-page-length result-page-width)
142
                          result-index 0)))
143
                (complete-solutions ()
144
                  (when result-page
145
                    (let ((page-result-count (1+ result-index)))
146
                      (when (< page-result-count result-page-length)
147
                        (setf result-page
148
                              (adjust-page result-page (list page-result-count result-page-width)))))
149
                    (put-page result-page))
150
                  (complete-field destination)
151
                  (incf-stat *solutions-processed* solution-count)
152
                  (incf-stat *solutions-constructed* result-count)
153
                  (return-from process-distinct
154
                    (values page-count solution-count result-count)))
155
                (put-page (page)
156
                  (trace-data process-distinct destination result-dimensions (term-value-field page))
157
                  (put-field-page destination page)))
158
         (unless (and (plusp result-page-length) (or (null end) (> end start))) (complete-solutions))
159
         (do-pages (solution-page base-source)
160
           (check-query-status *query*)
161
           #+trace-solution-count
162
           (when (> solution-count next-count)
163
             (bt:with-lock-held (*page-cache-lock*)
164
               (format *trace-output* "~% distinct ~s/~s" solution-count (hash-table-count cache))
165
               (check-memory)
166
               (finish-output *trace-output*))
167
             (incf next-count 50))
168
           (when (and *solution-count-limit*
169
                      (> (hash-table-count cache) *solution-count-limit*))
170
             (log-warn "order: terminated @~a/~a solutions." solution-count (hash-table-count cache))
171
             (terminate-task *query*))
172
           (incf solution-count (array-dimension solution-page 0))
173
           (incf page-count)
174
           (base-processor solution-page))
175
         (complete-solutions)))))
176
 
177
 
178
 (defun compute-distinct-operators (dimensions)
179
   (values (compute-flag-cache-op dimensions)
180
           (compute-unary-collector dimensions dimensions)))
181
 
182
 
183
 
184
 #|
185
 ;;; negligible time difference
186
 (defun cache-w-list (count)
187
   (let ((cache (make-hash-table :test 'equal)))
188
     (dotimes (c count)
189
       (setf (gethash (list c c c) cache) c))
190
     (hash-table-count cache)))
191
 
192
 (defun cache-w-vector (count)
193
   (let ((cache (make-hash-table :test 'equal)))
194
     (dotimes (c count)
195
       (setf (gethash (vector c c c) cache) c))
196
     (hash-table-count cache)))
197
 
198
 (time (cache-w-list 1000000))
199
 (time (cache-w-vector 1000000))
200
 
201
   
202
 |#