Coverage report: /development/source/library/org/datagraph/spocq-shard/src/algebra/operators/join.lisp

KindCoveredAll%
expression3541132 31.3
branch29126 23.0
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
2
 
3
 (in-package :org.datagraph.spocq.implementation)
4
 
5
 (:documentation "sparql join operations"
6
   "The SPARQL join operation is sort of a natural join, but not quite[1,2]. In particular, an unbound
7
  dimension in a solution matches with all opposing solutions, rather than failing to match.
8
  That is, whith respect to unbound dimensions, the operator is a cross join. The literature[2,5] suggests
9
  predicting which form of join is required, but the examples - eg the bindings tests, demonstrate, that
10
  is not feasible and the correct match can depend on the individual solution.
11
 
12
  Note that, when the unbound=wildcard logic is applied to bgp matching, it would mean that a single
13
  matching statement in the bgp should produce a solution for the entire bgp - with everything else unbound.
14
 
15
  This implementation proscribes stable solution dimensions, that it the logic is 'null rejecting'[3,4] and
16
  thus always performs a natural join.
17
 
18
  One of the consequences is that the service07 test yields no solutions rather than producing the
19
  same result as the query expression in which the service form is wrapped with an optional.
20
 
21
  ---
22
  [1] http://www.w3.org/TR/sparql11-query/#defn_algCompatibleMapping
23
  [2] www.hpl.hp.com/techreports/2005/HPL-2005-170.pdf
24
  [3] www2007.org/papers/paper435.pdf
25
  [4] www.dcc.uchile.cl/~cgutierr/papers/tods09.pdf
26
  [5] https://pdfs.semanticscholar.org/129e/e5496d9b54fdfdc11214765f90d047fe57a1.pdf
27
 ")
28
 
29
 
30
 #+(or)
31
 (progn (push :trace-solution-count *features*)
32
        (load #p"LIBRARY:org;datagraph;spocq;src;algebra;operators;join.lisp")
33
        (load #p"LIBRARY:org;datagraph;spocq;src;algebra;operators;filter.lisp")
34
        (load #p"LIBRARY:org;datagraph;spocq;src;algebra;operators;distinct.lisp"))
35
 
36
 (defparameter *join-translations*
37
   '(((spocq.a:|join| ?p1 (spocq.a:|union| ?p2 ?p3))
38
      . (spocq.a:|union| (spocq.a:|join| ?p1 ?p2) (spocq.a:|join| ?p1 ?p3)))
39
     ((spocq.a:|join| (spocq.a:|union| ?p2 ?p3) ?p1)
40
      . (spocq.a:|union| (spocq.a:|join| ?p2 ?p1) (spocq.a:|join| ?p3 ?p1)))
41
     #+(or)
42
     ((spocq.a:|join| ?field (spocq.a:|table| spocq.a:|unit|))
43
      . ?field)
44
     #+(or)
45
     ((spocq.a:|join| (spocq.a:|table| spocq.a:|unit|) ?field)
46
      . ?field)))
47
 
48
 (defmacro spocq.a:|join| (&whole form field1 field2 &rest args
49
                                  &key count end offset start test)
50
   "( ( solutionField solutionField (function (solution) xsd:boolean)? ) solutionField )
51
 A JOIN form combines two solution fields and yields a new field in which
52
 each solution is either a merge of two compatible solutions from the constituent
53
 fields, in the case where a natural join is possible, or an element of the cross
54
 join of the two fields, in the case where no dimension agrees.
55
 
56
 Where a test predicate is given, all result solutions satisfy it."
57
 
58
   (declare (ignore count end offset start test)
59
            (dynamic-extent args))
60
   (or (rule-based-translator form *join-translations*)
61
       (apply #'macroexpand-join field1 field2 args)))
62
 
63
 
64
 ;;; join propagation : NYI
65
 
66
 (defparameter *join-merge-bgps* t)
67
 
68
 (defparameter *enable.service-sip* t)
69
 
70
 (defun sip-expand-service (service? sip-source)
71
   ;; if the second argument field is a service - whether directory or indirectly
72
   ;; return an expansion which combines the service as sip with the other join side
73
   (if *enable.service-sip*
74
       (sip-expand-service-operator (first service?) service? sip-source)
75
       service?))
76
 
77
 (defgeneric sip-expand-service-operator (operator service? sip-source)
78
   (:method ((operator (eql 'spocq.a:|service|)) service? sip-source)
79
     ;; intersection already tested, but just cor the complete forms
80
     (if (or (null (intersection (expression-variables service?)
81
                                 (expression-variables sip-source)))
82
             (let* ((location (second service?))
83
                    (repository-id (and (not (variable-p location)) (iri-service-repository-id location))))
84
               (and repository-id (not (parse-view-repository-id repository-id)))))
85
         ;; decline to rewrite local service repositories unless they are
86
         service?
87
         (destructuring-bind (op iri group-graph-pattern &rest args) service?
88
           (declare (ignore op))
89
           `(spocq.a::|servicejoin| ,iri ,group-graph-pattern ,sip-source ,@args))))
90
   (:method ((operator (eql 'spocq.a::|servicejoin|)) service? sip-source)
91
     (destructuring-bind (op iri group-graph-pattern join-sip-source &rest args) service?
92
       (declare (ignore op))
93
       (if (null (intersection (expression-variables group-graph-pattern)
94
                               (expression-variables sip-source)))
95
           ;; decline to rewrite
96
           service?
97
           `(spocq.a::|servicejoin| ,iri ,group-graph-pattern (spocq.a:|join| ,join-sip-source ,sip-source) ,@args))))
98
 
99
   (:method ((operator (eql 'spocq.a:|filter|)) service? sip-source)
100
     (destructuring-bind (op filter-field . rest) service?
101
       (let ((sip-expansion? (sip-expand-service filter-field sip-source)))
102
         (if (eq sip-expansion? filter-field)
103
             service?
104
             `(,op ,sip-expansion? ,@rest)))))
105
 
106
   (:method ((operator (eql 'spocq.a:|join|)) service? sip-source)
107
     (destructuring-bind (op field1 field2 . rest) service?
108
       (let ((sip-expansion? (sip-expand-service field1 sip-source)))
109
         (if (eq sip-expansion? field1)
110
             (let ((sip-expansion? (sip-expand-service field2 sip-source)))
111
               (if (eq sip-expansion? field2)
112
                   service?
113
                   `(,op ,field1 ,sip-expansion? ,@rest)))
114
             `(,op ,sip-expansion? ,field2 ,@rest)))))
115
 
116
   (:method ((operator (eql 'spocq.a:|leftjoin|)) service? sip-source)
117
     (destructuring-bind (op field1 field2 . rest) service?
118
       (let ((sip-expansion? (sip-expand-service field1 sip-source)))
119
         (if (eq sip-expansion? field1)
120
             service?
121
             `(,op ,sip-expansion? ,field2 ,@rest)))))
122
 
123
   (:method ((operator t) service? sip-source)
124
     ;; if the first was not a service clause, but the second is, swap them
125
     (if (service-form-p sip-source)
126
         (let ((service-expansion (sip-expand-service sip-source service?)))
127
           (if (eq service-expansion sip-source)
128
               service?
129
               service-expansion))
130
         service?)))
131
 
132
 (defparameter *push-values-clauses* nil)
133
 
134
 (defun reduce-values-clause (field1 field2 &optional slice)
135
   "accept two fields and examine them to determine whether they involve bindings whcich can be reduced
136
    This applies if both a bindings clauses, for which the join can be precomputed,
137
    of it one binding can be pushed in to abgp on the other field" 
138
   (flet ((extend-bgp (bgp bindings)
139
            (when (null (set-difference (expression-variables bindings) (expression-variables bgp)))
140
              (unless (find bindings bgp)
141
                (push bindings (rest bgp)))
142
              bgp)))
143
     (cond ((and (bindings-form-p field1(bindings-form-p field2))
144
                ;; pre-join constant bindings
145
                (destructuring-bind (solution-list1 dimensions1) (rest field1)
146
                  (declare (ignore solution-list1))
147
                  (destructuring-bind (solution-list2 dimensions2) (rest field2)
148
                    (declare (ignore solution-list2))
149
                    ;; cross-join iff no dimensionis either natural or dynamically bound
150
                    (unless (or (intersection dimensions1 dimensions2)
151
                                (query-values-argument *query* dimensions1)
152
                                (query-values-argument *query* dimensions2))
153
                      (cross-join-bindings field1 field2 slice)))))
154
           ((bindings-form-p field1)
155
            (let ((bgp-forms (expression-bgps field2)))
156
              (when (plusp (loop for bgp in bgp-forms
157
                             count (extend-bgp bgp field1)))
158
                (if slice
159
                    `(spocq.e:slice ,field2 ,@(rest slice))
160
                    field2))))
161
           ((bindings-form-p field2)
162
            (let ((bgp-forms (expression-bgps field1)))
163
              (when (plusp (loop for bgp in bgp-forms
164
                             count (extend-bgp bgp field2)))
165
                (if slice
166
                    `(spocq.e:slice ,field1 ,@(rest slice))
167
                    field1)))))))
168
 
169
 
170
 (defun macroexpand-join (field1 field2 &rest args)
171
   "Perform a join on the two solution fields with optional slice constraints.
172
    In order to support delayed propagation, the constituent field with the fewest dimensions is taken as
173
    a potential source by declaring its available dimensions to the other side's form. those
174
    BGPs, which contribute to that side and include one or more of those dimensions, are then
175
    compiled to accepts a field stream with initial values and the requirements are captured in
176
    the AGP instance.
177
    In order to exploit service propagation, a service as a second argument is combined into a service-join
178
    with whatever preceeded it. Alternatively, by configuration a non-service second argument with the service
179
    as the first is swapped and then retried.
180
    The respective service clause may not be immediately apparent.
181
    On one hand, filter and join argument are also examined, and found bgps are speculatively expanded to
182
    see if they yield serice operations."
183
   (destructuring-bind (&key end start test)
184
                       (apply #'canonicalize-algebra-arguments args)
185
     (let ((slice-p (or end start)))
186
       (labels ((standard-expansion ()
187
                  (let ((field1-dimensions (expression-dimensions field1))
188
                        (field2-dimensions (expression-dimensions field2)))
189
                    (cond ((null (intersection field1-dimensions field2-dimensions))
190
                           (simple-expansion))
191
                          ;; support propagation into service clauses
192
                          ((let ((sip-service? (sip-expand-service field1 field2)))
193
                             (unless (eq field1 sip-service?)
194
                               (when test
195
                                  (setf sip-service? `(spocq.a:|filter| ,sip-service? ,test)))
196
                               (when slice-p
197
                                 (setf sip-service? `(spocq.a:|slice| ,sip-service?
198
                                                              ,@(when start `(:start ,start))
199
                                                              ,@(when end `(:end ,end)))))
200
                               sip-service?)))
201
                          ((eq *join-propagation-mode* :propagate)
202
                           (if (< (length field1-dimensions) (length field2-dimensions))
203
                               (setf field2 `(locally (declare (spocq.e:base-dimensions ,@field1-dimensions))
204
                                               ,field2))
205
                               (setf field1 `(locally (declare (spocq.e:base-dimensions ,@field2-dimensions))
206
                                           ,field1)))
207
                           (simple-expansion))
208
                          (t
209
                           (simple-expansion)))))
210
                (simple-expansion ()
211
                  `(spocq.e:join ,field1 ,field2
212
                                 ,@(when test `(:test (quote ,test)))
213
                                 ,@(when end `(:end ,end))
214
                                 ,@(when start `(:start ,start)))))
215
         (cond ((and *push-values-clauses*
216
                     (reduce-values-clause field1 field2 (when slice-p `(spocq.a:|slice| :start ,start :end ,end)))))
217
               ((and *join-merge-bgps* (null test))
218
                (cond ((bgp-form-p field1)
219
                       (cond ((bgp-form-p field2)
220
                              `(spocq.a:|bgp| ,@(rest field1) ,@(rest field2)
221
                                        ,@(when slice-p `((spocq.a:|slice| :start ,start :end ,end)))))
222
                             ((and (filter-form-p field2(bgp-form-p (second field2)))
223
                              (destructuring-bind (filter-op (bgp-op . patterns) test) field2
224
                                (declare (ignore filter-op bgp-op))
225
                                `(spocq.a:|bgp| ,@(rest field1) ,@patterns
226
                                          ,@(when slice-p `((spocq.a:|slice| :start ,start :end ,end)))
227
                                          (spocq.a:|filter| ,test))))
228
                             (t
229
                              (standard-expansion))))
230
                      ((and (filter-form-p field1(bgp-form-p (second field1)))
231
                       (cond ((bgp-form-p field2)
232
                              (destructuring-bind (filter-op (bgp-op . patterns) test) field1
233
                                (declare (ignore filter-op bgp-op))
234
                                `(spocq.a:|bgp| ,@patterns ,@(rest field2)
235
                                          ,@(when slice-p `((spocq.a:|slice| :start ,start :end ,end)))
236
                                          (spocq.a:|filter| ,test))))
237
                             ((and (filter-form-p field2(bgp-form-p (second field2)))
238
                              (destructuring-bind (filter-op1 (bgp-op1 . patterns1) test1) field1
239
                                (declare (ignore filter-op1 bgp-op1))
240
                                (destructuring-bind (filter-op2 (bgp-op2 . patterns2) test2) field2
241
                                  (declare (ignore filter-op2 bgp-op2))
242
                                  `(spocq.a:|bgp| ,@patterns1 ,@patterns2
243
                                            ,@(when slice-p `((spocq.a:|slice| :start ,start :end ,end)))
244
                                            (spocq.a:|filter| ,test1)
245
                                            (spocq.a:|filter| ,test2)))))
246
                             (t
247
                              (standard-expansion))))
248
                      (t
249
                       (standard-expansion))))
250
               (t
251
                (standard-expansion)))))))
252
                  
253
 
254
 
255
 (defgeneric spocq.e:join (field1 field2 &rest args &key end start test)
256
   (:documentation "The JOIN combines solution fields to generate a field which contains
257
    a merged solution for each compatible pair.
258
    The initial evaluation process, which composes the bgps and operators as a generator tree
259
    with agp leaves, passes constituent agps upwards by registering them with respective generators
260
    and links the sources with destinations by correlating available with required dimensions.
261
    Where a match occurs, the source channel is spliced with an extra one which feeds data
262
    to the destination bgp match process.")
263
   
264
   (:method :before ((field1 t) (field2 t) &key end start test)
265
     (assert-argument-types spocq.e:join
266
       (start (or null (integer 0)))
267
       (end (or null (integer 0))))
268
     (incf-stat *algebra-operations*)
269
     (trace-algebra spocq.e:join field1 field2 
270
                    :start start :end end :test test))
271
 
272
   (:method ((field1 null-generator) (field2 solution-generator) &rest args)
273
     (declare (ignore args))
274
     (let* ((result-dimensions (union-dimensions (solution-generator-dimensions field1)
275
                                                 (solution-generator-dimensions field2)))
276
            (result-channel (make-null-channel :dimensions result-dimensions)))
277
       (process-null result-channel result-dimensions)
278
       (make-null-generator :operator 'spocq.a:|null|
279
                            :dimensions result-dimensions
280
                            :expression nil
281
                            :channel result-channel
282
                            :constituents ())))
283
 
284
   (:method ((field1 solution-generator) (field2 null-generator) &rest args)
285
     (declare (ignore args))
286
     (let* ((result-dimensions (union-dimensions (solution-generator-dimensions field1)
287
                                                 (solution-generator-dimensions field2)))
288
            (result-channel (make-null-channel :dimensions result-dimensions)))
289
       (process-null result-channel result-dimensions)
290
       (make-null-generator :operator 'spocq.a:|null|
291
                            :dimensions result-dimensions
292
                            :expression nil
293
                            :channel result-channel
294
                            :constituents ())))
295
   
296
   (:method ((field1 solution-generator) (field2 solution-generator) &rest args)
297
     (declare (dynamic-extent args))
298
     (apply 'join-generator field1 field2
299
            args)))
300
 
301
 
302
 (defparameter *join.verbose* nil)
303
 
304
 
305
 (defun join-generator (left-field-generator right-field-generator &rest args &key end start test)
306
   (declare (ignore test))
307
   ;; upon invocation use the generator argument to compute the combination operators
308
   ;; and invoke the respective binding operator with the respective solution destination
309
   (let* ((left-dimensions (solution-generator-dimensions left-field-generator))
310
          (right-dimensions (solution-generator-dimensions right-field-generator))
311
          (left-channel (solution-generator-channel left-field-generator))
312
          (right-channel (solution-generator-channel right-field-generator))
313
          (result-dimensions (union-dimensions left-dimensions right-dimensions))
314
          (result-channel (make-channel :name (cons 'spocq.a:|join| (task-id *query*))
315
                                        :dimensions result-dimensions
316
                                        :size (effective-channel-size :start start :end end)
317
                                        :page-length (effective-page-length :start start :end end)))
318
          (concrete-operator 'process-join))
319
     ;; examing dimensional dependencies to determine whether it is possible to
320
     ;; propagate solutions. if so, splice additional flow paths into the network.
321
     ;; nb. this must happen before constituent trheads start to generate solutions, that is, before
322
     ;; the join operator pr-se is invoked, so this logic must be here rather than delayed
323
     ;; until internal in the oeprator itself.  !! an alternative would be to arrange spliced
324
     ;; data flow to occur on the channel read rather than write, but that would synchronize the
325
     ;; dependents with the primary path.
326
     (when *join.verbose*
327
       (print (list left-dimensions right-dimensions)))
328
     (if (< (length left-dimensions) (length right-dimensions))
329
       (dolist (agp (solution-generator-patterns right-field-generator))
330
         (when *join.verbose*
331
           (print (list :right-to-left (and (agp-base-channel agp) (agp-base-dimensions agp))
332
                        left-dimensions)))
333
         (when (and (agp-base-channel agp)
334
                    (equal (agp-base-dimensions agp) left-dimensions))
335
           (when *join.verbose*
336
             (print (list :join.splice.right-to-left agp (agp-base-channel agp) left-channel
337
                          left-dimensions right-dimensions)))
338
           (trace-algebra join.splice.right-to-left agp (agp-base-channel agp) left-channel
339
                          left-dimensions right-dimensions)
340
           (setf concrete-operator 'process-left-dominant-join)
341
           (push (agp-base-channel agp) (channel-channels left-channel))))
342
       (dolist (agp (solution-generator-patterns left-field-generator))
343
         (when *join.verbose*
344
           (print (list :left-to-right (and (agp-base-channel agp) (agp-base-dimensions agp))
345
                        right-dimensions)))
346
         (when (and (agp-base-channel agp)
347
                    (equal (agp-base-dimensions agp) right-dimensions))
348
           (when *join.verbose*
349
             (print (list :join.splice.left-to-right agp (agp-base-channel agp) right-channel
350
                          left-dimensions right-dimensions)))
351
           (trace-algebra join.splice.left-to-right agp (agp-base-channel agp) right-channel
352
                          left-dimensions right-dimensions)
353
           (setf concrete-operator 'process-right-dominant-join)
354
           (push (agp-base-channel agp) (channel-channels right-channel)))))
355
     
356
     ;; return the binding function to the combination operator
357
     (make-solution-generator :operator 'spocq.a:|join|
358
                              :concrete-operator concrete-operator
359
                              :dimensions result-dimensions
360
                              :expression (list 'run-join-thread result-channel left-field-generator right-field-generator
361
                                                args :concrete-operator concrete-operator)
362
                              :channel result-channel
363
                              :constituents (list left-field-generator right-field-generator))))
364
 
365
 
366
 (defun run-join-thread (result-channel left-field-generator right-field-generator args
367
                                        &key (concrete-operator 'process-join))
368
   (let* ((left-dimensions (solution-generator-dimensions left-field-generator))
369
          (right-dimensions (solution-generator-dimensions right-field-generator))
370
          (left-channel (solution-generator-channel left-field-generator))
371
          (right-channel (solution-generator-channel right-field-generator))
372
          (left-expression (solution-generator-expression left-field-generator))
373
          (right-expression (solution-generator-expression right-field-generator))
374
          (*thread-operations* (cons (list* 'spocq.a:|join| (task-id *task*)
375
                                            left-dimensions right-dimensions args)
376
                                     *thread-operations*)))
377
     (when *join.verbose*
378
       (print (list :concrete-operator concrete-operator)))
379
     (query-run-in-thread *query* left-expression)
380
     (query-run-in-thread *query* right-expression)
381
     (setf (channel-size result-channel) (min (channel-size left-channel)
382
                                              (channel-size right-channel)
383
                                              (channel-size result-channel))
384
           (channel-page-length result-channel) (min (channel-page-length left-channel)
385
                                                     (channel-page-length right-channel)
386
                                                     (channel-page-length result-channel)))
387
     (apply concrete-operator result-channel left-channel right-channel
388
            left-dimensions
389
            right-dimensions
390
            :left-sort-dimensions (solution-generator-sort-dimensions left-field-generator)
391
            :right-sort-dimensions (solution-generator-sort-dimensions right-field-generator)
392
            args)
393
     'spocq.a:|join|))
394
 
395
 
396
 
397
 (defun compute-join-operators (left-dimensions right-dimensions test)
398
   (let ((result-dimensions (union-dimensions left-dimensions right-dimensions)))
399
     (if (intersection left-dimensions right-dimensions)
400
       (values :natural result-dimensions
401
               (compute-binary-cache-op left-dimensions right-dimensions)
402
               (compute-binary-cache-op right-dimensions left-dimensions)
403
               (when test
404
                 (compute-binary-predicate test left-dimensions right-dimensions))
405
               (compute-binary-collector result-dimensions
406
                                         left-dimensions right-dimensions))
407
       (values :cross result-dimensions nil nil
408
               (when test
409
                 (compute-binary-predicate test left-dimensions right-dimensions :handle-errors t))
410
               (compute-binary-collector result-dimensions
411
                                         left-dimensions right-dimensions)))))
412
 
413
 
414
 (defparameter *mp-join-p* nil)
415
 
416
 (defmethod process-join ((destination array-page-channel)
417
                          (left-source array-page-channel)
418
                          (right-source array-page-channel)
419
                          left-dimensions right-dimensions
420
                          &rest args &key (test nil t-s) offset count
421
                          (start offset) (end (when count (+ count (or start 0))))
422
                          (left-sort-dimensions ()) (right-sort-dimensions ()))
423
   "Generate a stream of join solutions to a destination given a solution source and dimension lists for
424
  the left and right solutions fields. Invoke the source function repeatedly to obtain a stream of
425
  (dimension . solution-page) pairs until the page is null. Use the dimensions to distinguish left from right.
426
  Each must be a unique list. Combine the respective source either as a natural or a cross join depending
427
  on whether they share any dimensions. In the former case, hash and cache solutions based on the shared
428
  dimensions."
429
 
430
   (assert-argument-types process-join
431
     (right-source (or channel function))
432
     (left-dimensions list)
433
     (right-dimensions list)
434
     )
435
   (unless start (when end (setf start 0)))
436
   (when end (setf end (max start end)))
437
   (when t-s
438
     (setf args (copy-list args))
439
     (remf args :test))
440
   ;; retrieve / generate the core operator
441
   (let* ((form (if (intersection left-dimensions right-dimensions)
442
                  'compute-natural-join-operator 'compute-cross-join-operator))
443
          (key (list form left-dimensions right-dimensions test start end))
444
          (function (get-aspect-cache key)))
445
     (unless function
446
       (setf function (funcall form left-dimensions right-dimensions test start end
447
                               :left-sort-dimensions left-sort-dimensions :right-sort-dimensions right-sort-dimensions)
448
             (get-aspect-cache key) function))
449
     ;; if parallel reduction is enabled, split the streams across multiple cores, otherwise
450
     ;; pass everything through just one
451
     (funcall function destination left-source right-source)
452
     #+(or)
453
     (if *mp-join-p*
454
       (if (and (eq form 'compute-natural-join-operator)
455
                (typep (algebra-operator-thread-count) '(integer 2))
456
                (not (or end (typep start '(integer 1))))
457
                (> (repository-statement-count *repository*) 10000))
458
         (process-mp-natural-join function destination left-source right-source left-dimensions right-dimensions)
459
         (funcall function destination left-source right-source))
460
       (funcall function destination left-source right-source))))
461
 
462
 
463
 #+trace-solution-count
464
 (progn
465
   (defun check-memory ()
466
     (let ((usage (sb-kernel::dynamic-usage)))
467
       (format *trace-output* "[usage: ~s]" usage)
468
       (finish-output *trace-output*)
469
       (when (> usage 2000000000)
470
         (when (> usage 2700000000)
471
           (format *trace-output* "[gc]")
472
           (sb-ext:gc))
473
         (room))))
474
   ;; (defun check-memory ())
475
   
476
   (defun log-memory ()
477
     (let ((dynamic (sb-kernel::dynamic-usage)))
478
       (format *trace-output* "[MEMORY: dynamic ~d]" dynamic)
479
       (finish-output *trace-output*)))
480
   
481
   (pushnew 'log-memory sb-ext:*after-gc-hooks*))
482
 
483
 
484
 (defun compute-cross-join-operator (left-dimensions right-dimensions test start end &key &allow-other-keys)
485
   (let* ((result-dimensions (union-dimensions left-dimensions right-dimensions))
486
          (lambda
487
            `(lambda (destination left-source right-source)
488
               (declare (optimize (speed 3) (safety 0)))
489
               (trace-algebra process-cross-join destination left-source right-source ',left-dimensions ',right-dimensions
490
                              :start ,start :end ,end :test ',test)
491
               
492
               (block process-cross-join
493
                 (let* ((result-page-width (channel-page-width destination))
494
                        (result-page-length (channel-page-length destination))
495
                        (result-page nil)
496
                        (result-index result-page-length)
497
                        (left-width ,(length left-dimensions))
498
                        (right-width ,(length right-dimensions))
499
                        (left-cache ())
500
                        (right-cache ())
501
                        (left-cache-end ())
502
                        (right-cache-end ())
503
                        (left-solution-count 0)
504
                        (right-solution-count 0)
505
                        (result-count 0)
506
                        (solution-count 0)
507
                        (left-complete nil)
508
                        (right-complete nil)
509
                        (source-dimensions (list ',left-dimensions ',right-dimensions))
510
                        #+trace-solution-count (next-count 0)
511
                        #+trace-solution-count (uuid (uuid:make-v1-uuid))
512
                        #+trace-solution-count (pns nil))
513
                   (declare (type fixnum result-index result-count solution-count))
514
                   (assert (= ,(length result-dimensions) result-page-width) ()
515
                           "Channel and operation dimensions do not match: ~a: ~a." destination ',result-dimensions)
516
                   (labels (,@(when test
517
                                (let ((lambda (compute-binary-predicate-lambda test left-dimensions right-dimensions :handle-errors nil)))
518
                                  `((test ,@(rest lambda)))))
519
                            ,(let ((lambda (compute-binary-collector-lambda result-dimensions left-dimensions right-dimensions)))
520
                               `(collect ,@(rest lambda)))
521
                            (collect-solution (left-page left-index right-page right-index)
522
                              (incf result-count)
523
                              (when (and *solution-count-limit* (> result-count *solution-count-limit*))
524
                                (spocq.e::cardinality-limit-error :operator 'cross-join
525
                                                                  :state `(:limit ,*solution-count-limit*
526
                                                                           :left-count ,left-solution-count
527
                                                                           :right-count ,right-solution-count
528
                                                                           :result-count ,result-count)
529
                                                                  :dimensions '(,left-dimensions ,right-dimensions)))
530
                              ,(if test
531
                                 `(when (and (test left-page left-index right-page right-index)
532
                                             ,@(when start `((> result-count ,start))))
533
                                    (next-solution-location)
534
                                    (collect result-page result-index left-page left-index right-page right-index)
535
                                    ,@(when end `((when (>= result-count ,end)
536
                                                    (trace-algebra process-join.end :test+bounded result-count)
537
                                                    (complete-solutions)))))
538
                                 `(,@(if start `(when (> result-count ,start)) '(progn))
539
                                   (next-solution-location)
540
                                   (collect result-page result-index left-page left-index right-page right-index)
541
                                   ,@(when end `((when (>= result-count ,end)
542
                                                   (trace-algebra process-join.end result-count)
543
                                                   (complete-solutions)))))))
544
                            (next-solution-location ()
545
                              ;; return a page (possible newly created) and the next free location in that page
546
                              (when (>= (incf result-index) result-page-length)
547
                                (when result-page (put-page result-page))
548
                                (setf result-page (new-field-page destination result-page-length result-page-width)
549
                                      result-index 0)))
550
                            (complete-solutions ()
551
                              (log-trace "join(x) ~a: complete: ~10s/~10s" source-dimensions solution-count result-count)
552
                              (when result-page
553
                                (let ((page-result-count (1+ result-index)))
554
                                  (when (< page-result-count result-page-length)
555
                                    (setf result-page
556
                                          (adjust-page result-page (list page-result-count result-page-width)))))
557
                                (put-page result-page))
558
                              (put-page nil)
559
                              (incf-stat *solutions-processed* solution-count)
560
                              (incf-stat *solutions-constructed* result-count)
561
                              (trace-algebra process-join ',left-dimensions ',right-dimensions solution-count result-count)
562
                              (return-from process-cross-join (values solution-count result-count)))
563
                            (put-page (page)
564
                              (trace-data process-cross-join.put-page destination ',result-dimensions page)
565
                              #+trace-solution-count
566
                              (when (shiftf pns nil)
567
                                (bt:with-lock-held (*page-cache-lock*)
568
                                  (format *trace-output* "~% ~a join(x)put ~10s/~10s -> ~s"
569
                                          uuid solution-count result-count (channel-solution-count destination))
570
                                  (finish-output *trace-output*)))
571
                              (if page
572
                                (put-field-page destination page)
573
                                (complete-field destination)))
574
                            (cache-left (page)
575
                              (let ((cell (cons (copy-page page) nil)))
576
                                (if left-cache
577
                                  (setf (rest left-cache-end) cell)
578
                                  (setf left-cache cell))
579
                                (setf left-cache-end cell)))
580
                            (cache-right (page)
581
                              (let ((cell (cons (copy-page page) nil)))
582
                                (if right-cache
583
                                  (setf (rest right-cache-end) cell)
584
                                  (setf right-cache cell))
585
                                (setf right-cache-end cell)))
586
                            (clear-left-cache ()
587
                              (setf left-cache nil
588
                                    left-cache-end nil))
589
                            (clear-right-cache ()
590
                              (setf right-cache nil
591
                                    right-cache-end nil)))
592
                     (unless (and (plusp result-page-length) (or (null ,end) (> ,end ,start))) (complete-solutions))
593
                     (log-trace "join(x) ~a" source-dimensions)
594
                     (loop (check-query-status *query*)
595
                            #+trace-solution-count
596
                           (when (> solution-count next-count)
597
                             (setf pns t)
598
                             (bt:with-lock-held (*page-cache-lock*)
599
                               (format *trace-output* "~% ~a join(x)... ~10s/~10s ~10s/~10s <- ~d/~d"
600
                                       uuid solution-count result-count
601
                                       left-solution-count
602
                                       right-solution-count
603
                                       (channel-solution-count left-source) (channel-solution-count right-source))
604
                               (finish-output *trace-output*))
605
                             (incf next-count 10000))
606
                           
607
                           (unless left-complete
608
                             (let ((left-page (get-field-page left-source)))
609
                               (cond (left-page
610
                                      (assert (= left-width (array-dimension left-page 1)) ()
611
                                              "cjg: Invalid left page width (expected ~a): ~a."
612
                                              left-width (array-dimension left-page 1))
613
                                      (incf solution-count (array-dimension left-page 0))
614
                                      (incf left-solution-count (array-dimension left-page 0))
615
                                      (unless right-complete (cache-left left-page))
616
                                      ,(if test
617
                                         '(handler-bind ((error (lambda (c) (declare (ignore c)) (throw :skip nil))))
618
                                            (loop for right-page in right-cache
619
                                                  do (dotimes (left-index (array-dimension left-page 0))
620
                                                       (dotimes (right-index (array-dimension right-page 0))
621
                                                         (catch :skip
622
                                                           (collect-solution left-page left-index right-page right-index))))))
623
                                         '(loop for right-page in right-cache
624
                                                do (dotimes (left-index (array-dimension left-page 0))
625
                                                     (dotimes (right-index (array-dimension right-page 0))
626
                                                       (collect-solution left-page left-index right-page right-index)))))
627
                                      (release-field-page left-source left-page))
628
                                     ((zerop left-solution-count)
629
                                      ;; empty solutionfield entails empty result
630
                                      (log-trace "join(x) ~a: lft: null" source-dimensions)
631
                                      (complete-solutions))
632
                                     ((setf left-complete t)
633
                                     ;;  #+trace-solution-count
634
                                      (log-trace "join(x) ~a: lft: ~10@s/~10s ~10@s/~10s"
635
                                                 source-dimensions
636
                                                 solution-count result-count
637
                                                 left-solution-count right-solution-count)
638
                                      (clear-right-cache)))))
639
                           (unless right-complete
640
                             (let ((right-page (get-field-page right-source)))
641
                               (cond (right-page
642
                                      (assert (= right-width (array-dimension right-page 1)) ()
643
                                              "cjg: Invalid right page width (expected ~a): ~a."
644
                                              right-width (array-dimension right-page 1))
645
                                      (incf solution-count (array-dimension right-page 0))
646
                                      (incf right-solution-count (array-dimension right-page 0))
647
                                      (unless left-complete (cache-right right-page))
648
                                      ,(if test
649
                                         '(handler-bind ((error (lambda (c) (declare (ignore c)) (throw :skip nil))))
650
                                            (loop for left-page in left-cache
651
                                                  do (dotimes (left-index (array-dimension left-page 0))
652
                                                       (dotimes (right-index (array-dimension right-page 0))
653
                                                         (catch :skip
654
                                                           (collect-solution left-page left-index right-page right-index))))))
655
                                         '(loop for left-page in left-cache
656
                                                do (dotimes (left-index (array-dimension left-page 0))
657
                                                     (dotimes (right-index (array-dimension right-page 0))
658
                                                       (collect-solution left-page left-index right-page right-index)))))
659
                                      (release-field-page right-source right-page))
660
                                     ((zerop right-solution-count)
661
                                      ;; empty solutionfield entails empty result
662
                                      (log-trace "join(x) ~a: rgt: null" source-dimensions)
663
                                      (complete-solutions))
664
                                     ((setf right-complete t)
665
                                      (log-trace "join(x) ~a: rgt: ~10@s/~10s ~10@s/~10s"
666
                                                 source-dimensions
667
                                                 solution-count result-count
668
                                                 left-solution-count right-solution-count)
669
                                      (clear-left-cache)))))
670
                           (when (and left-complete right-complete) (return)))
671
                     (complete-solutions)))))))
672
     (values (spocq-compile lambda) lambda)))
673
 
674
 (defparameter *CARDINALITY-LIMIT-ERROR* t)
675
 (defun compute-natural-join-operator (left-dimensions right-dimensions test start end &key
676
                                                       (left-sort-dimensions ())
677
                                                       (right-sort-dimensions ()))
678
   (declare (ignorable left-sort-dimensions right-sort-dimensions))
679
   (let* ((result-dimensions (union-dimensions left-dimensions right-dimensions))
680
          (lambda
681
            `(lambda (destination left-source right-source)
682
               (declare (optimize (debug 3))) ; (speed 3) (safety 0)))
683
               (trace-algebra process-natural-join destination left-source right-source ',left-dimensions ',right-dimensions
684
                              :start ,start :end ,end :test ',test)
685
               (block process-natural-join
686
                 (let* ((result-page-width (channel-page-width destination))
687
                        (result-page-length (channel-page-length destination))
688
                        (result-page nil)
689
                        (result-index result-page-length)
690
                        (left-width ,(length left-dimensions))
691
                        (right-width ,(length right-dimensions))
692
                        (left-cache (make-term-id-cache :single-thread t))
693
                        (right-cache (make-term-id-cache :single-thread t))
694
                        (result-count 0)
695
                        (left-solution-count 0)
696
                        (right-solution-count 0)
697
                        (left-complete nil)
698
                        (right-complete nil)
699
                        (source-dimensions (list ',left-dimensions ',right-dimensions))
700
                        (left-count 0)
701
                        (right-count 0)
702
                        (completed nil)
703
                        #+trace-solution-count (next-count 0)
704
                        #+trace-solution-count (uuid (uuid:make-v1-uuid))
705
                        #+trace-solution-count (pns nil)
706
                        (left-surrogate (make-array left-width :element-type 'fixnum))
707
                        (right-surrogate (make-array right-width :element-type 'fixnum))
708
                        #+(or) 
709
                        ((last-left ',(make-array (length left-dimensions) :initial-element +null-term-id+ :element-type 'fixnum))
710
                        (last-right ',(make-array (length right-dimensions) :initial-element +null-term-id+ :element-type 'fixnum))))
711
                   (declare (type fixnum result-index result-count left-solution-count right-solution-count)
712
                            (type hash-table left-cache right-cache))
713
                   (assert (= ,(length result-dimensions) result-page-width) ()
714
                           "Channel and operation dimensions do not match: ~a: ~a." destination ',result-dimensions)
715
                   (labels (,@(when test
716
                                (let ((lambda (compute-binary-predicate-lambda-spread test left-dimensions right-dimensions :handle-errors nil)))
717
                                  `((test ,@(rest lambda)))))
718
                            ,(let ((lambda (compute-binary-collector-lambda-spread result-dimensions left-dimensions right-dimensions)))
719
                               (trace-algebra :collector lambda)
720
                               `(collect ,@(rest lambda)))
721
                            (cache-left ,@(let ((lambda (compute-binary-cache-op-lambda-spread left-dimensions right-dimensions)))
722
                                            (trace-algebra :cache-op-lr lambda)
723
                                            (rest lambda)))
724
                            (cache-right ,@(let ((lambda (compute-binary-cache-op-lambda-spread right-dimensions left-dimensions)))
725
                                             (trace-algebra :cache-op-rl lambda)
726
                                             (rest lambda)))
727
                            #+(or) (
728
                            (compare-left-solution ,@(rest (compute-binary-vector-comparison-lambda left-dimensions left-dimensions
729
                                                                                             left-sort-dimensions left-sort-dimensions)))
730
                            (compare-right-solution ,@(rest (compute-binary-vector-comparison-lambda right-dimensions right-dimensions
731
                                                                                              right-sort-dimensions right-sort-dimensions))) )
732
                            (collect-solution (left-solution right-solution)
733
                              (incf result-count)
734
                              (trace-algebra join.n.cs "~a ~a ~a"
735
                                             left-solution right-solution result-count)
736
                              (when (and *solution-count-limit* (> result-count *solution-count-limit*))
737
                                (log-warn "~&join limit exceeded (~a ~a ~a) ~a ~a~%"
738
                                          result-count left-count right-count
739
                                          (let ((entry-count 0)
740
                                                (key-count 0))
741
                                            (maphash #'(lambda (k v) k (incf entry-count (length v)) (incf key-count)) left-cache)
742
                                            (list entry-count key-count ))
743
                                          (let ((entry-count 0)
744
                                                (key-count 0))
745
                                            (maphash #'(lambda (k v) k (incf entry-count (length v)) (incf key-count)) right-cache)
746
                                            (list entry-count key-count )))
747
                                (cond (*cardinality-limit-error*
748
                                       (spocq.e::cardinality-limit-error :operator 'natural-join
749
                                                                         :state `(:limit ,*solution-count-limit*
750
                                                                                         :left-count ,left-solution-count
751
                                                                                         :right-count ,right-solution-count
752
                                                                                         :result-count ,result-count)
753
                                                                         :dimensions '(,left-dimensions ,right-dimensions)))
754
                                      (t
755
                                       (complete-solutions))))
756
                              ,(if test
757
                                 `(when (and (test left-solution right-solution)
758
                                             ,@(when start `((> result-count ,start))))
759
                                    (next-solution-location)
760
                                    (collect result-page result-index left-solution right-solution)
761
                                    ,@(when end `((when (>= result-count ,end)
762
                                                   (trace-algebra process-join.end :test+bounded result-count)
763
                                                   (complete-solutions)))))
764
                                 `(,@(if start `(when (> result-count ,start)) '(progn))
765
                                   (next-solution-location)
766
                                   (collect result-page result-index left-solution right-solution)
767
                                   ,@(when end `((when (>= result-count ,end)
768
                                                   (trace-algebra process-join.end :bounded result-count)
769
                                                   (complete-solutions))))))
770
                              #+(or) (
771
                                      (when (eq '< (compare-left-solution left-solution last-left))
772
                                        (warn "degression (@~d) left: ~s ~s" result-count left-solution last-left)
773
                                        (complete-solutions))
774
                                      (when (eq '< (compare-right-solution right-solution last-right))
775
                                        (warn "degression (@~d) right: ~s ~s" result-count right-solution last-right)
776
                                        (complete-solutions))
777
                                      ;; (print (setf last-left left-solution) *trace-output*)
778
                                      ;; (print (setf last-right right-solution) *trace-output*)
779
                                      (finish-output *trace-output*)))
780
                            (next-solution-location ()
781
                              ;; return a page (possible newly created) and the next free location in that page
782
                              (when (>= (incf result-index) result-page-length)
783
                                (when result-page (put-page result-page))
784
                                (setf result-page (new-field-page destination result-page-length result-page-width)
785
                                      result-index 0)))
786
                            (complete-solutions ()
787
                              (when (shiftf completed t)
788
                                (error "already completed"))
789
                              (clrhash right-cache)
790
                              (clrhash left-cache)
791
                              (trace-algebra join.n "~a: complete: (~10s ~10s)/~10s"
792
                                         source-dimensions left-solution-count right-solution-count result-count)
793
                              (when result-page
794
                                (let ((page-result-count (1+ result-index)))
795
                                  (when (< page-result-count result-page-length)
796
                                    (setf result-page
797
                                          (adjust-page result-page (list page-result-count result-page-width)))))
798
                                (put-page result-page))
799
                              (complete-field destination)
800
                              (incf-stat *solutions-processed* (+ left-solution-count right-solution-count))
801
                              (incf-stat *solutions-constructed* result-count)
802
                              (return-from process-natural-join (values left-solution-count right-solution-count result-count)))
803
                            (put-page (page)
804
                              (trace-data process-natural-join.put-page destination ',result-dimensions page)
805
                              #+trace-solution-count
806
                              (when (shiftf pns nil)
807
                                (bt:with-lock-held (*page-cache-lock*)
808
                                  (format *trace-output* "~% ~a join.nput (~10s ~10s)/~10s -> ~s"
809
                                          uuid left-solution-count right-solution-count result-count (channel-solution-count destination))
810
                                  (finish-output *trace-output*)))
811
                              (put-field-page destination page)))
812
                     ;; test initial boundary conditions
813
                     (unless (and (plusp result-page-length) (or (null ,end) (> ,end ,start))) (complete-solutions))
814
                     (trace-algebra join.n "~a" source-dimensions)
815
                     (loop (check-query-status *query*)
816
                           #+trace-solution-count
817
                           (when (> (+ left-solution-count right-solution-count) next-count)
818
                             (setf pns t)
819
                             (bt:with-lock-held (*page-cache-lock*)
820
                               (format *trace-output* "~% ~a join.n... (~10s ~10s)/~10s/(~10s ~10s) <- ~d/~d"
821
                                       uuid left-solution-count right-solution-count result-count
822
                                       (hash-table-count left-cache)
823
                                       (hash-table-count right-cache)
824
                                       (channel-solution-count left-source) (channel-solution-count right-source))
825
                               (finish-output *trace-output*))
826
                             (incf next-count 1000000))
827
                           (unless left-complete
828
                             (let ((left-page (get-field-page left-source)))
829
                               (cond (left-page
830
                                      (assert (= left-width (array-dimension left-page 1)) ()
831
                                              "njg: Invalid left page width (expected ~a): ~a."
832
                                              left-width (array-dimension left-page 1))
833
                                      (incf left-count)
834
                                      (incf left-solution-count (array-dimension left-page 0))
835
                                      (trace-algebra join.n "lftpage: ~a" left-page)
836
                                      ;;(format *trace-output* "~&l.~a.~a.~a~%" left-count left-solution-count (hash-table-count left-cache))
837
                                      ,(if test
838
                                         `(handler-bind ((error (lambda (c) (declare (ignore c)) (throw :skip nil))))
839
                                            (dotimes (left-index (array-dimension left-page 0))
840
                                              (multiple-value-bind (right-solutions left-solution)
841
                                                                   (if right-complete
842
                                                                     (cache-left left-page left-index nil right-cache left-surrogate)
843
                                                                     (cache-left left-page left-index left-cache right-cache))
844
                                                (dolist (right-solution right-solutions)
845
                                                  (catch :skip (collect-solution left-solution right-solution))))))
846
                                         `(dotimes (left-index (array-dimension left-page 0))
847
                                            (multiple-value-bind (right-solutions left-solution)
848
                                                                 (if right-complete
849
                                                                   (cache-left left-page left-index nil right-cache left-surrogate)
850
                                                                   (cache-left left-page left-index left-cache right-cache))
851
                                              (dolist (right-solution right-solutions)
852
                                                (collect-solution left-solution right-solution)))))
853
                                      ;; (print (list :left-page ',left-dimensions (term-value-field left-page)))
854
                                      (release-field-page left-source left-page))
855
                                     ((zerop left-count)
856
                                      ;; empty solutionfield entails empty result
857
                                      (trace-algebra join.n "~a: lft: null" source-dimensions)
858
                                      (complete-solutions))
859
                                     ((setf left-complete t)
860
                                      (trace-algebra join.n "~a: lft: ~10@s/~10s ~10@s/~10s"
861
                                                 source-dimensions
862
                                                 left-solution-count result-count
863
                                                 (hash-table-count left-cache) (hash-table-count right-cache))
864
                                      (clrhash right-cache)))))
865
                           (unless right-complete
866
                             (let ((right-page (get-field-page right-source)))
867
                               (cond (right-page
868
                                      (assert (= right-width (array-dimension right-page 1)) ()
869
                                              "njg: Invalid right page width (expected ~a): ~a, ~a."
870
                                              right-width (array-dimension right-page 1) (array-dimensions right-page))
871
                                      (incf right-count)
872
                                      (incf right-solution-count (array-dimension right-page 0))
873
                                      (trace-algebra join.n "rgtpage: ~a" right-page)
874
                                      ;;(format *trace-output* "~&r.~a.~a.~a~%" right-count right-solution-count (hash-table-count right-cache))
875
                                      ,(if test
876
                                         `(handler-bind ((error (lambda (c) (declare (ignore c)) (throw :skip nil))))
877
                                            (dotimes (right-index (array-dimension right-page 0))
878
                                              (multiple-value-bind (left-solutions right-solution)
879
                                                                   (if left-complete
880
                                                                     (cache-right right-page right-index nil left-cache right-surrogate)
881
                                                                     (cache-right right-page right-index right-cache left-cache))
882
                                                (dolist (left-solution left-solutions) 
883
                                                  (catch :skip (collect-solution left-solution right-solution))))))
884
                                         `(dotimes (right-index (array-dimension right-page 0))
885
                                            (multiple-value-bind (left-solutions right-solution)
886
                                                                 (if left-complete
887
                                                                   (cache-right right-page right-index nil left-cache right-surrogate)
888
                                                                   (cache-right right-page right-index right-cache left-cache))
889
                                              (dolist (left-solution left-solutions) 
890
                                                (collect-solution left-solution right-solution)))))
891
                                      ;; (print (list :right-page ',right-dimensions (term-value-field right-page)))
892
                                      (release-field-page right-source right-page))
893
                                     ((zerop right-count)
894
                                      ;; empty solutionfield entails empty result
895
                                      (trace-algebra join.n "~a: rgt: null" source-dimensions)
896
                                      (complete-solutions))
897
                                     ((setf right-complete t)
898
                                      (trace-algebra join.n "~a: rgt: ~10@s/~10s ~10@s/~10s"
899
                                                 source-dimensions
900
                                                 right-solution-count result-count
901
                                                 (hash-table-count left-cache) (hash-table-count right-cache))
902
                                      (clrhash left-cache)))))
903
                           (when (and left-complete right-complete) (return)))
904
                     (complete-solutions)))))))
905
     (values (spocq-compile lambda) lambda)))
906
 
907
 (:documentation "Parallel join algorithm"
908
   "The algorithm comprises two distinct phases:
909
  - distribution into left and right substreams based on key hash
910
  - combination of each substream based on (key x property-vector)
911
  - merging the result joined fields
912
 
913
  The first phase uses the key to (redundantly) compute the has key, the low n bits of which designate the partition.
914
  The second phase peforms the same operations a single-process join, but expects the streams to already contain the
915
  (key x property-vector) delegates. The thrids phase is just a stable-order merge of the streams, witht he effect
916
  the athe result field solutions are always the same order.")
917
 
918
 #+(or)                                  ; must be re-written for ring-buffer channels
919
 (defun process-mp-natural-join (core-function destination left-source right-source left-dimensions right-dimensions
920
                                               &key &allow-other-keys)
921
   "Parallel-process a natural join by distributing the source streams according to their key hash values
922
  and collating the result streams."
923
 
924
   (assert-argument-types process-mp-natural-join
925
     (left-source (or channel function))
926
     (left-source (or channel function)))
927
     
928
   (multiple-value-bind (left-hash-generator right-hash-generator left-solution-collector right-solution-collector)
929
                        (compute-mp-join-aspects left-dimensions right-dimensions)
930
     (let* ((thread-count (algebra-operator-thread-count))
931
            (thread-id-mask (- thread-count 1))
932
            (left-channels (make-array thread-count))
933
            (right-channels (make-array thread-count))
934
            (result-channels (make-array thread-count))
935
            (threads (make-array thread-count))
936
            (left-complete nil)
937
            (right-complete nil)
938
            (result-thread nil)
939
            (thread-solution-counts (make-array thread-count))
940
            (left-solution-mediators (make-array thread-count))
941
            (right-solution-mediators (make-array thread-count))
942
            (left-solution-completors (make-array thread-count))
943
            (right-solution-completors (make-array thread-count))
944
            (left-page-width (length left-dimensions))
945
            (right-page-width (length right-dimensions))
946
            (result-page-length *field-page-length*)
947
            (solution-count 0)
948
            (page-count 0)
949
            #+(or) (test-lock (bt:make-lock )))
950
       
951
       ;; activate the partitions threads
952
       (dotimes (thread-id thread-count)
953
         (let* ((left-channel (setf (aref left-channels thread-id) (make-channel :dimensions left-dimensions)))
954
                (right-channel (setf (aref right-channels thread-id) (make-channel :dimensions right-dimensions)))
955
                (result-channel (setf (aref result-channels thread-id) (make-channel :dimensions (channel-dimensions destination)))))
956
           (setf (aref threads thread-id)
957
                 (bt:make-thread #'(lambda ()
958
                                     (funcall core-function result-channel left-channel right-channel))
959
                                 :name (format nil "mp-join-~d" thread-id)
960
                                 :initial-bindings (acons '*query* *query*
961
                                                          (acons '*task* *task*
962
                                                                 (acons '*repository* *repository* ())))))
963
           (let ((page nil)
964
                 (index result-page-length))
965
             (flet ((left-solution-mediator (base-page base-index)
966
                      (when (>= (incf index) result-page-length)
967
                        (when page (put-field-page left-channel page))
968
                        (setf page (new-field-page left-channel result-page-length left-page-width)
969
                              index 0))
970
                      (funcall left-solution-collector page index base-page base-index))
971
                    (left-solution-completor ()
972
                      (when page
973
                        (when (< (incf index) result-page-length)
974
                          (setf page (adjust-page page (list index left-page-width))))
975
                        (channel-put left-channel page))
976
                      (complete-field left-channel)))
977
               (setf (aref left-solution-mediators thread-id) #'left-solution-mediator)
978
               (setf (aref left-solution-completors thread-id) #'left-solution-completor)))
979
           (let ((page nil)
980
                 (index result-page-length))
981
             (flet ((right-solution-mediator (base-page base-index)
982
                      (when (>= (Incf index) result-page-length)
983
                        (when page (put-field-page right-channel page))
984
                        (setf page (new-field-page right-channel result-page-length right-page-width)
985
                              index 0))
986
                      (funcall right-solution-collector page index base-page base-index))
987
                    (right-solution-completor ()
988
                      (when page
989
                        (when (< (incf index) result-page-length)
990
                          (setf page (adjust-page page (list index right-page-width))))
991
                        (channel-put right-channel page))
992
                      (complete-field right-channel)))
993
               (setf (aref right-solution-mediators thread-id) #'right-solution-mediator)
994
               (setf (aref right-solution-completors thread-id) #'right-solution-completor)))))
995
       
996
       ;; collate the results
997
       (setf result-thread
998
             (bt:make-thread #'(lambda ()
999
                                 (let ((result-channels-complete (make-array (length result-channels) :initial-element nil)))
1000
                                   (loop (when (every #'identity result-channels-complete) (return))
1001
                                         (loop for i from 0 for result-channel across result-channels
1002
                                               for channel-complete across result-channels-complete
1003
                                               unless channel-complete
1004
                                               do (let ((page (channel-get result-channel)))
1005
                                                    (if page
1006
                                                      (put-field-page destination page)
1007
                                                      (setf (aref result-channels-complete i) t)))))
1008
                                   (put-field-page destination nil)))
1009
                             :name "mp-results"))
1010
 
1011
       (flet ((distribute-left-solution (page index)
1012
                (let* ((hash (funcall left-hash-generator page index))
1013
                       (id (logand (ash hash -4) thread-id-mask)))
1014
                  (funcall (aref left-solution-mediators id) page index)))
1015
              (distribute-right-solution (page index)
1016
                (let* ((hash (funcall right-hash-generator page index))
1017
                       (id (logand (ash hash -4) thread-id-mask)))
1018
                  (funcall (aref right-solution-mediators id) page index))))
1019
         ;; distribute the solutions
1020
         (loop (check-query-status *query*)
1021
               (when (and *solution-count-limit* (> result-count *solution-count-limit*))
1022
                 (log-warn "join: terminated @~a solutions." solution-count)
1023
                 (terminate-task *query*)
1024
                 (return))
1025
               (unless left-complete
1026
                 (let ((left-page (get-field-page left-source)))
1027
                   (cond (left-page
1028
                          (assert (= left-page-width (array-dimension left-page 1)) ()
1029
                                  "Invalid left page width (expected ~a): ~a."
1030
                                  left-page-width (array-dimension left-page 1))
1031
                          (trace-data mp-distribute-left left-page (term-value-field left-page))
1032
                          (incf solution-count (array-dimension left-page 0))
1033
                          (incf page-count)
1034
                          (dotimes (index (array-dimension left-page 0))
1035
                            (distribute-left-solution left-page index))
1036
                          (release-page left-page))
1037
                         ((setf left-complete t)))))
1038
               (unless right-complete
1039
                 (let ((right-page (get-field-page right-source)))
1040
                   (cond (right-page
1041
                          (assert (= right-page-width (array-dimension right-page 1)) ()
1042
                                  "Invalid right page width (expected ~a): ~a."
1043
                                  right-page-width (array-dimension right-page 1))
1044
                          (trace-data mp-distribute-right right-page (term-value-field right-page))
1045
                          (incf solution-count (array-dimension right-page 0))
1046
                          (incf page-count)
1047
                          (dotimes (index (array-dimension right-page 0))
1048
                            (distribute-right-solution right-page index))
1049
                          (release-page right-page))
1050
                         ((setf right-complete t)))))
1051
               (when (and left-complete right-complete) (return))))
1052
 
1053
       (map nil #'funcall left-solution-completors)
1054
       (map nil #'funcall right-solution-completors)
1055
       (bt:join-thread result-thread)
1056
       (map nil #'bt:join-thread threads)
1057
       (let ((result-count 0) (left-count 0) (right-count 0))
1058
         (loop for counts across thread-solution-counts
1059
               do (progn (incf result-count (aref counts 0))
1060
                         (incf left-count (aref counts 1))
1061
                         (incf right-count (aref counts 2))))
1062
         (incf-stat *solutions-processed* (+ left-count right-count))
1063
         (incf-stat *solutions-constructed* result-count)
1064
         (values (+ left-count right-count) result-count)))))
1065
 
1066
 #+(or)
1067
 (defun process-mp-natural-join (core-function destination left-source right-source left-dimensions right-dimensions
1068
                                               &key &allow-other-keys)
1069
   "Parallel-process a natural join by distributing the source streams according to their key hash values
1070
  and collating the result streams."
1071
 
1072
   (declare (type function left-source right-source))
1073
   (multiple-value-bind (left-hash-generator right-hash-generator left-solution-collector right-solution-collector)
1074
                        (compute-mp-join-aspects left-dimensions right-dimensions)
1075
     (let* ((thread-count (algebra-operator-thread-count))
1076
            (thread-id-mask (- thread-count 1))
1077
            (left-channels (make-array thread-count))
1078
            (right-channels (make-array thread-count))
1079
            (result-channels (make-array thread-count))
1080
            (threads (make-array thread-count))
1081
            (left-complete nil)
1082
            (right-complete nil)
1083
            (result-thread nil)
1084
            (thread-solution-counts (make-array thread-count))
1085
            (left-solution-mediators (make-array thread-count))
1086
            (right-solution-mediators (make-array thread-count))
1087
            (left-solution-completors (make-array thread-count))
1088
            (right-solution-completors (make-array thread-count))
1089
            (left-page-width (length left-dimensions))
1090
            (right-page-width (length right-dimensions))
1091
            (result-page-length *field-page-length*)
1092
            (solution-count 0)
1093
            (page-count 0)
1094
            #+(or) (test-lock (bt:make-lock )))
1095
       
1096
       ;; activate the partitions threads
1097
       (dotimes (thread-id thread-count)
1098
         (let* ((left-channel (setf (aref left-channels thread-id) (make-channel)))
1099
                (right-channel (setf (aref right-channels thread-id) (make-channel)))
1100
                (result-channel (setf (aref result-channels thread-id) (make-channel))))
1101
           (setf (aref threads thread-id)
1102
                 (bt:make-thread #'(lambda ()
1103
                                     (funcall core-function result-channel left-channel right-channel))
1104
                                 :name (format nil "mp-join-~d" thread-id)
1105
                                 :initial-bindings (acons '*query* *query*
1106
                                                          (acons '*task* *task*
1107
                                                                 (acons '*repository* *repository* ())))))
1108
           (let ((page nil)
1109
                 (index result-page-length))
1110
             (flet ((left-solution-mediator (base-page base-index)
1111
                      (when (>= (incf index) result-page-length)
1112
                        (when page (channel-put left-channel page))
1113
                        (setf page (new-field-page destination result-page-length result-page-width)
1114
                              index 0))
1115
                      (funcall left-solution-collector page index base-page base-index))
1116
                    (left-solution-completor ()
1117
                      (when page
1118
                        (when (< (incf index) result-page-length)
1119
                          (setf page (adjust-page page (list index left-page-width))))
1120
                        (channel-put left-channel page))
1121
                      (complete-field left-channel)))
1122
               (setf (aref left-solution-mediators thread-id) #'left-solution-mediator)
1123
               (setf (aref left-solution-completors thread-id) #'left-solution-completor)))
1124
           (let ((page nil)
1125
                 (index result-page-length))
1126
             (flet ((right-solution-mediator (base-page base-index)
1127
                      (when (>= (Incf index) result-page-length)
1128
                        (when page (channel-put right-channel page))
1129
                        (setf page (new-field-page destination result-page-length result-page-width)
1130
                              index 0))
1131
                      (funcall right-solution-collector page index base-page base-index))
1132
                    (right-solution-completor ()
1133
                      (when page
1134
                        (when (< (incf index) result-page-length)
1135
                          (setf page (adjust-page page (list index right-page-width))))
1136
                        (channel-put right-channel page))
1137
                      (complete-field right-channel)))
1138
               (setf (aref right-solution-mediators thread-id) #'right-solution-mediator)
1139
               (setf (aref right-solution-completors thread-id) #'right-solution-completor)))))
1140
       
1141
       ;; collate the results
1142
       (setf result-thread
1143
             (bt:make-thread #'(lambda ()
1144
                                 (let ((result-channels-complete (make-array (length result-channels) :initial-element nil)))
1145
                                   (loop (when (every #'identity result-channels-complete) (return))
1146
                                         (loop for i from 0 for result-channel across result-channels
1147
                                               for channel-complete across result-channels-complete
1148
                                               unless channel-complete
1149
                                               do (let ((page (channel-get result-channel)))
1150
                                                    (if page
1151
                                                      (put-field-page destination page)
1152
                                                      (setf (aref result-channels-complete i) t)))))
1153
                                   (put-field-page destination nil)))
1154
                             :name "mp-results"))
1155
       
1156
       (flet ((distribute-left-solution (page index)
1157
                (let* ((hash (funcall left-hash-generator page index))
1158
                       (id (logand (ash hash -4) thread-id-mask)))
1159
                  (funcall (aref left-solution-mediators id) page index)))
1160
              (distribute-right-solution (page index)
1161
                (let* ((hash (funcall right-hash-generator page index))
1162
                       (id (logand (ash hash -4) thread-id-mask)))
1163
                  (funcall (aref right-solution-mediators id) page index))))
1164
         ;; distribute the solutions
1165
         (loop (check-query-status *query*)
1166
               (when (and *solution-count-limit* (> solution-count *solution-count-limit*))
1167
                 (log-warn "join: terminated @~a solutions." solution-count)
1168
                 (terminate-task *query*)
1169
                 (return))
1170
               (unless left-complete
1171
                 (let ((left-page (get-field-page left-source)))
1172
                   (cond (left-page
1173
                          (assert (= left-page-width (array-dimension left-page 1)) ()
1174
                                  "Invalid left page width (expected ~a): ~a."
1175
                                  left-page-width (array-dimension left-page 1))
1176
                          (trace-data mp-distribute-left left-page (term-value-field left-page))
1177
                          (incf solution-count (array-dimension left-page 0))
1178
                          (incf page-count)
1179
                          (dotimes (index (array-dimension left-page 0))
1180
                            (distribute-left-solution left-page index)))
1181
                         ((setf left-complete t)))))
1182
               (unless right-complete
1183
                 (let ((right-page (get-field-page right-source)))
1184
                   (cond (right-page
1185
                          (assert (= right-page-width (array-dimension right-page 1)) ()
1186
                                  "Invalid right page width (expected ~a): ~a."
1187
                                  right-page-width (array-dimension right-page 1))
1188
                          (trace-data mp-distribute-right right-page (term-value-field right-page))
1189
                          (incf solution-count (array-dimension right-page 0))
1190
                          (incf page-count)
1191
                          (dotimes (index (array-dimension right-page 0))
1192
                            (distribute-right-solution right-page index)))
1193
                         ((setf right-complete t)))))
1194
               (when (and left-complete right-complete) (return))))
1195
       (when *join.verbose*
1196
         (print (list :threads threads
1197
                      :left-solution-mediators left-solution-mediators
1198
                      :right-solution-mediators right-solution-mediators
1199
                      :left-solution-completors left-solution-completors
1200
                      :right-solution-completors right-solution-completors
1201
                      :result-thread result-thread)))
1202
 
1203
       (map nil #'funcall left-solution-completors)
1204
       (map nil #'funcall right-solution-completors)
1205
       (bt:join-thread result-thread)
1206
       (map nil #'bt:join-thread threads)
1207
       (let ((result-count 0) (left-count 0) (right-count 0))
1208
         (loop for counts across thread-solution-counts
1209
               do (progn (incf result-count (aref counts 0))
1210
                         (incf left-count (aref counts 1))
1211
                         (incf right-count (aref counts 2))))
1212
         (incf-stat *solutions-processed* (+ left-count right-count))
1213
         (incf-stat *solutions-constructed* result-count)
1214
         (values (+ left-count right-count) result-count)))))
1215
 
1216
     
1217
 (defun compute-mp-join-aspects (left-dimensions right-dimensions)
1218
   (let ((key-dimensions (intersect-dimensions left-dimensions right-dimensions)))
1219
     (values (compute-key-generator key-dimensions left-dimensions)
1220
             (compute-key-generator key-dimensions right-dimensions)
1221
             (compute-unary-collector left-dimensions left-dimensions)
1222
             (compute-unary-collector right-dimensions right-dimensions))))
1223
                 
1224
                   
1225
 (defun process-left-dominant-join (destination left-source right-source left-dimensions right-dimensions
1226
                                                &key test (start 0) end
1227
                                                (left-sort-dimensions ()) (right-sort-dimensions ()))
1228
   "Generate a stream of join solutions to a destination given left and right solution sources and dimension lists for
1229
  the left and right solutions fields. Invoke the source function repeatedly to obtain a stream of
1230
  (dimension . solution-page) pairs until the result page is null. Use the dimensions to distinguish left from right.
1231
  Recognize the left side as dominant and cache its content before proceeding with the join.
1232
  Each must be a unique list. Combine the respective source either as a natural or a cross join depending
1233
  on whether they share any dimensions. In the former case, hash and cache solutions based on the shared
1234
  dimensions."
1235
 
1236
   (declare ;(optimize (speed 3) (safety 0))
1237
    (ignore left-sort-dimensions right-sort-dimensions)
1238
            )
1239
   (assert-argument-types process-join
1240
     (destination (or channel function))
1241
     (left-source (or channel function))
1242
     (right-source (or channel function))
1243
     (left-dimensions list)
1244
     (right-dimensions list)
1245
     (start (integer 0))
1246
     (end (or null (integer 0))))
1247
   (incf-stat *algebra-operations*)
1248
   (trace-algebra process-join left-source right-source left-dimensions right-dimensions
1249
                  :start start :end end :test test)
1250
 
1251
   (multiple-value-bind (type result-dimensions left-cache-operator right-cache-operator cross-predicate cross-collector)
1252
                        (compute-join-operators left-dimensions right-dimensions test)
1253
     (declare (type (function (array fixnum array fixnum array fixnum) t) cross-collector))
1254
     (let* ((result-page-width (channel-page-width destination))
1255
            (result-page-length (channel-page-length destination))
1256
            (result-page nil)
1257
            (result-index result-page-length)
1258
            (left-width (length left-dimensions))
1259
            (right-width (length right-dimensions))
1260
            (left-cache (ecase type
1261
                          (:natural (make-term-id-cache :single-thread t))
1262
                          (:cross ())))
1263
            (left-cache-end nil)
1264
            (right-surrogate (make-array right-width :element-type 'fixnum))
1265
            (result-count 0)
1266
            (solution-count 0)
1267
            (page-count 0)
1268
            (natural-collector (compute-binary-collector-spread result-dimensions left-dimensions right-dimensions))
1269
            (natural-predicate (when test (compute-binary-predicate-spread result-dimensions left-dimensions right-dimensions
1270
                                                                           :handle-errors nil))))
1271
       (assert (= (length result-dimensions) result-page-width) ()
1272
               "Channel and operation dimensions do not match: ~a: ~a." destination result-dimensions)
1273
       (labels ((left-natural-processor (left-page)
1274
                  (assert (= left-width (array-dimension left-page 1)) ()
1275
                          "ldj: Invalid left page width (expected ~a/~a): ~a."
1276
                          left-width left-dimensions (array-dimension left-page 1))
1277
                  (dotimes (left-index (array-dimension left-page 0))
1278
                    (funcall left-cache-operator left-page left-index left-cache nil)))
1279
                (right-natural-processor (right-page)
1280
                  (assert (= right-width (array-dimension right-page 1)) ()
1281
                          "ldj: Invalid right page width (expected ~a/~a): ~a."
1282
                          right-width right-dimensions (array-dimension right-page 1))
1283
                  (if test
1284
                    (handler-bind ((error (lambda (c) (declare (ignore c)) (throw :skip nil))))
1285
                      (dotimes (right-index (array-dimension right-page 0))
1286
                        (dolist (left-solution (funcall right-cache-operator right-page right-index nil left-cache right-surrogate))
1287
                          (catch :skip (collect-natural-solution left-solution right-surrogate)))))
1288
                    (dotimes (right-index (array-dimension right-page 0))
1289
                        (dolist (left-solution (funcall right-cache-operator right-page right-index nil left-cache right-surrogate))
1290
                          (collect-natural-solution left-solution right-surrogate)))))
1291
                    
1292
                (left-cross-processor (left-page)
1293
                  (assert (= left-width (array-dimension left-page 1)) ()
1294
                          "ldj: Invalid left page width (expected ~a/~a): ~a."
1295
                          left-width left-dimensions (array-dimension left-page 1))
1296
                  (let ((cell (cons left-page nil)))
1297
                    (if left-cache
1298
                      (setf (rest left-cache-end) cell)
1299
                      (setf left-cache cell))
1300
                    (setf left-cache-end cell)))
1301
                (right-cross-processor (right-page)
1302
                  (assert (= right-width (array-dimension right-page 1)) ()
1303
                          "ldj: Invalid right page width (expected ~a/~a): ~a."
1304
                          right-width right-dimensions (array-dimension right-page 1))
1305
                  (if test
1306
                    (handler-bind ((error (lambda (c) (declare (ignore c)) (throw :skip nil))))
1307
                      (loop for left-page in left-cache
1308
                            do (dotimes (left-index (array-dimension left-page 0))
1309
                                 (dotimes (right-index (array-dimension right-page 0))
1310
                                   (catch :skip (collect-cross-solution left-page left-index right-page right-index))))))
1311
                    (loop for left-page in left-cache
1312
                          do (dotimes (left-index (array-dimension left-page 0))
1313
                               (dotimes (right-index (array-dimension right-page 0))
1314
                                 (collect-cross-solution left-page left-index right-page right-index))))))
1315
                  
1316
                (collect-natural-solution (left-solution right-solution)
1317
                  (when (and (or (null natural-predicate)
1318
                                 (funcall natural-predicate left-solution right-solution))
1319
                             (> (incf result-count) start))
1320
                    (when (and *solution-count-limit* (> result-count *solution-count-limit*))
1321
                      (log-warn "join: terminated @~a solutions." result-count)
1322
                      (terminate-task *query*)
1323
                      (complete-solutions))
1324
                    (next-solution-location)
1325
                    (funcall natural-collector result-page result-index left-solution right-solution)
1326
                    (when (and end (>= result-count end)) (complete-solutions))))
1327
                (collect-cross-solution (left-page left-index right-page right-index)
1328
                  (when (and (or (null cross-predicate)
1329
                                 (funcall cross-predicate left-page left-index right-page right-index))
1330
                             (> (incf result-count) start))
1331
                    (when (and *solution-count-limit* (> result-count *solution-count-limit*))
1332
                      (log-warn "join: terminated @~a solutions." result-count)
1333
                      (terminate-task *query*)
1334
                      (complete-solutions))
1335
                    (next-solution-location)
1336
                    (funcall cross-collector result-page result-index left-page left-index right-page right-index)
1337
                    (when (and end (>= result-count end)) (complete-solutions))))
1338
                (next-solution-location ()
1339
                  ;; return a page (possible newly created) and the next free location in that page
1340
                  (when (>= (incf result-index) result-page-length)
1341
                    (when result-page (put-page result-page))
1342
                    (setf result-page (new-field-page destination result-page-length result-page-width)
1343
                          result-index 0)))
1344
                (complete-solutions ()
1345
                  (when result-page
1346
                    (let ((page-result-count (1+ result-index)))
1347
                      (when (< page-result-count result-page-length)
1348
                        (setf result-page
1349
                              (adjust-page result-page (list page-result-count result-page-width)))))
1350
                    (put-page result-page))
1351
                  (put-page nil)
1352
                  (incf-stat *solutions-processed* solution-count)
1353
                  (incf-stat *solutions-constructed* result-count)
1354
                  (return-from process-left-dominant-join
1355
                    (values solution-count result-count)))
1356
                (put-page (page)
1357
                  (trace-data process-join.put-data destination result-dimensions (term-value-field page))
1358
                  (if page
1359
                    (put-field-page destination page)
1360
                    (complete-field destination))))
1361
         (unless (and (plusp result-page-length) (or (null end) (> end start))) (complete-solutions))
1362
         ;; (setq *old-saved-pages* *saved-pages*)
1363
         ;; (setq *saved-pages* nil)
1364
         (loop for left-page = (get-field-page left-source)
1365
               until (null left-page)
1366
               do (progn (check-query-status *query*)
1367
                         (incf solution-count (array-dimension left-page 0))
1368
                         (incf page-count)
1369
                         (let ((cache-page (copy-page left-page)))
1370
                           (release-field-page left-source left-page)
1371
                           (ecase type
1372
                             (:natural (left-natural-processor cache-page))
1373
                             (:cross (left-cross-processor cache-page))))))
1374
         (trace-data process-left-dominant-join.left-count
1375
                     (typecase left-cache (vector (length left-cache)) (hash-table (hash-table-count left-cache))))
1376
         (loop for right-page = (get-field-page right-source)
1377
               until (null right-page)
1378
               do (progn (check-query-status *query*)
1379
                         (incf solution-count (array-dimension right-page 0))
1380
                         (incf page-count)
1381
                         (ecase type
1382
                           (:natural (right-natural-processor right-page))
1383
                           (:cross (right-cross-processor right-page)))
1384
                         (release-field-page right-source right-page)))
1385
         #+(or)
1386
         (setq *saved-pages*
1387
               (cons (list :*query-maximum-threads* *query-maximum-threads*
1388
                           :*query-operator-thread-threshold* *query-operator-thread-threshold*)
1389
                     (reverse *saved-pages*)))
1390
         (complete-solutions)))))
1391
 
1392
 (defun process-right-dominant-join (destination left-source right-source left-dimensions right-dimensions
1393
                                   &rest args)
1394
   (declare (dynamic-extent args))
1395
   (apply #'process-left-dominant-join destination right-source left-source right-dimensions left-dimensions
1396
          args))
1397
 
1398
 
1399
 
1400
 
1401
 #|
1402
 (compute-cross-join-operator '(?::|inproc| ?::|name2| ?::|person2|)
1403
                              '(?::|article| ?::|name| ?::|person|)
1404
                              '(ORG.DATAGRAPH.SPOCQ.ALGEBRA:= ?::|name| ?::|name2|)
1405
                              0 0)
1406
 
1407
 
1408
 (compute-natural-join-operator '(?::|inproc| ?::|name2| ?::|person|)
1409
                                '(?::|article| ?::|name| ?::|person|)
1410
                              '(ORG.DATAGRAPH.SPOCQ.ALGEBRA:= ?::|name| ?::|name2|)
1411
                              nil nil)
1412
 
1413
 
1414
 #+(or) ;; for use as merge op
1415
 (defun compute-natural-join-operator (left-dimensions right-dimensions test start end &key &allow-other-keys)
1416
   "Given the respective dimensions and predicate and/or slice constraints, generate an operator
1417
  which accepts the destination and two sources and performs a merge join based on the identity of the
1418
  shared binding."
1419
 
1420
   (let* ((result-dimensions (union-dimensions left-dimensions right-dimensions))
1421
          (lambda
1422
            `(lambda (destination left-source right-source)
1423
               (declare (optimize (speed 3) (safety 0)))
1424
               (trace-algebra process-join left-source right-source ',left-dimensions ',right-dimensions
1425
                              :start ,start :end ,end :test ',test)
1426
               
1427
               (block process-join
1428
                 (let* ((result-page-width (channel-page-width destination))
1429
                        (result-page-length (channel-page-length destination))
1430
                        (result-page nil)
1431
                        (result-index result-page-length)
1432
                        (left-width ,(length left-dimensions))
1433
                        (right-width ,(length right-dimensions))
1434
                        (left-cache (make-term-id-cache :single-thread t))
1435
                        (right-cache (make-term-id-cache :single-thread t))
1436
                        (result-count 0)
1437
                        (left-solution-count 0)
1438
                        (right-solution-count 0)
1439
                        (left-complete nil)
1440
                        (right-complete nil)
1441
                        #+trace-solution-count (next-count 0)
1442
                        #+trace-solution-count (uuid (uuid:make-v1-uuid))
1443
                        #+trace-solution-count (pns nil)
1444
                        (left-surrogate (make-array left-width :element-type 'fixnum))
1445
                        (right-surrogate (make-array right-width :element-type 'fixnum)))
1446
                   (declare (type fixnum result-index result-count left-solution-count right-solution-count)
1447
                            (type hash-table left-cache right-cache))
1448
                   (assert (= ,(length result-dimensions) result-page-width) ()
1449
                           "Channel and operation dimensions do not match: ~a: ~a." destination ',result-dimensions)
1450
                   (labels (,@(when test
1451
                                (let ((lambda (compute-binary-predicate-lambda-spread test left-dimensions right-dimensions :handle-errors nil)))
1452
                                  `((test ,@(rest lambda)))))
1453
                            ,(let ((lambda (compute-binary-collector-lambda-spread result-dimensions left-dimensions right-dimensions)))
1454
                               `(collect ,@(rest lambda)))
1455
                            (cache-left ,@(rest (compute-binary-cache-op-lambda-spread left-dimensions right-dimensions)))
1456
                            (cache-right ,@(rest (compute-binary-cache-op-lambda-spread right-dimensions left-dimensions)))
1457
                            (collect-solution (left-solution right-solution)
1458
                              (incf result-count)
1459
                              ,(if test
1460
                                 `(when (and (test left-solution right-solution)
1461
                                             ,@(when start `((> result-count ,start))))
1462
                                    (next-solution-location)
1463
                                    (collect result-page result-index left-solution right-solution)
1464
                                    ,@(when end `((when (>= result-count ,end)
1465
                                                   (trace-algebra process-join.end :test+bounded result-count)
1466
                                                   (complete-solutions)))))
1467
                                 `(,@(if start `(when (> result-count ,start)) '(progn))
1468
                                   (next-solution-location)
1469
                                   (collect result-page result-index left-solution right-solution)
1470
                                   ,@(when end `((when (>= result-count ,end)
1471
                                                   (trace-algebra process-join.end :bounded result-count)
1472
                                                   (complete-solutions)))))))
1473
                            (next-solution-location ()
1474
                              ;; return a page (possible newly created) and the next free location in that page
1475
                              (when (>= (incf result-index) result-page-length)
1476
                                (when result-page (put-page result-page))
1477
                                (setf result-page (new-field-page destination result-page-length result-page-width)
1478
                                      result-index 0)))
1479
                            (complete-solutions ()
1480
                              (clrhash right-cache)
1481
                              (clrhash left-cache)
1482
                              #+trace-solution-count
1483
                              (bt:with-lock-held (*page-cache-lock*)
1484
                                (format *trace-output* "completed ***** ~a (~10s ~10s)/~s~%"
1485
                                        uuid left-solution-count right-solution-count result-count)
1486
                                (room)
1487
                                (finish-output *trace-output*))
1488
                              (when result-page
1489
                                (let ((page-result-count (1+ result-index)))
1490
                                  (when (< page-result-count result-page-length)
1491
                                    (setf result-page
1492
                                          (adjust-page result-page (list page-result-count result-page-width)))))
1493
                                (put-page result-page))
1494
                              (put-page nil)
1495
                              (incf-stat *solutions-processed* (+ left-solution-count right-solution-count))
1496
                              (incf-stat *solutions-constructed* result-count)
1497
                              (return-from process-join (values result-count left-solution-count right-solution-count)))
1498
                            (put-page (page)
1499
                              (trace-data process-join.put-page destination ',result-dimensions page)
1500
                              #+trace-solution-count
1501
                              (when (shiftf pns nil)
1502
                                (bt:with-lock-held (*page-cache-lock*)
1503
                                  (format *trace-output* "~% ~a join.nput (~10s ~10s)/~10s -> ~s"
1504
                                          uuid left-solution-count right-solution-count result-count
1505
                                          (channel-solution-count destination))
1506
                                  (check-memory)
1507
                                  (finish-output *trace-output*)))
1508
                              (put-field-page destination page)))
1509
                     (unless (and (plusp result-page-length) (or (null ,end) (> ,end ,start))) (complete-solutions))
1510
                     (bt:with-lock-held (*page-cache-lock*)
1511
                       (format *trace-output* "~% join.n ~s/~s/ ~a" 
1512
                               ',left-dimensions
1513
                               ',right-dimensions
1514
                               uuid))
1515
                     (loop (check-query-status *query*)
1516
                           #+trace-solution-count
1517
                           (when (> (+ left-solution-count right-solution-count) next-count)
1518
                             (setf pns t)
1519
                             (bt:with-lock-held (*page-cache-lock*)
1520
                               (format *trace-output* "~% ~a join.n... (~10s ~10s)/~10s ~10s/~10s <- ~d/~d"
1521
                                       uuid left-solution-count right-solution-count result-count
1522
                                       (hash-table-count left-cache) (hash-table-count right-cache)
1523
                                       (channel-solution-count left-source) (channel-solution-count right-source))
1524
                               (check-memory)
1525
                               (finish-output *trace-output*))
1526
                             (incf next-count 1000000))
1527
                           (when (and *solution-count-limit* (> result-count *solution-count-limit*))
1528
                             (log-warn "join: terminated @(~a ~a)/~a/~a/~a solutions."
1529
                                       left-solution-count right-solution-count (hash-table-count left-cache) (hash-table-count right-cache) result-count)
1530
                             (terminate-task *query*)
1531
                             (complete-solutions))
1532
                           (unless left-complete
1533
                             (let ((left-page (get-field-page left-source)))
1534
                               (cond (left-page
1535
                                      (assert (= left-width (array-dimension left-page 1)) ()
1536
                                              "Invalid left page width (expected ~a): ~a."
1537
                                              left-width (array-dimension left-page 1))
1538
                                      (incf left-solution-count (array-dimension left-page 0))
1539
                                      ,(if test
1540
                                         `(handler-bind ((error (lambda (c) (declare (ignore c)) (throw :skip nil))))
1541
                                            (dotimes (left-index (array-dimension left-page 0))
1542
                                              (multiple-value-bind (right-solutions left-solution)
1543
                                                                   (if right-complete
1544
                                                                     (cache-left left-page left-index nil right-cache left-surrogate)
1545
                                                                     (cache-left left-page left-index left-cache right-cache))
1546
                                                (dolist (right-solution right-solutions)
1547
                                                  (catch :skip (collect-solution left-solution right-solution))))))
1548
                                         `(dotimes (left-index (array-dimension left-page 0))
1549
                                            (multiple-value-bind (right-solutions left-solution)
1550
                                                                 (if right-complete
1551
                                                                   (cache-left left-page left-index nil right-cache left-surrogate)
1552
                                                                   (cache-left left-page left-index left-cache right-cache))
1553
                                              (dolist (right-solution right-solutions)
1554
                                                (collect-solution left-solution right-solution)))))
1555
                                      (release-page left-page))
1556
                                     ((setf left-complete t)
1557
                                      #+trace-solution-count
1558
                                      (bt:with-lock-held (*page-cache-lock*)
1559
                                        (format *trace-output* "~% ~a join.nlft ~10s/~10s ~10s/~10s" uuid left-solution-count result-count
1560
                                                (hash-table-count left-cache)
1561
                                                (hash-table-count right-cache))
1562
                                        (finish-output *trace-output*))
1563
                                      (clrhash right-cache)))))
1564
                           (unless right-complete
1565
                             (let ((right-page (get-field-page right-source)))
1566
                               (cond (right-page
1567
                                      (assert (= right-width (array-dimension right-page 1)) ()
1568
                                              "Invalid right page width (expected ~a): ~a."
1569
                                              right-width (array-dimension right-page 1))
1570
                                      (incf right-solution-count (array-dimension right-page 0))
1571
                                      ,(if test
1572
                                         `(handler-bind ((error (lambda (c) (declare (ignore c)) (throw :skip nil))))
1573
                                            (dotimes (right-index (array-dimension right-page 0))
1574
                                              (multiple-value-bind (left-solutions right-solution)
1575
                                                                   (if left-complete
1576
                                                                     (cache-right right-page right-index nil left-cache right-surrogate)
1577
                                                                     (cache-right right-page right-index right-cache left-cache))
1578
                                                (dolist (left-solution left-solutions) 
1579
                                                  (catch :skip (collect-solution left-solution right-solution))))))
1580
                                         `(dotimes (right-index (array-dimension right-page 0))
1581
                                            (multiple-value-bind (left-solutions right-solution)
1582
                                                                 (if left-complete
1583
                                                                   (cache-right right-page right-index nil left-cache right-surrogate)
1584
                                                                   (cache-right right-page right-index right-cache left-cache))
1585
                                              (dolist (left-solution left-solutions) 
1586
                                                (collect-solution left-solution right-solution)))))
1587
                                      (release-page right-page))
1588
                                     ((setf right-complete t)
1589
                                      #+trace-solution-count
1590
                                      (bt:with-lock-held (*page-cache-lock*)
1591
                                        (format *trace-output* "~% ~a join.nrgt ~10s/~10s ~10s/~10s" uuid right-solution-count result-count
1592
                                                (hash-table-count left-cache)
1593
                                                (hash-table-count right-cache))
1594
                                        (finish-output *trace-output*))
1595
                                      (clrhash left-cache)))))
1596
                           (when (and left-complete right-complete) (return)))
1597
                     (complete-solutions)))))))
1598
     (values (spocq-compile lambda) lambda)))
1599
 
1600
 
1601
 
1602
 |#