Coverage report: /development/source/library/org/datagraph/spocq-shard/src/algebra/operators/join.lisp
| Kind | Covered | All | % |
| expression | 354 | 1132 | 31.3 |
| branch | 29 | 126 | 23.0 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
3
(in-package :org.datagraph.spocq.implementation)
5
(:documentation "sparql join operations"
6
"The SPARQL join operation is sort of a natural join, but not quite[1,2]. In particular, an unbound
7
dimension in a solution matches with all opposing solutions, rather than failing to match.
8
That is, whith respect to unbound dimensions, the operator is a cross join. The literature[2,5] suggests
9
predicting which form of join is required, but the examples - eg the bindings tests, demonstrate, that
10
is not feasible and the correct match can depend on the individual solution.
12
Note that, when the unbound=wildcard logic is applied to bgp matching, it would mean that a single
13
matching statement in the bgp should produce a solution for the entire bgp - with everything else unbound.
15
This implementation proscribes stable solution dimensions, that it the logic is 'null rejecting'[3,4] and
16
thus always performs a natural join.
18
One of the consequences is that the service07 test yields no solutions rather than producing the
19
same result as the query expression in which the service form is wrapped with an optional.
22
[1] http://www.w3.org/TR/sparql11-query/#defn_algCompatibleMapping
23
[2] www.hpl.hp.com/techreports/2005/HPL-2005-170.pdf
24
[3] www2007.org/papers/paper435.pdf
25
[4] www.dcc.uchile.cl/~cgutierr/papers/tods09.pdf
26
[5] https://pdfs.semanticscholar.org/129e/e5496d9b54fdfdc11214765f90d047fe57a1.pdf
31
(progn (push :trace-solution-count *features*)
32
(load #p"LIBRARY:org;datagraph;spocq;src;algebra;operators;join.lisp")
33
(load #p"LIBRARY:org;datagraph;spocq;src;algebra;operators;filter.lisp")
34
(load #p"LIBRARY:org;datagraph;spocq;src;algebra;operators;distinct.lisp"))
36
(defparameter *join-translations*
37
'(((spocq.a:|join| ?p1 (spocq.a:|union| ?p2 ?p3))
38
. (spocq.a:|union| (spocq.a:|join| ?p1 ?p2) (spocq.a:|join| ?p1 ?p3)))
39
((spocq.a:|join| (spocq.a:|union| ?p2 ?p3) ?p1)
40
. (spocq.a:|union| (spocq.a:|join| ?p2 ?p1) (spocq.a:|join| ?p3 ?p1)))
42
((spocq.a:|join| ?field (spocq.a:|table| spocq.a:|unit|))
45
((spocq.a:|join| (spocq.a:|table| spocq.a:|unit|) ?field)
48
(defmacro spocq.a:|join| (&whole form field1 field2 &rest args
49
&key count end offset start test)
50
"( ( solutionField solutionField (function (solution) xsd:boolean)? ) solutionField )
51
A JOIN form combines two solution fields and yields a new field in which
52
each solution is either a merge of two compatible solutions from the constituent
53
fields, in the case where a natural join is possible, or an element of the cross
54
join of the two fields, in the case where no dimension agrees.
56
Where a test predicate is given, all result solutions satisfy it."
58
(declare (ignore count end offset start test)
59
(dynamic-extent args))
60
(or (rule-based-translator form *join-translations*)
61
(apply #'macroexpand-join field1 field2 args)))
64
;;; join propagation : NYI
66
(defparameter *join-merge-bgps* t)
68
(defparameter *enable.service-sip* t)
70
(defun sip-expand-service (service? sip-source)
71
;; if the second argument field is a service - whether directory or indirectly
72
;; return an expansion which combines the service as sip with the other join side
73
(if *enable.service-sip*
74
(sip-expand-service-operator (first service?) service? sip-source)
77
(defgeneric sip-expand-service-operator (operator service? sip-source)
78
(:method ((operator (eql 'spocq.a:|service|)) service? sip-source)
79
;; intersection already tested, but just cor the complete forms
80
(if (or (null (intersection (expression-variables service?)
81
(expression-variables sip-source)))
82
(let* ((location (second service?))
83
(repository-id (and (not (variable-p location)) (iri-service-repository-id location))))
84
(and repository-id (not (parse-view-repository-id repository-id)))))
85
;; decline to rewrite local service repositories unless they are
87
(destructuring-bind (op iri group-graph-pattern &rest args) service?
89
`(spocq.a::|servicejoin| ,iri ,group-graph-pattern ,sip-source ,@args))))
90
(:method ((operator (eql 'spocq.a::|servicejoin|)) service? sip-source)
91
(destructuring-bind (op iri group-graph-pattern join-sip-source &rest args) service?
93
(if (null (intersection (expression-variables group-graph-pattern)
94
(expression-variables sip-source)))
97
`(spocq.a::|servicejoin| ,iri ,group-graph-pattern (spocq.a:|join| ,join-sip-source ,sip-source) ,@args))))
99
(:method ((operator (eql 'spocq.a:|filter|)) service? sip-source)
100
(destructuring-bind (op filter-field . rest) service?
101
(let ((sip-expansion? (sip-expand-service filter-field sip-source)))
102
(if (eq sip-expansion? filter-field)
104
`(,op ,sip-expansion? ,@rest)))))
106
(:method ((operator (eql 'spocq.a:|join|)) service? sip-source)
107
(destructuring-bind (op field1 field2 . rest) service?
108
(let ((sip-expansion? (sip-expand-service field1 sip-source)))
109
(if (eq sip-expansion? field1)
110
(let ((sip-expansion? (sip-expand-service field2 sip-source)))
111
(if (eq sip-expansion? field2)
113
`(,op ,field1 ,sip-expansion? ,@rest)))
114
`(,op ,sip-expansion? ,field2 ,@rest)))))
116
(:method ((operator (eql 'spocq.a:|leftjoin|)) service? sip-source)
117
(destructuring-bind (op field1 field2 . rest) service?
118
(let ((sip-expansion? (sip-expand-service field1 sip-source)))
119
(if (eq sip-expansion? field1)
121
`(,op ,sip-expansion? ,field2 ,@rest)))))
123
(:method ((operator t) service? sip-source)
124
;; if the first was not a service clause, but the second is, swap them
125
(if (service-form-p sip-source)
126
(let ((service-expansion (sip-expand-service sip-source service?)))
127
(if (eq service-expansion sip-source)
132
(defparameter *push-values-clauses* nil)
134
(defun reduce-values-clause (field1 field2 &optional slice)
135
"accept two fields and examine them to determine whether they involve bindings whcich can be reduced
136
This applies if both a bindings clauses, for which the join can be precomputed,
137
of it one binding can be pushed in to abgp on the other field"
138
(flet ((extend-bgp (bgp bindings)
139
(when (null (set-difference (expression-variables bindings) (expression-variables bgp)))
140
(unless (find bindings bgp)
141
(push bindings (rest bgp)))
143
(cond ((and (bindings-form-p field1) (bindings-form-p field2))
144
;; pre-join constant bindings
145
(destructuring-bind (solution-list1 dimensions1) (rest field1)
146
(declare (ignore solution-list1))
147
(destructuring-bind (solution-list2 dimensions2) (rest field2)
148
(declare (ignore solution-list2))
149
;; cross-join iff no dimensionis either natural or dynamically bound
150
(unless (or (intersection dimensions1 dimensions2)
151
(query-values-argument *query* dimensions1)
152
(query-values-argument *query* dimensions2))
153
(cross-join-bindings field1 field2 slice)))))
154
((bindings-form-p field1)
155
(let ((bgp-forms (expression-bgps field2)))
156
(when (plusp (loop for bgp in bgp-forms
157
count (extend-bgp bgp field1)))
159
`(spocq.e:slice ,field2 ,@(rest slice))
161
((bindings-form-p field2)
162
(let ((bgp-forms (expression-bgps field1)))
163
(when (plusp (loop for bgp in bgp-forms
164
count (extend-bgp bgp field2)))
166
`(spocq.e:slice ,field1 ,@(rest slice))
170
(defun macroexpand-join (field1 field2 &rest args)
171
"Perform a join on the two solution fields with optional slice constraints.
172
In order to support delayed propagation, the constituent field with the fewest dimensions is taken as
173
a potential source by declaring its available dimensions to the other side's form. those
174
BGPs, which contribute to that side and include one or more of those dimensions, are then
175
compiled to accepts a field stream with initial values and the requirements are captured in
177
In order to exploit service propagation, a service as a second argument is combined into a service-join
178
with whatever preceeded it. Alternatively, by configuration a non-service second argument with the service
179
as the first is swapped and then retried.
180
The respective service clause may not be immediately apparent.
181
On one hand, filter and join argument are also examined, and found bgps are speculatively expanded to
182
see if they yield serice operations."
183
(destructuring-bind (&key end start test)
184
(apply #'canonicalize-algebra-arguments args)
185
(let ((slice-p (or end start)))
186
(labels ((standard-expansion ()
187
(let ((field1-dimensions (expression-dimensions field1))
188
(field2-dimensions (expression-dimensions field2)))
189
(cond ((null (intersection field1-dimensions field2-dimensions))
191
;; support propagation into service clauses
192
((let ((sip-service? (sip-expand-service field1 field2)))
193
(unless (eq field1 sip-service?)
195
(setf sip-service? `(spocq.a:|filter| ,sip-service? ,test)))
197
(setf sip-service? `(spocq.a:|slice| ,sip-service?
198
,@(when start `(:start ,start))
199
,@(when end `(:end ,end)))))
201
((eq *join-propagation-mode* :propagate)
202
(if (< (length field1-dimensions) (length field2-dimensions))
203
(setf field2 `(locally (declare (spocq.e:base-dimensions ,@field1-dimensions))
205
(setf field1 `(locally (declare (spocq.e:base-dimensions ,@field2-dimensions))
209
(simple-expansion)))))
211
`(spocq.e:join ,field1 ,field2
212
,@(when test `(:test (quote ,test)))
213
,@(when end `(:end ,end))
214
,@(when start `(:start ,start)))))
215
(cond ((and *push-values-clauses*
216
(reduce-values-clause field1 field2 (when slice-p `(spocq.a:|slice| :start ,start :end ,end)))))
217
((and *join-merge-bgps* (null test))
218
(cond ((bgp-form-p field1)
219
(cond ((bgp-form-p field2)
220
`(spocq.a:|bgp| ,@(rest field1) ,@(rest field2)
221
,@(when slice-p `((spocq.a:|slice| :start ,start :end ,end)))))
222
((and (filter-form-p field2) (bgp-form-p (second field2)))
223
(destructuring-bind (filter-op (bgp-op . patterns) test) field2
224
(declare (ignore filter-op bgp-op))
225
`(spocq.a:|bgp| ,@(rest field1) ,@patterns
226
,@(when slice-p `((spocq.a:|slice| :start ,start :end ,end)))
227
(spocq.a:|filter| ,test))))
229
(standard-expansion))))
230
((and (filter-form-p field1) (bgp-form-p (second field1)))
231
(cond ((bgp-form-p field2)
232
(destructuring-bind (filter-op (bgp-op . patterns) test) field1
233
(declare (ignore filter-op bgp-op))
234
`(spocq.a:|bgp| ,@patterns ,@(rest field2)
235
,@(when slice-p `((spocq.a:|slice| :start ,start :end ,end)))
236
(spocq.a:|filter| ,test))))
237
((and (filter-form-p field2) (bgp-form-p (second field2)))
238
(destructuring-bind (filter-op1 (bgp-op1 . patterns1) test1) field1
239
(declare (ignore filter-op1 bgp-op1))
240
(destructuring-bind (filter-op2 (bgp-op2 . patterns2) test2) field2
241
(declare (ignore filter-op2 bgp-op2))
242
`(spocq.a:|bgp| ,@patterns1 ,@patterns2
243
,@(when slice-p `((spocq.a:|slice| :start ,start :end ,end)))
244
(spocq.a:|filter| ,test1)
245
(spocq.a:|filter| ,test2)))))
247
(standard-expansion))))
249
(standard-expansion))))
251
(standard-expansion)))))))
255
(defgeneric spocq.e:join (field1 field2 &rest args &key end start test)
256
(:documentation "The JOIN combines solution fields to generate a field which contains
257
a merged solution for each compatible pair.
258
The initial evaluation process, which composes the bgps and operators as a generator tree
259
with agp leaves, passes constituent agps upwards by registering them with respective generators
260
and links the sources with destinations by correlating available with required dimensions.
261
Where a match occurs, the source channel is spliced with an extra one which feeds data
262
to the destination bgp match process.")
264
(:method :before ((field1 t) (field2 t) &key end start test)
265
(assert-argument-types spocq.e:join
266
(start (or null (integer 0)))
267
(end (or null (integer 0))))
268
(incf-stat *algebra-operations*)
269
(trace-algebra spocq.e:join field1 field2
270
:start start :end end :test test))
272
(:method ((field1 null-generator) (field2 solution-generator) &rest args)
273
(declare (ignore args))
274
(let* ((result-dimensions (union-dimensions (solution-generator-dimensions field1)
275
(solution-generator-dimensions field2)))
276
(result-channel (make-null-channel :dimensions result-dimensions)))
277
(process-null result-channel result-dimensions)
278
(make-null-generator :operator 'spocq.a:|null|
279
:dimensions result-dimensions
281
:channel result-channel
284
(:method ((field1 solution-generator) (field2 null-generator) &rest args)
285
(declare (ignore args))
286
(let* ((result-dimensions (union-dimensions (solution-generator-dimensions field1)
287
(solution-generator-dimensions field2)))
288
(result-channel (make-null-channel :dimensions result-dimensions)))
289
(process-null result-channel result-dimensions)
290
(make-null-generator :operator 'spocq.a:|null|
291
:dimensions result-dimensions
293
:channel result-channel
296
(:method ((field1 solution-generator) (field2 solution-generator) &rest args)
297
(declare (dynamic-extent args))
298
(apply 'join-generator field1 field2
302
(defparameter *join.verbose* nil)
305
(defun join-generator (left-field-generator right-field-generator &rest args &key end start test)
306
(declare (ignore test))
307
;; upon invocation use the generator argument to compute the combination operators
308
;; and invoke the respective binding operator with the respective solution destination
309
(let* ((left-dimensions (solution-generator-dimensions left-field-generator))
310
(right-dimensions (solution-generator-dimensions right-field-generator))
311
(left-channel (solution-generator-channel left-field-generator))
312
(right-channel (solution-generator-channel right-field-generator))
313
(result-dimensions (union-dimensions left-dimensions right-dimensions))
314
(result-channel (make-channel :name (cons 'spocq.a:|join| (task-id *query*))
315
:dimensions result-dimensions
316
:size (effective-channel-size :start start :end end)
317
:page-length (effective-page-length :start start :end end)))
318
(concrete-operator 'process-join))
319
;; examing dimensional dependencies to determine whether it is possible to
320
;; propagate solutions. if so, splice additional flow paths into the network.
321
;; nb. this must happen before constituent trheads start to generate solutions, that is, before
322
;; the join operator pr-se is invoked, so this logic must be here rather than delayed
323
;; until internal in the oeprator itself. !! an alternative would be to arrange spliced
324
;; data flow to occur on the channel read rather than write, but that would synchronize the
325
;; dependents with the primary path.
327
(print (list left-dimensions right-dimensions)))
328
(if (< (length left-dimensions) (length right-dimensions))
329
(dolist (agp (solution-generator-patterns right-field-generator))
331
(print (list :right-to-left (and (agp-base-channel agp) (agp-base-dimensions agp))
333
(when (and (agp-base-channel agp)
334
(equal (agp-base-dimensions agp) left-dimensions))
336
(print (list :join.splice.right-to-left agp (agp-base-channel agp) left-channel
337
left-dimensions right-dimensions)))
338
(trace-algebra join.splice.right-to-left agp (agp-base-channel agp) left-channel
339
left-dimensions right-dimensions)
340
(setf concrete-operator 'process-left-dominant-join)
341
(push (agp-base-channel agp) (channel-channels left-channel))))
342
(dolist (agp (solution-generator-patterns left-field-generator))
344
(print (list :left-to-right (and (agp-base-channel agp) (agp-base-dimensions agp))
346
(when (and (agp-base-channel agp)
347
(equal (agp-base-dimensions agp) right-dimensions))
349
(print (list :join.splice.left-to-right agp (agp-base-channel agp) right-channel
350
left-dimensions right-dimensions)))
351
(trace-algebra join.splice.left-to-right agp (agp-base-channel agp) right-channel
352
left-dimensions right-dimensions)
353
(setf concrete-operator 'process-right-dominant-join)
354
(push (agp-base-channel agp) (channel-channels right-channel)))))
356
;; return the binding function to the combination operator
357
(make-solution-generator :operator 'spocq.a:|join|
358
:concrete-operator concrete-operator
359
:dimensions result-dimensions
360
:expression (list 'run-join-thread result-channel left-field-generator right-field-generator
361
args :concrete-operator concrete-operator)
362
:channel result-channel
363
:constituents (list left-field-generator right-field-generator))))
366
(defun run-join-thread (result-channel left-field-generator right-field-generator args
367
&key (concrete-operator 'process-join))
368
(let* ((left-dimensions (solution-generator-dimensions left-field-generator))
369
(right-dimensions (solution-generator-dimensions right-field-generator))
370
(left-channel (solution-generator-channel left-field-generator))
371
(right-channel (solution-generator-channel right-field-generator))
372
(left-expression (solution-generator-expression left-field-generator))
373
(right-expression (solution-generator-expression right-field-generator))
374
(*thread-operations* (cons (list* 'spocq.a:|join| (task-id *task*)
375
left-dimensions right-dimensions args)
376
*thread-operations*)))
378
(print (list :concrete-operator concrete-operator)))
379
(query-run-in-thread *query* left-expression)
380
(query-run-in-thread *query* right-expression)
381
(setf (channel-size result-channel) (min (channel-size left-channel)
382
(channel-size right-channel)
383
(channel-size result-channel))
384
(channel-page-length result-channel) (min (channel-page-length left-channel)
385
(channel-page-length right-channel)
386
(channel-page-length result-channel)))
387
(apply concrete-operator result-channel left-channel right-channel
390
:left-sort-dimensions (solution-generator-sort-dimensions left-field-generator)
391
:right-sort-dimensions (solution-generator-sort-dimensions right-field-generator)
397
(defun compute-join-operators (left-dimensions right-dimensions test)
398
(let ((result-dimensions (union-dimensions left-dimensions right-dimensions)))
399
(if (intersection left-dimensions right-dimensions)
400
(values :natural result-dimensions
401
(compute-binary-cache-op left-dimensions right-dimensions)
402
(compute-binary-cache-op right-dimensions left-dimensions)
404
(compute-binary-predicate test left-dimensions right-dimensions))
405
(compute-binary-collector result-dimensions
406
left-dimensions right-dimensions))
407
(values :cross result-dimensions nil nil
409
(compute-binary-predicate test left-dimensions right-dimensions :handle-errors t))
410
(compute-binary-collector result-dimensions
411
left-dimensions right-dimensions)))))
414
(defparameter *mp-join-p* nil)
416
(defmethod process-join ((destination array-page-channel)
417
(left-source array-page-channel)
418
(right-source array-page-channel)
419
left-dimensions right-dimensions
420
&rest args &key (test nil t-s) offset count
421
(start offset) (end (when count (+ count (or start 0))))
422
(left-sort-dimensions ()) (right-sort-dimensions ()))
423
"Generate a stream of join solutions to a destination given a solution source and dimension lists for
424
the left and right solutions fields. Invoke the source function repeatedly to obtain a stream of
425
(dimension . solution-page) pairs until the page is null. Use the dimensions to distinguish left from right.
426
Each must be a unique list. Combine the respective source either as a natural or a cross join depending
427
on whether they share any dimensions. In the former case, hash and cache solutions based on the shared
430
(assert-argument-types process-join
431
(right-source (or channel function))
432
(left-dimensions list)
433
(right-dimensions list)
435
(unless start (when end (setf start 0)))
436
(when end (setf end (max start end)))
438
(setf args (copy-list args))
440
;; retrieve / generate the core operator
441
(let* ((form (if (intersection left-dimensions right-dimensions)
442
'compute-natural-join-operator 'compute-cross-join-operator))
443
(key (list form left-dimensions right-dimensions test start end))
444
(function (get-aspect-cache key)))
446
(setf function (funcall form left-dimensions right-dimensions test start end
447
:left-sort-dimensions left-sort-dimensions :right-sort-dimensions right-sort-dimensions)
448
(get-aspect-cache key) function))
449
;; if parallel reduction is enabled, split the streams across multiple cores, otherwise
450
;; pass everything through just one
451
(funcall function destination left-source right-source)
454
(if (and (eq form 'compute-natural-join-operator)
455
(typep (algebra-operator-thread-count) '(integer 2))
456
(not (or end (typep start '(integer 1))))
457
(> (repository-statement-count *repository*) 10000))
458
(process-mp-natural-join function destination left-source right-source left-dimensions right-dimensions)
459
(funcall function destination left-source right-source))
460
(funcall function destination left-source right-source))))
463
#+trace-solution-count
465
(defun check-memory ()
466
(let ((usage (sb-kernel::dynamic-usage)))
467
(format *trace-output* "[usage: ~s]" usage)
468
(finish-output *trace-output*)
469
(when (> usage 2000000000)
470
(when (> usage 2700000000)
471
(format *trace-output* "[gc]")
474
;; (defun check-memory ())
477
(let ((dynamic (sb-kernel::dynamic-usage)))
478
(format *trace-output* "[MEMORY: dynamic ~d]" dynamic)
479
(finish-output *trace-output*)))
481
(pushnew 'log-memory sb-ext:*after-gc-hooks*))
484
(defun compute-cross-join-operator (left-dimensions right-dimensions test start end &key &allow-other-keys)
485
(let* ((result-dimensions (union-dimensions left-dimensions right-dimensions))
487
`(lambda (destination left-source right-source)
488
(declare (optimize (speed 3) (safety 0)))
489
(trace-algebra process-cross-join destination left-source right-source ',left-dimensions ',right-dimensions
490
:start ,start :end ,end :test ',test)
492
(block process-cross-join
493
(let* ((result-page-width (channel-page-width destination))
494
(result-page-length (channel-page-length destination))
496
(result-index result-page-length)
497
(left-width ,(length left-dimensions))
498
(right-width ,(length right-dimensions))
503
(left-solution-count 0)
504
(right-solution-count 0)
509
(source-dimensions (list ',left-dimensions ',right-dimensions))
510
#+trace-solution-count (next-count 0)
511
#+trace-solution-count (uuid (uuid:make-v1-uuid))
512
#+trace-solution-count (pns nil))
513
(declare (type fixnum result-index result-count solution-count))
514
(assert (= ,(length result-dimensions) result-page-width) ()
515
"Channel and operation dimensions do not match: ~a: ~a." destination ',result-dimensions)
516
(labels (,@(when test
517
(let ((lambda (compute-binary-predicate-lambda test left-dimensions right-dimensions :handle-errors nil)))
518
`((test ,@(rest lambda)))))
519
,(let ((lambda (compute-binary-collector-lambda result-dimensions left-dimensions right-dimensions)))
520
`(collect ,@(rest lambda)))
521
(collect-solution (left-page left-index right-page right-index)
523
(when (and *solution-count-limit* (> result-count *solution-count-limit*))
524
(spocq.e::cardinality-limit-error :operator 'cross-join
525
:state `(:limit ,*solution-count-limit*
526
:left-count ,left-solution-count
527
:right-count ,right-solution-count
528
:result-count ,result-count)
529
:dimensions '(,left-dimensions ,right-dimensions)))
531
`(when (and (test left-page left-index right-page right-index)
532
,@(when start `((> result-count ,start))))
533
(next-solution-location)
534
(collect result-page result-index left-page left-index right-page right-index)
535
,@(when end `((when (>= result-count ,end)
536
(trace-algebra process-join.end :test+bounded result-count)
537
(complete-solutions)))))
538
`(,@(if start `(when (> result-count ,start)) '(progn))
539
(next-solution-location)
540
(collect result-page result-index left-page left-index right-page right-index)
541
,@(when end `((when (>= result-count ,end)
542
(trace-algebra process-join.end result-count)
543
(complete-solutions)))))))
544
(next-solution-location ()
545
;; return a page (possible newly created) and the next free location in that page
546
(when (>= (incf result-index) result-page-length)
547
(when result-page (put-page result-page))
548
(setf result-page (new-field-page destination result-page-length result-page-width)
550
(complete-solutions ()
551
(log-trace "join(x) ~a: complete: ~10s/~10s" source-dimensions solution-count result-count)
553
(let ((page-result-count (1+ result-index)))
554
(when (< page-result-count result-page-length)
556
(adjust-page result-page (list page-result-count result-page-width)))))
557
(put-page result-page))
559
(incf-stat *solutions-processed* solution-count)
560
(incf-stat *solutions-constructed* result-count)
561
(trace-algebra process-join ',left-dimensions ',right-dimensions solution-count result-count)
562
(return-from process-cross-join (values solution-count result-count)))
564
(trace-data process-cross-join.put-page destination ',result-dimensions page)
565
#+trace-solution-count
566
(when (shiftf pns nil)
567
(bt:with-lock-held (*page-cache-lock*)
568
(format *trace-output* "~% ~a join(x)put ~10s/~10s -> ~s"
569
uuid solution-count result-count (channel-solution-count destination))
570
(finish-output *trace-output*)))
572
(put-field-page destination page)
573
(complete-field destination)))
575
(let ((cell (cons (copy-page page) nil)))
577
(setf (rest left-cache-end) cell)
578
(setf left-cache cell))
579
(setf left-cache-end cell)))
581
(let ((cell (cons (copy-page page) nil)))
583
(setf (rest right-cache-end) cell)
584
(setf right-cache cell))
585
(setf right-cache-end cell)))
589
(clear-right-cache ()
590
(setf right-cache nil
591
right-cache-end nil)))
592
(unless (and (plusp result-page-length) (or (null ,end) (> ,end ,start))) (complete-solutions))
593
(log-trace "join(x) ~a" source-dimensions)
594
(loop (check-query-status *query*)
595
#+trace-solution-count
596
(when (> solution-count next-count)
598
(bt:with-lock-held (*page-cache-lock*)
599
(format *trace-output* "~% ~a join(x)... ~10s/~10s ~10s/~10s <- ~d/~d"
600
uuid solution-count result-count
603
(channel-solution-count left-source) (channel-solution-count right-source))
604
(finish-output *trace-output*))
605
(incf next-count 10000))
607
(unless left-complete
608
(let ((left-page (get-field-page left-source)))
610
(assert (= left-width (array-dimension left-page 1)) ()
611
"cjg: Invalid left page width (expected ~a): ~a."
612
left-width (array-dimension left-page 1))
613
(incf solution-count (array-dimension left-page 0))
614
(incf left-solution-count (array-dimension left-page 0))
615
(unless right-complete (cache-left left-page))
617
'(handler-bind ((error (lambda (c) (declare (ignore c)) (throw :skip nil))))
618
(loop for right-page in right-cache
619
do (dotimes (left-index (array-dimension left-page 0))
620
(dotimes (right-index (array-dimension right-page 0))
622
(collect-solution left-page left-index right-page right-index))))))
623
'(loop for right-page in right-cache
624
do (dotimes (left-index (array-dimension left-page 0))
625
(dotimes (right-index (array-dimension right-page 0))
626
(collect-solution left-page left-index right-page right-index)))))
627
(release-field-page left-source left-page))
628
((zerop left-solution-count)
629
;; empty solutionfield entails empty result
630
(log-trace "join(x) ~a: lft: null" source-dimensions)
631
(complete-solutions))
632
((setf left-complete t)
633
;; #+trace-solution-count
634
(log-trace "join(x) ~a: lft: ~10@s/~10s ~10@s/~10s"
636
solution-count result-count
637
left-solution-count right-solution-count)
638
(clear-right-cache)))))
639
(unless right-complete
640
(let ((right-page (get-field-page right-source)))
642
(assert (= right-width (array-dimension right-page 1)) ()
643
"cjg: Invalid right page width (expected ~a): ~a."
644
right-width (array-dimension right-page 1))
645
(incf solution-count (array-dimension right-page 0))
646
(incf right-solution-count (array-dimension right-page 0))
647
(unless left-complete (cache-right right-page))
649
'(handler-bind ((error (lambda (c) (declare (ignore c)) (throw :skip nil))))
650
(loop for left-page in left-cache
651
do (dotimes (left-index (array-dimension left-page 0))
652
(dotimes (right-index (array-dimension right-page 0))
654
(collect-solution left-page left-index right-page right-index))))))
655
'(loop for left-page in left-cache
656
do (dotimes (left-index (array-dimension left-page 0))
657
(dotimes (right-index (array-dimension right-page 0))
658
(collect-solution left-page left-index right-page right-index)))))
659
(release-field-page right-source right-page))
660
((zerop right-solution-count)
661
;; empty solutionfield entails empty result
662
(log-trace "join(x) ~a: rgt: null" source-dimensions)
663
(complete-solutions))
664
((setf right-complete t)
665
(log-trace "join(x) ~a: rgt: ~10@s/~10s ~10@s/~10s"
667
solution-count result-count
668
left-solution-count right-solution-count)
669
(clear-left-cache)))))
670
(when (and left-complete right-complete) (return)))
671
(complete-solutions)))))))
672
(values (spocq-compile lambda) lambda)))
674
(defparameter *CARDINALITY-LIMIT-ERROR* t)
675
(defun compute-natural-join-operator (left-dimensions right-dimensions test start end &key
676
(left-sort-dimensions ())
677
(right-sort-dimensions ()))
678
(declare (ignorable left-sort-dimensions right-sort-dimensions))
679
(let* ((result-dimensions (union-dimensions left-dimensions right-dimensions))
681
`(lambda (destination left-source right-source)
682
(declare (optimize (debug 3))) ; (speed 3) (safety 0)))
683
(trace-algebra process-natural-join destination left-source right-source ',left-dimensions ',right-dimensions
684
:start ,start :end ,end :test ',test)
685
(block process-natural-join
686
(let* ((result-page-width (channel-page-width destination))
687
(result-page-length (channel-page-length destination))
689
(result-index result-page-length)
690
(left-width ,(length left-dimensions))
691
(right-width ,(length right-dimensions))
692
(left-cache (make-term-id-cache :single-thread t))
693
(right-cache (make-term-id-cache :single-thread t))
695
(left-solution-count 0)
696
(right-solution-count 0)
699
(source-dimensions (list ',left-dimensions ',right-dimensions))
703
#+trace-solution-count (next-count 0)
704
#+trace-solution-count (uuid (uuid:make-v1-uuid))
705
#+trace-solution-count (pns nil)
706
(left-surrogate (make-array left-width :element-type 'fixnum))
707
(right-surrogate (make-array right-width :element-type 'fixnum))
709
((last-left ',(make-array (length left-dimensions) :initial-element +null-term-id+ :element-type 'fixnum))
710
(last-right ',(make-array (length right-dimensions) :initial-element +null-term-id+ :element-type 'fixnum))))
711
(declare (type fixnum result-index result-count left-solution-count right-solution-count)
712
(type hash-table left-cache right-cache))
713
(assert (= ,(length result-dimensions) result-page-width) ()
714
"Channel and operation dimensions do not match: ~a: ~a." destination ',result-dimensions)
715
(labels (,@(when test
716
(let ((lambda (compute-binary-predicate-lambda-spread test left-dimensions right-dimensions :handle-errors nil)))
717
`((test ,@(rest lambda)))))
718
,(let ((lambda (compute-binary-collector-lambda-spread result-dimensions left-dimensions right-dimensions)))
719
(trace-algebra :collector lambda)
720
`(collect ,@(rest lambda)))
721
(cache-left ,@(let ((lambda (compute-binary-cache-op-lambda-spread left-dimensions right-dimensions)))
722
(trace-algebra :cache-op-lr lambda)
724
(cache-right ,@(let ((lambda (compute-binary-cache-op-lambda-spread right-dimensions left-dimensions)))
725
(trace-algebra :cache-op-rl lambda)
728
(compare-left-solution ,@(rest (compute-binary-vector-comparison-lambda left-dimensions left-dimensions
729
left-sort-dimensions left-sort-dimensions)))
730
(compare-right-solution ,@(rest (compute-binary-vector-comparison-lambda right-dimensions right-dimensions
731
right-sort-dimensions right-sort-dimensions))) )
732
(collect-solution (left-solution right-solution)
734
(trace-algebra join.n.cs "~a ~a ~a"
735
left-solution right-solution result-count)
736
(when (and *solution-count-limit* (> result-count *solution-count-limit*))
737
(log-warn "~&join limit exceeded (~a ~a ~a) ~a ~a~%"
738
result-count left-count right-count
739
(let ((entry-count 0)
741
(maphash #'(lambda (k v) k (incf entry-count (length v)) (incf key-count)) left-cache)
742
(list entry-count key-count ))
743
(let ((entry-count 0)
745
(maphash #'(lambda (k v) k (incf entry-count (length v)) (incf key-count)) right-cache)
746
(list entry-count key-count )))
747
(cond (*cardinality-limit-error*
748
(spocq.e::cardinality-limit-error :operator 'natural-join
749
:state `(:limit ,*solution-count-limit*
750
:left-count ,left-solution-count
751
:right-count ,right-solution-count
752
:result-count ,result-count)
753
:dimensions '(,left-dimensions ,right-dimensions)))
755
(complete-solutions))))
757
`(when (and (test left-solution right-solution)
758
,@(when start `((> result-count ,start))))
759
(next-solution-location)
760
(collect result-page result-index left-solution right-solution)
761
,@(when end `((when (>= result-count ,end)
762
(trace-algebra process-join.end :test+bounded result-count)
763
(complete-solutions)))))
764
`(,@(if start `(when (> result-count ,start)) '(progn))
765
(next-solution-location)
766
(collect result-page result-index left-solution right-solution)
767
,@(when end `((when (>= result-count ,end)
768
(trace-algebra process-join.end :bounded result-count)
769
(complete-solutions))))))
771
(when (eq '< (compare-left-solution left-solution last-left))
772
(warn "degression (@~d) left: ~s ~s" result-count left-solution last-left)
773
(complete-solutions))
774
(when (eq '< (compare-right-solution right-solution last-right))
775
(warn "degression (@~d) right: ~s ~s" result-count right-solution last-right)
776
(complete-solutions))
777
;; (print (setf last-left left-solution) *trace-output*)
778
;; (print (setf last-right right-solution) *trace-output*)
779
(finish-output *trace-output*)))
780
(next-solution-location ()
781
;; return a page (possible newly created) and the next free location in that page
782
(when (>= (incf result-index) result-page-length)
783
(when result-page (put-page result-page))
784
(setf result-page (new-field-page destination result-page-length result-page-width)
786
(complete-solutions ()
787
(when (shiftf completed t)
788
(error "already completed"))
789
(clrhash right-cache)
791
(trace-algebra join.n "~a: complete: (~10s ~10s)/~10s"
792
source-dimensions left-solution-count right-solution-count result-count)
794
(let ((page-result-count (1+ result-index)))
795
(when (< page-result-count result-page-length)
797
(adjust-page result-page (list page-result-count result-page-width)))))
798
(put-page result-page))
799
(complete-field destination)
800
(incf-stat *solutions-processed* (+ left-solution-count right-solution-count))
801
(incf-stat *solutions-constructed* result-count)
802
(return-from process-natural-join (values left-solution-count right-solution-count result-count)))
804
(trace-data process-natural-join.put-page destination ',result-dimensions page)
805
#+trace-solution-count
806
(when (shiftf pns nil)
807
(bt:with-lock-held (*page-cache-lock*)
808
(format *trace-output* "~% ~a join.nput (~10s ~10s)/~10s -> ~s"
809
uuid left-solution-count right-solution-count result-count (channel-solution-count destination))
810
(finish-output *trace-output*)))
811
(put-field-page destination page)))
812
;; test initial boundary conditions
813
(unless (and (plusp result-page-length) (or (null ,end) (> ,end ,start))) (complete-solutions))
814
(trace-algebra join.n "~a" source-dimensions)
815
(loop (check-query-status *query*)
816
#+trace-solution-count
817
(when (> (+ left-solution-count right-solution-count) next-count)
819
(bt:with-lock-held (*page-cache-lock*)
820
(format *trace-output* "~% ~a join.n... (~10s ~10s)/~10s/(~10s ~10s) <- ~d/~d"
821
uuid left-solution-count right-solution-count result-count
822
(hash-table-count left-cache)
823
(hash-table-count right-cache)
824
(channel-solution-count left-source) (channel-solution-count right-source))
825
(finish-output *trace-output*))
826
(incf next-count 1000000))
827
(unless left-complete
828
(let ((left-page (get-field-page left-source)))
830
(assert (= left-width (array-dimension left-page 1)) ()
831
"njg: Invalid left page width (expected ~a): ~a."
832
left-width (array-dimension left-page 1))
834
(incf left-solution-count (array-dimension left-page 0))
835
(trace-algebra join.n "lftpage: ~a" left-page)
836
;;(format *trace-output* "~&l.~a.~a.~a~%" left-count left-solution-count (hash-table-count left-cache))
838
`(handler-bind ((error (lambda (c) (declare (ignore c)) (throw :skip nil))))
839
(dotimes (left-index (array-dimension left-page 0))
840
(multiple-value-bind (right-solutions left-solution)
842
(cache-left left-page left-index nil right-cache left-surrogate)
843
(cache-left left-page left-index left-cache right-cache))
844
(dolist (right-solution right-solutions)
845
(catch :skip (collect-solution left-solution right-solution))))))
846
`(dotimes (left-index (array-dimension left-page 0))
847
(multiple-value-bind (right-solutions left-solution)
849
(cache-left left-page left-index nil right-cache left-surrogate)
850
(cache-left left-page left-index left-cache right-cache))
851
(dolist (right-solution right-solutions)
852
(collect-solution left-solution right-solution)))))
853
;; (print (list :left-page ',left-dimensions (term-value-field left-page)))
854
(release-field-page left-source left-page))
856
;; empty solutionfield entails empty result
857
(trace-algebra join.n "~a: lft: null" source-dimensions)
858
(complete-solutions))
859
((setf left-complete t)
860
(trace-algebra join.n "~a: lft: ~10@s/~10s ~10@s/~10s"
862
left-solution-count result-count
863
(hash-table-count left-cache) (hash-table-count right-cache))
864
(clrhash right-cache)))))
865
(unless right-complete
866
(let ((right-page (get-field-page right-source)))
868
(assert (= right-width (array-dimension right-page 1)) ()
869
"njg: Invalid right page width (expected ~a): ~a, ~a."
870
right-width (array-dimension right-page 1) (array-dimensions right-page))
872
(incf right-solution-count (array-dimension right-page 0))
873
(trace-algebra join.n "rgtpage: ~a" right-page)
874
;;(format *trace-output* "~&r.~a.~a.~a~%" right-count right-solution-count (hash-table-count right-cache))
876
`(handler-bind ((error (lambda (c) (declare (ignore c)) (throw :skip nil))))
877
(dotimes (right-index (array-dimension right-page 0))
878
(multiple-value-bind (left-solutions right-solution)
880
(cache-right right-page right-index nil left-cache right-surrogate)
881
(cache-right right-page right-index right-cache left-cache))
882
(dolist (left-solution left-solutions)
883
(catch :skip (collect-solution left-solution right-solution))))))
884
`(dotimes (right-index (array-dimension right-page 0))
885
(multiple-value-bind (left-solutions right-solution)
887
(cache-right right-page right-index nil left-cache right-surrogate)
888
(cache-right right-page right-index right-cache left-cache))
889
(dolist (left-solution left-solutions)
890
(collect-solution left-solution right-solution)))))
891
;; (print (list :right-page ',right-dimensions (term-value-field right-page)))
892
(release-field-page right-source right-page))
894
;; empty solutionfield entails empty result
895
(trace-algebra join.n "~a: rgt: null" source-dimensions)
896
(complete-solutions))
897
((setf right-complete t)
898
(trace-algebra join.n "~a: rgt: ~10@s/~10s ~10@s/~10s"
900
right-solution-count result-count
901
(hash-table-count left-cache) (hash-table-count right-cache))
902
(clrhash left-cache)))))
903
(when (and left-complete right-complete) (return)))
904
(complete-solutions)))))))
905
(values (spocq-compile lambda) lambda)))
907
(:documentation "Parallel join algorithm"
908
"The algorithm comprises two distinct phases:
909
- distribution into left and right substreams based on key hash
910
- combination of each substream based on (key x property-vector)
911
- merging the result joined fields
913
The first phase uses the key to (redundantly) compute the has key, the low n bits of which designate the partition.
914
The second phase peforms the same operations a single-process join, but expects the streams to already contain the
915
(key x property-vector) delegates. The thrids phase is just a stable-order merge of the streams, witht he effect
916
the athe result field solutions are always the same order.")
918
#+(or) ; must be re-written for ring-buffer channels
919
(defun process-mp-natural-join (core-function destination left-source right-source left-dimensions right-dimensions
920
&key &allow-other-keys)
921
"Parallel-process a natural join by distributing the source streams according to their key hash values
922
and collating the result streams."
924
(assert-argument-types process-mp-natural-join
925
(left-source (or channel function))
926
(left-source (or channel function)))
928
(multiple-value-bind (left-hash-generator right-hash-generator left-solution-collector right-solution-collector)
929
(compute-mp-join-aspects left-dimensions right-dimensions)
930
(let* ((thread-count (algebra-operator-thread-count))
931
(thread-id-mask (- thread-count 1))
932
(left-channels (make-array thread-count))
933
(right-channels (make-array thread-count))
934
(result-channels (make-array thread-count))
935
(threads (make-array thread-count))
939
(thread-solution-counts (make-array thread-count))
940
(left-solution-mediators (make-array thread-count))
941
(right-solution-mediators (make-array thread-count))
942
(left-solution-completors (make-array thread-count))
943
(right-solution-completors (make-array thread-count))
944
(left-page-width (length left-dimensions))
945
(right-page-width (length right-dimensions))
946
(result-page-length *field-page-length*)
949
#+(or) (test-lock (bt:make-lock )))
951
;; activate the partitions threads
952
(dotimes (thread-id thread-count)
953
(let* ((left-channel (setf (aref left-channels thread-id) (make-channel :dimensions left-dimensions)))
954
(right-channel (setf (aref right-channels thread-id) (make-channel :dimensions right-dimensions)))
955
(result-channel (setf (aref result-channels thread-id) (make-channel :dimensions (channel-dimensions destination)))))
956
(setf (aref threads thread-id)
957
(bt:make-thread #'(lambda ()
958
(funcall core-function result-channel left-channel right-channel))
959
:name (format nil "mp-join-~d" thread-id)
960
:initial-bindings (acons '*query* *query*
961
(acons '*task* *task*
962
(acons '*repository* *repository* ())))))
964
(index result-page-length))
965
(flet ((left-solution-mediator (base-page base-index)
966
(when (>= (incf index) result-page-length)
967
(when page (put-field-page left-channel page))
968
(setf page (new-field-page left-channel result-page-length left-page-width)
970
(funcall left-solution-collector page index base-page base-index))
971
(left-solution-completor ()
973
(when (< (incf index) result-page-length)
974
(setf page (adjust-page page (list index left-page-width))))
975
(channel-put left-channel page))
976
(complete-field left-channel)))
977
(setf (aref left-solution-mediators thread-id) #'left-solution-mediator)
978
(setf (aref left-solution-completors thread-id) #'left-solution-completor)))
980
(index result-page-length))
981
(flet ((right-solution-mediator (base-page base-index)
982
(when (>= (Incf index) result-page-length)
983
(when page (put-field-page right-channel page))
984
(setf page (new-field-page right-channel result-page-length right-page-width)
986
(funcall right-solution-collector page index base-page base-index))
987
(right-solution-completor ()
989
(when (< (incf index) result-page-length)
990
(setf page (adjust-page page (list index right-page-width))))
991
(channel-put right-channel page))
992
(complete-field right-channel)))
993
(setf (aref right-solution-mediators thread-id) #'right-solution-mediator)
994
(setf (aref right-solution-completors thread-id) #'right-solution-completor)))))
996
;; collate the results
998
(bt:make-thread #'(lambda ()
999
(let ((result-channels-complete (make-array (length result-channels) :initial-element nil)))
1000
(loop (when (every #'identity result-channels-complete) (return))
1001
(loop for i from 0 for result-channel across result-channels
1002
for channel-complete across result-channels-complete
1003
unless channel-complete
1004
do (let ((page (channel-get result-channel)))
1006
(put-field-page destination page)
1007
(setf (aref result-channels-complete i) t)))))
1008
(put-field-page destination nil)))
1009
:name "mp-results"))
1011
(flet ((distribute-left-solution (page index)
1012
(let* ((hash (funcall left-hash-generator page index))
1013
(id (logand (ash hash -4) thread-id-mask)))
1014
(funcall (aref left-solution-mediators id) page index)))
1015
(distribute-right-solution (page index)
1016
(let* ((hash (funcall right-hash-generator page index))
1017
(id (logand (ash hash -4) thread-id-mask)))
1018
(funcall (aref right-solution-mediators id) page index))))
1019
;; distribute the solutions
1020
(loop (check-query-status *query*)
1021
(when (and *solution-count-limit* (> result-count *solution-count-limit*))
1022
(log-warn "join: terminated @~a solutions." solution-count)
1023
(terminate-task *query*)
1025
(unless left-complete
1026
(let ((left-page (get-field-page left-source)))
1028
(assert (= left-page-width (array-dimension left-page 1)) ()
1029
"Invalid left page width (expected ~a): ~a."
1030
left-page-width (array-dimension left-page 1))
1031
(trace-data mp-distribute-left left-page (term-value-field left-page))
1032
(incf solution-count (array-dimension left-page 0))
1034
(dotimes (index (array-dimension left-page 0))
1035
(distribute-left-solution left-page index))
1036
(release-page left-page))
1037
((setf left-complete t)))))
1038
(unless right-complete
1039
(let ((right-page (get-field-page right-source)))
1041
(assert (= right-page-width (array-dimension right-page 1)) ()
1042
"Invalid right page width (expected ~a): ~a."
1043
right-page-width (array-dimension right-page 1))
1044
(trace-data mp-distribute-right right-page (term-value-field right-page))
1045
(incf solution-count (array-dimension right-page 0))
1047
(dotimes (index (array-dimension right-page 0))
1048
(distribute-right-solution right-page index))
1049
(release-page right-page))
1050
((setf right-complete t)))))
1051
(when (and left-complete right-complete) (return))))
1053
(map nil #'funcall left-solution-completors)
1054
(map nil #'funcall right-solution-completors)
1055
(bt:join-thread result-thread)
1056
(map nil #'bt:join-thread threads)
1057
(let ((result-count 0) (left-count 0) (right-count 0))
1058
(loop for counts across thread-solution-counts
1059
do (progn (incf result-count (aref counts 0))
1060
(incf left-count (aref counts 1))
1061
(incf right-count (aref counts 2))))
1062
(incf-stat *solutions-processed* (+ left-count right-count))
1063
(incf-stat *solutions-constructed* result-count)
1064
(values (+ left-count right-count) result-count)))))
1067
(defun process-mp-natural-join (core-function destination left-source right-source left-dimensions right-dimensions
1068
&key &allow-other-keys)
1069
"Parallel-process a natural join by distributing the source streams according to their key hash values
1070
and collating the result streams."
1072
(declare (type function left-source right-source))
1073
(multiple-value-bind (left-hash-generator right-hash-generator left-solution-collector right-solution-collector)
1074
(compute-mp-join-aspects left-dimensions right-dimensions)
1075
(let* ((thread-count (algebra-operator-thread-count))
1076
(thread-id-mask (- thread-count 1))
1077
(left-channels (make-array thread-count))
1078
(right-channels (make-array thread-count))
1079
(result-channels (make-array thread-count))
1080
(threads (make-array thread-count))
1082
(right-complete nil)
1084
(thread-solution-counts (make-array thread-count))
1085
(left-solution-mediators (make-array thread-count))
1086
(right-solution-mediators (make-array thread-count))
1087
(left-solution-completors (make-array thread-count))
1088
(right-solution-completors (make-array thread-count))
1089
(left-page-width (length left-dimensions))
1090
(right-page-width (length right-dimensions))
1091
(result-page-length *field-page-length*)
1094
#+(or) (test-lock (bt:make-lock )))
1096
;; activate the partitions threads
1097
(dotimes (thread-id thread-count)
1098
(let* ((left-channel (setf (aref left-channels thread-id) (make-channel)))
1099
(right-channel (setf (aref right-channels thread-id) (make-channel)))
1100
(result-channel (setf (aref result-channels thread-id) (make-channel))))
1101
(setf (aref threads thread-id)
1102
(bt:make-thread #'(lambda ()
1103
(funcall core-function result-channel left-channel right-channel))
1104
:name (format nil "mp-join-~d" thread-id)
1105
:initial-bindings (acons '*query* *query*
1106
(acons '*task* *task*
1107
(acons '*repository* *repository* ())))))
1109
(index result-page-length))
1110
(flet ((left-solution-mediator (base-page base-index)
1111
(when (>= (incf index) result-page-length)
1112
(when page (channel-put left-channel page))
1113
(setf page (new-field-page destination result-page-length result-page-width)
1115
(funcall left-solution-collector page index base-page base-index))
1116
(left-solution-completor ()
1118
(when (< (incf index) result-page-length)
1119
(setf page (adjust-page page (list index left-page-width))))
1120
(channel-put left-channel page))
1121
(complete-field left-channel)))
1122
(setf (aref left-solution-mediators thread-id) #'left-solution-mediator)
1123
(setf (aref left-solution-completors thread-id) #'left-solution-completor)))
1125
(index result-page-length))
1126
(flet ((right-solution-mediator (base-page base-index)
1127
(when (>= (Incf index) result-page-length)
1128
(when page (channel-put right-channel page))
1129
(setf page (new-field-page destination result-page-length result-page-width)
1131
(funcall right-solution-collector page index base-page base-index))
1132
(right-solution-completor ()
1134
(when (< (incf index) result-page-length)
1135
(setf page (adjust-page page (list index right-page-width))))
1136
(channel-put right-channel page))
1137
(complete-field right-channel)))
1138
(setf (aref right-solution-mediators thread-id) #'right-solution-mediator)
1139
(setf (aref right-solution-completors thread-id) #'right-solution-completor)))))
1141
;; collate the results
1143
(bt:make-thread #'(lambda ()
1144
(let ((result-channels-complete (make-array (length result-channels) :initial-element nil)))
1145
(loop (when (every #'identity result-channels-complete) (return))
1146
(loop for i from 0 for result-channel across result-channels
1147
for channel-complete across result-channels-complete
1148
unless channel-complete
1149
do (let ((page (channel-get result-channel)))
1151
(put-field-page destination page)
1152
(setf (aref result-channels-complete i) t)))))
1153
(put-field-page destination nil)))
1154
:name "mp-results"))
1156
(flet ((distribute-left-solution (page index)
1157
(let* ((hash (funcall left-hash-generator page index))
1158
(id (logand (ash hash -4) thread-id-mask)))
1159
(funcall (aref left-solution-mediators id) page index)))
1160
(distribute-right-solution (page index)
1161
(let* ((hash (funcall right-hash-generator page index))
1162
(id (logand (ash hash -4) thread-id-mask)))
1163
(funcall (aref right-solution-mediators id) page index))))
1164
;; distribute the solutions
1165
(loop (check-query-status *query*)
1166
(when (and *solution-count-limit* (> solution-count *solution-count-limit*))
1167
(log-warn "join: terminated @~a solutions." solution-count)
1168
(terminate-task *query*)
1170
(unless left-complete
1171
(let ((left-page (get-field-page left-source)))
1173
(assert (= left-page-width (array-dimension left-page 1)) ()
1174
"Invalid left page width (expected ~a): ~a."
1175
left-page-width (array-dimension left-page 1))
1176
(trace-data mp-distribute-left left-page (term-value-field left-page))
1177
(incf solution-count (array-dimension left-page 0))
1179
(dotimes (index (array-dimension left-page 0))
1180
(distribute-left-solution left-page index)))
1181
((setf left-complete t)))))
1182
(unless right-complete
1183
(let ((right-page (get-field-page right-source)))
1185
(assert (= right-page-width (array-dimension right-page 1)) ()
1186
"Invalid right page width (expected ~a): ~a."
1187
right-page-width (array-dimension right-page 1))
1188
(trace-data mp-distribute-right right-page (term-value-field right-page))
1189
(incf solution-count (array-dimension right-page 0))
1191
(dotimes (index (array-dimension right-page 0))
1192
(distribute-right-solution right-page index)))
1193
((setf right-complete t)))))
1194
(when (and left-complete right-complete) (return))))
1195
(when *join.verbose*
1196
(print (list :threads threads
1197
:left-solution-mediators left-solution-mediators
1198
:right-solution-mediators right-solution-mediators
1199
:left-solution-completors left-solution-completors
1200
:right-solution-completors right-solution-completors
1201
:result-thread result-thread)))
1203
(map nil #'funcall left-solution-completors)
1204
(map nil #'funcall right-solution-completors)
1205
(bt:join-thread result-thread)
1206
(map nil #'bt:join-thread threads)
1207
(let ((result-count 0) (left-count 0) (right-count 0))
1208
(loop for counts across thread-solution-counts
1209
do (progn (incf result-count (aref counts 0))
1210
(incf left-count (aref counts 1))
1211
(incf right-count (aref counts 2))))
1212
(incf-stat *solutions-processed* (+ left-count right-count))
1213
(incf-stat *solutions-constructed* result-count)
1214
(values (+ left-count right-count) result-count)))))
1217
(defun compute-mp-join-aspects (left-dimensions right-dimensions)
1218
(let ((key-dimensions (intersect-dimensions left-dimensions right-dimensions)))
1219
(values (compute-key-generator key-dimensions left-dimensions)
1220
(compute-key-generator key-dimensions right-dimensions)
1221
(compute-unary-collector left-dimensions left-dimensions)
1222
(compute-unary-collector right-dimensions right-dimensions))))
1225
(defun process-left-dominant-join (destination left-source right-source left-dimensions right-dimensions
1226
&key test (start 0) end
1227
(left-sort-dimensions ()) (right-sort-dimensions ()))
1228
"Generate a stream of join solutions to a destination given left and right solution sources and dimension lists for
1229
the left and right solutions fields. Invoke the source function repeatedly to obtain a stream of
1230
(dimension . solution-page) pairs until the result page is null. Use the dimensions to distinguish left from right.
1231
Recognize the left side as dominant and cache its content before proceeding with the join.
1232
Each must be a unique list. Combine the respective source either as a natural or a cross join depending
1233
on whether they share any dimensions. In the former case, hash and cache solutions based on the shared
1236
(declare ;(optimize (speed 3) (safety 0))
1237
(ignore left-sort-dimensions right-sort-dimensions)
1239
(assert-argument-types process-join
1240
(destination (or channel function))
1241
(left-source (or channel function))
1242
(right-source (or channel function))
1243
(left-dimensions list)
1244
(right-dimensions list)
1246
(end (or null (integer 0))))
1247
(incf-stat *algebra-operations*)
1248
(trace-algebra process-join left-source right-source left-dimensions right-dimensions
1249
:start start :end end :test test)
1251
(multiple-value-bind (type result-dimensions left-cache-operator right-cache-operator cross-predicate cross-collector)
1252
(compute-join-operators left-dimensions right-dimensions test)
1253
(declare (type (function (array fixnum array fixnum array fixnum) t) cross-collector))
1254
(let* ((result-page-width (channel-page-width destination))
1255
(result-page-length (channel-page-length destination))
1257
(result-index result-page-length)
1258
(left-width (length left-dimensions))
1259
(right-width (length right-dimensions))
1260
(left-cache (ecase type
1261
(:natural (make-term-id-cache :single-thread t))
1263
(left-cache-end nil)
1264
(right-surrogate (make-array right-width :element-type 'fixnum))
1268
(natural-collector (compute-binary-collector-spread result-dimensions left-dimensions right-dimensions))
1269
(natural-predicate (when test (compute-binary-predicate-spread result-dimensions left-dimensions right-dimensions
1270
:handle-errors nil))))
1271
(assert (= (length result-dimensions) result-page-width) ()
1272
"Channel and operation dimensions do not match: ~a: ~a." destination result-dimensions)
1273
(labels ((left-natural-processor (left-page)
1274
(assert (= left-width (array-dimension left-page 1)) ()
1275
"ldj: Invalid left page width (expected ~a/~a): ~a."
1276
left-width left-dimensions (array-dimension left-page 1))
1277
(dotimes (left-index (array-dimension left-page 0))
1278
(funcall left-cache-operator left-page left-index left-cache nil)))
1279
(right-natural-processor (right-page)
1280
(assert (= right-width (array-dimension right-page 1)) ()
1281
"ldj: Invalid right page width (expected ~a/~a): ~a."
1282
right-width right-dimensions (array-dimension right-page 1))
1284
(handler-bind ((error (lambda (c) (declare (ignore c)) (throw :skip nil))))
1285
(dotimes (right-index (array-dimension right-page 0))
1286
(dolist (left-solution (funcall right-cache-operator right-page right-index nil left-cache right-surrogate))
1287
(catch :skip (collect-natural-solution left-solution right-surrogate)))))
1288
(dotimes (right-index (array-dimension right-page 0))
1289
(dolist (left-solution (funcall right-cache-operator right-page right-index nil left-cache right-surrogate))
1290
(collect-natural-solution left-solution right-surrogate)))))
1292
(left-cross-processor (left-page)
1293
(assert (= left-width (array-dimension left-page 1)) ()
1294
"ldj: Invalid left page width (expected ~a/~a): ~a."
1295
left-width left-dimensions (array-dimension left-page 1))
1296
(let ((cell (cons left-page nil)))
1298
(setf (rest left-cache-end) cell)
1299
(setf left-cache cell))
1300
(setf left-cache-end cell)))
1301
(right-cross-processor (right-page)
1302
(assert (= right-width (array-dimension right-page 1)) ()
1303
"ldj: Invalid right page width (expected ~a/~a): ~a."
1304
right-width right-dimensions (array-dimension right-page 1))
1306
(handler-bind ((error (lambda (c) (declare (ignore c)) (throw :skip nil))))
1307
(loop for left-page in left-cache
1308
do (dotimes (left-index (array-dimension left-page 0))
1309
(dotimes (right-index (array-dimension right-page 0))
1310
(catch :skip (collect-cross-solution left-page left-index right-page right-index))))))
1311
(loop for left-page in left-cache
1312
do (dotimes (left-index (array-dimension left-page 0))
1313
(dotimes (right-index (array-dimension right-page 0))
1314
(collect-cross-solution left-page left-index right-page right-index))))))
1316
(collect-natural-solution (left-solution right-solution)
1317
(when (and (or (null natural-predicate)
1318
(funcall natural-predicate left-solution right-solution))
1319
(> (incf result-count) start))
1320
(when (and *solution-count-limit* (> result-count *solution-count-limit*))
1321
(log-warn "join: terminated @~a solutions." result-count)
1322
(terminate-task *query*)
1323
(complete-solutions))
1324
(next-solution-location)
1325
(funcall natural-collector result-page result-index left-solution right-solution)
1326
(when (and end (>= result-count end)) (complete-solutions))))
1327
(collect-cross-solution (left-page left-index right-page right-index)
1328
(when (and (or (null cross-predicate)
1329
(funcall cross-predicate left-page left-index right-page right-index))
1330
(> (incf result-count) start))
1331
(when (and *solution-count-limit* (> result-count *solution-count-limit*))
1332
(log-warn "join: terminated @~a solutions." result-count)
1333
(terminate-task *query*)
1334
(complete-solutions))
1335
(next-solution-location)
1336
(funcall cross-collector result-page result-index left-page left-index right-page right-index)
1337
(when (and end (>= result-count end)) (complete-solutions))))
1338
(next-solution-location ()
1339
;; return a page (possible newly created) and the next free location in that page
1340
(when (>= (incf result-index) result-page-length)
1341
(when result-page (put-page result-page))
1342
(setf result-page (new-field-page destination result-page-length result-page-width)
1344
(complete-solutions ()
1346
(let ((page-result-count (1+ result-index)))
1347
(when (< page-result-count result-page-length)
1349
(adjust-page result-page (list page-result-count result-page-width)))))
1350
(put-page result-page))
1352
(incf-stat *solutions-processed* solution-count)
1353
(incf-stat *solutions-constructed* result-count)
1354
(return-from process-left-dominant-join
1355
(values solution-count result-count)))
1357
(trace-data process-join.put-data destination result-dimensions (term-value-field page))
1359
(put-field-page destination page)
1360
(complete-field destination))))
1361
(unless (and (plusp result-page-length) (or (null end) (> end start))) (complete-solutions))
1362
;; (setq *old-saved-pages* *saved-pages*)
1363
;; (setq *saved-pages* nil)
1364
(loop for left-page = (get-field-page left-source)
1365
until (null left-page)
1366
do (progn (check-query-status *query*)
1367
(incf solution-count (array-dimension left-page 0))
1369
(let ((cache-page (copy-page left-page)))
1370
(release-field-page left-source left-page)
1372
(:natural (left-natural-processor cache-page))
1373
(:cross (left-cross-processor cache-page))))))
1374
(trace-data process-left-dominant-join.left-count
1375
(typecase left-cache (vector (length left-cache)) (hash-table (hash-table-count left-cache))))
1376
(loop for right-page = (get-field-page right-source)
1377
until (null right-page)
1378
do (progn (check-query-status *query*)
1379
(incf solution-count (array-dimension right-page 0))
1382
(:natural (right-natural-processor right-page))
1383
(:cross (right-cross-processor right-page)))
1384
(release-field-page right-source right-page)))
1387
(cons (list :*query-maximum-threads* *query-maximum-threads*
1388
:*query-operator-thread-threshold* *query-operator-thread-threshold*)
1389
(reverse *saved-pages*)))
1390
(complete-solutions)))))
1392
(defun process-right-dominant-join (destination left-source right-source left-dimensions right-dimensions
1394
(declare (dynamic-extent args))
1395
(apply #'process-left-dominant-join destination right-source left-source right-dimensions left-dimensions
1402
(compute-cross-join-operator '(?::|inproc| ?::|name2| ?::|person2|)
1403
'(?::|article| ?::|name| ?::|person|)
1404
'(ORG.DATAGRAPH.SPOCQ.ALGEBRA:= ?::|name| ?::|name2|)
1408
(compute-natural-join-operator '(?::|inproc| ?::|name2| ?::|person|)
1409
'(?::|article| ?::|name| ?::|person|)
1410
'(ORG.DATAGRAPH.SPOCQ.ALGEBRA:= ?::|name| ?::|name2|)
1414
#+(or) ;; for use as merge op
1415
(defun compute-natural-join-operator (left-dimensions right-dimensions test start end &key &allow-other-keys)
1416
"Given the respective dimensions and predicate and/or slice constraints, generate an operator
1417
which accepts the destination and two sources and performs a merge join based on the identity of the
1420
(let* ((result-dimensions (union-dimensions left-dimensions right-dimensions))
1422
`(lambda (destination left-source right-source)
1423
(declare (optimize (speed 3) (safety 0)))
1424
(trace-algebra process-join left-source right-source ',left-dimensions ',right-dimensions
1425
:start ,start :end ,end :test ',test)
1428
(let* ((result-page-width (channel-page-width destination))
1429
(result-page-length (channel-page-length destination))
1431
(result-index result-page-length)
1432
(left-width ,(length left-dimensions))
1433
(right-width ,(length right-dimensions))
1434
(left-cache (make-term-id-cache :single-thread t))
1435
(right-cache (make-term-id-cache :single-thread t))
1437
(left-solution-count 0)
1438
(right-solution-count 0)
1440
(right-complete nil)
1441
#+trace-solution-count (next-count 0)
1442
#+trace-solution-count (uuid (uuid:make-v1-uuid))
1443
#+trace-solution-count (pns nil)
1444
(left-surrogate (make-array left-width :element-type 'fixnum))
1445
(right-surrogate (make-array right-width :element-type 'fixnum)))
1446
(declare (type fixnum result-index result-count left-solution-count right-solution-count)
1447
(type hash-table left-cache right-cache))
1448
(assert (= ,(length result-dimensions) result-page-width) ()
1449
"Channel and operation dimensions do not match: ~a: ~a." destination ',result-dimensions)
1450
(labels (,@(when test
1451
(let ((lambda (compute-binary-predicate-lambda-spread test left-dimensions right-dimensions :handle-errors nil)))
1452
`((test ,@(rest lambda)))))
1453
,(let ((lambda (compute-binary-collector-lambda-spread result-dimensions left-dimensions right-dimensions)))
1454
`(collect ,@(rest lambda)))
1455
(cache-left ,@(rest (compute-binary-cache-op-lambda-spread left-dimensions right-dimensions)))
1456
(cache-right ,@(rest (compute-binary-cache-op-lambda-spread right-dimensions left-dimensions)))
1457
(collect-solution (left-solution right-solution)
1460
`(when (and (test left-solution right-solution)
1461
,@(when start `((> result-count ,start))))
1462
(next-solution-location)
1463
(collect result-page result-index left-solution right-solution)
1464
,@(when end `((when (>= result-count ,end)
1465
(trace-algebra process-join.end :test+bounded result-count)
1466
(complete-solutions)))))
1467
`(,@(if start `(when (> result-count ,start)) '(progn))
1468
(next-solution-location)
1469
(collect result-page result-index left-solution right-solution)
1470
,@(when end `((when (>= result-count ,end)
1471
(trace-algebra process-join.end :bounded result-count)
1472
(complete-solutions)))))))
1473
(next-solution-location ()
1474
;; return a page (possible newly created) and the next free location in that page
1475
(when (>= (incf result-index) result-page-length)
1476
(when result-page (put-page result-page))
1477
(setf result-page (new-field-page destination result-page-length result-page-width)
1479
(complete-solutions ()
1480
(clrhash right-cache)
1481
(clrhash left-cache)
1482
#+trace-solution-count
1483
(bt:with-lock-held (*page-cache-lock*)
1484
(format *trace-output* "completed ***** ~a (~10s ~10s)/~s~%"
1485
uuid left-solution-count right-solution-count result-count)
1487
(finish-output *trace-output*))
1489
(let ((page-result-count (1+ result-index)))
1490
(when (< page-result-count result-page-length)
1492
(adjust-page result-page (list page-result-count result-page-width)))))
1493
(put-page result-page))
1495
(incf-stat *solutions-processed* (+ left-solution-count right-solution-count))
1496
(incf-stat *solutions-constructed* result-count)
1497
(return-from process-join (values result-count left-solution-count right-solution-count)))
1499
(trace-data process-join.put-page destination ',result-dimensions page)
1500
#+trace-solution-count
1501
(when (shiftf pns nil)
1502
(bt:with-lock-held (*page-cache-lock*)
1503
(format *trace-output* "~% ~a join.nput (~10s ~10s)/~10s -> ~s"
1504
uuid left-solution-count right-solution-count result-count
1505
(channel-solution-count destination))
1507
(finish-output *trace-output*)))
1508
(put-field-page destination page)))
1509
(unless (and (plusp result-page-length) (or (null ,end) (> ,end ,start))) (complete-solutions))
1510
(bt:with-lock-held (*page-cache-lock*)
1511
(format *trace-output* "~% join.n ~s/~s/ ~a"
1515
(loop (check-query-status *query*)
1516
#+trace-solution-count
1517
(when (> (+ left-solution-count right-solution-count) next-count)
1519
(bt:with-lock-held (*page-cache-lock*)
1520
(format *trace-output* "~% ~a join.n... (~10s ~10s)/~10s ~10s/~10s <- ~d/~d"
1521
uuid left-solution-count right-solution-count result-count
1522
(hash-table-count left-cache) (hash-table-count right-cache)
1523
(channel-solution-count left-source) (channel-solution-count right-source))
1525
(finish-output *trace-output*))
1526
(incf next-count 1000000))
1527
(when (and *solution-count-limit* (> result-count *solution-count-limit*))
1528
(log-warn "join: terminated @(~a ~a)/~a/~a/~a solutions."
1529
left-solution-count right-solution-count (hash-table-count left-cache) (hash-table-count right-cache) result-count)
1530
(terminate-task *query*)
1531
(complete-solutions))
1532
(unless left-complete
1533
(let ((left-page (get-field-page left-source)))
1535
(assert (= left-width (array-dimension left-page 1)) ()
1536
"Invalid left page width (expected ~a): ~a."
1537
left-width (array-dimension left-page 1))
1538
(incf left-solution-count (array-dimension left-page 0))
1540
`(handler-bind ((error (lambda (c) (declare (ignore c)) (throw :skip nil))))
1541
(dotimes (left-index (array-dimension left-page 0))
1542
(multiple-value-bind (right-solutions left-solution)
1544
(cache-left left-page left-index nil right-cache left-surrogate)
1545
(cache-left left-page left-index left-cache right-cache))
1546
(dolist (right-solution right-solutions)
1547
(catch :skip (collect-solution left-solution right-solution))))))
1548
`(dotimes (left-index (array-dimension left-page 0))
1549
(multiple-value-bind (right-solutions left-solution)
1551
(cache-left left-page left-index nil right-cache left-surrogate)
1552
(cache-left left-page left-index left-cache right-cache))
1553
(dolist (right-solution right-solutions)
1554
(collect-solution left-solution right-solution)))))
1555
(release-page left-page))
1556
((setf left-complete t)
1557
#+trace-solution-count
1558
(bt:with-lock-held (*page-cache-lock*)
1559
(format *trace-output* "~% ~a join.nlft ~10s/~10s ~10s/~10s" uuid left-solution-count result-count
1560
(hash-table-count left-cache)
1561
(hash-table-count right-cache))
1562
(finish-output *trace-output*))
1563
(clrhash right-cache)))))
1564
(unless right-complete
1565
(let ((right-page (get-field-page right-source)))
1567
(assert (= right-width (array-dimension right-page 1)) ()
1568
"Invalid right page width (expected ~a): ~a."
1569
right-width (array-dimension right-page 1))
1570
(incf right-solution-count (array-dimension right-page 0))
1572
`(handler-bind ((error (lambda (c) (declare (ignore c)) (throw :skip nil))))
1573
(dotimes (right-index (array-dimension right-page 0))
1574
(multiple-value-bind (left-solutions right-solution)
1576
(cache-right right-page right-index nil left-cache right-surrogate)
1577
(cache-right right-page right-index right-cache left-cache))
1578
(dolist (left-solution left-solutions)
1579
(catch :skip (collect-solution left-solution right-solution))))))
1580
`(dotimes (right-index (array-dimension right-page 0))
1581
(multiple-value-bind (left-solutions right-solution)
1583
(cache-right right-page right-index nil left-cache right-surrogate)
1584
(cache-right right-page right-index right-cache left-cache))
1585
(dolist (left-solution left-solutions)
1586
(collect-solution left-solution right-solution)))))
1587
(release-page right-page))
1588
((setf right-complete t)
1589
#+trace-solution-count
1590
(bt:with-lock-held (*page-cache-lock*)
1591
(format *trace-output* "~% ~a join.nrgt ~10s/~10s ~10s/~10s" uuid right-solution-count result-count
1592
(hash-table-count left-cache)
1593
(hash-table-count right-cache))
1594
(finish-output *trace-output*))
1595
(clrhash left-cache)))))
1596
(when (and left-complete right-complete) (return)))
1597
(complete-solutions)))))))
1598
(values (spocq-compile lambda) lambda)))