Coverage report: /development/source/library/org/datagraph/spocq-shard/src/algebra/operators/leftjoin.lisp
| Kind | Covered | All | % |
| expression | 415 | 2006 | 20.7 |
| branch | 26 | 186 | 14.0 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
3
(in-package :org.datagraph.spocq.implementation)
5
(defparameter *leftjoin-translations*
6
'(;; cannot distribute over the leftjoin as the respective unmatched base solutions
7
;; are then introduced in the combined result
9
((spocq.a:|leftjoin| ?p1 (spocq.a:|union| ?p2 ?p3))
10
. (spocq.a:|union| (spocq.a:|leftjoin| ?p1 ?p2) (spocq.a:|leftjoin| ?p1 ?p3)))
11
;; but must distribute over the union to ensure that the dimensionality into
12
;; the left join is constant
13
((spocq.a:|leftjoin| (spocq.a:|union| ?p2 ?p3) ?p1)
14
. (spocq.a:|union| (spocq.a:|leftjoin| ?p2 ?p1) (spocq.a:|leftjoin| ?p3 ?p1)))))
17
(defun rewrite-leftjoin (form)
18
(destructuring-bind (left right . args) (rest form)
19
(unless (find :test form)
22
(setf left (rewrite-leftjoin left))))
25
(destructuring-bind (first second . union-args) (rest left)
26
(declare (ignore union-args)) ; should not be any
27
(when (not (equal (expression-dimensions first) (expression-dimensions second)))
28
`(spocq.a:|union| (spocq.a:|leftjoin| ,first ,right) (spocq.a:|leftjoin| ,second ,right) ,@args))))
32
(defmacro spocq.a:|leftjoin| (&whole form solution-field1 solution-field2 &rest args &key test count end offset start)
33
"( ( solutionField solutionField (function (solution) xsd:boolean)? ) solutionField )
34
A LEFTJOIN form combines two solution fields and yields a new field in which
35
each solution is either a merge of two compatible solutions from the constituent
36
fields or a solution from the first field, in the case where a natural join is possible,
37
or an element of the cross join of the two fields, in the case where no dimension agrees.
39
Where a test predicate is given, all result solutions satisfy it."
41
(declare (ignore test count end offset start))
42
(or (rewrite-leftjoin form)
43
(apply #'macroexpand-leftjoin solution-field1 solution-field2 args)))
45
(defmacro spocq.a::leftjoin* (field1 field2 &rest args)
46
(if (consp (first args))
47
`(spocq.a::leftjoin* (spocq.a:|leftjoin| ,field1 ,field2) ,@args)
48
`(spocq.a:|leftjoin| ,field1 ,field2 ,@args)))
50
(defun sip-expand-optional-service (optional-as-service? base-as-sip-source)
51
;; if the optional argument field is directly a service,
52
;; return an expansion which combines the service as sip with the other join side
53
(if *enable.service-sip*
54
(sip-expand-optional-service-operator (first optional-as-service?) optional-as-service? base-as-sip-source)
55
optional-as-service?))
57
(defgeneric sip-expand-optional-service-operator (operator optional-as-service? base-as-sip-source)
58
(:method ((operator (eql 'spocq.a:|service|)) optional-as-service? base-as-sip-source)
59
;; intersection already tested, but just cor the complete forms
60
(if (or (null (intersection (expression-variables optional-as-service?)
61
(expression-variables base-as-sip-source)))
62
(let ((location (second optional-as-service?)))
63
(and (not (variable-p location)) (iri-service-repository-id location))))
66
;; if they join, replace the service with a leftjoin service.
67
(destructuring-bind (op iri group-graph-pattern &rest args) optional-as-service?
69
`(spocq.a::|serviceleftjoin| ,iri ,group-graph-pattern ,base-as-sip-source ,@args))))
70
(:method ((operator (eql 'spocq.a::|servicejoin|)) optional-as-service? base-as-sip-source)
71
;;; iff the optional clause is already a service join, replace it with
72
;;; a service leftjoin which incorporates the source in its sip source
73
(destructuring-bind (op iri service-source service-sip-source &rest args) optional-as-service?
75
(if (null (intersection (expression-variables service-source)
76
(expression-variables base-as-sip-source)))
79
`(spocq.a::|serviceleftjoin| ,iri ,service-source
80
(spocq.a:|join| ,service-sip-source ,base-as-sip-source) ,@args))))
81
(:method ((operator (eql 'spocq.a::|serviceleftjoin|)) optional-as-service? base-as-sip-source)
82
;;; iff the optional clause is already a service leftjoin
83
;;; augment its sip source
84
(destructuring-bind (op iri service-source service-sip-source &rest args) optional-as-service?
86
(if (null (intersection (expression-variables service-source)
87
(expression-variables base-as-sip-source)))
90
`(spocq.a::|serviceleftjoin| ,iri ,service-source
91
(spocq.a:|join| ,service-sip-source ,base-as-sip-source) ,@args))))
93
(:method ((operator (eql 'spocq.a:|filter|)) optional-as-service? base-as-sip-source)
94
(destructuring-bind (op filter-field . rest) optional-as-service?
95
(let ((sip-expansion? (sip-expand-optional-service filter-field base-as-sip-source)))
96
(if (eq sip-expansion? filter-field)
98
`(,op ,sip-expansion? ,@rest)))))
100
(:method ((operator (eql 'spocq.a:|join|)) optional-as-service? base-as-sip-source)
101
(destructuring-bind (op field1 field2 . rest) optional-as-service?
102
(declare (ignore op))
103
(let ((sip-expansion? (sip-expand-optional-service field1 base-as-sip-source)))
104
(if (eq sip-expansion? field1)
105
(let ((sip-expansion? (sip-expand-optional-service field2 base-as-sip-source)))
106
(if (eq sip-expansion? field2)
108
`(spocq.a:|leftjoin| ,sip-expansion? ,field1 ,@rest)))
109
`(spocq.a:|leftjoin| ,sip-expansion? ,field2 ,@rest)))))
111
(:method ((operator (eql 'spocq.a:|leftjoin|)) optional-as-service? base-as-sip-source)
112
(destructuring-bind (op field1 field2 . rest) optional-as-service?
113
(let ((sip-expansion? (sip-expand-optional-service field1 base-as-sip-source)))
114
(if (eq sip-expansion? field1)
116
`(,op ,sip-expansion? ,field2 ,@rest)))))
118
(:method ((operator t) optional-as-service? base-as-sip-source)
119
optional-as-service?))
121
(defun macroexpand-leftjoin (field1 field2 &rest args)
122
"Translate a leftjoin algebra expression into its execution form.
123
This would include algebraic transformations, but none are defined.
124
It does include optional clause combination, which reduces independent bgp clauses fo the form
125
(leftjoin (leftjoin <field> (bgp (triple ...))) (bgp (triple ...)))
127
(leftjoin <field> (bgp (sum (triple ...) (triple ...))))
128
When no further combination is possible, augment the optional clause with the base clause dimensions
129
to enable propagation."
131
(setf args (apply #'canonicalize-algebra-arguments args))
132
(destructuring-bind (&key test &allow-other-keys) args
134
(setf (getf args :test) `(quote ,test))))
135
(let ((base-optional-bgp nil)
137
(base-dimensions (when (eq *leftjoin-propagation-mode* :propagate)
138
(expression-dimensions field1))))
139
(labels ((triple-or-sum-form-p (statement)
140
(or (triple-form-p statement) (sum-form-p statement)))
141
(eligible-base-bgp (form)
142
;; allow a leftjoin with exactly one triple statement
143
(when (leftjoin-form-p form)
144
(let ((bgp (third form)))
145
(when (and (bgp-form-p bgp)
146
(= 1 (count-if #'triple-form-p (rest bgp))))
148
(eligible-optional-bgp (bgp)
149
;; allow a bgp with exactly one triple or sum statement
150
(when (and (bgp-form-p bgp)
151
(= 1 (count-if #'triple-or-sum-form-p (rest bgp))))
153
(bgps-compatible-p (base optional)
154
(let ((base-subject (second (find-if #'triple-form-p (rest base))))
155
(base-graph (second (find-if #'graph-form-p (rest base))))
156
(optional-subject (or (second (find-if #'triple-form-p (rest optional)))
157
(second (find-if #'triple-form-p
158
(rest (find-if #'sum-form-p (rest optional)))))))
159
(optional-graph (second (find-if #'graph-form-p (rest optional)))))
160
(and (equal base-subject optional-subject)
161
(equal base-graph optional-graph))))
162
(default-expansion ()
163
(let ((sip-service? (sip-expand-optional-service field2 field1)))
164
(if (eq field2 sip-service?)
167
(locally ,@(when base-dimensions
168
`((declare (spocq.e:base-dimensions ,@base-dimensions))))
169
(spocq.e::with-join-scope ,(gensym "leftjoin-") ,field2))
173
(let* ((base-base-bgp (second field1))
174
(base-rest (nthcdr 3 field1))
175
(base-optional-triple (find-if #'triple-form-p (rest base-optional-bgp)))
176
(base-optional-rest (remove base-optional-triple (rest base-optional-bgp)))
177
(optional-sum (find-if #'sum-form-p (rest optional-bgp)))
178
(optional-triple (find-if #'triple-form-p (rest optional-bgp))))
179
`(spocq.a:|leftjoin| ,base-base-bgp
180
(spocq.a:|bgp| (spocq.a:|sum| ,base-optional-triple
183
(list optional-triple)))
184
,@base-optional-rest)
186
(ecase *leftjoin-expansion-mode*
190
(if (and (setf base-optional-bgp (eligible-base-bgp field1))
191
(setf optional-bgp (eligible-optional-bgp field2))
192
(bgps-compatible-p base-optional-bgp optional-bgp))
194
(default-expansion)))))))
196
(defgeneric spocq.e:leftjoin (solution-field1 solution-field2 &key end start test)
197
(:documentation "Combine two fields as a left outer join based on a test predicate and a
198
match predicate. see perez.2009 for a more explicit definition than in the spec.
199
the second clause in the union is not exists any u2, not exists such that not u2.")
201
(:method :before ((base-field t) (offset-field t) &key end start test)
202
(assert-argument-types spocq.e:leftjoin
203
(start (or null (integer 0)))
204
(end (or null (integer 0))))
205
(incf-stat *algebra-operations*)
206
(trace-algebra spocq.e:leftjoin base-field offset-field
207
:start start :end end :test test))
209
(:method ((field1 null-generator) (field2 solution-generator) &rest args)
210
(declare (ignore args))
211
(let* ((result-dimensions (union-dimensions (solution-generator-dimensions field1)
212
(solution-generator-dimensions field2)))
213
(result-channel (make-channel :name (list 'spocq.a:|null| (task-id *query*))
214
:dimensions result-dimensions)))
215
(process-null result-channel result-dimensions)
216
(make-null-generator :operator 'spocq.a:|null|
217
:dimensions result-dimensions
219
:channel result-channel
222
#+(or) ; requires that the solution matrix be widened
223
(:method ((field1 solution-generator) (field2 null-generator) &rest args)
224
(declare (ignore args))
227
(:method ((field1 solution-generator) (field2 solution-generator) &rest args)
228
(declare (dynamic-extent args))
229
(apply #'spocq.e:stream-leftjoin field1 field2
232
(:method ((field1 null-generator) (field2 solution-generator) &rest args)
233
(declare (ignore args))
234
(let* ((result-dimensions (union-dimensions (solution-generator-dimensions field1)
235
(solution-generator-dimensions field2)))
236
(result-channel (make-null-channel :dimensions result-dimensions)))
237
(process-null result-channel result-dimensions)
238
(make-null-generator :operator 'spocq.a:|null|
239
:dimensions result-dimensions
241
:channel result-channel
245
(defparameter *process-base-dominant-leftjoin.enable* t)
246
(defparameter *process-mp-leftjoin.enable* t)
249
(defun spocq.e:stream-leftjoin (base-field-generator optional-field-generator &rest args &key end start test)
250
(declare (ignore test))
251
;; upon invocation use the generator argument to compute the combination operators
252
;; and invoke the respective binding operator with the rspective solution continuation
253
(let* ((base-dimensions (solution-generator-dimensions base-field-generator))
254
(optional-dimensions (solution-generator-dimensions optional-field-generator))
255
(result-dimensions (union-dimensions base-dimensions optional-dimensions))
256
;; give priority to dimensions propagated through a chain of optional joins
257
;; but, if the combination changes, fall back to the combination with the base field
258
(key-dimensions (or (intersect-dimensions (solution-generator-key-dimensions base-field-generator) optional-dimensions)
259
(intersect-dimensions base-dimensions optional-dimensions)
261
(result-channel (make-channel :name (list 'spocq.a:|leftjoin| (task-id *query*))
262
:dimensions result-dimensions
263
:size (effective-channel-size :start start :end end)
264
:page-length (effective-page-length :start start :end end)))
265
(spliced-channel-p nil)
267
(labels ((run-leftjoin-thread (result-channel base-field-generator optional-field-generator args)
268
(let* ((base-channel (solution-generator-channel base-field-generator))
269
(optional-channel (solution-generator-channel optional-field-generator))
270
(base-expression (solution-generator-expression base-field-generator))
271
(optional-expression (solution-generator-expression optional-field-generator))
272
(*thread-operations* (cons (list* 'spocq.a:|leftjoin| base-dimensions optional-dimensions args)
273
*thread-operations*))
274
(join-operator *leftjoin-operator*)
275
(effective-channel-size (min (channel-size base-channel)
276
(channel-size optional-channel)
277
(channel-size result-channel))))
278
;; splice in propagation destinations
279
(dolist (agp (solution-generator-patterns optional-field-generator))
280
(when (and (agp-base-channel agp)
281
(equal (agp-base-dimensions agp) base-dimensions))
282
(trace-algebra leftjoin.splice agp (agp-base-channel agp) base-channel
283
base-dimensions optional-dimensions)
284
(setf spliced-channel-p t)
285
(push (agp-base-channel agp) (channel-channels base-channel))))
286
(unless join-operator
288
#+(or) (cond ((and *process-base-dominant-leftjoin.enable* spliced-channel-p)
289
'process-base-dominant-leftjoin)
290
((and *process-mp-leftjoin.enable*
291
(typep (algebra-operator-thread-count) '(integer 2))
292
(not (or count end offset start))
293
(> (repository-statement-count *repository*) 10000))
294
'process-mp-leftjoin)
298
(if (= effective-channel-size *channel-sliced-size-limit*)
299
'process-balanced-leftjoin
302
(log-debug "stream-leftjoin: ~s x ~s" join-operator effective-channel-size)
303
(push 'spocq.a:|leftjoin| (channel-name base-channel))
304
(push 'spocq.a:|leftjoin| (channel-name optional-channel))
305
(log-trace "stream-leftjoin: ~a: ~a ~a"
306
join-operator (channel-name base-channel) (channel-name optional-channel))
307
(setf (solution-generator-concrete-operator generator) join-operator)
308
(query-run-in-thread *query* base-expression)
309
(query-run-in-thread *query* optional-expression)
310
(setf (channel-size result-channel) effective-channel-size
311
(channel-page-length result-channel) (min (channel-page-length base-channel)
312
(channel-page-length optional-channel)
313
(channel-page-length result-channel)))
314
(apply join-operator result-channel base-channel optional-channel
319
'spocq.a:|leftjoin|)))
320
;; return the binding function to the combination operator
322
(make-leftjoin-solution-generator :operator 'spocq.a:|leftjoin|
323
:dimensions result-dimensions
324
:key-dimensions key-dimensions
325
:expression (list #'run-leftjoin-thread result-channel base-field-generator optional-field-generator
327
:channel result-channel
328
:constituents (list base-field-generator optional-field-generator))))))
331
#+(or) ;; for profiling
332
(defun call-predicate (predicate left-page left-index right-page right-index)
333
(declare (type (function (array fixnum array fixnum) boolean) predicate))
335
(funcall predicate left-page left-index right-page right-index)))
337
(defmethod process-leftjoin ((destination array-page-channel)
338
(base-source array-page-channel)
339
(optional-source array-page-channel)
340
base-dimensions optional-dimensions key-dimensions
341
&key test (start 0) end
343
;(trace-p (equal optional-dimensions '(?::action ?::data)))
344
;(*data-trace-output* (if trace-p *trace-output* *data-trace-output*))
345
;(*algebra-trace-output* (if trace-p *trace-output* *algebra-trace-output*))
347
"Generate a stream of left join solutions to a destination given a solution source and dimension lists for
348
the base and optional solutions fields, and a predicate expression. Invoke the source function repeatedly to
349
obtain a stream of (side . solution-page) pairs until the page is null. Use the dimensions to distinguish left from optional.
350
Combine the respective source either as a natural or a cross join depending
351
on whether they share any dimensions. In the former case, hash and cache solutions based on the shared
354
(declare (type (or channel (function ((or array null)) t)) destination)
355
(type (or channel (function () (or array null))) base-source optional-source))
356
(assert-argument-types process-leftjoin
357
(base-dimensions list)
358
(optional-dimensions list))
360
(multiple-value-bind (type result-dimensions base-cache-operator optional-cache-operator result-cache-operator predicate collector)
361
(compute-leftjoin-operators base-dimensions optional-dimensions key-dimensions test)
362
(declare (type (function (array fixnum array fixnum array fixnum) t) collector))
363
(let* ((result-page-width (channel-page-width destination))
364
(result-page-length (channel-page-length destination))
366
(result-index result-page-length)
367
(result-cache (make-term-id-cache :single-thread t))
368
(natural-cache (when (eq type :natural) (make-term-id-cache :single-thread t)))
369
(cross-cache (when (eq type :cross) (make-array 32 :fill-pointer 0 :adjustable t)))
372
;; for sp2b-q6@250k this reduced the time from 117 to 81 seconds on hetzner
373
(*store->spocq-term-registry* (if test
374
(copy-registry *store->spocq-term-registry* :single-thread t)
375
*store->spocq-term-registry*)))
376
(declare (type fixnum result-index result-count solution-count))
377
(assert (= (length result-dimensions) result-page-width) ()
378
"Channel and operation dimensions do not match: ~a: ~a." destination result-dimensions)
379
(labels ((base-natural-processor (base-page)
380
(declare (type (function (array fixnum hash-table) list) base-cache-operator))
381
(handler-bind ((error (lambda (c) (declare (ignore c)) (throw :return-nil nil))))
382
(dotimes (base-index (array-dimension base-page 0))
383
(trace-data process-leftjoin.optional-page.next base-index)
384
(let ((collected? nil))
385
(loop for (optional-page optional-index) in (funcall base-cache-operator base-page base-index natural-cache)
386
if (or (null predicate)
387
(call-predicate predicate base-page base-index optional-page optional-index))
388
do (progn (setf collected? t)
389
(trace-data process-leftjoin.optional-page.next.collect base-index optional-index)
390
(collect-solution base-page base-index optional-page optional-index)))
392
(collect-solution base-page base-index nil nil))))))
393
(optional-natural-processor (optional-page)
394
(declare (type (function (array fixnum hash-table) t) optional-cache-operator))
395
(dotimes (optional-index (array-dimension optional-page 0))
396
(trace-data process-leftjoin.optional-page.next optional-index)
397
(funcall optional-cache-operator optional-page optional-index natural-cache)))
398
(base-cross-processor (base-page)
399
(handler-bind ((error (lambda (c) (declare (ignore c)) (throw :return-nil nil))))
400
(dotimes (base-index (array-dimension base-page 0))
401
(let ((collected? nil))
402
(loop for optional-page across cross-cache
403
do (dotimes (optional-index (array-dimension optional-page 0) t)
404
(when (or (null predicate)
405
(call-predicate predicate base-page base-index optional-page optional-index))
407
(collect-solution base-page base-index optional-page optional-index))))
409
(collect-solution base-page base-index nil nil))))))
410
(optional-cross-processor (optional-page)
411
(vector-push-extend optional-page cross-cache))
412
(call-predicate (predicate left-page left-index right-page right-index)
413
(declare (type (function (array fixnum array fixnum) boolean) predicate))
415
(funcall predicate left-page left-index right-page right-index)))
416
(collect-solution (base-page base-index optional-page optional-index)
417
(declare (type (function (array fixnum array fixnum hash-table) boolean) result-cache-operator))
418
(when (and (null (funcall result-cache-operator base-page base-index optional-page optional-index result-cache))
419
(> (incf result-count) start))
420
(next-solution-location)
421
(funcall collector result-page result-index base-page base-index optional-page optional-index)
422
(when (and end (>= result-count end)) (complete-solutions))))
423
(next-solution-location ()
424
;; return a page (possible newly created) and the next free location in that page
425
(when (>= (incf result-index) result-page-length)
426
(when result-page (put-page result-page))
427
(setf result-page (new-field-page destination result-page-length result-page-width)
429
(complete-solutions ()
431
(let ((page-result-count (1+ result-index)))
432
(when (< page-result-count result-page-length)
434
(adjust-page result-page (list page-result-count result-page-width)))))
435
(put-page result-page))
437
(incf-stat *solutions-processed* solution-count)
438
(incf-stat *solutions-constructed* result-count)
439
(trace-algebra process-leftjoin.complete destination solution-count result-count)
440
(return-from process-leftjoin (values solution-count result-count)))
442
(trace-data process-leftjoin.put-page destination
443
(list base-dimensions optional-dimensions result-dimensions)
444
page (term-value-field page))
446
(put-field-page destination page)
447
(complete-field destination)))
450
(hash-table-count natural-cache)
451
(length cross-cache))))
452
(unless (and (plusp result-page-length) (or (null end) (> end start))) (complete-solutions))
453
(loop for solutions = (get-field-page optional-source)
454
until (null solutions)
455
do (progn (check-query-status *query*)
456
(trace-data process-leftjoin.optional-page type solutions (term-value-field solutions))
457
(incf solution-count (array-dimension solutions 0))
458
(when (and *solution-count-limit*
459
(> (cache-size) *solution-count-limit*))
460
(log-warn "leftjoin: terminated @~a optional solutions. (~a x ~a)"
461
(cache-size) base-dimensions optional-dimensions)
462
(terminate-task *query*)
463
(complete-solutions))
464
(let ((cache-page (copy-page solutions)))
466
(:natural (optional-natural-processor cache-page))
467
(:cross (optional-cross-processor cache-page))))
468
(release-field-page optional-source solutions)))
469
(loop for solutions = (get-field-page base-source)
470
until (null solutions)
471
do (progn (check-query-status *query*)
472
(incf solution-count (array-dimension solutions 0))
473
(when (and *solution-count-limit*
474
(> result-count *solution-count-limit*))
475
(log-warn "leftjoin: terminated @~a solutions. (~a x ~a)"
476
result-count base-dimensions optional-dimensions)
477
(terminate-task *query*)
478
(complete-solutions))
480
(:natural (base-natural-processor solutions))
481
(:cross (base-cross-processor solutions)))
482
(release-field-page base-source solutions)))
483
(complete-solutions)))))
485
;;; support optional clauses with sip from base to optionals clause
488
(defmethod process-sip-left-join ((destination array-page-channel)
489
(base-source array-page-channel)
490
(optional-source array-page-channel)
491
base-dimensions optional-dimensions key-dimensions
492
&key test (start 0) end
494
"In order to permit general field expressions - as opposed to restricting the optional clause to a single bgp,
495
it is not possible to establish the bindings just by propagating a slution field into the compiled bgp interpreter.
496
It must also allow, for example, that the optional clause is itself an optional expression or has filters.
497
As the situations which involve bindings - bgps, filters, aggregations and service clauses all must recognize
498
dynamic bindings, one way to accomplish this is to wrap the bse query in a delegate which accepts the sip values
499
as a stream and reruns the target clause for ecah value
500
- if it emits a terminated result stream for each propagate solution, the source would not need to join, but would
502
- the query wrapper should be lighter weight than a full clone
503
- it would be ideal of th eoperator could retrieve the first page of base solutions and then decide, based
504
on its length, whether to sip or join. this wuld have to be buffered when too large, as the optional field
505
must first be materialized fir joind processing, but generated dependently for sip processing.
506
in order to do that, it would need to compile both alternative networks . one with and one without sip dimensions
507
and then select which to invoke based on source cardinality.
508
in order for this to be possible, the optional-generator supplied to spocq.e:stream-leftjoin must
509
be able to switch modes nd either run free to emit values for a join or expect a stream of sip constraints
510
and emit a stream of one solution stream in response to each input.
511
process-leftjoin would need to decide how to handle the optional generator, either just accepting its results stream
512
and joining or feeding it solutions and merging.
517
(defun process-base-dominant-leftjoin (destination base-source optional-source base-dimensions optional-dimensions key-dimensions
518
&key test count offset
519
(start offset) (end (when count (+ count (or start 0)))))
520
"Generate a stream of left join solutions to a destination given a solution source and dimension lists for
521
the base and optional solutions fields, and a predicate expression. Invoke the source function repeatedly to
522
obtain a stream of (side . solution-page) pairs until the page is null. Use the dimensions to distinguish left from optional.
523
Combine the respective source either as a natural or a cross join depending
524
on whether they share any dimensions. In the former case, hash and cache solutions based on the shared
527
(declare (type (or channel (function ((or array null)) t)) destination)
528
(type (or channel (function () (or array null))) base-source optional-source))
529
(assert-argument-types process-leftjoin
530
(destination (or channel function))
531
(base-source (or channel function))
532
(optional-source (or channel function))
533
(base-dimensions list)
534
(optional-dimensions list)
535
(start (or null (integer 0)))
536
(end (or null (integer 0))))
537
(incf-stat *algebra-operations*)
538
(unless start (setf start 0))
539
(when end (setf end (max start end)))
540
(trace-algebra process-base-dominant-leftjoin base-source optional-source base-dimensions optional-dimensions key-dimensions
541
:start start :end end :test test)
543
(multiple-value-bind (type result-dimensions base-cache-operator optional-cache-operator result-cache-operator predicate collector)
544
(compute-leftjoin-operators base-dimensions optional-dimensions key-dimensions test)
545
(declare (type (function (array fixnum array fixnum array fixnum) t) collector))
546
(let* ((result-page-width (channel-page-width destination))
547
(result-page-length (channel-page-length destination))
549
(result-index result-page-length)
550
(result-cache (make-term-id-cache :single-thread t))
551
(natural-cache (when (eq type :natural) (make-term-id-cache :single-thread t)))
552
(cross-cache (when (eq type :cross) (make-array 32 :fill-pointer 0 :adjustable t)))
553
(base-cache (make-array 32 :fill-pointer 0 :adjustable t))
556
;; for sp2b-q6@250k this reduced the time from 117 to 81 seconds on hetzner
557
(*store->spocq-term-registry* (if test
558
(copy-registry *store->spocq-term-registry* :single-thread t)
559
*store->spocq-term-registry*)))
560
(declare (type fixnum result-index result-count solution-count))
561
(assert (= (length result-dimensions) result-page-width) ()
562
"Channel and operation dimensions do not match: ~a: ~a." destination result-dimensions)
563
(labels ((base-natural-processor (base-page)
564
(declare (type (function (array fixnum hash-table) list) base-cache-operator))
565
#+(or)(let ((*print-pretty* nil)) (print (list :base base-dimensions)))
566
(handler-bind ((error (lambda (c) (declare (ignore c)) (throw :return-nil nil))))
567
(dotimes (base-index (array-dimension base-page 0))
568
#+(or)(print (list :b base-index
569
(make-array (array-dimension base-page 1)
570
:displaced-to base-page :element-type 'fixnum
571
:displaced-index-offset (* base-index (array-dimension base-page 1)))))
572
(let ((collected? nil))
573
(loop for (optional-page optional-index) in (funcall base-cache-operator base-page base-index natural-cache)
574
if (or (null predicate)
575
(call-predicate predicate base-page base-index optional-page optional-index))
576
do (progn (setf collected? t)
577
#+(or)(print (list :o+ optional-index
578
(make-array (array-dimension optional-page 1)
579
:displaced-to optional-page :element-type 'fixnum
580
:displaced-index-offset (* optional-index (array-dimension optional-page 1)))))
581
(collect-solution base-page base-index optional-page optional-index))
582
#+(or) (print (list :o- optional-index
583
(make-array (array-dimension optional-page 1)
584
:displaced-to optional-page :element-type 'fixnum
585
:displaced-index-offset (* optional-index (array-dimension optional-page 1))))))
587
(collect-solution base-page base-index nil nil))))))
588
(optional-natural-processor (optional-page)
589
(declare (type (function (array fixnum hash-table) t) optional-cache-operator))
590
#+(or)(let ((*print-pretty* nil)) (print (list :optional optional-dimensions)))
591
(dotimes (optional-index (array-dimension optional-page 0))
592
#+(or)(print (list :o optional-index
593
(make-array (array-dimension optional-page 1)
594
:displaced-to optional-page :element-type 'fixnum
595
:displaced-index-offset (* optional-index (array-dimension optional-page 1)))))
596
(funcall optional-cache-operator optional-page optional-index natural-cache)))
597
(base-cross-processor (base-page)
598
(handler-bind ((error (lambda (c) (declare (ignore c)) (throw :return-nil nil))))
599
(dotimes (base-index (array-dimension base-page 0))
600
(let ((collected? nil))
601
(loop for optional-page across cross-cache
602
do (dotimes (optional-index (array-dimension optional-page 0) t)
603
(when (or (null predicate)
604
(call-predicate predicate base-page base-index optional-page optional-index))
606
(collect-solution base-page base-index optional-page optional-index))))
608
(collect-solution base-page base-index nil nil))))))
609
(optional-cross-processor (optional-page)
610
(vector-push-extend optional-page cross-cache))
611
(call-predicate (predicate left-page left-index right-page right-index)
612
(declare (type (function (array fixnum array fixnum) boolean) predicate))
614
(funcall predicate left-page left-index right-page right-index)))
615
(collect-solution (base-page base-index optional-page optional-index)
616
(declare (type (function (array fixnum array fixnum hash-table) boolean) result-cache-operator))
617
(when (and (null (funcall result-cache-operator base-page base-index optional-page optional-index result-cache))
618
(> (incf result-count) start))
619
(next-solution-location)
620
(funcall collector result-page result-index base-page base-index optional-page optional-index)
621
(when (and end (>= result-count end)) (complete-solutions))))
622
(next-solution-location ()
623
;; return a page (possible newly created) and the next free location in that page
624
(when (>= (incf result-index) result-page-length)
625
(when result-page (put-page result-page))
626
(setf result-page (new-field-page destination result-page-length result-page-width)
628
(complete-solutions ()
630
(let ((page-result-count (1+ result-index)))
631
(when (< page-result-count result-page-length)
633
(adjust-page result-page (list page-result-count result-page-width)))))
634
(put-page result-page))
636
(incf-stat *solutions-processed* solution-count)
637
(incf-stat *solutions-constructed* result-count)
638
(trace-algebra process-base-dominant-leftjoin base-dimensions optional-dimensions solution-count result-count)
639
(return-from process-base-dominant-leftjoin (values solution-count result-count)))
641
(trace-data process-base-dominant-leftjoin.put-page destination result-dimensions page)
642
(put-field-page destination page)))
643
(unless (and (plusp result-page-length) (or (null end) (> end start))) (complete-solutions))
644
(loop for solutions = (get-field-page base-source)
645
until (null solutions)
646
do (progn (check-query-status *query*)
647
(let ((cache-page (copy-page solutions)))
648
(vector-push-extend cache-page base-cache))
649
(release-field-page base-source solutions)))
650
(trace-data process-base-dominant-leftjoin.base-page-count (length base-cache))
651
(loop for solutions = (get-field-page optional-source)
652
until (null solutions)
653
do (progn (check-query-status *query*)
654
(incf solution-count (array-dimension solutions 0))
655
(let ((cache-page (copy-page solutions)))
657
(:natural (optional-natural-processor cache-page))
658
(:cross (optional-cross-processor cache-page))))
659
(release-field-page optional-source solutions)))
660
(trace-data process-base-dominant-leftjoin.post-optional-caches natural-cache cross-cache)
661
(loop for solutions across base-cache
662
do (progn (check-query-status *query*)
663
(incf solution-count (array-dimension solutions 0))
665
(:natural (base-natural-processor solutions))
666
(:cross (base-cross-processor solutions)))))
667
(complete-field destination)))))
670
;;; (setq *leftjoin-operator* 'process-balanced-leftjoin)
672
(defun process-balanced-leftjoin (destination base-source optional-source base-dimensions optional-dimensions key-dimensions
673
&key test count offset
674
(start offset) (end (when count (+ count (or start 0)))))
675
"Generate a stream of left join solutions to a destination given a solution sources dimension lists for
676
the base and optional solutions fields, and a predicate expression. Pull from the sources repeatedly to
677
obtain a stream solutions pairs until either the optional side is empty, in which case unmatched base solutions
678
are emitted un-extended, or the base side is empty, in which case the operation is complete.
679
Combine the sources either as a natural or a cross join depending on whether they share any dimensions.
680
In the natural case, hash and cache solutions based on the shared dimensions.
682
This version runs where the slice includes a limit"
684
(declare (type (or channel (function ((or array null)) t)) destination)
685
(type (or channel (function () (or array null))) base-source optional-source))
686
(assert-argument-types process-leftjoin
687
(destination (or channel function))
688
(base-source (or channel function))
689
(optional-source (or channel function))
690
(base-dimensions list)
691
(optional-dimensions list)
692
(start (or null (integer 0)))
693
(end (or null (integer 0))))
694
(incf-stat *algebra-operations*)
695
(unless start (setf start 0))
696
(when end (setf end (max start end)))
697
(trace-algebra process-balanced-leftjoin base-source optional-source base-dimensions optional-dimensions key-dimensions
698
:start start :end end :test test)
700
(multiple-value-bind (type result-dimensions
701
base-cache-write-operator base-cache-read-operator
702
optional-cache-write-operator optional-cache-read-operator
703
result-cache-operator predicate collector)
704
(compute-balanced-leftjoin-operators base-dimensions optional-dimensions key-dimensions test)
705
(declare (type (function (array fixnum array fixnum array fixnum) t) collector))
706
(let* ((result-page-width (channel-page-width destination))
707
(result-page-length (channel-page-length destination))
709
(result-index result-page-length)
710
(result-cache (make-term-id-cache :single-thread t))
711
(base-page-width (length base-dimensions))
712
(optional-page-width (length optional-dimensions))
713
(base-natural-cache (when (eq type :natural) (make-term-id-cache :single-thread t)))
714
(optional-natural-cache (when (eq type :natural) (make-term-id-cache :single-thread t)))
715
(base-cross-cache (when (eq type :cross) (make-array 32 :fill-pointer 0 :adjustable t)))
716
(optional-cross-cache (when (eq type :cross) (make-array 32 :fill-pointer 0 :adjustable t)))
717
(base-collected? (when (eq type :cross) (make-array 32 :fill-pointer 0 :adjustable t)))
720
;; for sp2b-q6@250k this reduced the time from 117 to 81 seconds on hetzner
721
(*store->spocq-term-registry* (if test
722
(copy-registry *store->spocq-term-registry* :single-thread t)
723
*store->spocq-term-registry*)))
724
(declare (type fixnum result-index result-count solution-count))
725
(assert (= (length result-dimensions) result-page-width) ()
726
"Channel and operation dimensions do not match: ~a: ~a." destination result-dimensions)
727
(labels ((base-natural-processor (base-page)
728
;(declare (type (function (array fixnum hash-table) list) optional-cache-read-operator base-cache-write-operator))
729
(assert (= base-page-width (array-dimension base-page 1)))
730
(handler-bind ((error (lambda (c) (declare (ignore c)) (throw :return-nil nil))))
731
(dotimes (base-index (array-dimension base-page 0))
732
(let ((collected? nil))
733
(loop for (optional-page optional-index)
734
in (funcall optional-cache-read-operator base-page base-index optional-natural-cache)
735
if (or (null predicate)
736
(call-predicate predicate base-page base-index optional-page optional-index))
737
do (progn (setf collected? t)
738
(collect-solution base-page base-index optional-page optional-index)))
739
(let ((base-cache-entry (funcall base-cache-write-operator base-page base-index base-natural-cache)))
741
(setf (cddr (first base-cache-entry)) t)))))))
742
(optional-natural-processor (optional-page)
743
;(declare (type (function (array fixnum hash-table) t) base-cache-read-operator))
744
(assert (= optional-page-width (array-dimension optional-page 1)))
745
(dotimes (optional-index (array-dimension optional-page 0))
746
(loop for page-index-matched?
747
in (funcall base-cache-read-operator optional-page optional-index base-natural-cache)
748
for (base-page base-index . nil) = page-index-matched?
749
if (or (null predicate)
750
(call-predicate predicate base-page base-index optional-page optional-index))
751
do (progn (setf (cddr page-index-matched?) t)
752
(collect-solution base-page base-index optional-page optional-index)))
753
(funcall optional-cache-write-operator optional-page optional-index optional-natural-cache)))
754
(base-natural-complete ()
755
(flet ((emit-if-unmatched (key base-page-indices)
756
(declare (ignore key))
757
(loop for (base-page base-index . matched?) in base-page-indices
759
do (collect-solution base-page base-index nil nil))))
760
(declare (dynamic-extent #'emit-if-unmatched))
761
(maphash #'emit-if-unmatched base-natural-cache)))
762
(base-cross-processor (base-page)
763
(vector-push-extend base-page base-cross-cache)
764
(let ((page-collected? (make-array (array-dimension base-page 0) :initial-element nil)))
765
(vector-push-extend page-collected? base-collected?)
766
(handler-bind ((error (lambda (c) (declare (ignore c)) (throw :return-nil nil))))
767
(dotimes (base-index (array-dimension base-page 0))
768
(let ((collected? nil))
769
(loop for optional-page across optional-cross-cache
770
do (dotimes (optional-index (array-dimension optional-page 0) t)
771
(when (or (null predicate)
772
(call-predicate predicate base-page base-index optional-page optional-index))
774
(collect-solution base-page base-index optional-page optional-index))))
776
(setf (aref page-collected? base-index) t)))))))
777
(optional-cross-processor (optional-page)
778
(vector-push-extend optional-page optional-cross-cache)
779
(handler-bind ((error (lambda (c) (declare (ignore c)) (throw :return-nil nil))))
780
(dotimes (optional-index (array-dimension optional-page 0))
781
(loop for base-page across base-cross-cache
782
for page-collected? across base-collected?
783
do (dotimes (base-index (array-dimension optional-page 0) t)
784
(when (or (null predicate)
785
(call-predicate predicate base-page base-index optional-page optional-index))
786
(collect-solution base-page base-index optional-page optional-index)
787
(setf (aref page-collected? base-index) t)))))))
788
(base-cross-complete ()
789
(loop for base-page across base-cross-cache
790
for page-collected? across base-collected?
791
do (loop for collected? across page-collected?
792
for base-index from 0
794
do (collect-solution base-page base-index nil nil))))
795
(call-predicate (predicate left-page left-index right-page right-index)
796
(declare (type (function (array fixnum array fixnum) boolean) predicate))
798
(funcall predicate left-page left-index right-page right-index)))
799
(collect-solution (base-page base-index optional-page optional-index)
800
(declare (type (function (array fixnum array fixnum hash-table) boolean) result-cache-operator))
801
(when (zerop result-count)
802
(trace-algebra process-leftjoin.first-solution base-dimensions solution-count))
803
(when (zerop result-count)
804
(trace-algebra process-leftjoin.first-solution base-dimensions solution-count))
805
(when (and (null (funcall result-cache-operator base-page base-index optional-page optional-index result-cache))
806
(> (incf result-count) start))
807
(next-solution-location)
808
(funcall collector result-page result-index base-page base-index optional-page optional-index)
809
(when (and end (>= result-count end)) (complete-solutions))))
810
(next-solution-location ()
811
;; return a page (possible newly created) and the next free location in that page
812
(when (>= (incf result-index) result-page-length)
813
(when result-page (put-page result-page))
814
(setf result-page (new-field-page destination result-page-length result-page-width)
816
(complete-solutions ()
818
(let ((page-result-count (1+ result-index)))
819
(when (< page-result-count result-page-length)
821
(adjust-page result-page (list page-result-count result-page-width)))))
822
(put-page result-page))
823
(complete-field destination)
824
(incf-stat *solutions-processed* solution-count)
825
(incf-stat *solutions-constructed* result-count)
826
(trace-algebra process-base-dominant-leftjoin base-dimensions optional-dimensions solution-count result-count)
827
(return-from process-balanced-leftjoin (values solution-count result-count)))
829
(trace-data process-balanced-leftjoin.put-page destination result-dimensions page)
830
(put-field-page destination page)))
831
(unless (and (plusp result-page-length) (or (null end) (> end start))) (complete-solutions))
832
(labels ((base-page-step ()
833
(check-query-status *query*)
834
(let ((base-page (get-field-page base-source)))
835
(trace-data process-balanced-leftjoin.get-base-page base-source base-page)
837
(incf solution-count (array-dimension base-page 0))
838
(let ((cache-page (copy-page base-page)))
840
(:natural (base-natural-processor cache-page))
841
(:cross (base-cross-processor cache-page))))
842
(release-field-page base-source base-page))
845
(:natural (base-natural-complete))
846
(:cross (base-cross-complete)))
847
(complete-solutions)))))
848
(optional-page-step ()
849
(let ((optional-page (get-field-page optional-source)))
850
(trace-data process-balanced-leftjoin.get-optional-page optional-source optional-page)
852
(incf solution-count (array-dimension optional-page 0))
853
(let ((cache-page (copy-page optional-page)))
855
(:natural (optional-natural-processor cache-page))
856
(:cross (optional-cross-processor cache-page))))
857
(release-field-page optional-source optional-page))
859
(loop (base-page-step)))))))
860
(loop (base-page-step)
861
(optional-page-step)))))))
863
#+(or) ;; with collected indicators
864
(defun process-base-dominant-leftjoin (destination base-source optional-source base-dimensions optional-dimensions key-dimensions
865
&key test count offset
866
(start offset) (end (when count (+ count (or start 0)))))
867
"Generate a stream of left join solutions to a destination given a solution source and dimension lists for
868
the base and optional solutions fields, and a predicate expression. The base-dominant version feeds the
869
base field into optional-side bgps to constraint their matches. In order that this not deadlock, the entire
870
base stream must be read and cached in order to ensure that the optional stream is generated.
871
The simple method is to just cache the page sequence and re-read it in order to process the join itself.
872
Page-by-page does not work, as the dependent side could be something like a sliced, sorted subselect which delays
873
the optional side until it is complete.
875
Get from the base repeatedly to
876
obtain a stream of (side . solution-page) pairs until the page is null. Use the dimensions to distinguish left from optional.
877
Combine the respective source either as a natural or a cross join depending
878
on whether they share any dimensions. In the former case, hash and cache solutions based on the shared
881
(declare (type (or channel (function ((or array null)) t)) destination)
882
(type (or channel (function () (or array null))) base-source optional-source))
883
(assert-argument-types process-leftjoin
884
(destination (or channel function))
885
(base-source (or channel function))
886
(optional-source (or channel function))
887
(base-dimensions list)
888
(optional-dimensions list)
889
(start (or null (integer 0)))
890
(end (or null (integer 0))))
891
(incf-stat *algebra-operations*)
892
(unless start (setf start 0))
893
(when end (setf end (max start end)))
894
(trace-algebra process-base-dominant-leftjoin base-source optional-source base-dimensions optional-dimensions key-dimensions
895
:start start :end end :test test)
897
(multiple-value-bind (type result-dimensions base-cache-operator optional-cache-operator result-cache-operator predicate collector)
898
(compute-base-dominant-leftjoin-operators base-dimensions optional-dimensions key-dimensions test)
899
(declare (type (function (array fixnum array fixnum array fixnum) t) collector))
900
(let* ((result-page-width (length result-dimensions))
902
(result-page-length *field-page-length*)
903
(result-index result-page-length)
904
(result-cache (make-term-id-cache :single-thread t))
905
(natural-cache (when (eq type :natural) (make-term-id-cache :single-thread t)))
906
(cross-cache (when (eq type :cross) (make-array 32 :fill-pointer 0 :adjustable t)))
909
;; for sp2b-q6@250k this reduced the time from 117 to 81 seconds on hetzner
910
(*store->spocq-term-registry* (if test
911
(copy-registry *store->spocq-term-registry* :single-thread t)
912
*store->spocq-term-registry*)))
913
(declare (type fixnum result-index result-count solution-count))
914
(unless (plusp result-page-length)
915
(put-field-page destination nil)
916
(return-from process-base-dominant-leftjoin (values 0 0)))
917
(labels ((base-natural-processor (base-page)
918
(declare (type (function (array fixnum hash-table) t) base-cache-operator))
919
(dotimes (base-index (array-dimension base-page 0))
920
(funcall base-cache-operator base-page base-index natural-cache)))
921
(base-uncollected-operator ()
922
(loop for location being each hash-value of natural-cache
923
for (base-page base-index . collected?) = location
925
do (collect-solution base-page base-index nil nil)))
926
(optional-natural-processor (optional-page)
927
(declare (type (function (array fixnum hash-table) list) optional-cache-operator))
928
(handler-bind ((error (lambda (c) (declare (ignore c)) (throw :return-nil nil))))
929
(dotimes (optional-index (array-dimension optional-page 0))
930
(loop for location in (funcall optional-cache-operator optional-page optional-index natural-cache)
931
for (optional-page optional-index) = location
932
if (or (null predicate)
933
(call-predicate predicate optional-page optional-index optional-page optional-index))
934
do (setf (cddr location) t)
935
(collect-solution optional-page optional-index optional-page optional-index)))))
936
(base-cross-processor (base-page)
937
(vector-push-extend base-page cross-cache))
938
(optional-cross-processor (optional-page)
939
(handler-bind ((error (lambda (c) (declare (ignore c)) (throw :return-nil nil))))
940
(dotimes (base-index (array-dimension base-page 0))
941
(let ((collected? nil))
942
(loop for optional-page across cross-cache
943
do (dotimes (optional-index (array-dimension optional-page 0) t)
944
(when (or (null predicate)
945
(call-predicate predicate base-page base-index optional-page optional-index))
947
(collect-solution base-page base-index optional-page optional-index))))
949
(collect-solution base-page base-index nil nil))))))
950
(call-predicate (predicate left-page left-index right-page right-index)
951
(declare (type (function (array fixnum array fixnum) boolean) predicate))
953
(funcall predicate left-page left-index right-page right-index)))
954
(collect-solution (base-page base-index optional-page optional-index)
955
(declare (type (function (array fixnum array fixnum hash-table) boolean) result-cache-operator))
956
(when (and (null (funcall result-cache-operator base-page base-index optional-page optional-index result-cache))
957
(> (incf result-count) start))
958
(next-solution-location)
959
(funcall collector result-page result-index base-page base-index optional-page optional-index)
960
(when (and end (>= result-count end)) (complete-solutions))))
961
(next-solution-location ()
962
;; return a page (possible newly created) and the next free location in that page
963
(when (>= (incf result-index) result-page-length)
964
(when result-page (put-page result-page))
965
(setf result-page (new-field-page destination result-page-length result-page-width)
967
(complete-solutions ()
969
(let ((page-result-count (1+ result-index)))
970
(when (< page-result-count result-page-length)
972
(adjust-page result-page (list page-result-count result-page-width)))))
973
(put-page result-page))
975
(incf-stat *solutions-processed* solution-count)
976
(incf-stat *solutions-constructed* result-count)
977
(return-from process-base-dominant-leftjoin (values solution-count result-count)))
979
(trace-algebra process-base-dominant-leftjoin destination result-dimensions page)
980
(funcall destination page)))
981
(unless (plusp result-page-length) (complete-solutions))
982
(loop for solutions = (get-field-page base-source)
983
until (null solutions)
984
do (progn (incf solution-count (array-dimension solutions 0))
986
(:natural (base-natural-processor solutions))
987
(:cross (base-cross-processor solutions)))
988
(check-query-status *query*)))
989
(loop for solutions = (get-field-page optional-source)
990
until (null solutions)
991
do (progn (check-query-status *query*)
992
(incf solution-count (array-dimension solutions 0))
994
(:natural (optional-natural-processor solutions))
995
(:cross (optional-cross-processor solutions)))))
996
(when (eq type :natural)
997
(base-uncollected-operator))
998
(complete-field destination)))))
1000
(defun funcall-cache-operator (optional-cache-operator optional-page optional-index natural-cache)
1001
(funcall optional-cache-operator optional-page optional-index natural-cache))
1003
(defun call-solution-processor (base-natural-processor solutions)
1004
(funcall base-natural-processor solutions))
1006
(defun call-mp-destination (mp-destination page)
1007
(funcall mp-destination page))
1009
(defun process-mp-leftjoin (destination base-source optional-source base-dimensions optional-dimensions key-dimensions
1011
"Generate a stream of left join solutions to a destination given a solution source and dimension lists for
1012
the base and optional solutions fields, and a predicate expression. Invoke the source function repeatedly to
1013
obtain a stream of (side . solution-page) pairs until the page is null. Use the dimensions to distinguish left from optional.
1014
Combine the respective source either as a natural or a cross join depending
1015
on whether they share any dimensions. In the former case, hash and cache solutions based on the shared
1017
This version creates a thread complement equal to *algebra-thread-count* to perform the cobination phase."
1019
(declare (type (or channel (function ((or array null)) t)) destination)
1020
(type (or channel (function () (or array null))) base-source optional-source))
1021
(assert-argument-types process-leftjoin
1022
(destination (or channel function))
1023
(base-source (or channel function))
1024
(optional-source (or channel function))
1025
(base-dimensions list)
1026
(optional-dimensions list)
1028
(incf-stat *algebra-operations*)
1029
(trace-algebra process-mp-leftjoin base-source optional-source base-dimensions optional-dimensions key-dimensions :test test)
1031
(multiple-value-bind (type result-dimensions base-cache-operator optional-cache-operator result-cache-operator predicate collector)
1032
(compute-leftjoin-operators base-dimensions optional-dimensions key-dimensions test)
1033
(declare (type (function (array fixnum array fixnum array fixnum) t) collector))
1035
(let* ((result-page-width (channel-page-width destination))
1036
(result-page-length (channel-page-length destination))
1037
(result-cache (make-term-id-cache :single-thread nil))
1040
(natural-cache (when (eq type :natural) (make-term-id-cache :single-thread t)))
1041
(cross-cache (when (eq type :cross) (make-array 32 :fill-pointer 0 :adjustable t)))
1042
(base-source-lock (bt:make-lock "leftjoin-base-lock"))
1043
(destination-lock (bt:make-lock "leftjoin-destination-lock"))
1044
(base-complete nil))
1045
(assert (= (length result-dimensions) result-page-width) ()
1046
"Channel and operation dimensions do not match: ~a: ~a." destination result-dimensions)
1047
(labels ((optional-natural-processor (optional-page)
1048
(declare (type (function (array fixnum hash-table) t) optional-cache-operator))
1049
(dotimes (optional-index (array-dimension optional-page 0))
1050
(funcall-cache-operator optional-cache-operator optional-page optional-index natural-cache)))
1051
(optional-cross-processor (optional-page)
1052
(vector-push-extend optional-page cross-cache))
1055
(hash-table-count natural-cache)
1056
(length cross-cache))))
1057
(loop for solutions = (get-field-page optional-source)
1058
until (null solutions)
1059
do (progn (check-query-status *query*)
1060
(incf solution-count (array-dimension solutions 0))
1061
(when (and *solution-count-limit*
1062
(> (cache-size) *solution-count-limit*))
1063
(log-warn "leftjoin: terminated @~a optional solutions. (~a x ~a)"
1064
(cache-size) base-dimensions optional-dimensions)
1065
(terminate-task *query*))
1067
(:natural (optional-natural-processor solutions))
1068
(:cross (optional-cross-processor solutions))))))
1069
(labels ((mp-destination (page)
1070
(bt:with-lock-held (destination-lock)
1071
(incf result-count (array-dimension page 0))
1072
(put-field-page destination page)))
1074
(bt:with-lock-held (base-source-lock)
1075
(check-query-status *query*)
1076
(when (and *solution-count-limit*
1077
(> solution-count *solution-count-limit*))
1078
(log-warn "mp-leftjoin: terminated @~a solutions. (~a x ~a)"
1079
solution-count base-dimensions optional-dimensions)
1080
(terminate-task *query*))
1081
(unless base-complete
1082
(let ((page (get-field-page base-source)))
1084
(incf solution-count (array-dimension page 0))
1085
(setf base-complete t))
1087
(process-base-solutions ()
1088
(let ((result-page nil)
1089
(result-index result-page-length)
1090
(*store->spocq-term-registry* (if test
1091
(copy-registry *store->spocq-term-registry* :single-thread t)
1092
*store->spocq-term-registry*)))
1093
(declare (type fixnum result-index result-count solution-count))
1094
(labels ((base-natural-processor (base-page)
1095
(declare (type (function (array fixnum hash-table) list) base-cache-operator))
1096
(handler-bind ((error (lambda (c) (declare (ignore c)) (throw :return-nil nil))))
1097
(dotimes (base-index (array-dimension base-page 0))
1098
(let ((collected? nil))
1099
(loop for (optional-page optional-index) in (funcall-cache-operator base-cache-operator base-page base-index natural-cache)
1100
if (or (null predicate)
1101
(call-predicate predicate base-page base-index optional-page optional-index))
1102
do (setf collected? t)
1103
(collect-solution base-page base-index optional-page optional-index))
1105
(collect-solution base-page base-index nil nil))))))
1106
(base-cross-processor (base-page)
1107
(handler-bind ((error (lambda (c) (declare (ignore c)) (throw :return-nil nil))))
1108
(dotimes (base-index (array-dimension base-page 0))
1109
(let ((collected? nil))
1110
(loop for optional-page across cross-cache
1111
do (dotimes (optional-index (array-dimension optional-page 0) t)
1112
(when (or (null predicate)
1113
(call-predicate predicate base-page base-index optional-page optional-index))
1115
(collect-solution base-page base-index optional-page optional-index))))
1117
(collect-solution base-page base-index nil nil))))))
1118
(call-predicate (predicate left-page left-index right-page right-index)
1119
(declare (type (function (array fixnum array fixnum) boolean) predicate))
1121
(funcall predicate left-page left-index right-page right-index)))
1122
(collect-solution (base-page base-index optional-page optional-index)
1123
(declare (type (function (array fixnum array fixnum hash-table) boolean) result-cache-operator))
1124
(when (null (funcall result-cache-operator base-page base-index optional-page optional-index result-cache))
1125
(next-solution-location)
1127
(funcall collector result-page result-index base-page base-index optional-page optional-index)))
1128
(next-solution-location ()
1129
;; return a page (possible newly created) and the next free location in that page
1130
(when (>= (incf result-index) result-page-length)
1131
(when result-page (put-page result-page))
1132
(setf result-page (new-field-page destination result-page-length result-page-width)
1134
(complete-solutions ()
1136
(let ((page-result-count (1+ result-index)))
1137
(when (< page-result-count result-page-length)
1139
(adjust-page result-page (list page-result-count result-page-width)))))
1140
(put-page result-page)))
1142
(trace-algebra process-mp-leftjoin destination result-dimensions page)
1143
(call-mp-destination #'mp-destination page)))
1144
(loop for solutions = (mp-base-source)
1145
until (null solutions)
1147
(:natural (call-solution-processor #'base-natural-processor solutions))
1148
(:cross (call-solution-processor #'base-cross-processor solutions))))
1149
(complete-solutions)))))
1151
(when (plusp result-page-length)
1152
(let ((threads (loop for i from 0 below (algebra-operator-thread-count)
1153
collect (bt:make-thread #'process-base-solutions
1154
:name (format nil "leftjoin-~a" i)
1155
:initial-bindings (acons '*query* *query*
1156
(acons '*task* *task*
1157
(acons '*repository* *repository* ())))))))
1158
(loop for thread in threads do (bt:join-thread thread)))
1159
(incf-stat *solutions-processed* solution-count)
1160
(incf-stat *solutions-constructed* result-count))
1161
(complete-field destination)
1162
(trace-algebra process-mp-leftjoin base-dimensions optional-dimensions solution-count result-count)
1163
(values solution-count result-count)))))
1166
(defun compute-leftjoin-operators (base-dimensions optional-dimensions key-dimensions test)
1167
(let ((result-dimensions (union-dimensions base-dimensions optional-dimensions)))
1168
(if (intersection base-dimensions optional-dimensions)
1169
(values :natural result-dimensions
1170
(compute-read-cache-op base-dimensions optional-dimensions :key-dimensions key-dimensions)
1171
(compute-write-cache-op optional-dimensions base-dimensions :key-dimensions key-dimensions)
1172
(compute-optional-write-cache-op base-dimensions optional-dimensions)
1174
(compute-binary-predicate test base-dimensions optional-dimensions :handle-errors nil))
1175
(compute-optional-collector result-dimensions base-dimensions optional-dimensions))
1176
(values :cross result-dimensions
1179
(compute-optional-write-cache-op base-dimensions optional-dimensions)
1181
(compute-binary-predicate test base-dimensions optional-dimensions :handle-errors nil))
1182
(compute-optional-collector result-dimensions base-dimensions optional-dimensions)))))
1184
(defun compute-base-dominant-leftjoin-operators (base-dimensions optional-dimensions key-dimensions test)
1185
(let ((result-dimensions (union-dimensions base-dimensions optional-dimensions)))
1186
(if (intersection base-dimensions optional-dimensions)
1187
(values :natural result-dimensions
1188
(compute-write-cache-op base-dimensions optional-dimensions :key-dimensions key-dimensions)
1189
(compute-read-cache-op optional-dimensions base-dimensions :key-dimensions key-dimensions)
1190
(compute-optional-write-cache-op base-dimensions optional-dimensions)
1192
(compute-binary-predicate test base-dimensions optional-dimensions :handle-errors nil))
1193
(compute-optional-collector result-dimensions base-dimensions optional-dimensions))
1194
(values :cross result-dimensions
1197
(compute-optional-write-cache-op base-dimensions optional-dimensions)
1199
(compute-binary-predicate test base-dimensions optional-dimensions :handle-errors nil))
1200
(compute-optional-collector result-dimensions base-dimensions optional-dimensions)))))
1202
(defun compute-balanced-leftjoin-operators (base-dimensions optional-dimensions key-dimensions test)
1203
(let ((result-dimensions (union-dimensions base-dimensions optional-dimensions)))
1204
(if (intersection base-dimensions optional-dimensions)
1205
(values :natural result-dimensions
1206
(compute-write-cache-op base-dimensions optional-dimensions :key-dimensions key-dimensions)
1207
(compute-read-cache-op optional-dimensions base-dimensions :key-dimensions key-dimensions)
1208
(compute-write-cache-op optional-dimensions base-dimensions :key-dimensions key-dimensions)
1209
(compute-read-cache-op base-dimensions optional-dimensions :key-dimensions key-dimensions)
1210
(compute-optional-write-cache-op base-dimensions optional-dimensions)
1212
(compute-binary-predicate test base-dimensions optional-dimensions :handle-errors nil))
1213
(compute-optional-collector result-dimensions base-dimensions optional-dimensions))
1214
(values :cross result-dimensions
1219
(compute-optional-write-cache-op base-dimensions optional-dimensions)
1221
(compute-binary-predicate test base-dimensions optional-dimensions :handle-errors nil))
1222
(compute-optional-collector result-dimensions base-dimensions optional-dimensions)))))
1227
(run-test-query "PREFIX foaf: <http://xmlns.com/foaf/0.1/>
1231
?x foaf:mbox ?mbox .
1232
OPTIONAL { ?x foaf:name ?name } .
1234
" :repository-id (lookup-repository-id :repository-name "optional-dawg-optional-001" :account-name "jhacker"))
1236
(run-test-query "PREFIX foaf: <http://xmlns.com/foaf/0.1/>
1237
SELECT ?person ?nick ?page ?img ?name ?firstN
1239
?person foaf:nick ?nick
1240
OPTIONAL { ?person foaf:isPrimaryTopicOf ?page }
1242
?person foaf:name ?name
1243
{ ?person foaf:depiction ?img } UNION
1244
{ ?person foaf:firstName ?firstN }
1245
} FILTER ( bound(?page) || bound(?img) || bound(?firstN) )
1247
" :repository-id (lookup-repository-id :repository-name "optional-dawg-optional-complex-1" :account-name "jhacker"))
1249
(run-test-query "SELECT ?v
1253
" :repository-id (lookup-repository-id :repository-name "distinct-no-distinct-3" :account-name "jhacker"))
1255
(run-test-query "SELECT ?v
1259
" :repository-id (lookup-repository-id :repository-name "distinct-no-distinct-1" :account-name "jhacker"))
1261
(run-test-query "PREFIX : <http://example.org/>
1262
PREFIX foaf: <http://xmlns.com/foaf/0.1/>
1265
WHERE { ?X foaf:knows* ?Y }
1269
:repository-id (lookup-repository-id :repository-name "property-path-pp14" :account-name "jhacker"))
1271
;;; chained leftjoin optimization
1273
(pprint-sse (parse-sparql "
1275
?s rdf:type <http://ontology/a> .
1276
?s dc:creator <http://example.com/a> .
1277
optional { ?s <http://ontology/slot/a> ?a }
1278
optional { ?s <http://ontology/slot/b> ?b }
1284
(select (leftjoin (leftjoin (bgp (triple ?:s
1285
http://www.w3.org/1999/02/22-rdf-syntax-ns#:type
1286
<http://ontology/a>)
1288
<http://purl.org/dc/terms/creator>
1289
<http://example.com/a>))
1290
(bgp (triple ?:s <http://ontology/slot/a> ?:a)))
1291
(bgp (triple ?:s <http://ontology/slot/b> ?:b)))
1296
(leftjoin (leftjoin <x> (bgp (triple s0 p0 o0)))
1297
(bgp (triple s1 p1 o1)))
1299
(leftjoin <x> (bgp (sum (triple s0 p0 o0)
1300
(triple s1 p1 o1))))
1304
(leftjoin (leftjoin <x> (bgp (triple s0 p0 o0)))
1305
(bgp (sum (triple s1 p1 o1)
1306
(triple s2 p2 o2))))
1308
(leftjoin <x> (bgp (sum (triple s0 p0 o0)
1310
(triple s2 p2 o2))))