Coverage report: /development/source/library/org/datagraph/spocq-shard/src/algebra/operators/order.lisp
| Kind | Covered | All | % |
| expression | 245 | 533 | 46.0 |
| branch | 11 | 40 | 27.5 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
3
(in-package :org.datagraph.spocq.implementation)
6
;; the sparql spec names it order-by, byt arq callis it order
8
(defmacro spocq.a:|order| (solution-field order-predicate-form &rest args &key start end)
9
"( ( solutionField (function (solution solution) xsd:boolean) ) solutionField )
10
A SORT form applies an ordering predicate to a given solution field
11
to produce a result filed in which each successive solution pair satisfies
13
(declare (ignore end start))
14
(apply #'macroexpand-order solution-field order-predicate-form args))
17
(defun macroexpand-order (solution-field order-predicate-form &rest args &key end limit offset start)
18
(declare (ignore end limit offset start))
19
(let ((variables (expression-variables order-predicate-form)))
20
(setf (variable-opacity variables) :transparent)
21
(setf args (apply #'canonicalize-algebra-arguments args))
22
`(spocq.e:order (spocq.e::with-reference-dimensions ,variables
23
(spocq.e::with-sort-order ,variables ,solution-field))
24
',order-predicate-form ,@args)))
25
;;; (expand-query "select * where {?s ?p ?o} order by ?o desc (?p)" :repository-id "test/test" :agent (system-agent))
27
(defgeneric spocq.e:order (field order-predicate-form &key start end)
28
(:method :before ((field t) (order-predicate-form t) &key start end)
29
(assert-argument-types spocq.e:order
30
(start (or null (integer 0)))
31
(end (or null (integer 0))))
32
(incf-stat *algebra-operations*)
33
(trace-algebra spocq.e:order field order-predicate-form
34
:start start :end end))
36
(:method ((field solution-generator) order-predicate-form &rest args)
37
(declare (dynamic-extent args))
38
(apply #'spocq.e:stream-order field order-predicate-form args)))
40
(defparameter *process-order.maximum-slice* 0)
42
(defun expression-sort-dimensions (order-predicate-form)
43
"Return a list of sort variables
44
the elements, in order
45
each is dimension-sorted
46
asc/desc are prepended"
47
(loop for sort-specification in order-predicate-form
48
collect (typecase sort-specification
49
(cons (if (member (first sort-specification) '(spocq.a:|asc| spocq.a:|desc|))
50
(cons (first sort-specification)
51
(expression-dimensions (second sort-specification)))
52
(expression-dimensions sort-specification)))
53
(t sort-specification))))
55
(defun spocq.e:stream-order (field-generator order-predicate-form &rest args &key (start 0) end)
56
(let* ((base-dimensions (solution-generator-dimensions field-generator))
57
(base-sort-dimensions (solution-generator-sort-dimensions field-generator))
58
(sort-dimensions (expression-sort-dimensions order-predicate-form))
59
(result-channel (make-channel :name (list 'spocq.a:|order| (task-id *query*))
60
:dimensions base-dimensions
61
:sort-dimensions sort-dimensions)))
62
;; (print (list :base base-dimensions :sort sort-dimensions :base-sort base-sort-dimensions))
63
(labels ((run-order-thread (result-channel field-generator order-predicate-form args)
64
(let* ((base-dimensions (solution-generator-dimensions field-generator))
65
(base-channel (solution-generator-channel field-generator))
66
(expression (solution-generator-expression field-generator))
67
(*thread-operations* (cons (list 'spocq.a:|order| base-dimensions order-predicate-form)
68
*thread-operations*)))
69
(push 'spocq.a:|order| (channel-name base-channel))
70
(query-run-in-thread *query* expression)
71
(setf (channel-size result-channel) (min (channel-size base-channel)
72
(channel-size result-channel))
73
(channel-page-length result-channel) (min (channel-page-length base-channel)
74
(channel-page-length result-channel)))
75
(if (and base-sort-dimensions
76
(equal base-sort-dimensions
78
(apply #'process-slice result-channel base-channel
81
(if (and end (< (- end start) *process-order.maximum-slice*))
82
(apply #'process-order-slice result-channel base-channel
86
(apply #'process-order result-channel base-channel
91
;; return the binding function to the combination operator
92
(make-solution-generator :operator 'spocq.a:|order|
93
:dimensions base-dimensions
94
:sort-dimensions sort-dimensions
95
:expression (list #'run-order-thread result-channel field-generator order-predicate-form
97
:channel result-channel
98
:constituents (list field-generator)))))
101
;;; list version @9000000 ran 20% faster
102
(defmethod process-order ((destination array-page-channel)
103
(base-source array-page-channel)
104
base-dimensions order-predicate-form
106
"Generate a tream of ordered solutions given a source and a order predicate form."
108
(declare (type list base-dimensions order-predicate-form))
109
(assert-argument-types process-group
110
(base-dimensions list)
111
(order-predicate-form cons))
112
(incf-stat *algebra-operations*)
113
(trace-algebra process-order destination base-source base-dimensions order-predicate-form)
115
;;;!!!HDT : sort predicate and/or key construction requires reference to HDT dictionary
116
(multiple-value-bind (collector key-operator predicate-operators)
117
(compute-order-operators base-dimensions order-predicate-form)
118
(declare (type (function (array fixnum array fixnum) t) collector)
119
(type (function (array fixnum) t) key-operator))
120
(let* ((result-page-width (channel-page-width destination))
121
(result-page-length (channel-page-length destination))
123
(result-index result-page-length)
124
(keyed-solutions '())
125
(*enable-sort-precedence* t)
128
(assert (= (length base-dimensions) result-page-width) ()
129
"Channel and operation dimensions do not match: ~a: ~a." destination base-dimensions)
130
(labels ((base-processor (base-page)
131
(let ((solution-count (array-dimension base-page 0)))
132
(loop for base-index below solution-count
133
do (push (cons (funcall key-operator base-page base-index)
134
(cons base-page base-index))
137
(test-predicates (entry-one entry-two)
138
(handler-case (loop for sub-key-one in (first entry-one)
139
for sub-key-two in (first entry-two)
140
for predicate in predicate-operators
141
do (ecase (call-predicate predicate sub-key-one sub-key-two)
144
(( 1 nil) (return nil))))
146
(call-predicate (predicate v1 v2)
147
(declare (type (function (t t) (or integer null)) predicate))
148
(funcall predicate v1 v2))
149
(collect-solution (base-page base-index)
150
(when (> (incf result-count) start)
151
(next-solution-location)
152
(funcall collector result-page result-index base-page base-index)
153
(when (and end (>= result-count end))
154
(complete-solutions))))
155
(next-solution-location ()
156
;; return a page (possible newly created) and the next free location in that page
157
(when (>= (incf result-index) result-page-length)
158
(when result-page (put-page result-page))
159
(setf result-page (new-field-page destination result-page-length result-page-width)
161
(complete-solutions ()
163
(let ((result-count (1+ result-index)))
164
(when (< result-count result-page-length)
166
(adjust-page result-page (list result-count result-page-width)))))
167
(put-page result-page))
169
(incf-stat *solutions-processed* solution-count)
170
(trace-algebra process-order.complete destination)
171
(return-from process-order (values solution-count result-count)))
173
(trace-data process-order destination base-dimensions (term-value-field page))
175
(put-field-page destination page)
176
(complete-field destination))))
177
(unless (and (plusp result-page-length) (or (null end) (> end start))) (complete-solutions))
178
(rlmdb:with-string-database (db)
179
(do-pages (solutions base-source)
180
(check-query-status *query*)
181
(incf solution-count (base-processor (copy-page solutions)))))
182
(setf keyed-solutions (sort keyed-solutions #'test-predicates))
183
(loop for (key . (page . index)) in keyed-solutions
184
do (collect-solution page index))
185
(complete-solutions)))))
187
(defmethod process-order-slice ((destination array-page-channel)
188
(base-source array-page-channel)
189
base-dimensions order-predicate-form
191
"Generate a sliced stream of ordered solutions given a source and a order predicate form."
193
(declare (type list base-dimensions order-predicate-form))
194
(assert-argument-types process-group
195
(base-dimensions list)
196
(order-predicate-form cons))
197
(incf-stat *algebra-operations*)
198
(trace-algebra process-order-slice destination base-source base-dimensions order-predicate-form)
200
(multiple-value-bind (collector key-operator predicate-operators)
201
(compute-order-operators base-dimensions order-predicate-form)
202
(declare (type (function (array fixnum array fixnum) t) collector)
203
(type (function (array fixnum) t) key-operator))
205
(result-page-width (channel-page-width destination))
206
(result-page-length (channel-page-length destination))
208
(result-index result-page-length)
209
(*enable-sort-precedence* t)
210
(cache (make-array size))
214
(assert (= (length base-dimensions) result-page-width) ()
215
"Channel and operation dimensions do not match: ~a: ~a." destination base-dimensions)
216
(labels ((base-processor (base-page)
217
(let ((page-solution-count (array-dimension base-page 0)))
218
(loop for base-index below page-solution-count
219
for key = (funcall key-operator base-page base-index)
220
do (cond ((zerop solution-count)
221
(setf (aref cache 0) (cons key (cons base-page base-index)))
222
(setf maximum-key key))
223
((test-predicates key maximum-key)
224
(insert-solution key (cons base-page base-index))
225
(setf maximum-key (first (aref cache (1- size)))))
226
((< solution-count size)
227
(setf (aref cache solution-count) (cons key (cons base-page base-index)))
228
(setf maximum-key key)))
229
do (incf solution-count))
230
page-solution-count))
231
(insert-solution (key entry)
232
(let ((successor (find-successor key -1 size)))
233
(assert successor () "process-order: no successor.")
234
(loop for i from (- size 2) to successor
235
do (setf (aref cache (1+ i)) (aref cache i)))
236
(setf (aref cache successor) (cons key entry))))
237
(find-successor (key predecessor successor)
238
;; insert the new entry relative to its key and bump the last entry
239
;; at the outset it is known to preceed the entry at the given position
240
(let ((delta (- successor predecessor)))
243
(let ((test-position (+ predecessor (ash delta -1))))
244
(if (test-predicates key (first (aref cache test-position)))
245
;; if it preceeds the probe, continue below it
246
(find-successor key predecessor test-position)
247
;; otherwise continue above it
248
(find-successor key test-position successor))))))
249
(test-predicates (entry-one entry-two)
250
(handler-case (loop for sub-key-one in (first entry-one)
251
for sub-key-two in (first entry-two)
252
for predicate in predicate-operators
253
do (ecase (call-predicate predicate sub-key-one sub-key-two)
256
(( 1 nil) (return nil))))
258
(call-predicate (predicate v1 v2)
259
(declare (type (function (t t) (or integer null)) predicate))
260
(funcall predicate v1 v2))
261
(collect-solution (base-page base-index)
262
(when (> (incf result-count) start)
263
(next-solution-location)
264
(funcall collector result-page result-index base-page base-index)
265
(when (and end (>= result-count end))
266
(complete-solutions))))
267
(next-solution-location ()
268
;; return a page (possible newly created) and the next free location in that page
269
(when (>= (incf result-index) result-page-length)
270
(when result-page (put-page result-page))
271
(setf result-page (new-field-page destination result-page-length result-page-width)
273
(complete-solutions ()
275
(let ((result-count (1+ result-index)))
276
(when (< result-count result-page-length)
278
(adjust-page result-page (list result-count result-page-width)))))
279
(put-page result-page))
281
(incf-stat *solutions-processed* solution-count)
282
(trace-algebra process-order.complete destination)
283
(return-from process-order-slice (values solution-count result-count)))
285
(trace-data process-order-slice destination base-dimensions (term-value-field page))
287
(put-field-page destination page)
288
(complete-field destination))))
292
(let ((current-key nil)
295
(current-page-original nil))
296
(do-pages (solutions base-source)
297
(check-query-status *query*)
298
(let ((solution-count (array-dimension solutions 0)))
299
(loop for base-index below solution-count
300
for key = (funcall key-operator solutions base-index)
301
if (or (null current-key) (test-predicates key current-key))
302
do (progn (setf current-key key
303
current-index base-index)
304
(unless (eq solutions current-page-original)
305
(setf current-page (copy-page solutions)
306
current-page-original solutions)))))
307
(incf solution-count (array-dimension solutions 0)))
308
(setf result-count 1)
309
(collect-solution current-page current-index)))
311
(do-pages (solutions base-source)
312
(check-query-status *query*)
313
(base-processor (copy-page solutions)))
315
(loop for (nil . (page . index)) across cache
316
do (collect-solution page index))))
317
(complete-solutions)))))
319
(defun compute-order-operators (base-dimensions order-predicate-form)
320
(let ((collector (values (compute-unary-collector base-dimensions base-dimensions))))
321
(multiple-value-bind (key-operator key-predicates)
322
(compute-sort-key-operator base-dimensions order-predicate-form)
330
(defun process-order (destination base-source base-dimensions order-predicate-form)
331
"Generate a tream of ordered solutions given a source and a order predicate form."
333
(declare (type (function ((or cons null)) t) continuation)
334
(type (function () (or array null)) base-source)
335
(type list base-dimensions order-predicate-form))
336
(assert-argument-types process-group
337
(continuation function)
338
(base-source function)
339
(base-dimensions list)
340
(order-predicate-form cons))
341
(incf-stat *algebra-operations*)
342
(trace-algebra process-order base-source base-dimensions order-predicate-form)
344
(multiple-value-bind (collector key-operator predicate-operators)
345
(compute-order-operators base-dimensions order-predicate-form)
346
(declare (type (function (array fixnum array fixnum) t) collector)
347
(type (function (array fixnum) t) key-operator))
348
(let* ((result-page-width (length base-dimensions))
350
(result-page-length *field-page-length*)
351
(result-index result-page-length)
352
(keyed-solutions (make-array result-page-length :fill-pointer 0 :adjustable t))
353
(*enable-sort-precedence* t)
355
(labels ((base-processor (base-page)
356
(let ((solution-count (array-dimension base-page 0)))
357
(loop for base-index below solution-count
358
do (vector-push-extend (cons (funcall key-operator base-page base-index)
359
(cons base-page base-index))
362
(test-predicates (entry-one entry-two) (print (list :test entry-one entry-two)) (print
363
(handler-case (loop for sub-key-one in (first entry-one)
364
for sub-key-two in (first entry-two)
365
for predicate in predicate-operators
366
unless (and sub-key-one sub-key-two)
368
if (call-predicate predicate sub-key-one sub-key-two)
370
else unless (spocq.e:= sub-key-one sub-key-two)
373
(call-predicate (predicate v1 v2)
374
(declare (type (function (t t) boolean) predicate))
375
(funcall predicate v1 v2))
376
(collect-solution (base-page base-index)
377
(next-solution-location)
378
(funcall collector result-page result-index base-page base-index))
379
(next-solution-location ()
380
;; return a page (possible newly created) and the next free location in that page
381
(when (>= (incf result-index) result-page-length)
382
(when result-page (funcall continuation result-page))
383
(setf result-page (new-field-page destination result-page-length result-page-width)
385
(complete-solutions ()
387
(let ((result-count (1+ result-index)))
388
(when (< result-count result-page-length)
390
(adjust-page result-page (list result-count result-page-width)))))
391
(put-page result-page))
393
(incf-stat *solutions-processed* solution-count)
394
(incf-stat *solutions-constructed* solution-count)
395
(return-from process-order))
397
(trace-data process-order destination base-dimensions (term-value-field page))
399
(put-field-page destination page)
400
(complete-field destination))))
401
(do-pages (solutions base-source)
402
(check-query-status *query*)
403
(when (and *solution-count-limit*
404
(> solution-count *solution-count-limit*))
405
(log-warn "order: terminated @~a solutions." solution-count)
406
(terminate-task *query*))
407
(incf solution-count (base-processor (copy-page solutions))))
408
(sort keyed-solutions #'test-predicates)
409
(loop for (nil . (page . index)) across keyed-solutions
410
do (collect-solution page index))
411
(complete-solutions)))))
413
#+(or) ;; tree-case replaced with vector-cache version
414
(defmethod process-order-slice ((destination array-page-channel)
415
(base-source array-page-channel)
416
base-dimensions order-predicate-form
418
"Generate a sliced stream of ordered solutions given a source and a order predicate form."
420
(declare (type list base-dimensions order-predicate-form))
421
(assert-argument-types process-group
422
(base-dimensions list)
423
(order-predicate-form cons))
424
(incf-stat *algebra-operations*)
425
(trace-algebra process-order-slice destination base-source base-dimensions order-predicate-form)
427
(multiple-value-bind (collector key-operator predicate-operators)
428
(compute-order-operators base-dimensions order-predicate-form)
429
(declare (type (function (array fixnum array fixnum) t) collector)
430
(type (function (array fixnum) t) key-operator))
432
(result-page-width (channel-page-width destination))
433
(result-page-length (channel-page-length destination))
435
(result-index result-page-length)
436
(*enable-sort-precedence* t)
441
(assert (= (length base-dimensions) result-page-width) ()
442
"Channel and operation dimensions do not match: ~a: ~a." destination base-dimensions)
443
(labels ((base-processor (base-page)
444
(let ((page-solution-count (array-dimension base-page 0)))
445
(loop for base-index below page-solution-count
446
for key = (funcall key-operator base-page base-index)
447
do (cond ((zerop solution-count)
448
(trees:insert (cons key (cons base-page base-index)) cache)
449
(setf maximum-key key))
450
((< solution-count size)
451
(trees:insert (cons key (cons base-page base-index)) cache)
452
(when (unless (test-predicates key maximum-key)
453
(setf maximum-key key))))
454
((test-predicates key maximum-key)
455
(trees:insert (cons key (cons base-page base-index)) cache)
456
(let ((entry (trees:maximum cache)))
457
(trees:delete (first entry) cache)
458
(setf maximum-key (first (trees:maximum cache))))))
459
do (incf solution-count))
460
page-solution-count))
461
(test-predicates (entry-one entry-two)
462
(handler-case (loop for sub-key-one in (first entry-one)
463
for sub-key-two in (first entry-two)
464
for predicate in predicate-operators
465
do (ecase (call-predicate predicate sub-key-one sub-key-two)
468
(( 1 nil) (return nil))))
470
(call-predicate (predicate v1 v2)
471
(declare (type (function (t t) (or integer null)) predicate))
472
(funcall predicate v1 v2))
473
(collect-solution (base-page base-index)
474
(when (> (incf result-count) start)
475
(next-solution-location)
476
(funcall collector result-page result-index base-page base-index)
477
(when (and end (>= result-count end))
478
(complete-solutions))))
479
(next-solution-location ()
480
;; return a page (possible newly created) and the next free location in that page
481
(when (>= (incf result-index) result-page-length)
482
(when result-page (put-page result-page))
483
(setf result-page (new-field-page destination result-page-length result-page-width)
485
(complete-solutions ()
487
(let ((result-count (1+ result-index)))
488
(when (< result-count result-page-length)
490
(adjust-page result-page (list result-count result-page-width)))))
491
(put-page result-page))
493
(incf-stat *solutions-processed* solution-count)
494
(trace-algebra process-order.complete destination)
495
(return-from process-order-slice (values solution-count result-count)))
497
(trace-data process-order-slice destination base-dimensions (term-value-field page))
499
(put-field-page destination page)
500
(complete-field destination))))
504
(let ((current-key nil)
507
(current-page-original nil))
508
(do-pages (solutions base-source)
509
(check-query-status *query*)
510
(let ((solution-count (array-dimension solutions 0)))
511
(loop for base-index below solution-count
512
for key = (funcall key-operator solutions base-index)
513
if (or (null current-key) (test-predicates key current-key))
514
do (progn (setf current-key key
515
current-index base-index)
516
(unless (eq solutions current-page-original)
517
(setf current-page (copy-page solutions)
518
current-page-original solutions)))))
519
(incf solution-count (array-dimension solutions 0)))
520
(setf result-count 1)
521
(collect-solution current-page current-index)))
523
(setf cache (trees:make-binary-tree :aa #'test-predicates :key #'first
524
;; no entries are equal - even with identical keys
525
:test (constantly nil)))
526
(do-pages (solutions base-source)
527
(check-query-status *query*)
528
(base-processor (copy-page solutions)))
530
(trees:dotree (node cache)
531
(destructuring-bind (key . (page . index)) node
532
(declare (ignore key))
533
(collect-solution page index)))))
534
(complete-solutions)))))
538
(defun find-successor (cache key test &optional (predecessor -1) (successor (length cache)))
539
;; insert the new entry relative to its key and bump the last entry
540
;; at the outset it is known to preceed the entry at the given position
541
(let ((delta (- successor predecessor)))
544
(let ((test-position (+ predecessor (ash delta -1))))
545
(if (funcall test key (aref cache test-position))
546
;; if it preceeds the probe, continue below it
547
(find-successor cache key test predecessor test-position)
548
;; otherwise continue above it
549
(find-successor cache key test test-position successor))))))
550
(loop for i from -1 below 6 collect `(:i ,i :successor ,(find-successor #(0 1 2 3 4) i #'< )))
551
(loop for i from -1 below 6 collect `(:i ,i :successor ,(find-successor #(1) i #'< )))