Coverage report: /development/source/library/org/datagraph/spocq-shard/src/algebra/operators/leftjoin.lisp

KindCoveredAll%
expression4152006 20.7
branch26186 14.0
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
2
 
3
 (in-package :org.datagraph.spocq.implementation)
4
 
5
 (defparameter *leftjoin-translations*
6
   '(;; cannot distribute over the leftjoin as the respective unmatched base solutions
7
     ;; are then introduced in the combined result
8
     #+(or)
9
     ((spocq.a:|leftjoin| ?p1 (spocq.a:|union| ?p2 ?p3))
10
      . (spocq.a:|union| (spocq.a:|leftjoin| ?p1 ?p2) (spocq.a:|leftjoin| ?p1 ?p3)))
11
     ;; but must distribute over the union to ensure that the dimensionality into
12
     ;; the left join is constant
13
     ((spocq.a:|leftjoin| (spocq.a:|union| ?p2 ?p3) ?p1)
14
      . (spocq.a:|union| (spocq.a:|leftjoin| ?p2 ?p1) (spocq.a:|leftjoin| ?p3 ?p1)))))
15
 
16
 
17
 (defun rewrite-leftjoin (form)
18
   (destructuring-bind (left right . args) (rest form)
19
     (unless (find :test form)
20
       (case (first left)
21
         (spocq.a:|leftjoin|
22
                  (setf left (rewrite-leftjoin left))))
23
       (case (first left)
24
         (spocq.a:|union|
25
                  (destructuring-bind (first second . union-args) (rest left)
26
                    (declare (ignore union-args))    ; should not be any
27
                    (when (not (equal (expression-dimensions first) (expression-dimensions second)))
28
                      `(spocq.a:|union| (spocq.a:|leftjoin| ,first ,right) (spocq.a:|leftjoin| ,second ,right) ,@args))))
29
         (t                                  ; no change
30
          nil)))))
31
 
32
 (defmacro spocq.a:|leftjoin| (&whole form solution-field1 solution-field2 &rest args &key test count end offset start)
33
   "( ( solutionField solutionField (function (solution) xsd:boolean)? ) solutionField )
34
 A LEFTJOIN form combines two solution fields and yields a new field in which
35
 each solution is either a merge of two compatible solutions from the constituent
36
 fields or a solution from the first field, in the case where a natural join is possible,
37
 or an element of the cross join of the two fields, in the case where no dimension agrees.
38
 
39
 Where a test predicate is given, all result solutions satisfy it."
40
 
41
   (declare (ignore test count end offset start))
42
   (or (rewrite-leftjoin form)
43
       (apply #'macroexpand-leftjoin solution-field1 solution-field2 args)))
44
 
45
 (defmacro spocq.a::leftjoin* (field1 field2 &rest args)
46
   (if (consp (first args))
47
       `(spocq.a::leftjoin* (spocq.a:|leftjoin| ,field1 ,field2) ,@args)
48
       `(spocq.a:|leftjoin| ,field1 ,field2 ,@args)))
49
 
50
 (defun sip-expand-optional-service (optional-as-service? base-as-sip-source)
51
   ;; if the optional argument field is directly a service,
52
   ;; return an expansion which combines the service as sip with the other join side
53
   (if *enable.service-sip*
54
       (sip-expand-optional-service-operator (first optional-as-service?) optional-as-service? base-as-sip-source)
55
       optional-as-service?))
56
 
57
 (defgeneric sip-expand-optional-service-operator (operator optional-as-service? base-as-sip-source)
58
   (:method ((operator (eql 'spocq.a:|service|)) optional-as-service? base-as-sip-source)
59
     ;; intersection already tested, but just cor the complete forms
60
     (if (or (null (intersection (expression-variables optional-as-service?)
61
                                 (expression-variables base-as-sip-source)))
62
             (let ((location (second optional-as-service?)))
63
               (and (not (variable-p location)) (iri-service-repository-id location))))
64
         ;; decline to rewrite
65
         optional-as-service?
66
         ;; if they join, replace the service with a leftjoin service.
67
         (destructuring-bind (op iri group-graph-pattern &rest args) optional-as-service?
68
           (declare (ignore op))
69
           `(spocq.a::|serviceleftjoin| ,iri ,group-graph-pattern ,base-as-sip-source ,@args))))
70
   (:method  ((operator (eql 'spocq.a::|servicejoin|)) optional-as-service? base-as-sip-source)
71
     ;;; iff the optional clause is already a service join, replace it with
72
     ;;; a service leftjoin which incorporates the source in its sip source
73
     (destructuring-bind (op iri service-source service-sip-source &rest args) optional-as-service?
74
       (declare (ignore op))
75
       (if (null (intersection (expression-variables service-source)
76
                               (expression-variables base-as-sip-source)))
77
           ;; decline to rewrite
78
           optional-as-service?
79
           `(spocq.a::|serviceleftjoin| ,iri ,service-source
80
                      (spocq.a:|join| ,service-sip-source ,base-as-sip-source) ,@args))))
81
   (:method  ((operator (eql 'spocq.a::|serviceleftjoin|)) optional-as-service? base-as-sip-source)
82
     ;;; iff the optional clause is already a service leftjoin
83
     ;;; augment its sip source
84
     (destructuring-bind (op iri service-source service-sip-source &rest args) optional-as-service?
85
       (declare (ignore op))
86
       (if (null (intersection (expression-variables service-source)
87
                               (expression-variables base-as-sip-source)))
88
           ;; decline to rewrite
89
           optional-as-service?
90
           `(spocq.a::|serviceleftjoin| ,iri ,service-source
91
                      (spocq.a:|join| ,service-sip-source ,base-as-sip-source) ,@args))))
92
 
93
   (:method ((operator (eql 'spocq.a:|filter|)) optional-as-service? base-as-sip-source)
94
     (destructuring-bind (op filter-field . rest) optional-as-service?
95
       (let ((sip-expansion? (sip-expand-optional-service filter-field base-as-sip-source)))
96
         (if (eq sip-expansion? filter-field)
97
             optional-as-service?
98
             `(,op ,sip-expansion? ,@rest)))))
99
 
100
   (:method ((operator (eql 'spocq.a:|join|)) optional-as-service? base-as-sip-source)
101
     (destructuring-bind (op field1 field2 . rest) optional-as-service?
102
       (declare (ignore op)) 
103
       (let ((sip-expansion? (sip-expand-optional-service field1 base-as-sip-source)))
104
         (if (eq sip-expansion? field1)
105
             (let ((sip-expansion? (sip-expand-optional-service field2 base-as-sip-source)))
106
               (if (eq sip-expansion? field2)
107
                   optional-as-service?
108
                   `(spocq.a:|leftjoin| ,sip-expansion? ,field1 ,@rest)))
109
             `(spocq.a:|leftjoin| ,sip-expansion? ,field2 ,@rest)))))
110
 
111
   (:method ((operator (eql 'spocq.a:|leftjoin|)) optional-as-service? base-as-sip-source)
112
     (destructuring-bind (op field1 field2 . rest) optional-as-service?
113
       (let ((sip-expansion? (sip-expand-optional-service field1 base-as-sip-source)))
114
         (if (eq sip-expansion? field1)
115
             optional-as-service?
116
             `(,op ,sip-expansion? ,field2 ,@rest)))))
117
 
118
   (:method ((operator t) optional-as-service? base-as-sip-source)
119
     optional-as-service?))
120
 
121
 (defun macroexpand-leftjoin (field1 field2 &rest args)
122
   "Translate a leftjoin algebra expression into its execution form.
123
  This would include algebraic transformations, but none are defined.
124
  It does include optional clause combination, which reduces independent bgp clauses fo the form
125
     (leftjoin (leftjoin <field> (bgp (triple ...))) (bgp (triple ...)))
126
  into
127
     (leftjoin <field> (bgp (sum (triple ...) (triple ...))))
128
  When no further combination is possible, augment the optional clause with the base clause dimensions
129
  to enable propagation."
130
 
131
   (setf args (apply #'canonicalize-algebra-arguments args))
132
   (destructuring-bind (&key test &allow-other-keys) args
133
     (when test
134
       (setf (getf args :test) `(quote ,test))))
135
   (let ((base-optional-bgp nil)
136
         (optional-bgp nil)
137
         (base-dimensions (when (eq *leftjoin-propagation-mode* :propagate)
138
                            (expression-dimensions field1))))
139
     (labels ((triple-or-sum-form-p (statement)
140
                (or (triple-form-p statement) (sum-form-p statement)))
141
              (eligible-base-bgp (form)
142
                ;; allow a leftjoin with exactly one triple statement
143
                (when (leftjoin-form-p form)
144
                  (let ((bgp (third form)))
145
                    (when (and (bgp-form-p bgp)
146
                               (= 1 (count-if #'triple-form-p (rest bgp))))
147
                      bgp))))
148
              (eligible-optional-bgp (bgp)
149
                ;; allow a bgp with exactly one triple or sum statement
150
                (when (and (bgp-form-p bgp)
151
                           (= 1 (count-if #'triple-or-sum-form-p (rest bgp))))
152
                  bgp))
153
              (bgps-compatible-p (base optional)
154
                (let ((base-subject (second (find-if #'triple-form-p (rest base))))
155
                      (base-graph (second (find-if #'graph-form-p (rest base))))
156
                      (optional-subject (or (second (find-if #'triple-form-p (rest optional)))
157
                                            (second (find-if #'triple-form-p
158
                                                             (rest (find-if #'sum-form-p (rest optional)))))))
159
                      (optional-graph (second (find-if #'graph-form-p (rest optional)))))
160
                  (and (equal base-subject optional-subject)
161
                       (equal base-graph optional-graph))))
162
              (default-expansion ()
163
                (let ((sip-service? (sip-expand-optional-service field2 field1)))
164
                  (if (eq field2 sip-service?)
165
                      `(spocq.e:leftjoin
166
                        ,field1
167
                        (locally ,@(when base-dimensions
168
                                     `((declare (spocq.e:base-dimensions ,@base-dimensions))))
169
                          (spocq.e::with-join-scope ,(gensym "leftjoin-") ,field2))
170
                        ,@args)
171
                      sip-service?)))
172
              (merged-expansion ()
173
                (let* ((base-base-bgp (second field1))
174
                      (base-rest (nthcdr 3 field1))
175
                      (base-optional-triple (find-if #'triple-form-p (rest base-optional-bgp)))
176
                      (base-optional-rest (remove base-optional-triple (rest base-optional-bgp)))
177
                      (optional-sum (find-if #'sum-form-p (rest optional-bgp)))
178
                      (optional-triple (find-if #'triple-form-p (rest optional-bgp))))
179
                  `(spocq.a:|leftjoin| ,base-base-bgp
180
                                       (spocq.a:|bgp| (spocq.a:|sum| ,base-optional-triple
181
                                                                     ,@(if optional-sum
182
                                                                         (rest optional-sum)
183
                                                                         (list optional-triple)))
184
                                                      ,@base-optional-rest)
185
                                       ,@base-rest))))
186
       (ecase *leftjoin-expansion-mode*
187
         (:literal
188
          (default-expansion))
189
         (:logical
190
          (if (and (setf base-optional-bgp (eligible-base-bgp field1))
191
                   (setf optional-bgp (eligible-optional-bgp field2))
192
                   (bgps-compatible-p base-optional-bgp optional-bgp))
193
            (merged-expansion)
194
            (default-expansion)))))))
195
 
196
 (defgeneric spocq.e:leftjoin (solution-field1 solution-field2 &key end start test)
197
   (:documentation "Combine two fields as a left outer join based on a test predicate and a
198
     match predicate. see perez.2009 for a more explicit definition than in the spec.
199
     the second clause in the union is not exists any u2, not exists such that not u2.")  
200
   
201
   (:method :before ((base-field t) (offset-field t) &key end start test)
202
     (assert-argument-types spocq.e:leftjoin
203
       (start (or null (integer 0)))
204
       (end (or null (integer 0))))
205
     (incf-stat *algebra-operations*)
206
     (trace-algebra spocq.e:leftjoin base-field offset-field
207
                    :start start :end end :test test))
208
 
209
   (:method ((field1 null-generator) (field2 solution-generator) &rest args)
210
     (declare (ignore args))
211
     (let* ((result-dimensions (union-dimensions (solution-generator-dimensions field1)
212
                                                 (solution-generator-dimensions field2)))
213
            (result-channel (make-channel :name (list 'spocq.a:|null| (task-id *query*))
214
                                          :dimensions result-dimensions)))
215
       (process-null result-channel result-dimensions)
216
       (make-null-generator :operator 'spocq.a:|null|
217
                            :dimensions result-dimensions
218
                            :expression nil
219
                            :channel result-channel
220
                            :constituents ())))
221
 
222
   #+(or)                                ; requires that the solution matrix be widened
223
   (:method ((field1 solution-generator) (field2 null-generator) &rest args)
224
     (declare (ignore args))
225
     field1)
226
   
227
   (:method ((field1 solution-generator) (field2 solution-generator) &rest args)
228
     (declare (dynamic-extent args))
229
     (apply #'spocq.e:stream-leftjoin field1 field2
230
            args))
231
 
232
   (:method ((field1 null-generator) (field2 solution-generator) &rest args)
233
     (declare (ignore args))
234
     (let* ((result-dimensions (union-dimensions (solution-generator-dimensions field1)
235
                                                 (solution-generator-dimensions field2)))
236
            (result-channel (make-null-channel :dimensions result-dimensions)))
237
       (process-null result-channel result-dimensions)
238
       (make-null-generator :operator 'spocq.a:|null|
239
                            :dimensions result-dimensions
240
                            :expression nil
241
                            :channel result-channel
242
                            :constituents ()))))
243
 
244
 
245
 (defparameter *process-base-dominant-leftjoin.enable* t)
246
 (defparameter *process-mp-leftjoin.enable* t)
247
 
248
 
249
 (defun spocq.e:stream-leftjoin (base-field-generator optional-field-generator &rest args &key end start test)
250
   (declare (ignore test))
251
   ;; upon invocation use the generator argument to compute the combination operators
252
   ;; and invoke the respective binding operator with the rspective solution continuation
253
   (let* ((base-dimensions (solution-generator-dimensions base-field-generator))
254
          (optional-dimensions (solution-generator-dimensions optional-field-generator))
255
          (result-dimensions (union-dimensions base-dimensions optional-dimensions))
256
          ;; give priority to dimensions propagated through a chain of optional joins
257
          ;; but, if the combination changes, fall back to the combination with the base field
258
          (key-dimensions (or (intersect-dimensions (solution-generator-key-dimensions base-field-generator) optional-dimensions)
259
                              (intersect-dimensions base-dimensions optional-dimensions)
260
                              result-dimensions))
261
          (result-channel (make-channel :name (list 'spocq.a:|leftjoin| (task-id *query*))
262
                                        :dimensions result-dimensions
263
                                        :size (effective-channel-size :start start :end end)
264
                                        :page-length (effective-page-length :start start :end end)))
265
          (spliced-channel-p nil)
266
          (generator nil))
267
     (labels ((run-leftjoin-thread (result-channel base-field-generator optional-field-generator args)
268
                (let* ((base-channel (solution-generator-channel base-field-generator))
269
                       (optional-channel (solution-generator-channel optional-field-generator))
270
                       (base-expression (solution-generator-expression base-field-generator))
271
                       (optional-expression (solution-generator-expression optional-field-generator))
272
                       (*thread-operations* (cons (list* 'spocq.a:|leftjoin| base-dimensions optional-dimensions args)
273
                                                  *thread-operations*))
274
                       (join-operator *leftjoin-operator*)
275
                       (effective-channel-size (min (channel-size base-channel)
276
                                                    (channel-size optional-channel)
277
                                                    (channel-size result-channel))))
278
                  ;; splice in propagation destinations
279
                  (dolist (agp (solution-generator-patterns optional-field-generator))
280
                    (when (and (agp-base-channel agp)
281
                               (equal (agp-base-dimensions agp) base-dimensions))
282
                      (trace-algebra leftjoin.splice agp (agp-base-channel agp) base-channel
283
                                     base-dimensions optional-dimensions)
284
                      (setf spliced-channel-p t)
285
                      (push (agp-base-channel agp) (channel-channels base-channel))))
286
                  (unless join-operator
287
                    (setf join-operator
288
                          #+(or)  (cond ((and *process-base-dominant-leftjoin.enable* spliced-channel-p)
289
                                         'process-base-dominant-leftjoin)
290
                                        ((and *process-mp-leftjoin.enable*
291
                                              (typep (algebra-operator-thread-count) '(integer 2))
292
                                              (not (or count end offset start))
293
                                              (> (repository-statement-count *repository*) 10000))
294
                                         'process-mp-leftjoin)
295
                                        (t
296
                                         'process-leftjoin))
297
                          #+(or)
298
                          (if (= effective-channel-size *channel-sliced-size-limit*)
299
                            'process-balanced-leftjoin 
300
                            'process-leftjoin)
301
                          'process-leftjoin))
302
                  (log-debug "stream-leftjoin: ~s x ~s" join-operator effective-channel-size)
303
                  (push 'spocq.a:|leftjoin| (channel-name base-channel))
304
                  (push 'spocq.a:|leftjoin| (channel-name optional-channel))
305
                  (log-trace "stream-leftjoin: ~a: ~a ~a"
306
                             join-operator (channel-name base-channel) (channel-name optional-channel))
307
                  (setf (solution-generator-concrete-operator generator) join-operator)
308
                  (query-run-in-thread *query* base-expression)
309
                  (query-run-in-thread *query* optional-expression)
310
                  (setf (channel-size result-channel) effective-channel-size
311
                        (channel-page-length result-channel) (min (channel-page-length base-channel)
312
                                                                  (channel-page-length optional-channel)
313
                                                                  (channel-page-length result-channel)))
314
                  (apply join-operator result-channel base-channel optional-channel
315
                         base-dimensions
316
                         optional-dimensions
317
                         key-dimensions
318
                         args)
319
                  'spocq.a:|leftjoin|)))
320
       ;; return the binding function to the combination operator
321
       (setf generator
322
             (make-leftjoin-solution-generator :operator 'spocq.a:|leftjoin|
323
                                               :dimensions result-dimensions
324
                                               :key-dimensions key-dimensions
325
                                               :expression (list #'run-leftjoin-thread result-channel base-field-generator optional-field-generator
326
                                                                 args)
327
                                               :channel result-channel
328
                                               :constituents (list base-field-generator optional-field-generator))))))
329
 
330
 
331
 #+(or) ;; for profiling
332
 (defun call-predicate (predicate left-page left-index right-page right-index)
333
   (declare (type (function (array fixnum array fixnum) boolean) predicate))
334
   (catch :return-nil
335
     (funcall predicate left-page left-index right-page right-index)))
336
 
337
 (defmethod process-leftjoin ((destination array-page-channel)
338
                              (base-source array-page-channel)
339
                              (optional-source array-page-channel)
340
                              base-dimensions optional-dimensions key-dimensions
341
                              &key test (start 0) end
342
                              &aux
343
                              ;(trace-p (equal optional-dimensions '(?::action ?::data)))
344
                              ;(*data-trace-output* (if trace-p *trace-output* *data-trace-output*))
345
                              ;(*algebra-trace-output* (if trace-p *trace-output* *algebra-trace-output*))
346
                                            )
347
   "Generate a stream of left join solutions to a destination given a solution source and dimension lists for
348
  the base and optional solutions fields, and a predicate expression. Invoke the source function repeatedly to
349
  obtain a stream of (side . solution-page) pairs until the page is null. Use the dimensions to distinguish left from optional.
350
  Combine the respective source either as a natural or a cross join depending
351
  on whether they share any dimensions. In the former case, hash and cache solutions based on the shared
352
  dimensions."
353
 
354
   (declare (type (or channel (function ((or array null)) t)) destination)
355
            (type (or channel (function () (or array null))) base-source optional-source))
356
   (assert-argument-types process-leftjoin
357
     (base-dimensions list)
358
     (optional-dimensions list))
359
   
360
   (multiple-value-bind (type result-dimensions base-cache-operator optional-cache-operator result-cache-operator predicate collector)
361
                        (compute-leftjoin-operators base-dimensions optional-dimensions key-dimensions test)
362
     (declare (type (function (array fixnum array fixnum array fixnum) t) collector))
363
     (let* ((result-page-width (channel-page-width destination))
364
            (result-page-length (channel-page-length destination))
365
            (result-page nil)
366
            (result-index result-page-length)
367
            (result-cache (make-term-id-cache :single-thread t))
368
            (natural-cache (when (eq type :natural) (make-term-id-cache :single-thread t)))
369
            (cross-cache (when (eq type :cross) (make-array 32 :fill-pointer 0 :adjustable t)))
370
            (result-count 0)
371
            (solution-count 0)
372
            ;; for sp2b-q6@250k this reduced the time from 117 to 81 seconds on hetzner
373
            (*store->spocq-term-registry* (if test
374
                                            (copy-registry *store->spocq-term-registry* :single-thread t)
375
                                            *store->spocq-term-registry*)))
376
       (declare (type fixnum result-index result-count solution-count))
377
       (assert (= (length result-dimensions) result-page-width) ()
378
               "Channel and operation dimensions do not match: ~a: ~a." destination result-dimensions)
379
       (labels ((base-natural-processor (base-page)
380
                  (declare (type (function (array fixnum hash-table) list) base-cache-operator))
381
                  (handler-bind ((error (lambda (c) (declare (ignore c)) (throw :return-nil nil))))
382
                    (dotimes (base-index (array-dimension base-page 0))
383
                      (trace-data process-leftjoin.optional-page.next base-index)
384
                      (let ((collected? nil))
385
                        (loop for (optional-page optional-index) in (funcall base-cache-operator base-page base-index natural-cache)
386
                              if (or (null predicate)
387
                                     (call-predicate predicate base-page base-index optional-page optional-index))
388
                              do (progn (setf collected? t)
389
                                   (trace-data process-leftjoin.optional-page.next.collect base-index optional-index)
390
                                   (collect-solution base-page base-index optional-page optional-index)))
391
                        (unless collected?
392
                          (collect-solution base-page base-index nil nil))))))
393
                (optional-natural-processor (optional-page)
394
                  (declare (type (function (array fixnum hash-table) t) optional-cache-operator))
395
                  (dotimes (optional-index (array-dimension optional-page 0))
396
                    (trace-data process-leftjoin.optional-page.next optional-index)
397
                    (funcall optional-cache-operator optional-page optional-index natural-cache)))
398
                (base-cross-processor (base-page)
399
                  (handler-bind ((error (lambda (c) (declare (ignore c)) (throw :return-nil nil))))
400
                    (dotimes (base-index (array-dimension base-page 0))
401
                      (let ((collected? nil))
402
                        (loop for optional-page across cross-cache
403
                              do (dotimes (optional-index (array-dimension optional-page 0) t)
404
                                   (when (or (null predicate)
405
                                             (call-predicate predicate base-page base-index optional-page optional-index))
406
                                     (setf collected? t)
407
                                     (collect-solution base-page base-index optional-page optional-index))))
408
                        (unless collected?
409
                          (collect-solution base-page base-index nil nil))))))
410
                (optional-cross-processor (optional-page)
411
                  (vector-push-extend optional-page cross-cache))
412
                (call-predicate (predicate left-page left-index right-page right-index)
413
                  (declare (type (function (array fixnum array fixnum) boolean) predicate))
414
                  (catch :return-nil
415
                    (funcall predicate left-page left-index right-page right-index)))
416
                (collect-solution (base-page base-index optional-page optional-index)
417
                  (declare (type (function (array fixnum array fixnum hash-table) boolean) result-cache-operator))
418
                  (when (and (null (funcall result-cache-operator base-page base-index optional-page optional-index result-cache))
419
                             (> (incf result-count) start))
420
                    (next-solution-location)
421
                    (funcall collector result-page result-index base-page base-index optional-page optional-index)
422
                    (when (and end (>= result-count end)) (complete-solutions))))
423
                (next-solution-location ()
424
                  ;; return a page (possible newly created) and the next free location in that page
425
                  (when (>= (incf result-index) result-page-length)
426
                    (when result-page (put-page result-page))
427
                    (setf result-page (new-field-page destination result-page-length result-page-width)
428
                          result-index 0)))
429
                (complete-solutions ()
430
                  (when result-page
431
                    (let ((page-result-count (1+ result-index)))
432
                      (when (< page-result-count result-page-length)
433
                        (setf result-page
434
                              (adjust-page result-page (list page-result-count result-page-width)))))
435
                    (put-page result-page))
436
                  (put-page nil)
437
                  (incf-stat *solutions-processed* solution-count)
438
                  (incf-stat *solutions-constructed* result-count)
439
                  (trace-algebra process-leftjoin.complete destination solution-count result-count)
440
                  (return-from process-leftjoin (values solution-count result-count)))
441
                (put-page (page)
442
                  (trace-data process-leftjoin.put-page destination
443
                              (list base-dimensions optional-dimensions result-dimensions)
444
                              page (term-value-field page))
445
                  (if page
446
                    (put-field-page destination page)
447
                    (complete-field destination)))
448
                (cache-size ()
449
                  (if natural-cache
450
                    (hash-table-count natural-cache)
451
                    (length cross-cache))))
452
         (unless (and (plusp result-page-length) (or (null end) (> end start))) (complete-solutions))
453
         (loop for solutions = (get-field-page optional-source)
454
               until (null solutions)
455
               do (progn (check-query-status *query*)
456
                    (trace-data process-leftjoin.optional-page type solutions (term-value-field solutions))
457
                         (incf solution-count (array-dimension solutions 0))
458
                         (when (and *solution-count-limit*
459
                                    (> (cache-size) *solution-count-limit*))
460
                           (log-warn "leftjoin: terminated @~a optional solutions. (~a x ~a)"
461
                                     (cache-size) base-dimensions optional-dimensions)
462
                           (terminate-task *query*)
463
                           (complete-solutions))
464
                         (let ((cache-page (copy-page solutions)))
465
                           (ecase type
466
                             (:natural (optional-natural-processor cache-page))
467
                             (:cross (optional-cross-processor cache-page))))
468
                         (release-field-page optional-source solutions)))
469
         (loop for solutions = (get-field-page base-source)
470
               until (null solutions)
471
               do (progn (check-query-status *query*)
472
                         (incf solution-count (array-dimension solutions 0))
473
                         (when (and *solution-count-limit*
474
                                    (> result-count *solution-count-limit*))
475
                           (log-warn "leftjoin: terminated @~a solutions. (~a x ~a)"
476
                                     result-count base-dimensions optional-dimensions)
477
                           (terminate-task *query*)
478
                           (complete-solutions))
479
                         (ecase type
480
                           (:natural (base-natural-processor solutions))
481
                           (:cross (base-cross-processor solutions)))
482
                         (release-field-page base-source solutions)))
483
         (complete-solutions)))))
484
 
485
 ;;; support optional clauses with sip from base to optionals clause
486
 
487
 ;;; !!!HDT
488
 (defmethod process-sip-left-join ((destination array-page-channel)
489
                                   (base-source array-page-channel)
490
                                   (optional-source array-page-channel)
491
                                   base-dimensions optional-dimensions key-dimensions
492
                                   &key test (start 0) end
493
                                   )
494
   "In order to permit general field expressions - as opposed to restricting the optional clause to a single bgp,
495
 it is not possible to establish the bindings just by propagating a slution field into the compiled bgp interpreter.
496
 It must also allow, for example, that the optional clause is itself an optional expression or has filters.
497
 As the situations which involve bindings - bgps, filters, aggregations and service clauses all must recognize
498
 dynamic bindings, one way to accomplish this is to wrap the bse query in a delegate which accepts the sip values
499
 as a stream and reruns the target clause for ecah value
500
 - if it emits a terminated result stream for each propagate solution, the source would not need to join, but would
501
 just merge.
502
 - the query wrapper should be lighter weight than a full clone
503
 - it would be ideal of th eoperator could retrieve the first page of base solutions and then decide, based
504
   on its length, whether to sip or join. this wuld have to be buffered when too large, as the optional field
505
 must first be materialized fir joind processing, but generated dependently for sip processing.
506
   in order to do that, it would need to compile both alternative networks . one with and one without sip dimensions
507
   and then select which to invoke based on source cardinality.
508
   in order for this to be possible, the optional-generator supplied to spocq.e:stream-leftjoin must
509
   be able to switch modes nd either run free to emit values for a join or expect a stream of sip constraints
510
   and emit a stream of one solution stream in response to each input.
511
   process-leftjoin would need to decide how to handle the optional generator, either just accepting its results stream
512
   and joining or feeding it solutions and merging.
513
   "
514
   )
515
 
516
 
517
 (defun process-base-dominant-leftjoin (destination base-source optional-source base-dimensions optional-dimensions key-dimensions
518
                                                    &key test count offset
519
                                                    (start offset) (end (when count (+ count (or start 0)))))
520
   "Generate a stream of left join solutions to a destination given a solution source and dimension lists for
521
  the base and optional solutions fields, and a predicate expression. Invoke the source function repeatedly to
522
  obtain a stream of (side . solution-page) pairs until the page is null. Use the dimensions to distinguish left from optional.
523
  Combine the respective source either as a natural or a cross join depending
524
  on whether they share any dimensions. In the former case, hash and cache solutions based on the shared
525
  dimensions."
526
   
527
   (declare (type (or channel (function ((or array null)) t)) destination)
528
            (type (or channel (function () (or array null))) base-source optional-source))
529
   (assert-argument-types process-leftjoin
530
     (destination (or channel function))
531
     (base-source (or channel function))
532
     (optional-source (or channel function))
533
     (base-dimensions list)
534
     (optional-dimensions list)
535
     (start (or null (integer 0)))
536
     (end (or null (integer 0))))
537
   (incf-stat *algebra-operations*)
538
   (unless start (setf start 0))
539
   (when end (setf end (max start end)))
540
   (trace-algebra process-base-dominant-leftjoin base-source optional-source base-dimensions optional-dimensions key-dimensions
541
                  :start start :end end :test test)
542
   
543
   (multiple-value-bind (type result-dimensions base-cache-operator optional-cache-operator result-cache-operator predicate collector)
544
                        (compute-leftjoin-operators base-dimensions optional-dimensions key-dimensions test)
545
     (declare (type (function (array fixnum array fixnum array fixnum) t) collector))
546
     (let* ((result-page-width (channel-page-width destination))
547
            (result-page-length (channel-page-length destination))
548
            (result-page nil)
549
            (result-index result-page-length)
550
            (result-cache (make-term-id-cache :single-thread t))
551
            (natural-cache (when (eq type :natural) (make-term-id-cache :single-thread t)))
552
            (cross-cache (when (eq type :cross) (make-array 32 :fill-pointer 0 :adjustable t)))
553
            (base-cache (make-array 32 :fill-pointer 0 :adjustable t))
554
            (result-count 0)
555
            (solution-count 0)
556
            ;; for sp2b-q6@250k this reduced the time from 117 to 81 seconds on hetzner
557
            (*store->spocq-term-registry* (if test
558
                                            (copy-registry *store->spocq-term-registry* :single-thread t)
559
                                            *store->spocq-term-registry*)))
560
       (declare (type fixnum result-index result-count solution-count))
561
       (assert (= (length result-dimensions) result-page-width) ()
562
               "Channel and operation dimensions do not match: ~a: ~a." destination result-dimensions)
563
       (labels ((base-natural-processor (base-page)
564
                  (declare (type (function (array fixnum hash-table) list) base-cache-operator))
565
                  #+(or)(let ((*print-pretty* nil)) (print (list :base base-dimensions)))
566
                  (handler-bind ((error (lambda (c) (declare (ignore c)) (throw :return-nil nil))))
567
                    (dotimes (base-index (array-dimension base-page 0))
568
                      #+(or)(print (list :b base-index
569
                                         (make-array (array-dimension base-page 1)
570
                                                     :displaced-to base-page :element-type 'fixnum
571
                                                     :displaced-index-offset (* base-index (array-dimension base-page 1)))))
572
                      (let ((collected? nil))
573
                        (loop for (optional-page optional-index) in (funcall base-cache-operator base-page base-index natural-cache)
574
                              if (or (null predicate)
575
                                     (call-predicate predicate base-page base-index optional-page optional-index))
576
                              do (progn (setf collected? t)
577
                                        #+(or)(print (list :o+ optional-index
578
                                                           (make-array (array-dimension optional-page 1)
579
                                                                       :displaced-to optional-page :element-type 'fixnum
580
                                                                       :displaced-index-offset (* optional-index (array-dimension optional-page 1)))))
581
                                        (collect-solution base-page base-index optional-page optional-index))
582
                              #+(or) (print (list :o- optional-index
583
                                                  (make-array (array-dimension optional-page 1)
584
                                                              :displaced-to optional-page :element-type 'fixnum
585
                                                              :displaced-index-offset (* optional-index (array-dimension optional-page 1))))))
586
                        (unless collected?
587
                          (collect-solution base-page base-index nil nil))))))
588
                (optional-natural-processor (optional-page)
589
                  (declare (type (function (array fixnum hash-table) t) optional-cache-operator))
590
                  #+(or)(let ((*print-pretty* nil)) (print (list :optional optional-dimensions)))
591
                  (dotimes (optional-index (array-dimension optional-page 0))
592
                    #+(or)(print (list :o optional-index
593
                                       (make-array (array-dimension optional-page 1)
594
                                                   :displaced-to optional-page :element-type 'fixnum
595
                                                   :displaced-index-offset (* optional-index (array-dimension optional-page 1)))))
596
                    (funcall optional-cache-operator optional-page optional-index natural-cache)))
597
                (base-cross-processor (base-page)
598
                  (handler-bind ((error (lambda (c) (declare (ignore c)) (throw :return-nil nil))))
599
                    (dotimes (base-index (array-dimension base-page 0))
600
                      (let ((collected? nil))
601
                        (loop for optional-page across cross-cache
602
                              do (dotimes (optional-index (array-dimension optional-page 0) t)
603
                                   (when (or (null predicate)
604
                                             (call-predicate predicate base-page base-index optional-page optional-index))
605
                                     (setf collected? t)
606
                                     (collect-solution base-page base-index optional-page optional-index))))
607
                        (unless collected?
608
                          (collect-solution base-page base-index nil nil))))))
609
                (optional-cross-processor (optional-page)
610
                  (vector-push-extend optional-page cross-cache))
611
                (call-predicate (predicate left-page left-index right-page right-index)
612
                  (declare (type (function (array fixnum array fixnum) boolean) predicate))
613
                  (catch :return-nil
614
                    (funcall predicate left-page left-index right-page right-index)))
615
                (collect-solution (base-page base-index optional-page optional-index)
616
                  (declare (type (function (array fixnum array fixnum hash-table) boolean) result-cache-operator))
617
                  (when (and (null (funcall result-cache-operator base-page base-index optional-page optional-index result-cache))
618
                             (> (incf result-count) start))
619
                    (next-solution-location)
620
                    (funcall collector result-page result-index base-page base-index optional-page optional-index)
621
                    (when (and end (>= result-count end)) (complete-solutions))))
622
                (next-solution-location ()
623
                  ;; return a page (possible newly created) and the next free location in that page
624
                  (when (>= (incf result-index) result-page-length)
625
                    (when result-page (put-page result-page))
626
                    (setf result-page (new-field-page destination result-page-length result-page-width)
627
                          result-index 0)))
628
                (complete-solutions ()
629
                  (when result-page
630
                    (let ((page-result-count (1+ result-index)))
631
                      (when (< page-result-count result-page-length)
632
                        (setf result-page
633
                              (adjust-page result-page (list page-result-count result-page-width)))))
634
                    (put-page result-page))
635
                  (put-page nil)
636
                  (incf-stat *solutions-processed* solution-count)
637
                  (incf-stat *solutions-constructed* result-count)
638
                  (trace-algebra process-base-dominant-leftjoin base-dimensions optional-dimensions solution-count result-count)
639
                  (return-from process-base-dominant-leftjoin (values solution-count result-count)))
640
                (put-page (page)
641
                  (trace-data process-base-dominant-leftjoin.put-page destination result-dimensions page)
642
                  (put-field-page destination page)))
643
         (unless (and (plusp result-page-length) (or (null end) (> end start))) (complete-solutions))
644
         (loop for solutions = (get-field-page base-source)
645
               until (null solutions)
646
               do (progn (check-query-status *query*)
647
                         (let ((cache-page (copy-page solutions)))
648
                           (vector-push-extend cache-page base-cache))
649
                         (release-field-page base-source solutions)))
650
         (trace-data process-base-dominant-leftjoin.base-page-count (length base-cache))
651
         (loop for solutions = (get-field-page optional-source)
652
               until (null solutions)
653
               do (progn (check-query-status *query*)
654
                         (incf solution-count (array-dimension solutions 0))
655
                         (let ((cache-page (copy-page solutions)))
656
                           (ecase type
657
                             (:natural (optional-natural-processor cache-page))
658
                             (:cross (optional-cross-processor cache-page))))
659
                         (release-field-page optional-source solutions)))
660
         (trace-data process-base-dominant-leftjoin.post-optional-caches natural-cache cross-cache)
661
         (loop for solutions across base-cache
662
               do (progn (check-query-status *query*)
663
                         (incf solution-count (array-dimension solutions 0))
664
                         (ecase type
665
                           (:natural (base-natural-processor solutions))
666
                           (:cross (base-cross-processor solutions)))))
667
         (complete-field destination)))))
668
 
669
 
670
 ;;; (setq *leftjoin-operator* 'process-balanced-leftjoin)
671
 
672
 (defun process-balanced-leftjoin (destination base-source optional-source base-dimensions optional-dimensions key-dimensions
673
                                                    &key test count offset
674
                                                    (start offset) (end (when count (+ count (or start 0)))))
675
   "Generate a stream of left join solutions to a destination given a solution sources dimension lists for
676
  the base and optional solutions fields, and a predicate expression. Pull from the sources repeatedly to
677
  obtain a stream solutions pairs until either the optional side is empty, in which case unmatched base solutions
678
  are emitted un-extended, or the base side is empty, in which case the operation is complete.
679
  Combine the sources either as a natural or a cross join depending on whether they share any dimensions.
680
  In the natural case, hash and cache solutions based on the shared dimensions.
681
 
682
  This version runs where the slice includes a limit"
683
   
684
   (declare (type (or channel (function ((or array null)) t)) destination)
685
            (type (or channel (function () (or array null))) base-source optional-source))
686
   (assert-argument-types process-leftjoin
687
     (destination (or channel function))
688
     (base-source (or channel function))
689
     (optional-source (or channel function))
690
     (base-dimensions list)
691
     (optional-dimensions list)
692
     (start (or null (integer 0)))
693
     (end (or null (integer 0))))
694
   (incf-stat *algebra-operations*)
695
   (unless start (setf start 0))
696
   (when end (setf end (max start end)))
697
   (trace-algebra process-balanced-leftjoin base-source optional-source base-dimensions optional-dimensions key-dimensions
698
                  :start start :end end :test test)
699
   
700
   (multiple-value-bind (type result-dimensions
701
                         base-cache-write-operator base-cache-read-operator
702
                         optional-cache-write-operator optional-cache-read-operator
703
                         result-cache-operator predicate collector)
704
                        (compute-balanced-leftjoin-operators base-dimensions optional-dimensions key-dimensions test)
705
     (declare (type (function (array fixnum array fixnum array fixnum) t) collector))
706
     (let* ((result-page-width (channel-page-width destination))
707
            (result-page-length (channel-page-length destination))
708
            (result-page nil)
709
            (result-index result-page-length)
710
            (result-cache (make-term-id-cache :single-thread t))
711
            (base-page-width (length base-dimensions))
712
            (optional-page-width (length optional-dimensions))
713
            (base-natural-cache (when (eq type :natural) (make-term-id-cache :single-thread t)))
714
            (optional-natural-cache (when (eq type :natural) (make-term-id-cache :single-thread t)))
715
            (base-cross-cache (when (eq type :cross) (make-array 32 :fill-pointer 0 :adjustable t)))
716
            (optional-cross-cache (when (eq type :cross) (make-array 32 :fill-pointer 0 :adjustable t)))
717
            (base-collected? (when (eq type :cross) (make-array 32 :fill-pointer 0 :adjustable t)))
718
            (result-count 0)
719
            (solution-count 0)
720
            ;; for sp2b-q6@250k this reduced the time from 117 to 81 seconds on hetzner
721
            (*store->spocq-term-registry* (if test
722
                                            (copy-registry *store->spocq-term-registry* :single-thread t)
723
                                            *store->spocq-term-registry*)))
724
       (declare (type fixnum result-index result-count solution-count))
725
       (assert (= (length result-dimensions) result-page-width) ()
726
               "Channel and operation dimensions do not match: ~a: ~a." destination result-dimensions)
727
       (labels ((base-natural-processor (base-page)
728
                  ;(declare (type (function (array fixnum hash-table) list) optional-cache-read-operator base-cache-write-operator))
729
                  (assert (= base-page-width (array-dimension base-page 1)))
730
                  (handler-bind ((error (lambda (c) (declare (ignore c)) (throw :return-nil nil))))
731
                    (dotimes (base-index (array-dimension base-page 0))
732
                      (let ((collected? nil))
733
                        (loop for (optional-page optional-index)
734
                              in (funcall optional-cache-read-operator base-page base-index optional-natural-cache)
735
                              if (or (null predicate)
736
                                     (call-predicate predicate base-page base-index optional-page optional-index))
737
                              do (progn (setf collected? t)
738
                                        (collect-solution base-page base-index optional-page optional-index)))
739
                        (let ((base-cache-entry (funcall base-cache-write-operator base-page base-index base-natural-cache)))
740
                          (when collected?
741
                            (setf (cddr (first base-cache-entry)) t)))))))
742
                (optional-natural-processor (optional-page)
743
                  ;(declare (type (function (array fixnum hash-table) t) base-cache-read-operator))
744
                  (assert (= optional-page-width (array-dimension optional-page 1)))
745
                  (dotimes (optional-index (array-dimension optional-page 0))
746
                    (loop for page-index-matched?
747
                          in (funcall base-cache-read-operator optional-page optional-index base-natural-cache)
748
                          for (base-page base-index . nil) = page-index-matched?
749
                          if (or (null predicate)
750
                                 (call-predicate predicate base-page base-index optional-page optional-index))
751
                          do (progn (setf (cddr page-index-matched?) t)
752
                                    (collect-solution base-page base-index optional-page optional-index)))
753
                    (funcall optional-cache-write-operator optional-page optional-index optional-natural-cache)))
754
                (base-natural-complete ()
755
                  (flet ((emit-if-unmatched (key base-page-indices)
756
                           (declare (ignore key))
757
                           (loop for (base-page base-index . matched?) in base-page-indices
758
                                 unless matched?
759
                                 do (collect-solution base-page base-index nil nil))))
760
                    (declare (dynamic-extent #'emit-if-unmatched))
761
                    (maphash #'emit-if-unmatched base-natural-cache)))
762
                (base-cross-processor (base-page)
763
                  (vector-push-extend base-page base-cross-cache)
764
                  (let ((page-collected? (make-array (array-dimension base-page 0) :initial-element nil)))
765
                    (vector-push-extend page-collected? base-collected?)
766
                    (handler-bind ((error (lambda (c) (declare (ignore c)) (throw :return-nil nil))))
767
                      (dotimes (base-index (array-dimension base-page 0))
768
                        (let ((collected? nil))
769
                          (loop for optional-page across optional-cross-cache
770
                                do (dotimes (optional-index (array-dimension optional-page 0) t)
771
                                     (when (or (null predicate)
772
                                               (call-predicate predicate base-page base-index optional-page optional-index))
773
                                       (setf collected? t)
774
                                       (collect-solution base-page base-index optional-page optional-index))))
775
                          (when collected?
776
                            (setf (aref page-collected? base-index) t)))))))
777
                (optional-cross-processor (optional-page)
778
                  (vector-push-extend optional-page optional-cross-cache)
779
                  (handler-bind ((error (lambda (c) (declare (ignore c)) (throw :return-nil nil))))
780
                    (dotimes (optional-index (array-dimension optional-page 0))
781
                      (loop for base-page across base-cross-cache
782
                            for page-collected? across base-collected?
783
                            do (dotimes (base-index (array-dimension optional-page 0) t)
784
                                 (when (or (null predicate)
785
                                           (call-predicate predicate base-page base-index optional-page optional-index))
786
                                   (collect-solution base-page base-index optional-page optional-index)
787
                                   (setf (aref page-collected? base-index) t)))))))
788
                (base-cross-complete ()
789
                  (loop for base-page across base-cross-cache
790
                        for page-collected? across base-collected?
791
                        do (loop for collected? across page-collected?
792
                                 for base-index from 0
793
                                 unless collected?
794
                                 do (collect-solution base-page base-index nil nil))))
795
                (call-predicate (predicate left-page left-index right-page right-index)
796
                  (declare (type (function (array fixnum array fixnum) boolean) predicate))
797
                  (catch :return-nil
798
                    (funcall predicate left-page left-index right-page right-index)))
799
                (collect-solution (base-page base-index optional-page optional-index)
800
                  (declare (type (function (array fixnum array fixnum hash-table) boolean) result-cache-operator))
801
                  (when (zerop result-count)
802
                    (trace-algebra process-leftjoin.first-solution base-dimensions solution-count))
803
                  (when (zerop result-count)
804
                    (trace-algebra process-leftjoin.first-solution base-dimensions solution-count))
805
                  (when (and (null (funcall result-cache-operator base-page base-index optional-page optional-index result-cache))
806
                             (> (incf result-count) start))
807
                    (next-solution-location)
808
                    (funcall collector result-page result-index base-page base-index optional-page optional-index)
809
                    (when (and end (>= result-count end)) (complete-solutions))))
810
                (next-solution-location ()
811
                  ;; return a page (possible newly created) and the next free location in that page
812
                  (when (>= (incf result-index) result-page-length)
813
                    (when result-page (put-page result-page))
814
                    (setf result-page (new-field-page destination result-page-length result-page-width)
815
                          result-index 0)))
816
                (complete-solutions ()
817
                  (when result-page
818
                    (let ((page-result-count (1+ result-index)))
819
                      (when (< page-result-count result-page-length)
820
                        (setf result-page
821
                              (adjust-page result-page (list page-result-count result-page-width)))))
822
                    (put-page result-page))
823
                  (complete-field destination)
824
                  (incf-stat *solutions-processed* solution-count)
825
                  (incf-stat *solutions-constructed* result-count)
826
                  (trace-algebra process-base-dominant-leftjoin base-dimensions optional-dimensions solution-count result-count)
827
                  (return-from process-balanced-leftjoin (values solution-count result-count)))
828
                (put-page (page)
829
                  (trace-data process-balanced-leftjoin.put-page destination result-dimensions page)
830
                  (put-field-page destination page)))
831
         (unless (and (plusp result-page-length) (or (null end) (> end start))) (complete-solutions))
832
         (labels ((base-page-step ()
833
                    (check-query-status *query*)
834
                    (let ((base-page (get-field-page base-source)))
835
                      (trace-data process-balanced-leftjoin.get-base-page base-source base-page)
836
                      (cond (base-page 
837
                             (incf solution-count (array-dimension base-page 0))
838
                             (let ((cache-page (copy-page base-page)))
839
                               (ecase type
840
                                 (:natural (base-natural-processor cache-page))
841
                                 (:cross (base-cross-processor cache-page))))
842
                             (release-field-page base-source base-page))
843
                            (t
844
                             (ecase type
845
                               (:natural (base-natural-complete))
846
                               (:cross (base-cross-complete)))
847
                             (complete-solutions)))))
848
                  (optional-page-step ()
849
                    (let ((optional-page (get-field-page optional-source)))
850
                      (trace-data process-balanced-leftjoin.get-optional-page optional-source optional-page)
851
                      (cond (optional-page
852
                             (incf solution-count (array-dimension optional-page 0))
853
                             (let ((cache-page (copy-page optional-page)))
854
                               (ecase type
855
                                 (:natural (optional-natural-processor cache-page))
856
                                 (:cross (optional-cross-processor cache-page))))
857
                             (release-field-page optional-source optional-page))
858
                            (t
859
                             (loop (base-page-step)))))))
860
         (loop (base-page-step)
861
               (optional-page-step)))))))
862
 
863
 #+(or) ;; with collected indicators
864
 (defun process-base-dominant-leftjoin (destination base-source optional-source base-dimensions optional-dimensions key-dimensions
865
                                            &key test count offset
866
                                            (start offset) (end (when count (+ count (or start 0)))))
867
   "Generate a stream of left join solutions to a destination given a solution source and dimension lists for
868
  the base and optional solutions fields, and a predicate expression. The base-dominant version feeds the
869
  base field into optional-side bgps to constraint their matches. In order that this not deadlock, the entire
870
  base stream must be read and cached in order to ensure that the optional stream is generated.
871
  The simple method is to just cache the page sequence and re-read it in order to process the join itself.
872
  Page-by-page does not work, as the dependent side could be something like a sliced, sorted subselect which delays
873
  the optional side until it is complete.
874
 
875
 Get from the base repeatedly to
876
  obtain a stream of (side . solution-page) pairs until the page is null. Use the dimensions to distinguish left from optional.
877
  Combine the respective source either as a natural or a cross join depending
878
  on whether they share any dimensions. In the former case, hash and cache solutions based on the shared
879
  dimensions."
880
 
881
   (declare (type (or channel (function ((or array null)) t)) destination)
882
            (type (or channel (function () (or array null))) base-source optional-source))
883
   (assert-argument-types process-leftjoin
884
     (destination (or channel function))
885
     (base-source (or channel function))
886
     (optional-source (or channel function))
887
     (base-dimensions list)
888
     (optional-dimensions list)
889
     (start (or null (integer 0)))
890
     (end (or null (integer 0))))
891
   (incf-stat *algebra-operations*)
892
   (unless start (setf start 0))
893
   (when end (setf end (max start end)))
894
   (trace-algebra process-base-dominant-leftjoin base-source optional-source base-dimensions optional-dimensions key-dimensions
895
                  :start start :end end :test test)
896
 
897
   (multiple-value-bind (type result-dimensions base-cache-operator optional-cache-operator result-cache-operator predicate collector)
898
                        (compute-base-dominant-leftjoin-operators base-dimensions optional-dimensions key-dimensions test)
899
     (declare (type (function (array fixnum array fixnum array fixnum) t) collector))
900
     (let* ((result-page-width (length result-dimensions))
901
            (result-page nil)
902
            (result-page-length *field-page-length*)
903
            (result-index result-page-length)
904
            (result-cache (make-term-id-cache :single-thread t))
905
            (natural-cache (when (eq type :natural) (make-term-id-cache :single-thread t)))
906
            (cross-cache (when (eq type :cross) (make-array 32 :fill-pointer 0 :adjustable t)))
907
            (result-count 0)
908
            (solution-count 0)
909
            ;; for sp2b-q6@250k this reduced the time from 117 to 81 seconds on hetzner
910
            (*store->spocq-term-registry* (if test
911
                                            (copy-registry *store->spocq-term-registry* :single-thread t)
912
                                            *store->spocq-term-registry*)))
913
       (declare (type fixnum result-index result-count solution-count))
914
       (unless (plusp result-page-length)
915
         (put-field-page destination nil)
916
         (return-from process-base-dominant-leftjoin (values 0 0)))
917
       (labels ((base-natural-processor (base-page)
918
                  (declare (type (function (array fixnum hash-table) t) base-cache-operator))
919
                  (dotimes (base-index (array-dimension base-page 0))
920
                    (funcall base-cache-operator base-page base-index natural-cache)))
921
                (base-uncollected-operator ()
922
                  (loop for location being each hash-value of natural-cache
923
                        for (base-page base-index . collected?) = location
924
                        unless collected?
925
                        do (collect-solution base-page base-index nil nil)))
926
                (optional-natural-processor (optional-page)
927
                  (declare (type (function (array fixnum hash-table) list) optional-cache-operator))
928
                  (handler-bind ((error (lambda (c) (declare (ignore c)) (throw :return-nil nil))))
929
                    (dotimes (optional-index (array-dimension optional-page 0))
930
                      (loop for location in (funcall optional-cache-operator optional-page optional-index natural-cache)
931
                            for (optional-page optional-index) = location
932
                            if (or (null predicate)
933
                                   (call-predicate predicate optional-page optional-index optional-page optional-index))
934
                            do (setf (cddr location) t)
935
                            (collect-solution optional-page optional-index optional-page optional-index)))))
936
                (base-cross-processor (base-page)
937
                  (vector-push-extend base-page cross-cache))
938
                (optional-cross-processor (optional-page)
939
                  (handler-bind ((error (lambda (c) (declare (ignore c)) (throw :return-nil nil))))
940
                    (dotimes (base-index (array-dimension base-page 0))
941
                      (let ((collected? nil))
942
                        (loop for optional-page across cross-cache
943
                              do (dotimes (optional-index (array-dimension optional-page 0) t)
944
                                   (when (or (null predicate)
945
                                             (call-predicate predicate base-page base-index optional-page optional-index))
946
                                     (setf collected? t)
947
                                     (collect-solution base-page base-index optional-page optional-index))))
948
                        (unless collected?
949
                          (collect-solution base-page base-index nil nil))))))
950
                (call-predicate (predicate left-page left-index right-page right-index)
951
                  (declare (type (function (array fixnum array fixnum) boolean) predicate))
952
                  (catch :return-nil
953
                    (funcall predicate left-page left-index right-page right-index)))
954
                (collect-solution (base-page base-index optional-page optional-index)
955
                  (declare (type (function (array fixnum array fixnum hash-table) boolean) result-cache-operator))
956
                  (when (and (null (funcall result-cache-operator base-page base-index optional-page optional-index result-cache))
957
                             (> (incf result-count) start))
958
                    (next-solution-location)
959
                    (funcall collector result-page result-index base-page base-index optional-page optional-index)
960
                    (when (and end (>= result-count end)) (complete-solutions))))
961
                (next-solution-location ()
962
                  ;; return a page (possible newly created) and the next free location in that page
963
                  (when (>= (incf result-index) result-page-length)
964
                    (when result-page (put-page result-page))
965
                    (setf result-page (new-field-page destination result-page-length result-page-width)
966
                          result-index 0)))
967
                (complete-solutions ()
968
                  (when result-page
969
                    (let ((page-result-count (1+ result-index)))
970
                      (when (< page-result-count result-page-length)
971
                        (setf result-page
972
                              (adjust-page result-page (list page-result-count result-page-width)))))
973
                    (put-page result-page))
974
                  (put-page nil)
975
                  (incf-stat *solutions-processed* solution-count)
976
                  (incf-stat *solutions-constructed* result-count)
977
                  (return-from process-base-dominant-leftjoin (values solution-count result-count)))
978
                (put-page (page)
979
                  (trace-algebra process-base-dominant-leftjoin destination result-dimensions page)
980
                  (funcall destination page)))
981
         (unless (plusp result-page-length) (complete-solutions))
982
         (loop for solutions = (get-field-page base-source)
983
               until (null solutions)
984
               do (progn (incf solution-count (array-dimension solutions 0))
985
                         (ecase type
986
                           (:natural (base-natural-processor solutions))
987
                           (:cross (base-cross-processor solutions)))
988
                         (check-query-status *query*)))
989
         (loop for solutions = (get-field-page optional-source)
990
               until (null solutions)
991
               do (progn (check-query-status *query*)
992
                         (incf solution-count (array-dimension solutions 0))
993
                         (ecase type
994
                           (:natural (optional-natural-processor solutions))
995
                           (:cross (optional-cross-processor solutions)))))
996
         (when (eq type :natural)
997
           (base-uncollected-operator))
998
         (complete-field destination)))))
999
 
1000
 (defun funcall-cache-operator (optional-cache-operator optional-page optional-index natural-cache)
1001
   (funcall optional-cache-operator optional-page optional-index natural-cache))
1002
 
1003
 (defun call-solution-processor (base-natural-processor solutions)
1004
   (funcall base-natural-processor solutions))
1005
 
1006
 (defun call-mp-destination (mp-destination page)
1007
   (funcall mp-destination page))
1008
 
1009
 (defun process-mp-leftjoin (destination base-source optional-source base-dimensions optional-dimensions key-dimensions
1010
                                          &key test)
1011
   "Generate a stream of left join solutions to a destination given a solution source and dimension lists for
1012
  the base and optional solutions fields, and a predicate expression. Invoke the source function repeatedly to
1013
  obtain a stream of (side . solution-page) pairs until the page is null. Use the dimensions to distinguish left from optional.
1014
  Combine the respective source either as a natural or a cross join depending
1015
  on whether they share any dimensions. In the former case, hash and cache solutions based on the shared
1016
  dimensions.
1017
   This version creates a thread complement equal to *algebra-thread-count* to perform the cobination phase."
1018
 
1019
   (declare (type (or channel (function ((or array null)) t)) destination)
1020
            (type (or channel (function () (or array null))) base-source optional-source))
1021
   (assert-argument-types process-leftjoin
1022
     (destination (or channel function))
1023
     (base-source (or channel function))
1024
     (optional-source (or channel function))
1025
     (base-dimensions list)
1026
     (optional-dimensions list)
1027
     )
1028
   (incf-stat *algebra-operations*)
1029
   (trace-algebra process-mp-leftjoin base-source optional-source base-dimensions optional-dimensions key-dimensions :test test)
1030
 
1031
   (multiple-value-bind (type result-dimensions base-cache-operator optional-cache-operator result-cache-operator predicate collector)
1032
                        (compute-leftjoin-operators base-dimensions optional-dimensions key-dimensions test)
1033
     (declare (type (function (array fixnum array fixnum array fixnum) t) collector))
1034
 
1035
     (let* ((result-page-width (channel-page-width destination))
1036
            (result-page-length (channel-page-length destination))
1037
            (result-cache (make-term-id-cache :single-thread nil))
1038
            (result-count 0)
1039
            (solution-count 0)
1040
            (natural-cache (when (eq type :natural) (make-term-id-cache :single-thread t)))
1041
            (cross-cache (when (eq type :cross) (make-array 32 :fill-pointer 0 :adjustable t)))
1042
            (base-source-lock (bt:make-lock "leftjoin-base-lock"))
1043
            (destination-lock (bt:make-lock "leftjoin-destination-lock"))
1044
            (base-complete nil))
1045
       (assert (= (length result-dimensions) result-page-width) ()
1046
               "Channel and operation dimensions do not match: ~a: ~a." destination result-dimensions)
1047
       (labels ((optional-natural-processor (optional-page)
1048
                  (declare (type (function (array fixnum hash-table) t) optional-cache-operator))
1049
                  (dotimes (optional-index (array-dimension optional-page 0))
1050
                    (funcall-cache-operator optional-cache-operator optional-page optional-index natural-cache)))
1051
                (optional-cross-processor (optional-page)
1052
                  (vector-push-extend optional-page cross-cache))
1053
                (cache-size ()
1054
                  (if natural-cache
1055
                    (hash-table-count natural-cache)
1056
                    (length cross-cache))))
1057
         (loop for solutions = (get-field-page optional-source)
1058
               until (null solutions)
1059
               do (progn (check-query-status *query*)
1060
                         (incf solution-count (array-dimension solutions 0))
1061
                         (when (and *solution-count-limit*
1062
                                    (> (cache-size) *solution-count-limit*))
1063
                           (log-warn "leftjoin: terminated @~a optional solutions. (~a x ~a)"
1064
                                     (cache-size) base-dimensions optional-dimensions)
1065
                           (terminate-task *query*))
1066
                         (ecase type
1067
                           (:natural (optional-natural-processor solutions))
1068
                           (:cross (optional-cross-processor solutions))))))
1069
       (labels ((mp-destination (page)
1070
                  (bt:with-lock-held (destination-lock)
1071
                    (incf result-count (array-dimension page 0))
1072
                    (put-field-page destination page)))
1073
                (mp-base-source ()
1074
                  (bt:with-lock-held (base-source-lock)
1075
                    (check-query-status *query*)
1076
                    (when (and *solution-count-limit*
1077
                               (> solution-count *solution-count-limit*))
1078
                      (log-warn "mp-leftjoin: terminated @~a solutions. (~a x ~a)"
1079
                                solution-count base-dimensions optional-dimensions)
1080
                      (terminate-task *query*))
1081
                    (unless base-complete
1082
                      (let ((page (get-field-page base-source)))
1083
                        (if page
1084
                          (incf solution-count (array-dimension page 0))
1085
                          (setf base-complete t))
1086
                        page))))
1087
                (process-base-solutions ()
1088
                  (let ((result-page nil)
1089
                        (result-index result-page-length)
1090
                        (*store->spocq-term-registry* (if test
1091
                                                        (copy-registry *store->spocq-term-registry* :single-thread t)
1092
                                                        *store->spocq-term-registry*)))
1093
                    (declare (type fixnum result-index result-count solution-count))
1094
                    (labels ((base-natural-processor (base-page)
1095
                               (declare (type (function (array fixnum hash-table) list) base-cache-operator))
1096
                               (handler-bind ((error (lambda (c) (declare (ignore c)) (throw :return-nil nil))))
1097
                                 (dotimes (base-index (array-dimension base-page 0))
1098
                                   (let ((collected? nil))
1099
                                     (loop for (optional-page optional-index) in (funcall-cache-operator base-cache-operator base-page base-index natural-cache)
1100
                                           if (or (null predicate)
1101
                                                  (call-predicate predicate base-page base-index optional-page optional-index))
1102
                                           do (setf collected? t)
1103
                                           (collect-solution base-page base-index optional-page optional-index))
1104
                                     (unless collected?
1105
                                       (collect-solution base-page base-index nil nil))))))
1106
                             (base-cross-processor (base-page)
1107
                               (handler-bind ((error (lambda (c) (declare (ignore c)) (throw :return-nil nil))))
1108
                                 (dotimes (base-index (array-dimension base-page 0))
1109
                                   (let ((collected? nil))
1110
                                     (loop for optional-page across cross-cache
1111
                                           do (dotimes (optional-index (array-dimension optional-page 0) t)
1112
                                                (when (or (null predicate)
1113
                                                          (call-predicate predicate base-page base-index optional-page optional-index))
1114
                                                  (setf collected? t)
1115
                                                  (collect-solution base-page base-index optional-page optional-index))))
1116
                                     (unless collected?
1117
                                       (collect-solution base-page base-index nil nil))))))
1118
                             (call-predicate (predicate left-page left-index right-page right-index)
1119
                               (declare (type (function (array fixnum array fixnum) boolean) predicate))
1120
                               (catch :return-nil
1121
                                 (funcall predicate left-page left-index right-page right-index)))
1122
                             (collect-solution (base-page base-index optional-page optional-index)
1123
                               (declare (type (function (array fixnum array fixnum hash-table) boolean) result-cache-operator))
1124
                               (when (null (funcall result-cache-operator base-page base-index optional-page optional-index result-cache))
1125
                                 (next-solution-location)
1126
                                 (incf result-count)
1127
                                 (funcall collector result-page result-index base-page base-index optional-page optional-index)))
1128
                             (next-solution-location ()
1129
                               ;; return a page (possible newly created) and the next free location in that page
1130
                               (when (>= (incf result-index) result-page-length)
1131
                                 (when result-page (put-page result-page))
1132
                                 (setf result-page (new-field-page destination result-page-length result-page-width)
1133
                                       result-index 0)))
1134
                             (complete-solutions ()
1135
                               (when result-page
1136
                                 (let ((page-result-count (1+ result-index)))
1137
                                   (when (< page-result-count result-page-length)
1138
                                     (setf result-page
1139
                                           (adjust-page result-page (list page-result-count result-page-width)))))
1140
                                 (put-page result-page)))
1141
                             (put-page (page)
1142
                               (trace-algebra process-mp-leftjoin destination result-dimensions page)
1143
                               (call-mp-destination #'mp-destination page)))
1144
                      (loop for solutions = (mp-base-source)
1145
                            until (null solutions)
1146
                            do (ecase type
1147
                                 (:natural (call-solution-processor #'base-natural-processor solutions))
1148
                                 (:cross (call-solution-processor #'base-cross-processor solutions))))
1149
                      (complete-solutions)))))
1150
         
1151
         (when (plusp result-page-length)
1152
           (let ((threads (loop for i from 0 below (algebra-operator-thread-count)
1153
                                collect (bt:make-thread #'process-base-solutions 
1154
                                                        :name (format nil "leftjoin-~a" i)
1155
                                                        :initial-bindings (acons '*query* *query*
1156
                                                                                 (acons '*task* *task*
1157
                                                                                        (acons '*repository* *repository* ())))))))
1158
             (loop for thread in threads do (bt:join-thread thread)))
1159
           (incf-stat *solutions-processed* solution-count)
1160
           (incf-stat *solutions-constructed* result-count))
1161
         (complete-field destination)
1162
         (trace-algebra process-mp-leftjoin base-dimensions optional-dimensions solution-count result-count)
1163
         (values solution-count result-count)))))
1164
 
1165
 
1166
 (defun compute-leftjoin-operators (base-dimensions optional-dimensions key-dimensions test)
1167
   (let ((result-dimensions (union-dimensions base-dimensions optional-dimensions)))
1168
     (if (intersection base-dimensions optional-dimensions)
1169
       (values :natural result-dimensions
1170
               (compute-read-cache-op base-dimensions optional-dimensions :key-dimensions key-dimensions)
1171
               (compute-write-cache-op optional-dimensions base-dimensions :key-dimensions key-dimensions)
1172
               (compute-optional-write-cache-op base-dimensions optional-dimensions)
1173
               (when test
1174
                 (compute-binary-predicate test base-dimensions optional-dimensions :handle-errors nil))
1175
               (compute-optional-collector result-dimensions base-dimensions optional-dimensions))
1176
       (values :cross result-dimensions
1177
               nil
1178
               nil
1179
               (compute-optional-write-cache-op base-dimensions optional-dimensions)
1180
               (when test
1181
                 (compute-binary-predicate test base-dimensions optional-dimensions :handle-errors nil))
1182
               (compute-optional-collector result-dimensions base-dimensions optional-dimensions)))))
1183
 
1184
 (defun compute-base-dominant-leftjoin-operators (base-dimensions optional-dimensions key-dimensions test)
1185
   (let ((result-dimensions (union-dimensions base-dimensions optional-dimensions)))
1186
     (if (intersection base-dimensions optional-dimensions)
1187
       (values :natural result-dimensions
1188
               (compute-write-cache-op base-dimensions optional-dimensions :key-dimensions key-dimensions)
1189
               (compute-read-cache-op optional-dimensions base-dimensions :key-dimensions key-dimensions)
1190
               (compute-optional-write-cache-op base-dimensions optional-dimensions)
1191
               (when test
1192
                 (compute-binary-predicate test base-dimensions optional-dimensions :handle-errors nil))
1193
               (compute-optional-collector result-dimensions base-dimensions optional-dimensions))
1194
       (values :cross result-dimensions
1195
               nil
1196
               nil
1197
               (compute-optional-write-cache-op base-dimensions optional-dimensions)
1198
               (when test
1199
                 (compute-binary-predicate test base-dimensions optional-dimensions :handle-errors nil))
1200
               (compute-optional-collector result-dimensions base-dimensions optional-dimensions)))))
1201
 
1202
 (defun compute-balanced-leftjoin-operators (base-dimensions optional-dimensions key-dimensions test)
1203
   (let ((result-dimensions (union-dimensions base-dimensions optional-dimensions)))
1204
     (if (intersection base-dimensions optional-dimensions)
1205
       (values :natural result-dimensions
1206
               (compute-write-cache-op base-dimensions optional-dimensions :key-dimensions key-dimensions)
1207
               (compute-read-cache-op optional-dimensions base-dimensions :key-dimensions key-dimensions)
1208
               (compute-write-cache-op optional-dimensions base-dimensions :key-dimensions key-dimensions)
1209
               (compute-read-cache-op base-dimensions optional-dimensions :key-dimensions key-dimensions)
1210
               (compute-optional-write-cache-op base-dimensions optional-dimensions)
1211
               (when test
1212
                 (compute-binary-predicate test base-dimensions optional-dimensions :handle-errors nil))
1213
               (compute-optional-collector result-dimensions base-dimensions optional-dimensions))
1214
       (values :cross result-dimensions
1215
               nil
1216
               nil
1217
               nil
1218
               nil
1219
               (compute-optional-write-cache-op base-dimensions optional-dimensions)
1220
               (when test
1221
                 (compute-binary-predicate test base-dimensions optional-dimensions :handle-errors nil))
1222
               (compute-optional-collector result-dimensions base-dimensions optional-dimensions)))))
1223
 
1224
 
1225
 #|
1226
 
1227
 (run-test-query "PREFIX  foaf:   <http://xmlns.com/foaf/0.1/>
1228
 
1229
 SELECT ?mbox ?name
1230
    {
1231
      ?x foaf:mbox ?mbox .
1232
      OPTIONAL { ?x foaf:name  ?name } .
1233
    }
1234
 " :repository-id (lookup-repository-id :repository-name "optional-dawg-optional-001" :account-name "jhacker"))
1235
 
1236
 (run-test-query "PREFIX  foaf:   <http://xmlns.com/foaf/0.1/>
1237
 SELECT ?person ?nick ?page ?img ?name ?firstN
1238
 
1239
     ?person foaf:nick ?nick
1240
     OPTIONAL { ?person foaf:isPrimaryTopicOf ?page } 
1241
     OPTIONAL { 
1242
         ?person foaf:name ?name 
1243
         { ?person foaf:depiction ?img } UNION 
1244
         { ?person foaf:firstName ?firstN } 
1245
     } FILTER ( bound(?page) || bound(?img) || bound(?firstN) ) 
1246
 
1247
 " :repository-id (lookup-repository-id :repository-name "optional-dawg-optional-complex-1" :account-name "jhacker"))
1248
 
1249
 (run-test-query "SELECT ?v
1250
 {
1251
     ?x ?p ?v .
1252
 }
1253
 " :repository-id (lookup-repository-id :repository-name "distinct-no-distinct-3" :account-name "jhacker"))
1254
 
1255
 (run-test-query "SELECT ?v
1256
 {
1257
     ?x ?p ?v .
1258
 }
1259
 " :repository-id (lookup-repository-id :repository-name "distinct-no-distinct-1" :account-name "jhacker"))
1260
 
1261
 (run-test-query "PREFIX : <http://example.org/>
1262
 PREFIX foaf: <http://xmlns.com/foaf/0.1/>
1263
 
1264
 SELECT *
1265
 WHERE { ?X foaf:knows* ?Y } 
1266
 ORDER BY ?X ?Y
1267
 
1268
 "
1269
                 :repository-id (lookup-repository-id :repository-name "property-path-pp14" :account-name "jhacker"))
1270
 
1271
 ;;; chained leftjoin optimization
1272
 
1273
 (pprint-sse (parse-sparql "
1274
  select * where {
1275
   ?s rdf:type <http://ontology/a> .
1276
   ?s dc:creator <http://example.com/a> .
1277
   optional { ?s <http://ontology/slot/a> ?a }
1278
   optional { ?s <http://ontology/slot/b> ?b }
1279
  }
1280
 "))
1281
 
1282
 ;;; yields
1283
 
1284
 (select (leftjoin (leftjoin (bgp (triple ?:s
1285
                                           http://www.w3.org/1999/02/22-rdf-syntax-ns#:type
1286
                                           <http://ontology/a>)
1287
                                   (triple ?:s
1288
                                           <http://purl.org/dc/terms/creator>
1289
                                           <http://example.com/a>))
1290
                              (bgp (triple ?:s <http://ontology/slot/a> ?:a)))
1291
                    (bgp (triple ?:s <http://ontology/slot/b> ?:b)))
1292
   (?:s ?:a ?:b))
1293
 
1294
 
1295
 of which the forms
1296
 (leftjoin (leftjoin <x> (bgp (triple s0 p0 o0)))
1297
           (bgp (triple s1 p1 o1)))
1298
 =>
1299
 (leftjoin <x> (bgp (sum (triple s0 p0 o0)
1300
                         (triple s1 p1 o1))))
1301
 
1302
 
1303
 
1304
 (leftjoin (leftjoin <x> (bgp (triple s0 p0 o0)))
1305
           (bgp (sum (triple s1 p1 o1)
1306
                     (triple s2 p2 o2))))
1307
 =>
1308
 (leftjoin <x> (bgp (sum (triple s0 p0 o0)
1309
                         (triple s1 p1 o1)
1310
                         (triple s2 p2 o2))))
1311
 
1312
 |#