Coverage report: /development/source/library/org/datagraph/spocq-shard/src/algebra/operators/distinct.lisp
| Kind | Covered | All | % |
| expression | 202 | 230 | 87.8 |
| branch | 14 | 20 | 70.0 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
3
(in-package :org.datagraph.spocq.implementation)
5
(:documentation "This file defines the DISTINCT operator for the 'org.datagraph.spocq' RDF engine."
7
"Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved."))
9
(defparameter *distinct-translations*
10
'(((spocq.a:|distinct|
11
(spocq.a:|select| (spocq.a:|bgp| (spocq.a:|triple| ?s (?is ?p variable-p) (?is ?o variable-p)))
13
. (spocq.e::subjects :dimensions '(?s) :distinct t))
15
(spocq.a:|select| (spocq.a:|order| (?s)
16
(spocq.a:|bgp| (spocq.a:|triple| ?s (?is ?p variable-p) (?is ?o variable-p))))
18
. (spocq.a:|order| (spocq.e::subjects :dimensions '(?s) :distinct t)
21
(spocq.a:|select| (spocq.a:|bgp| (spocq.a:|triple| (?is ?s variable-p) ?p (?is ?o variable-p)))
23
. (spocq.e::predicates :dimensions '(?p) :distinct t))
25
(spocq.a:|select| (spocq.a:|order| (?p)
26
(spocq.a:|bgp| (spocq.a:|triple| (?is ?s variable-p) ?p (?is ?o variable-p))))
28
. (spocq.a:|order| (spocq.e::predicates :dimensions '(?p) :distinct t)
31
(spocq.a:|select| (spocq.a:|bgp| (spocq.a:|triple| (?is ?s variable-p) (?is ?p variable-p) ?o))
33
. (spocq.e::objects :dimensions '(?o) :distinct t))
35
(spocq.a:|select| (spocq.a:|order| (?o)
36
(spocq.a:|bgp| (spocq.a:|triple| (?is ?s variable-p) (?is ?p variable-p) ?o)))
38
. (spocq.a:|order| (spocq.e::objects :dimensions '(?o) :distinct t)
42
(defmacro spocq.a:|distinct| (&whole form solution-field &rest args &key count end offset start)
43
"( ( solutionField ) solutionField )
44
A DISTINCT form accepts a single solution field and yields an analogous field
45
which removes the duplicate solutions."
47
(declare (ignore count end offset start))
48
(setf args (apply #'canonicalize-algebra-arguments args))
49
(or (and (null (dataset-default-graphs *task*))
50
(rule-based-translator form *distinct-translations*))
51
(apply #'macroexpand-distinct solution-field args)))
54
(defun macroexpand-distinct (solution-field &rest args)
55
`(spocq.e:distinct ,solution-field ,@args))
58
(defgeneric spocq.e:distinct (solution-field &rest args &key end start)
60
(:method :before ((field t) &key end start)
61
(assert-argument-types spocq>distinct
62
(start (or null (integer 0)))
63
(end (or null (integer 0))))
64
(incf-stat *algebra-operations*)
65
(trace-algebra spocq.e:distinct field :start start :end end))
67
(:method ((field solution-generator) &rest args)
68
(declare (dynamic-extent args))
69
(apply 'distinct-generator field args)))
72
(defun distinct-generator (field-generator &rest args &key end start)
73
(let* ((base-dimensions (solution-generator-dimensions field-generator))
74
(result-channel (make-channel :name (list 'spocq.a:|distinct| (task-id *query*))
75
:dimensions base-dimensions
76
:size (effective-channel-size :start start :end end)
77
:page-length (effective-page-length :start start :end end))))
78
(make-solution-generator :operator 'spocq.a:|distinct|
79
:dimensions base-dimensions
80
:expression (list #'run-distinct-thread result-channel field-generator
82
:channel result-channel
83
:constituents (list field-generator))))
85
(defun run-distinct-thread (result-channel field-generator args)
86
(let* ((base-dimensions (solution-generator-dimensions field-generator))
87
(base-channel (solution-generator-channel field-generator))
88
(base-expression (solution-generator-expression field-generator))
89
(*thread-operations* (cons (list* 'spocq.a:|distinct| base-dimensions args)
90
*thread-operations*)))
91
(push 'spocq.a:|distinct| (channel-name base-channel))
92
(query-run-in-thread *query* base-expression)
93
(setf (channel-size result-channel) (min (channel-size base-channel)
94
(channel-size result-channel))
95
(channel-page-length result-channel) (min (channel-page-length base-channel)
96
(channel-page-length result-channel)))
97
(apply #'process-distinct result-channel base-channel
100
'spocq.a:|distinct|))
103
(defmethod process-distinct ((destination array-page-channel)
104
(base-source array-page-channel)
105
result-dimensions &key (start 0) end)
106
"Generate a stream of unique solutions to a destination given a solution source.
107
slice (start, end) constraints are applied on-the-fly to constrain the solutions emitted."
109
(declare (list result-dimensions))
110
(assert-argument-types process-distinct
111
(result-dimensions list))
113
(multiple-value-bind (cache-operator collector)
114
(compute-distinct-operators result-dimensions)
115
(declare (type (function (array fixnum hash-table) t) cache-operator)
116
(type (function (array fixnum array fixnum) t) collector))
117
(let* ((result-page-width (channel-page-width destination))
118
(result-page-length (channel-page-length destination))
120
(result-index result-page-length)
121
(cache (make-term-id-cache :single-thread t))
124
#+trace-solution-count (next-count 0)
126
(assert (= (length result-dimensions) result-page-width) ()
127
"Channel and operation dimensions do not match: ~a: ~a." destination result-dimensions)
128
(labels ((base-processor (base-page)
129
(dotimes (base-index (array-dimension base-page 0))
130
(unless (funcall cache-operator base-page base-index cache)
131
(collect-solution base-page base-index))))
132
(collect-solution (base-page base-index)
133
(when (> (incf result-count) start)
134
(next-solution-location)
135
(funcall collector result-page result-index base-page base-index)
136
(when (and end (>= result-count end)) (complete-solutions))))
137
(next-solution-location ()
138
;; return a page (possible newly created) and the next free location in that page
139
(when (>= (incf result-index) result-page-length)
140
(when result-page (put-page result-page))
141
(setf result-page (new-field-page destination result-page-length result-page-width)
143
(complete-solutions ()
145
(let ((page-result-count (1+ result-index)))
146
(when (< page-result-count result-page-length)
148
(adjust-page result-page (list page-result-count result-page-width)))))
149
(put-page result-page))
150
(complete-field destination)
151
(incf-stat *solutions-processed* solution-count)
152
(incf-stat *solutions-constructed* result-count)
153
(return-from process-distinct
154
(values page-count solution-count result-count)))
156
(trace-data process-distinct destination result-dimensions (term-value-field page))
157
(put-field-page destination page)))
158
(unless (and (plusp result-page-length) (or (null end) (> end start))) (complete-solutions))
159
(do-pages (solution-page base-source)
160
(check-query-status *query*)
161
#+trace-solution-count
162
(when (> solution-count next-count)
163
(bt:with-lock-held (*page-cache-lock*)
164
(format *trace-output* "~% distinct ~s/~s" solution-count (hash-table-count cache))
166
(finish-output *trace-output*))
167
(incf next-count 50))
168
(when (and *solution-count-limit*
169
(> (hash-table-count cache) *solution-count-limit*))
170
(log-warn "order: terminated @~a/~a solutions." solution-count (hash-table-count cache))
171
(terminate-task *query*))
172
(incf solution-count (array-dimension solution-page 0))
174
(base-processor solution-page))
175
(complete-solutions)))))
178
(defun compute-distinct-operators (dimensions)
179
(values (compute-flag-cache-op dimensions)
180
(compute-unary-collector dimensions dimensions)))
185
;;; negligible time difference
186
(defun cache-w-list (count)
187
(let ((cache (make-hash-table :test 'equal)))
189
(setf (gethash (list c c c) cache) c))
190
(hash-table-count cache)))
192
(defun cache-w-vector (count)
193
(let ((cache (make-hash-table :test 'equal)))
195
(setf (gethash (vector c c c) cache) c))
196
(hash-table-count cache)))
198
(time (cache-w-list 1000000))
199
(time (cache-w-vector 1000000))