Coverage report: /development/source/library/org/datagraph/spocq-shard/src/core/streaming.lisp
| Kind | Covered | All | % |
| expression | 279 | 1850 | 15.1 |
| branch | 11 | 116 | 9.5 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
3
(in-package :org.datagraph.spocq.implementation)
4
;;; (load #p"patches/streaming.lisp")
12
(fmakunbound 'channel-put)
13
(load (compile-file "streaming"))
17
(require :sb-concurrency)
19
(defgeneric page-length (page)
20
(:method ((page array)) (first (array-dimensions page))))
24
(defvar *streaming-trace-output* nil)
25
(defmacro trace-streaming (operator &rest values)
26
(let ((op (gensym "trace-")))
28
(format ,op "~&~a[~a] : ~@{~a~^ ~}~%"
29
',operator (bt:thread-name (bt:current-thread)) ,@values)))
30
(declare (dynamic-extent #',op))
31
(trace-when *streaming-trace-output* #',op))))
35
(progn ; just enough to allow it to compile
36
(defpackage :sb-thread
37
(:export :wait-on-semaphore :signal-semaphore :semaphore-count :make-semaphore))
40
(lock (bt:make-lock ))
42
(defun sb-thread:make-semaphore (&rest args)
43
(apply #'make-semaphore args))
44
(defun sb-thread:semaphore-count (semaphore)
45
(semaphore-count semaphore))
46
(defun sb-thread:wait-on-semaphore (semaphore)
47
(loop (bt:with-lock-held ((semaphore-lock semaphore))
48
(when (plusp (semaphore-count semaphore))
49
(decf (semaphore-count semaphore))
50
(return-from sb-thread:wait-on-semaphore t)))
51
(ccl:process-wait "semaphore wait" #'(lambda (semaphore) (plusp (semaphore-count semaphore))) semaphore)))
52
(defun sb-thread:signal-semaphore (semaphore)
53
(bt:with-lock-held ((semaphore-lock semaphore))
54
(incf (semaphore-count semaphore))))
58
(defun wait-on-semaphore (s)
59
(sb-thread:wait-on-semaphore s))
60
(defun signal-semaphore (s)
61
(sb-thread:signal-semaphore s))
62
(defun try-semaphore (s)
63
(sb-thread:try-semaphore s)))
68
(defgeneric cl-user::format-channel-state (stream object &optional colon at &rest args)
71
(defgeneric channel-count (channel)
72
(:documentation "Return the count of elements currently in the channel."))
74
(defgeneric channel-create-page (channel &optional length width)
75
(:documentation "Return a new page sized for the channel page with and length."))
77
(defgeneric channel-get (channel &key wait)
78
(:documentation "Return the next available element from the channel.
79
If WAIT is true, then suspend the process until data is available.
80
Returns two values, the datum and a boolean to indicate wheter data was available."))
82
(defgeneric channel-get-all (channel) )
84
(defgeneric channel-name (channel) )
86
(defgeneric (setf channel-name) (name channel) )
88
(defgeneric channel-page-count (channel)
89
(:documentation "Return the count of elements passed through the channel."))
91
(defgeneric (setf channel-page-length) (length channel)
92
(:documentation "Set the number of solutions per page for pages in the channel."))
94
(defgeneric channel-page-length (channel)
95
(:documentation "Return the number of solutions per page for pages in the channel.")
96
(:method ((channel function))
99
(defgeneric channel-put (channel value)
101
(:method :after ((c t) (v t))
102
(let ((*print-pretty* nil) (*print-array* nil))
103
(format *trace-output* "~&channel-put: ~s: ~s" c v))))
105
(defgeneric channel-reset (channel) )
107
(defgeneric channel-size (channel) )
109
(defgeneric (setf channel-size) (size channel)
110
(:documentation "Set the channel depth. The size of the channel buffer limits the number
111
of unconsumed pages before the producer is blocked."))
113
(defgeneric channel-solution-count (channel)
114
(:documentation "Return the count of solutions passed through the channel."))
116
(defgeneric channel-get-wait-count (channel)
117
(:documentation "Return the count of threads waiting on the channel."))
119
(defgeneric channel-put-wait-count (channel)
120
(:documentation "Return the count of threads waiting to write the channel."))
122
(defgeneric complete-field (destination)
123
(:documentation "Indicate that no further solutions will be added to a field channel.")
124
(:method ((destination function))
125
(funcall destination nil)))
127
(defgeneric get-field-page (source)
128
(:documentation "Get the next available page from the source. Supports various queues and
129
functions as destination.")
131
(:method ((source function))
135
(defgeneric map-channel (operator channel) )
137
(defmethod new-field-page ((destination t) result-page-length result-page-width)
138
(make-page result-page-length result-page-width))
140
(defgeneric put-field-page (destination page)
141
(:documentation "Pass the page through the channel. Supports various queues and
142
functions as destination.")
144
(:method ((destination function) page)
145
(funcall destination page)))
147
(defgeneric release-field-page (source page)
148
(:method ((source t) page) (release-page page)))
152
(defun effective-page-length (&key start end)
153
"Determine the field page length to use given query parameters. Iff a slice length is specified,
154
limit the page to that. Otherwise use the default page length."
157
(max (min *field-page-length* (- end (or start 0))) *field-sliced-page-length*)
158
*field-page-length*))
161
(defun effective-channel-size (&key start end)
162
"Determine the field channel depth to use given query parameters. Iff a slice length is specified,
163
use a size specific to slices. Otherwise use the default size."
165
(declare (ignore start))
167
*channel-sliced-size-limit*
168
*channel-size-limit*))
171
(defgeneric channel-object-solution-count (object)
172
(:documentation "Returns the object's 'size' to add to the channel's flow count.")
173
(:method ((object array)) (if (plusp (array-rank object)) (array-dimension object 0) 1))
174
(:method ((object null)) 0)
175
(:method ((object t)) 1))
178
(defgeneric object-channel-copy (object)
179
(:method ((object array)) (copy-page object))
180
(:method ((object t)) object))
186
((de.setf.amqp.implementation::name
187
:initform nil :initarg :name
188
:accessor channel-name)
189
(reader :initform nil :accessor channel-reader)
190
(writer :initform nil :accessor channel-writer)
191
(state :initform :io :type (member :io :input nil) :accessor channel-state)
193
:initform nil :initarg :channels
194
:accessor channel-channels
195
:documentation "A list of channels into which all input is broadcast.")
197
:initform nil :accessor channel-timeout)
199
:initform 0 :accessor channel-start-timestamp
200
:documentation "records the timestamp in microseconds")
202
:initform 0 :accessor channel-start-thread-time
203
:documentation "records the thread-local start in nanoseconds")
205
:initform 0 :accessor channel-end-thread-time))
206
(:documentation "the protocol class for field solution channels."))
208
(defmethod print-object ((object channel) stream)
209
(print-unreadable-object (object stream :identity t :type t)
210
(format stream "[~/format-channel-state/] ~s"
212
(bound-slot-value object 'de.setf.amqp.implementation::name))))
214
(defgeneric channel-p (object)
215
(:method ((channel channel)) t)
216
(:method ((object t)) nil))
218
(defmethod cl-user::format-channel-state ((stream t) (channel channel) &optional colon at &rest args)
219
(declare (ignore colon at args))
220
(format stream "~a @~a" (channel-name channel) (channel-state channel)))
222
(defmethod channel-reset ((channel channel))
223
(setf (channel-reader channel) nil)
224
(setf (channel-writer channel) nil)
225
(setf (channel-state channel) :io))
227
(defmethod put-field-page :before ((channel channel) (value t))
228
;; set it in the correct thread
229
(when (zerop (channel-start-timestamp channel))
230
(setf (channel-start-timestamp channel) (get-timeline-location)
231
(channel-start-thread-time channel) (rdfcache:time-in-thread))))
233
(defmethod complete-field :before ((channel channel))
234
;;; maybe never started
235
(when (zerop (channel-start-timestamp channel))
236
(setf (channel-start-timestamp channel) (get-timeline-location)
237
(channel-start-thread-time channel) (rdfcache:time-in-thread)))
238
(setf (channel-end-thread-time channel) (rdfcache:time-in-thread)))
242
(defclass abstract-page-channel (channel)
244
:initform (error "dimensions is required.") :initarg :dimensions
245
:reader channel-dimensions)
247
:initform (error "sort-dimensions is required.") :initarg :sort-dimensions
248
:accessor channel-sort-dimensions)
250
:initform *channel-size-limit* :initarg :size
251
:reader channel-size :writer setf-channel-size)
253
:initarg :page-width ; must be suppied explicitly or in terms of dimensions
254
:accessor channel-page-width)
256
:initform *field-page-length* :initarg :page-length
257
:reader channel-page-length :writer setf-channel-page-length)
260
:accessor channel-page-count)
263
:accessor channel-solution-count)))
265
(defmethod initialize-instance ((instance abstract-page-channel)
268
(sort-dimensions dimensions))
269
(declare (dynamic-extent initargs))
270
(apply #'call-next-method instance
271
:sort-dimensions sort-dimensions
274
(defmethod cl-user::format-channel-state ((stream t) (channel abstract-page-channel) &optional colon at &rest args)
275
(declare (ignore colon at args))
276
(format stream "~a (~a x ~a) @~a"
277
(channel-name channel)
278
(channel-page-length channel) (channel-page-width channel)
279
(channel-state channel)))
282
;;; the principle prtocol classes are array-page-channel and matrix-page-channel
283
;;; then specialize algebra operator methods with respect to the access method required
284
;;; for the respective page representation - heap array or external array.
286
(defclass array-page-channel (abstract-page-channel)
288
(:documentation "Mark the implementation as representing pages as arrays"))
290
(defclass matrix-page-channel (abstract-page-channel)
292
(:documentation "Mark the implementation as representing pages as pointers to foreign matrix values"))
295
(defclass page-channel (abstract-page-channel)
298
:accessor channel-free-count)
301
:accessor channel-bound-count)
304
:accessor channel-write-count)
307
:accessor channel-read-count)))
309
(defmethod initialize-instance :after ((instance page-channel) &key)
310
(with-slots (size free-count bound-count write-count read-count
311
page-width dimensions) instance
312
(setf free-count size
316
(setf page-width (length dimensions))))
318
(defmethod print-object ((object page-channel) stream)
319
(print-unreadable-object (object stream :identity t :type t)
320
(handler-case (format stream "[~/format-channel-state/: ~a] ~s"
322
(bound-slot-value object 'dimensions)
323
(bound-slot-value object 'de.setf.amqp.implementation::name))
325
(format stream "error: ~a" (type-of c))))))
327
(defmethod cl-user::format-channel-state (stream (object page-channel) &optional colon at &rest args)
328
(declare (ignore colon at args))
329
(handler-case (format stream "[~a:~a/~a/~a/~a]x~:d.~:d~@[@~a~]"
330
(bound-slot-value object 'size)
331
(bound-slot-value object 'free-count)
332
(bound-slot-value object 'bound-count)
333
(bound-slot-value object 'write-count)
334
(bound-slot-value object 'read-count)
335
(bound-slot-value object 'page-count)
336
(bound-slot-value object 'solution-count)
337
(bound-slot-value object 'state))
339
(print-unreadable-object (object stream :type t :identity t)
340
(format stream "error: ~a" (type-of c))))))
343
(defmethod channel-reset ((channel page-channel))
345
(setf (channel-page-count channel) 0)
346
(setf (channel-solution-count channel) 0)
347
(setf (channel-bound-count channel) 0
348
(channel-write-count channel) 0
349
(channel-read-count channel) 0
350
(channel-free-count channel) (channel-size channel))
354
(defmethod (setf channel-size) (size (channel abstract-page-channel))
355
"record the new size, adjust the buffer lengths and reset the start/end indices"
356
(unless (eql (channel-size channel) size)
357
(setf-channel-size size channel)
358
(channel-reset channel))
361
(defmethod (setf channel-page-length) (length (channel abstract-page-channel))
362
"record the new length and delete any pages present"
363
(unless (eql (channel-page-length channel) length)
364
(setf-channel-page-length length channel)
365
(channel-reset channel))
369
(defclass page-ring (page-channel array-page-channel)
372
:accessor ring-pages)
375
:accessor ring-free-count)
378
:accessor ring-bound-count)
381
:accessor ring-write-count)
384
:accessor ring-read-count)
386
:reader channel-size-mask))
387
(:documentation "A page ring encapsulates a bounded set of field pages within an interface which permits
388
operations which tread the page set as a resourced stream from a producing to a consuming thread.
389
In the initial state, the inactive set is all null entries and the inactive buffer is empty.
390
The operators (setf channel-size) and (setf channel-page-length) are used when linking channels together
391
to coordinate the channel parameters for the query type: sliced and/or path queries use smaller pages and
392
shallower channels than a complete projection."))
395
(defmethod initialize-instance :after ((instance page-ring) &key &allow-other-keys)
396
(with-slots (size-mask size) instance
397
(assert (= 1 (logcount size)) ()
398
"Invalid ring size (must be a power of two): ~s" size)
399
(setf size-mask (1- size))))
401
(defmethod shared-initialize :after ((instance page-ring) (slots t) &key &allow-other-keys)
402
"(re)establish the state of a page ring:
403
- create the inactive and active rin buffers;
406
(with-slots (pages size) instance
407
(setf pages (make-array size :initial-element nil))))
409
(defmethod cl-user::format-channel-state (stream (object page-ring) &optional colon at &rest args)
410
(declare (ignore colon at args))
411
(format stream "[~a:~a/~a/~a/~a]x~:d.~:d~@[@~a~]"
412
(channel-size object)
413
(ring-free-count object)
414
(ring-bound-count object)
415
(ring-write-count object)
416
(ring-read-count object)
417
(channel-page-count object)
418
(channel-solution-count object)
419
(channel-state object)))
420
;;; a state label like '[8:7/8/8/8]x0.0@NIL' would indicate a closed channel which had no pages
425
(defclass page-mailbox (page-channel array-page-channel)
427
:reader channel-free-pages)
429
:reader channel-active-pages))
430
(:documentation "A page-mailbox combines two mailboxes, each of which encapsulates its own
431
queue and an availability semaphore. The mailboxes support multi-reader / multi-writer
434
(defmethod cl-user::format-channel-state (stream (object page-mailbox) &optional colon at &rest args)
435
(declare (ignore colon at args))
436
(format stream "[~a:~a/~a/]x~:d.~:d~@[@~a~]"
437
(channel-size object)
438
(channel-free-count object)
439
(channel-count object)
440
(channel-page-count object)
441
(channel-solution-count object)
442
(channel-state object)))
444
(defmethod initialize-instance ((instance page-mailbox) &key name)
445
(let ((*print-pretty* nil))
446
(with-slots (active-pages free-pages) instance
447
(setf free-pages (sb-concurrency:make-mailbox :name (format nil "~a.free" name)))
448
(setf active-pages (sb-concurrency:make-mailbox :name (format nil "~a.active" name)))))
454
(defclass solution-channel (channel)
455
((dimensions :initarg :dimensions
456
:reader channel-dimensions)
457
(mailbox :reader channel-mailbox))
458
(:documentation "A solution-channel wraps a mailbox to add name and dimensions."))
460
(defmethod cl-user::format-channel-state (stream (object solution-channel) &optional colon at &rest args)
461
(declare (ignore colon at args))
462
(format stream "[~a]x~:d~@[@~a~]"
463
(channel-name object)
464
(channel-count object)
465
(channel-state object)))
467
(defmethod initialize-instance ((instance solution-channel) &key name)
468
(let ((*print-pretty* nil))
469
(with-slots (mailbox) instance
470
(setf mailbox (sb-concurrency:make-mailbox :name (format nil "~a.mailbox" name)))))
473
(defmethod complete-field ((channel solution-channel))
474
(sb-concurrency:send-message (channel-mailbox channel) nil))
475
(defmethod channel-count ((channel solution-channel))
476
(sb-concurrency:mailbox-count (channel-mailbox channel)))
477
(defmethod channel-put ((channel solution-channel) (solution sequence))
478
(sb-concurrency:send-message (channel-mailbox channel) solution))
479
(defmethod channel-get ((channel solution-channel) &key (wait *channel-get-wait*))
480
(when (or wait (plusp (sb-concurrency:mailbox-count (channel-mailbox channel))))
481
(values (sb-concurrency:receive-message (channel-mailbox channel)) t)))
486
(defclass page-queue (page-channel array-page-channel)
488
:reader channel-free-pages)
490
:initform (sb-thread:make-semaphore :name "queue free semaphore" :count 0)
491
:reader channel-free-semaphore)
493
:reader channel-active-pages)
495
:initform (sb-thread:make-semaphore :name "queue active semaphore" :count 0)
496
:reader channel-active-semaphore))
497
(:documentation "A page-channel combines two queues and respective semaphores for signaling
498
availability. The queues support just single-reader / single-writer access."))
501
(defmethod initialize-instance :after ((instance page-queue) &key name)
502
(let ((*print-pretty* nil))
503
(with-slots (active-pages free-pages) instance
504
(setf free-pages (make-instance 'amqp.i::queue :name (format nil "~a.free" name)))
505
(setf active-pages (make-instance 'amqp.i::queue :name (format nil "~a.active" name))))))
510
(defmethod complete-field ((channel page-ring))
511
(trace-streaming complete-field.before channel)
512
(let ((write-count (ring-write-count channel)))
514
;; ensure space for the e-o-d marker
515
(new-field-page channel nil nil)
516
;; mark for input only
517
(setf (channel-state channel) :input)
518
;; indicate completion
519
(setf (svref (ring-pages channel) (logand write-count (channel-size-mask channel)))
521
(setf (ring-write-count channel) (1+ write-count)))
522
(dolist (next (channel-channels channel))
523
(complete-field next))
524
(trace-streaming complete-field.after channel))
527
(defmethod channel-count ((channel page-ring))
528
(- (ring-write-count channel) (ring-read-count channel)))
530
(defmethod channel-get-wait-count ((channel page-ring))
531
(+ (if (>= (ring-bound-count channel) (ring-free-count channel)) 1 0)
532
(if (>= (ring-read-count channel) (ring-write-count channel)) 1 0)))
534
(defmethod channel-get ((channel page-ring) &key (wait *channel-get-wait*))
535
(when (or wait (plusp (channel-count channel)))
536
(values (get-field-page channel) t)))
538
(defmethod channel-get-all ((channel page-ring))
539
(loop for page = (get-field-page channel)
543
(defmethod channel-put ((channel page-ring) value)
544
;; skip the new- operation
545
(new-field-page channel (channel-page-length channel) (channel-page-width channel))
546
(put-field-page channel value))
549
(defmethod create-ring-page ((channel page-ring))
550
(make-array (list (channel-page-length channel) (channel-page-width channel))
551
:element-type 'fixnum :initial-element +NULL-TERM-ID+))
555
(defmethod (setf channel-page-length) (length (channel page-ring))
556
"record the new length and delete any pages present"
557
(setf-channel-page-length length channel)
558
(channel-reset channel)
561
(defmethod channel-reset ((channel page-ring))
563
(if (= (length (ring-pages channel)) (channel-size channel))
564
(fill (ring-pages channel) nil)
565
(setf (ring-pages channel) (make-array (channel-size channel) :initial-element nil))))
567
(defmethod (setf channel-size) (size (channel page-ring))
568
"record the new size, adjust the buffer lengths and reset the start/end indices"
569
(with-slots (size-mask) channel
570
(assert (= 1 (logcount size)) ()
571
"Invalid ring size (must be a power of two): ~s" size)
572
(setf size-mask (1- size))
573
(setf-channel-size size channel)
574
(channel-reset channel)
578
(defmethod new-field-page ((channel page-ring) result-page-length result-page-width)
579
"Remove an entry from the inactive buffer. If none is available, wait until one is.
580
While waiting, reduce the thread priority to idle."
581
(declare (ignore result-page-length result-page-width))
583
(trace-streaming new-field-page channel)
584
(ecase (channel-state channel)
586
;; when nothing is already available, then adjust the priority in case
587
;; the process is suspended.
588
(let* ((timeout (channel-timeout channel))
589
(deadline (when timeout (+ timeout (get-internal-run-time))))
590
(bound-count (ring-bound-count channel)))
591
(loop (when (dotimes (x 100 nil)
592
(when (> (ring-free-count channel) bound-count) (return t))
593
;; #+sbcl (sb-ext:spin-loop-hint)
596
(when (and deadline (plusp bound-count) (>= (get-internal-run-time) deadline))
597
(spocq.e::timeout-error :detail (format nil "new-field-page: ~a" channel)))
598
(setf (thread-priority) *algebra-idle-priority*)
601
(setf (thread-priority) *algebra-running-priority*))
602
(let ((data (svref (ring-pages channel)
603
(logand bound-count (channel-size-mask channel)))))
605
(setf (ring-bound-count channel) (1+ bound-count))
607
(array (setf data (clear-page data)))
608
;; special, to allow group entries
609
(cons (setf data (clear-page (second data))))
610
(null (case (channel-state channel)
612
(setf data (create-ring-page channel)))
615
(trace-streaming new-field-page.dequeued channel data)
617
((:input nil) ; should be no further output
618
(log-warn "new-field-page: invalid channel state: ~a(~s)." channel (channel-state channel))
622
(defmethod put-field-page ((channel page-ring) data)
623
"Add an entry to the active buffer. As each page originated from the inactive buffer, there can be no more of
624
them than the inactive buffer count. Which means, the inactive buffer read semaphore suffices to preclude buffer
626
(trace-streaming put-field-page channel data)
627
(dolist (next (channel-channels channel))
628
(let ((to (new-field-page next (channel-page-length channel) (channel-page-width channel))))
629
(assert (= (array-dimension to 1) (array-dimension data 1)) ()
630
"page width mismatch: ~a/~a : ~a/~a."
631
(channel-dimensions channel) (array-dimension data 1)
632
(channel-dimensions next) (array-dimension to 1))
633
(unless (= (array-dimension to 0) (array-dimension data 0))
634
(setf to (adjust-page to (array-dimensions data))))
636
(put-field-page next to)))
637
(ecase (channel-state channel)
639
(assert data () "invalid channel data: ~a: ~a" channel data)
640
(let ((write-count (ring-write-count channel)))
642
(setf (svref (ring-pages channel) (logand write-count (channel-size-mask channel)))
644
(setf (ring-write-count channel) (1+ write-count)))
645
(incf (channel-solution-count channel) (channel-object-solution-count data))
646
(incf (channel-page-count channel)))
648
(log-warn "put-field-page: invalid channel state: ~a(~s): ~a." channel (channel-state channel) data)))
649
(trace-streaming put-field-page.enqueued channel)
653
(defmethod get-field-page ((channel page-ring))
654
"Return the next active page. If none is available and the channel is still active, wait until one is.
655
While waiting, reduce the thread priority to idle."
656
(declare (optimize (debug 3)))
657
(ecase (channel-state channel)
659
;; when nothing is already available, then adjust the priority in case
660
;; the process is suspended.
661
(let* ((timeout (channel-timeout channel))
662
(deadline (when timeout (+ timeout (get-internal-run-time))))
663
(read-count (ring-read-count channel)))
664
(loop (when (dotimes (x 100 nil)
665
(when (> (ring-write-count channel) read-count) (return t))
666
;; #+sbcl (sb-ext:spin-loop-hint)
669
(when (and deadline (plusp read-count) (>= (get-internal-run-time) deadline))
670
(spocq.e::timeout-error :detail (format nil "get-field-page: ~a" channel)))
671
(setf (thread-priority) *algebra-idle-priority*)
674
(setf (thread-priority) *algebra-running-priority*))
675
(let ((data (svref (ring-pages channel)
676
(logand read-count (channel-size-mask channel)))))
677
(trace-streaming get-field-page.dequeued channel read-count data)
678
(setf (ring-read-count channel) (1+ read-count))
679
;; the last element is nil, afterwhich nothing should be read
681
(unless (eq (channel-state channel) :input)
682
(log-warn "null data w/ :io: ~a ~a"
683
channel (map 'list #'type-of (ring-pages channel))))
684
(setf (channel-state channel) nil))
686
((nil) ; nothing present, nothing to expect
687
(log-warn "get-field-page: invalid channel state: ~a(~s): ~a." channel (channel-state channel))
690
(defmethod release-field-page ((channel page-ring) page)
691
(trace-streaming release-field-page channel page)
692
(ecase (channel-state channel)
695
(log-warn "release-field-page: invalid channel state: ~a(~s): ~a." channel (channel-state channel) page)
697
(setf (svref (ring-pages channel)
698
(logand (shiftf (ring-free-count channel) (1+ (ring-free-count channel)))
699
(channel-size-mask channel)))
700
(when (= (array-dimension page 0) (channel-page-length channel))
701
;; save the page only if it is a 'standard' length
703
(trace-streaming release-field-page.enqueued channel)
706
(defmethod map-pages (op (source page-ring))
707
(loop for page = (get-field-page source)
709
do (progn (funcall op page)
710
(release-field-page source page))))
712
;;; channels based on amqp queues
714
(defclass fifo (page-channel amqp.i::queue array-page-channel)
716
:initform (sb-thread:make-semaphore :name "queue semaphore" :count 0)
717
:reader queue-semaphore)
719
:initform nil :initarg :channels
720
:accessor channel-channels
721
:documentation "A list of channels into which all input is broadcast.")))
724
(defmethod print-object ((object fifo) stream &aux (*print-pretty* nil))
725
(print-unreadable-object (object stream :type t :identity t)
726
(handler-case (format stream "[~/format-channel-state/: ~a] ~s x~s"
728
(bound-slot-value object 'dimensions)
729
(bound-slot-value object 'de.setf.amqp.implementation::name)
730
(when (slot-boundp object 'de.setf.amqp.implementation::header) (amqp.i::collection-size object)))
732
(format stream "error: ~a" (type-of c))))))
735
(defmethod channel-count ((channel fifo))
736
(sb-thread:semaphore-count (queue-semaphore channel)))
738
(defmethod channel-get-wait-count ((channel fifo))
739
(sb-thread::semaphore-waitcount (queue-semaphore channel)))
741
(defmethod channel-get ((channel fifo) &key (wait *channel-get-wait*))
742
(when (or wait (plusp (channel-count channel)))
743
(values (get-field-page channel) t)))
745
(defmethod channel-get-all ((channel fifo))
746
(collect-list (collect)
747
(loop (multiple-value-bind (value not-empty) (amqp.u:dequeue channel)
752
(defmethod channel-put ((channel fifo) value)
753
(put-field-page channel value))
755
(defmethod complete-field ((channel fifo))
756
(put-field-page channel nil)
757
(dolist (next (channel-channels channel))
758
(complete-field next))
759
(setf (channel-state channel) :input))
761
(defmethod new-field-page ((channel fifo) page-length page-width)
762
(trace-streaming new-field-page channel page-length page-width)
763
(assert (= page-width (channel-page-width channel)) ()
764
"Invalid page width: ~s != ~s." (channel-page-width channel) page-width)
765
(let ((data (make-page page-length page-width)))
766
(incf (channel-bound-count channel))
767
(trace-streaming new-field-page.dequeued channel data)))
769
(defmethod put-field-page ((channel fifo) page)
770
(trace-streaming put-field-page channel page)
771
(let ((semaphore (queue-semaphore channel))
772
(limit (channel-size channel)))
774
(loop (when (< (sb-thread:semaphore-count semaphore) limit) (return))
775
(setf (thread-priority) *algebra-idle-priority*)
777
(setf (thread-priority) *algebra-running-priority*)))
778
(dolist (next (channel-channels channel))
779
(let ((to (new-field-page next (channel-page-length channel) (channel-page-width channel))))
780
(unless (= (array-dimension to 0) (array-dimension page 0))
781
(setf to (adjust-page to (array-dimensions page))))
783
(put-field-page next to)))
784
(amqp.u:enqueue page channel)
785
(incf (channel-write-count channel))
787
(incf (channel-solution-count channel) (channel-object-solution-count page))
788
(incf (channel-page-count channel)))
789
(signal-semaphore semaphore)
790
(trace-streaming put-field-page.enqueued channel)
793
(defmethod get-field-page ((channel fifo))
794
(trace-streaming get-field-page channel)
795
(let ((page (loop (multiple-value-bind (value not-empty)
796
(amqp.u:dequeue channel)
799
(setf (thread-priority) *algebra-idle-priority*)
800
(wait-on-semaphore (queue-semaphore channel))
801
(setf (thread-priority) *algebra-running-priority*)))))
802
(trace-streaming get-field-page.dequeued channel page)
806
(defmethod release-field-page ((channel fifo) page)
807
(trace-streaming release-field-page channel page)
809
(trace-streaming release-field-page.enqueued channel)
813
(progn (defparameter *resource-streaming-pages* nil)
814
(defmethod complete-field ((channel page-mailbox))
815
(trace-streaming complete-field.before channel)
816
;; empty the free page queue
817
(sb-concurrency:receive-pending-messages (channel-free-pages channel))
818
;; mark for input only
819
(setf (channel-state channel) :input)
820
;; indicate completion
821
(sb-concurrency:send-message (channel-active-pages channel) nil)
822
(dolist (next (channel-channels channel))
823
(complete-field next))
824
(trace-streaming complete-field.after channel))
826
(defmethod channel-count ((channel page-mailbox))
827
"Return the count from the active page queue."
828
(sb-concurrency:mailbox-count (channel-active-pages channel)))
830
(defmethod channel-create-page ((channel page-channel) &optional
831
(length (channel-page-length channel))
832
(width (channel-page-width channel)))
833
(make-array (list length width)
834
:element-type 'fixnum :initial-element +NULL-TERM-ID+))
836
(defmethod channel-free-count ((channel page-mailbox))
837
"Return the count from the free page queue."
838
(sb-concurrency:mailbox-count (channel-free-pages channel)))
840
(defmethod channel-get ((channel page-mailbox) &key (wait *channel-get-wait*))
842
(sb-concurrency:receive-message (channel-active-pages channel))
843
(sb-concurrency:receive-message-no-hang (channel-active-pages channel))))
845
(defmethod channel-get-all ((channel page-mailbox))
846
"Empty both mailbox queues. Return the content of the active page queue."
847
(sb-concurrency:receive-pending-messages (channel-free-pages channel))
848
(sb-concurrency:receive-pending-messages (channel-active-pages channel)))
850
(defmethod channel-put ((channel page-mailbox) page)
851
(let ((width (channel-page-width channel)))
852
(assert (= width (array-dimension page 1)) ()
853
"channel-put: Invalid page width (expected ~a): ~a, ~a."
854
width (array-dimension page 1) (array-dimensions page)))
855
(sb-concurrency:send-message (channel-active-pages channel) page))
857
(defmethod (setf channel-page-length) (length (channel page-mailbox))
858
"record the new length and delete any pages present"
859
(setf-channel-page-length length channel)
860
(channel-reset channel)
863
(defmethod channel-reset ((channel page-mailbox))
865
(sb-concurrency:receive-pending-messages (channel-free-pages channel))
866
(sb-concurrency:receive-pending-messages (channel-active-pages channel)))
868
(defmethod (setf channel-size) (size (channel page-mailbox))
869
"record the new size, adjust the buffer lengths and reset the start/end indices"
870
(setf-channel-size size channel)
871
(channel-reset channel)
874
(defmethod channel-get-wait-count ((channel page-mailbox))
875
"Return the count from the active page queue."
876
(let* ((mailbox (channel-active-pages channel))
877
(semaphore (sb-concurrency::mailbox-semaphore mailbox)))
878
(sb-thread::semaphore-waitcount semaphore)))
880
(defmethod new-field-page ((channel page-mailbox) length width)
881
"Attempt to dequeue a free page. If that does not succeed immediately,
882
then iff the channel is still below the size limit, make a new page.
883
Otherwise, wait until a page is released."
884
(trace-streaming new-field-page channel)
885
(assert (= width (channel-page-width channel)) ()
886
"new-field-page: Invalid page width requested (expected ~a): ~a."
887
width (channel-page-width channel))
888
(ecase (channel-state channel)
890
(if *resource-streaming-pages*
891
(let* ((mailbox (channel-free-pages channel))
892
(semaphore (sb-concurrency::mailbox-semaphore mailbox))
893
(queue (sb-concurrency::mailbox-queue mailbox))
895
;; w/o the locking from receive-message(-no-hang) across semaphore/queue
896
;; operations, as there is just one reader / one writer
897
(cond ((sb-thread:try-semaphore semaphore)
898
(sb-concurrency:dequeue queue))
899
((< (sb-thread:semaphore-count
900
(sb-concurrency::mailbox-semaphore (channel-active-pages channel)))
901
(channel-size channel))
902
(channel-create-page channel))
904
(sb-thread:wait-on-semaphore semaphore)
905
(sb-concurrency:dequeue queue)))))
906
(trace-streaming new-field-page.dequeued channel data)
907
(unless (and (eql (array-dimension data 0) length) (eql (array-dimension data 1) width))
908
(setf data (adjust-array data (list length width))))
909
(let ((width (channel-page-width channel)))
910
(assert (= width (array-dimension data 1)) ()
911
"new-field-page: Invalid page width (expected ~a): ~a, ~a."
912
width (array-dimension data 1) (array-dimensions data)))
914
(channel-create-page channel length width)))
915
(((:input nil) ; should be no further output
916
(log-warn "new-field-page: invalid channel state: ~a(~s)." channel (channel-state channel))
919
(defmethod put-field-page ((channel page-mailbox) page)
920
"Add an entry to the active pages. As each page should have originated from the free pages, there can
921
be no more of them than the inactive buffer count. Which means, the free pages semaphore suffices to
922
preclude buffer over-run."
923
(trace-streaming put-field-page channel page)
925
(let ((width (channel-page-width channel)))
926
(assert (= width (array-dimension page 1)) ()
927
"put-field-page: Invalid page width (expected ~a): ~a, ~a."
928
width (array-dimension page 1) (array-dimensions page))))
929
(ecase (channel-state channel)
931
(sb-concurrency:send-message (channel-active-pages channel) page)
932
(incf (channel-solution-count channel) (channel-object-solution-count page))
933
(incf (channel-page-count channel))
934
(dolist (next (channel-channels channel))
935
(let ((to (new-field-page next (channel-page-length channel) (channel-page-width channel))))
936
(assert (= (array-dimension to 1) (array-dimension page 1)) ()
937
"page width mismatch: ~a/~a : ~a/~a."
938
(channel-dimensions channel) (array-dimension page 1)
939
(channel-dimensions next) (array-dimension to 1))
940
(unless (= (array-dimension to 0) (array-dimension page 0))
941
(setf to (adjust-page to (array-dimensions page))))
943
(put-field-page next to))))
945
(log-warn "put-field-page: invalid channel state: ~a(~s): ~a." channel (channel-state channel) page)))
946
(trace-streaming put-field-page.enqueued channel)
949
(defmethod get-field-page ((channel page-mailbox))
950
"Return the next active page. If none is available and the channel is still active, wait until one is.
951
While waiting, reduce the thread priority to idle."
952
(trace-streaming get-field-page channel)
953
(ecase (channel-state channel)
955
(let* ((mailbox (channel-active-pages channel))
956
(semaphore (sb-concurrency::mailbox-semaphore mailbox))
957
(queue (sb-concurrency::mailbox-queue mailbox))
959
;; w/o the locking from receive-message(-no-hang) across semaphore/queue
960
;; operations, as there is just one reader / one writer
961
(cond ((sb-thread:try-semaphore semaphore)
962
(sb-concurrency:dequeue queue))
965
(progn (setf (thread-priority) *algebra-idle-priority*)
966
(sb-thread:wait-on-semaphore semaphore)
967
(sb-concurrency:dequeue queue))
968
(setf (thread-priority) *algebra-running-priority*))))))
969
(unless (or data (eq (channel-state channel) :input))
970
(log-warn "get-field-page: null data w/ :io: ~a ~a" channel mailbox)
971
(setf (channel-state channel) nil))
973
(let ((width (channel-page-width channel)))
974
(unless (= width (array-dimension data 1))
975
(warn "get-field-page: Invalid page width (expected ~a): ~a, ~a."
976
width (array-dimension data 1) (array-dimensions data)))
977
(assert (= width (array-dimension data 1)) ()
978
"get-field-page: Invalid page width (expected ~a): ~a, ~a."
979
width (array-dimension data 1) (array-dimensions data))))
980
(trace-streaming get-field-page.dequeued channel data)
982
(((nil) ; should be no further output
983
(log-warn "get-field-page: invalid channel state: ~a(~s): ~a." channel (channel-state channel))
986
(defmethod release-field-page ((channel page-mailbox) page)
987
(trace-streaming release-field-page channel page)
988
(ecase (channel-state channel)
991
(log-warn "release-field-page: invalid channel state: ~a(~s): ~a." channel (channel-state channel) page)
993
(when *resource-streaming-pages*
994
(sb-concurrency:send-message (channel-free-pages channel)
995
(when (= (array-dimension page 0) (channel-page-length channel))
996
;; save the page only if it is a 'standard' length
998
(trace-streaming release-field-page.enqueued channel))
1001
(defmethod map-pages (op (source page-mailbox))
1002
(loop for page = (get-field-page source)
1004
do (progn (funcall op page)
1005
(release-field-page source page))))
1011
(defmethod channel-count ((channel sb-concurrency:mailbox))
1012
(sb-concurrency:mailbox-count channel))
1014
(defmethod channel-get-wait-count ((channel sb-concurrency:mailbox))
1015
"Return the count from the active page queue."
1016
(let* ((semaphore (sb-concurrency::mailbox-semaphore channel)))
1017
(sb-thread::semaphore-waitcount semaphore)))
1019
(defmethod channel-get-all ((channel sb-concurrency:mailbox))
1020
(sb-concurrency:receive-pending-messages channel))
1022
(defmethod channel-get ((channel sb-concurrency:mailbox) &key (wait *channel-get-wait*)
1025
(sb-concurrency:receive-message channel :timeout timeout)
1026
(sb-concurrency:receive-message-no-hang channel)))
1028
(defmethod channel-name ((channel sb-concurrency:mailbox))
1029
(sb-concurrency:mailbox-name channel))
1031
(defmethod (setf channel-name) (name (channel sb-concurrency:mailbox))
1032
(setf (sb-concurrency:mailbox-name channel) name))
1034
(defmethod channel-put ((channel sb-concurrency:mailbox) data)
1035
(sb-concurrency:send-message channel data))
1037
(defun make-pool (&key name) (sb-concurrency:make-mailbox :name name))
1039
(defmethod map-channel (operator (channel sb-concurrency:mailbox))
1040
(map nil operator (sb-concurrency:list-mailbox-messages channel)))
1045
;;; (load "patches/streaming.lisp")
1047
(defun queue-read-wait (queue lock)
1048
(sb-thread:condition-wait queue lock))
1050
(defun queue-write-wait (queue lock)
1051
(sb-thread:condition-wait queue lock))
1053
(defun queue-read-notify (queue)
1054
(sb-thread:condition-notify queue))
1056
(defun queue-write-notify (queue)
1057
(sb-thread:condition-notify queue))
1059
(defvar *trace-paged-buffer-output* nil)
1061
(defclass paged-buffer (abstract-page-channel)
1063
:accessor paged-buffer-waitqueue)
1065
:accessor paged-buffer-lock)
1067
:initform 0 :type (or fixnum null)
1068
:accessor paged-buffer-active-page-count)
1070
:accessor paged-buffer-read-ring)
1072
:accessor paged-buffer-write-ring))
1074
"Encapsulate a condition variable w/lock and the state required to implement a paged solution
1075
stream with at least double-buffering.
1076
A static sequence of pages is bound as a ring and the reader/writer pair cooperate to step
1077
through the ring such that the reader tails the writer.
1078
The new/get/put/release operations are implemented such that the reader blocks if it has caugt up
1079
and the writer blocks if it would over-run.
1080
A condition variable serves to coordinate notification when the respective blocked state is eliminated.
1081
The initial state is that the rings coincide, which means the reader likely blocks as soon as it starts.
1082
a write fills the next write page, advances the write ring when filled, but blocks if the next page
1083
is not available and notifies if the available page unblocks the reader.
1084
a read blocks until the page is avaialble, advances the read ring when emptied and notifies
1085
if the advance unblocks the writer."))
1087
(defclass paged-array (paged-buffer array-page-channel)
1089
(:documentation "a paged buffer which carries array pages"))
1091
(defmacro trace-paged-buffer (&rest args)
1092
`(when *trace-paged-buffer-output*
1093
(bt:with-lock-held (*trace-lock*)
1094
(terpri *trace-paged-buffer-output*)
1095
(format *trace-paged-buffer-output* ,@args)
1096
(finish-output *trace-paged-buffer-output*))))
1098
(defmethod initialize-instance ((instance paged-buffer) &rest initargs &key name
1099
(dimensions () d-s) (size *channel-size-limit*) page-width)
1102
(t (setf name (write-to-string name))))
1104
(assert (eql page-width (length dimensions)) ()
1105
"paged buffer dimensions do not match page width: ~a: ~a" dimensions page-width))
1106
(d-s ; if null dimensions are explicitly supplied, the width is 0
1107
(setf page-width (length dimensions)))
1109
(error "dimensions are required.")))
1110
(setf (paged-buffer-waitqueue instance) (sb-thread:make-waitqueue))
1111
(setf (paged-buffer-lock instance) (sb-thread:make-mutex :name name))
1112
(apply #'call-next-method instance
1114
:page-width page-width
1117
(defgeneric make-paged-buffer-page-ring (paged-bufer)
1118
(:method ((instance paged-buffer))
1119
(loop with size = (channel-size instance)
1120
with length = (channel-page-length instance)
1121
with width = (channel-page-width instance)
1123
collect (make-page length width))))
1125
(defmethod shared-initialize ((instance paged-buffer) (slot-names t) &key)
1127
(let ((pages (make-paged-buffer-page-ring instance)))
1128
(setf (rest (last pages)) pages)
1129
(setf (paged-buffer-write-ring instance) pages)
1130
(setf (paged-buffer-read-ring instance) pages)
1131
(setf (paged-buffer-active-page-count instance) 0)
1132
(setf (channel-state instance) :io)))
1134
(defmethod cl-user::format-channel-state ((stream t) (channel paged-buffer) &optional colon at &rest args)
1135
(declare (ignore colon at args))
1136
(format stream "~a (~a x ~a) @~a ~a/~a" (channel-name channel)
1137
(channel-page-length channel) (channel-page-width channel)
1138
(channel-state channel)
1139
(paged-buffer-active-page-count channel) (channel-size channel)))
1141
(defgeneric channel-complete-input (channel)
1142
(:method ((channel paged-buffer))
1143
(setf (channel-state channel) nil)))
1145
(defmethod channel-get ((channel paged-buffer) &key (wait t))
1146
"get the next available read page.
1147
iff none is available wait.
1148
iff the count is null, the stream is complete."
1149
(flet ((complete-input ()
1150
;;no further pages to be expected
1151
(channel-complete-input channel)
1152
(return-from channel-get nil)))
1153
(ecase (channel-state channel)
1155
;; lock and optionall queu only when the nothing is present.
1156
;; otherwise, the writer can only increase the count
1157
(unless (plusp (paged-buffer-active-page-count channel))
1158
(sb-thread:with-mutex ((paged-buffer-lock channel))
1159
(trace-paged-buffer " [<- active ~s]" (paged-buffer-active-page-count channel))
1160
(unless (plusp (paged-buffer-active-page-count channel))
1161
(when (eq (channel-state channel) :input)
1163
(unless wait (return-from channel-get nil))
1165
(trace-paged-buffer " [<- wait]")
1166
(queue-read-wait (paged-buffer-waitqueue channel) (paged-buffer-lock channel))
1167
(trace-paged-buffer " [<- notified active ~s,~s]" (paged-buffer-active-page-count channel) (channel-state channel))
1168
(if (plusp (paged-buffer-active-page-count channel))
1169
;; if a page is ready, return it
1171
(ecase (channel-state channel)
1172
;; otherwise either complete or note the anomaly and continue to wait
1173
(:io (log-warn "paged-buffer reader notified with no pagees active"))
1174
(:input (complete-input))))))))
1175
;; return the next page, or nil if the ring is now empty
1176
(first (paged-buffer-read-ring channel)))
1179
(defmethod channel-put ((channel paged-buffer) page)
1180
"indcate that a page has been written and is available to read.
1181
iff the reader is waiting, notify it."
1182
(incf (channel-page-count channel))
1183
(incf (channel-solution-count channel) (channel-object-solution-count page))
1184
(ecase (channel-state channel)
1186
(sb-thread:with-mutex ((paged-buffer-lock channel))
1187
(setf (first (paged-buffer-write-ring channel)) page)
1188
(trace-paged-buffer " [-> active ~s]" (paged-buffer-active-page-count channel))
1189
(cond ((>= (incf (paged-buffer-active-page-count channel)) (channel-size channel))
1190
;; transition to full requires wait
1192
(trace-paged-buffer " [-> wait]")
1193
(queue-write-wait (paged-buffer-waitqueue channel) (paged-buffer-lock channel))
1194
(trace-paged-buffer " [-> notified active ~s]" (paged-buffer-active-page-count channel))
1195
(if (< (paged-buffer-active-page-count channel) (channel-size channel))
1197
(log-warn "paged-buffer writer notified with all pages still active"))))
1198
((= (paged-buffer-active-page-count channel) 1)
1199
(trace-paged-buffer " [-> notifying]")
1200
;; 0 -> 1 transitions requires notification
1201
(queue-write-notify (paged-buffer-waitqueue channel)))))
1202
;; advance to the next page for any subsequent new page request
1203
(pop (paged-buffer-write-ring channel)))
1205
(log-warn "output to complete channel: ~s" channel)))
1210
(defmethod channel-get-wait-count ((channel paged-buffer))
1211
(if (zerop (paged-buffer-active-page-count channel))
1214
(defmethod channel-put-wait-count ((channel paged-buffer))
1215
(if (>= (paged-buffer-active-page-count channel) (channel-size channel))
1218
(defmethod channel-reset ((channel paged-buffer))
1219
(reinitialize-instance channel))
1221
(defmethod complete-field ((channel paged-buffer))
1222
(sb-thread:with-mutex ((paged-buffer-lock channel))
1223
(trace-paged-buffer " [-> completing]")
1224
(setf (channel-state channel) :input)
1225
(when (zerop (paged-buffer-active-page-count channel))
1226
(trace-paged-buffer " [-> notifying complete]")
1227
;; completion @ 0 requires notification
1228
(queue-write-notify (paged-buffer-waitqueue channel)))))
1230
(defmethod get-field-page ((channel paged-buffer))
1231
;; (third (print (list :get channel (channel-get channel))))
1232
(channel-get channel))
1234
(defmethod map-channel (operator (channel paged-buffer))
1235
(loop for page = (get-field-page channel)
1237
do (progn (funcall operator page)
1238
(release-field-page channel page))))
1240
(defmethod map-pages (operator (channel paged-buffer))
1241
(map-channel operator channel))
1243
(defmethod new-field-page ((channel paged-buffer) length width)
1244
"immediately return the first page.
1245
do not change the ring position, as the subsequent put may supply resized, for the last page, or
1246
nil, once complete."
1247
(assert (or (and (null width) (null length))
1248
(and (eql width (channel-page-width channel))
1249
;; must allow to shorten the last page
1250
;; (eql length (channel-page-length channel))
1253
"invalid page dimensions: ~a: ~s x ~s" channel width length)
1254
(first (paged-buffer-write-ring channel)))
1256
(defmethod put-field-page ((channel paged-buffer) page)
1257
;; (print (list :put channel page))
1258
(channel-put channel page))
1260
(defmethod release-field-page ((channel paged-buffer) page)
1261
"when processing of the read page is complete, advance the read read
1262
and optionally notify"
1263
;; advance in the read ring
1264
(pop (paged-buffer-read-ring channel))
1265
(sb-thread:with-mutex ((paged-buffer-lock channel))
1266
(when (zerop (decf (paged-buffer-active-page-count channel)))
1267
(trace-paged-buffer " [<- notifying]")
1268
(queue-read-notify (paged-buffer-waitqueue channel)))))
1274
(defmethod complete-field ((channel page-queue))
1275
(trace-streaming complete-field.before channel)
1276
;; empty the free page queue
1277
(amqp.i::collection-clear (channel-free-pages channel))
1278
;; mark for input only
1279
(setf (channel-state channel) :input)
1280
;; indicate completion
1281
(amqp.i::enqueue nil (channel-active-pages channel))
1282
(signal-semaphore (channel-active-semaphore channel))
1283
(dolist (next (channel-channels channel))
1284
(complete-field next))
1285
(trace-streaming complete-field.after channel))
1287
(defmethod channel-count ((channel page-queue))
1288
"The count is the difference between the pages written and the pages read."
1289
(- (channel-write-count channel) (channel-read-count channel)))
1291
(defmethod channel-create-page ((channel page-queue) &optional
1292
(length (channel-page-length channel))
1293
(width (channel-page-width channel)))
1294
(make-array (list length width)
1295
:element-type 'fixnum :initial-element +NULL-TERM-ID+))
1297
(defmethod channel-get ((channel page-queue) &key (wait *channel-get-wait*))
1298
(when (or wait (plusp (channel-count channel)))
1299
(values (get-field-page channel) t)))
1301
(defmethod channel-get-all ((channel page-queue))
1302
"Empty both mailbox queues. Return the content of the active page queue."
1303
(loop for q on (amqp.i::collection-content channel)
1304
collect (shiftf (first q) nil)))
1306
(defmethod channel-put ((channel page-queue) value)
1307
(amqp.i::enqueue value (channel-active-pages channel)))
1309
(defmethod channel-reset ((channel page-queue))
1311
(amqp.i::collection-clear (channel-free-pages channel))
1312
(amqp.i::collection-clear (channel-active-pages channel)))
1315
(defmethod channel-get-wait-count ((channel page-queue))
1316
"Return the count from the active page queue."
1317
(sb-thread::semaphore-waitcount (channel-active-semaphore channel)))
1319
(defmethod new-field-page ((channel page-queue) length width)
1320
"Attempt to dequeue a free page. If that does not succeed immediately,
1321
then iff the channel is still below the size limit, make a new page.
1322
Otherwise, wait until a page is released."
1323
(declare (ignore length width))
1325
(trace-streaming new-field-page channel)
1326
(ecase (channel-state channel)
1328
(let* ((semaphore (channel-free-semaphore channel))
1329
(queue (channel-free-pages channel))
1331
;; w/o the locking from receive-message(-no-hang) across semaphore/queue
1332
;; operations, as there is just one reader / one writer
1333
(cond ((try-semaphore semaphore)
1334
(amqp.i::dequeue queue))
1335
((< (channel-count channel) (channel-size channel))
1336
(channel-create-page channel))
1339
(progn (setf (thread-priority) *algebra-idle-priority*)
1340
(wait-on-semaphore semaphore)
1341
(amqp.i::dequeue queue))
1342
(setf (thread-priority) *algebra-running-priority*))))))
1343
(incf (channel-bound-count channel))
1344
(trace-streaming new-field-page.dequeued channel data)
1346
((:input nil) ; should be no further output
1347
(log-warn "new-field-page: invalid channel state: ~a(~s)." channel (channel-state channel))
1350
(defmethod put-field-page ((channel page-queue) page)
1351
"Add an entry to the active pages. As each page should have originated from the free pages, there can
1352
be no more of them than the inactive buffer count. Which means, the free pages semaphore suffices to
1353
preclude buffer over-run."
1354
(trace-streaming put-field-page channel page)
1355
(ecase (channel-state channel)
1357
(amqp.i::enqueue page (channel-active-pages channel))
1358
(incf (channel-solution-count channel) (channel-object-solution-count page))
1359
(incf (channel-page-count channel))
1360
(incf (channel-write-count channel))
1361
(signal-semaphore (channel-active-semaphore channel))
1362
(dolist (next (channel-channels channel))
1363
(let ((to (new-field-page next (channel-page-length channel) (channel-page-width channel))))
1364
(unless (= (array-dimension to 0) (array-dimension page 0))
1365
(setf to (adjust-page to (array-dimensions page))))
1367
(put-field-page next to))))
1369
(log-warn "put-field-page: invalid channel state: ~a(~s): ~a." channel (channel-state channel) page)))
1370
(trace-streaming put-field-page.enqueued channel)
1373
(defmethod get-field-page ((channel page-queue))
1374
"Return the next active page. If none is available and the channel is still active, wait until one is.
1375
While waiting, reduce the thread priority to idle."
1376
(trace-streaming get-field-page channel)
1377
(ecase (channel-state channel)
1379
(let* ((semaphore (channel-active-semaphore channel))
1380
(queue (channel-active-pages channel))
1382
;; w/o the locking from receive-message(-no-hang) across semaphore/queue
1383
;; operations, as there is just one reader / one writer
1384
(cond ((try-semaphore semaphore)
1385
(amqp.i::dequeue queue))
1388
(progn (setf (thread-priority) *algebra-idle-priority*)
1389
(wait-on-semaphore semaphore)
1390
(amqp.i::dequeue queue))
1391
(setf (thread-priority) *algebra-running-priority*))))))
1392
(unless (or data (eq (channel-state channel) :input))
1393
(log-warn "null data w/ :io: ~a ~a" channel queue)
1394
(setf (channel-state channel) nil))
1395
(incf (channel-read-count channel))
1396
(trace-streaming get-field-page.dequeued channel data)
1398
((nil) ; should be no further output
1399
(log-warn "get-field-page: invalid channel state: ~a(~s): ~a." channel (channel-state channel))
1402
(defmethod release-field-page ((channel page-queue) page)
1403
(trace-streaming release-field-page channel page)
1404
(ecase (channel-state channel)
1407
(log-warn "release-field-page: invalid channel state: ~a(~s): ~a." channel (channel-state channel) page)
1409
(amqp.i::enqueue (when (= (array-dimension page 0) (channel-page-length channel))
1410
;; save the page only if it is a 'standard' length
1412
(channel-free-pages channel))
1413
(incf (channel-free-count channel))
1414
(signal-semaphore (channel-free-semaphore channel))
1415
(trace-streaming release-field-page.enqueued channel)
1419
(progn (defmethod channel-put ((channel sb-concurrency:queue) value &key (limit *channel-size-limit*) (wait *channel-put-wait*))
1421
(loop (cond ((< (sb-concurrency:queue-count channel) limit)
1422
(sb-concurrency:enqueue value channel)
1429
(sb-concurrency:enqueue value channel)
1431
(defmethod channel-get ((channel sb-concurrency:queue) &key (wait *channel-get-wait*))
1433
(loop (multiple-value-bind (value not-empty)
1434
(sb-concurrency:dequeue channel)
1436
(return (values value not-empty))
1437
(bt:thread-yield))))
1438
(sb-concurrency:dequeue channel)))
1439
(defmethod channel-count ((channel sb-concurrency:queue))
1440
(sb-concurrency:queue-count channel))
1441
(defmethod channel-get-all ((channel sb-concurrency:queue))
1442
(loop until (sb-concurrency:queue-empty-p channel)
1443
collect (sb-concurrency:dequeue channel)))
1444
(defmethod channel-name ((channel sb-concurrency:queue))
1445
(sb-concurrency:queue-name channel))
1446
(defmethod (setf channel-name) (name (channel sb-concurrency:queue))
1447
(setf (sb-concurrency:queue-name channel) name)))
1450
(progn (defclass pool (channel amqp.i::locked-queue array-page-channel)
1453
:accessor channel-page-count)))
1454
(defun make-pool (&key name) (declare (ignore name)) (make-instance 'pool))
1455
(defmethod channel-put ((channel pool) value &key (limit *channel-size-limit*) (wait *channel-put-wait*))
1456
(dolist (next (channel-channels channel))
1457
(channel-put next value))
1458
(incf (channel-page-count channel))
1460
(loop (cond ((< (amqp.i::collection-size channel) limit)
1461
(amqp.u:enqueue value channel)
1468
(amqp.u:enqueue value channel)
1470
(defmethod channel-get ((channel pool) &key (wait *channel-get-wait*))
1472
(amqp.u:dequeue channel :if-empty :wait))
1473
((not (amqp.i::collection-empty-p channel))
1474
(amqp.u:dequeue channel))))
1475
(defmethod channel-count ((channel pool))
1476
(amqp.i::collection-size channel))
1477
(defmethod channel-get-all ((channel pool))
1478
(collect-list (collect)
1479
(loop (multiple-value-bind (value not-empty) (amqp.u:dequeue channel)
1487
(defun make-channel (&rest args &key dimensions name size page-length page-width sort-dimensions)
1488
(declare (ignorable dimensions name size page-length page-width sort-dimensions))
1489
;;(setf (getf args :size) 128)
1490
(case *make-channel.class*
1491
;; all are standard classes
1492
(t (apply #'make-instance *make-channel.class* args))))
1494
(defun make-null-channel (&rest args &key (dimensions (error "dimensions is required."))
1495
(name (list 'spocq.a:|null| (task-id *query*)))
1496
(size *channel-sliced-size-limit*)
1497
(page-length *field-sliced-page-length*))
1498
(apply #'make-channel :dimensions dimensions :name name :size size :page-length page-length
1501
(defun make-unit-table-channel (&rest args &key (dimensions nil)
1502
(name (list 'spocq.a:|table| (task-id *query*)))
1503
(size *channel-sliced-size-limit*)
1504
(page-length *field-sliced-page-length*))
1505
(apply #'make-channel :dimensions dimensions :name name :size size :page-length page-length
1508
(defun make-solution-channel (&rest args)
1509
(apply #'make-instance 'solution-channel args))
1514
(defclass dimensioned-funcallable-object (c2mop:funcallable-standard-object)
1516
:initarg :dimensions :initform (error "dimensions is required.")
1517
:reader function-dimensions)
1520
:initarg :function :initform (error "function is required.")
1521
:accessor dimensioned-funcallable-object-function))
1522
#-mcl (:metaclass c2mop:funcallable-standard-class))
1524
(defmethod initialize-instance ((instance dimensioned-funcallable-object) &key function)
1525
(declare (ignorable function))
1526
#-mcl (c2mop:set-funcallable-instance-function instance function)
1529
(defmacro dimlambda (parameter-list &rest body)
1530
`(make-instance 'dimensioned-function
1531
:dimensions ',parameter-list
1532
:function (lambda ,parameter-list ,@body)))
1535
(defmethod dimensioned-funcallable-object-function ((function function))
1542
(let ((c (make-channel :size 4 :dimensions '(a))))
1545
(let ((p (new-field-page c 10 1)))
1546
(put-field-page c p)
1548
(release-field-page c p)))
1553
(setq *data-trace-output* *trace-output*)
1555
(sb-profile:profile complete-field put-field-page get-field-page new-field-page release-field-page
1557
wait-on-semaphore signal-semaphore try-semaphore)
1558
(sb-profile:unprofile)
1560
(defun test-queue (&key (size 8) (count (* 8 size)) (verbose nil) (test nil)
1561
(dimensions '(a b)) (width (length dimensions)) (length 8)
1563
(let* ((q (make-instance class :dimensions dimensions :page-length length :size size))
1567
(print q *trace-output*)
1568
(finish-output *trace-output*)
1571
(let ((page (new-field-page q width length)))
1572
(trace-streaming tq.new q)
1574
(fill (make-array (* (channel-page-width q) (channel-page-length q))
1575
:element-type 'fixnum
1578
(put-field-page q page))
1579
(trace-streaming tq.put q)
1580
(when verbose (write-char #\+ *trace-output*) (finish-output *trace-output*)))
1583
(dotimes (test-count count)
1584
(let ((page (get-field-page q)))
1585
(trace-streaming tq.get q)
1587
(position test-count
1588
(make-array (* (channel-page-width q) (channel-page-length q))
1589
:element-type 'fixnum
1592
(warn "missed: ~s != ~s." test-count page)
1593
(bt:destroy-thread t-put)
1594
(setf result :failed)
1596
(release-field-page q page)
1597
(trace-streaming tq.release q)
1598
(when verbose (write-char #\- *trace-output*) (finish-output *trace-output*))
1600
(unless (null (get-field-page q))
1601
(warn "non null after ~d." count))
1602
(bt:join-thread t-put)))
1603
(setf t-put (bt:make-thread #'run-put))
1604
(setf t-get (bt:make-thread #'run-get)))
1605
(bt:join-thread t-get)
1608
(test-queue :verbose t :size 4 :count 4)
1609
(let ((*print-array* nil) (*print-pretty* nil)) (test-queue :count 8 :size 2 :verbose t))
1610
(time (test-queue :count 100000 :class 'page-fifo))
1611
(time (test-queue :count 100000 :class 'page-ring))
1612
(time (test-queue :count 10000000 :class 'page-ring))
1614
* (time (test-queue :count 10000000 :class 'page-fifo)) ;; 59.758/103.722482 +12,661MB
1616
#<PAGE-FIFO "#<PAGE-FIFO {1047F401A3}>" [0] {1047F401A3}>
1618
58.989 seconds of real time
1619
102.366397 seconds of total run time (54.079380 user, 48.287017 system)
1620
[ Run times consist of 0.560 seconds GC time, and 101.807 seconds non-GC time. ]
1623
201,243,870,487 processor cycles
1624
12,660,919,744 bytes consed
1627
#<PAGE-FIFO "#<PAGE-FIFO {1047F401A3}>" [0] {1047F401A3}>
1628
* (sb-profile:report)
1630
seconds | gc | consed | calls | sec/call | name
1631
-----------------------------------------------------------------
1632
62.878 | 0.560 | 10,504,214,736 | 10,000,001 | 0.000006 | GET-FIELD-PAGE
1633
61.447 | 0.560 | 12,618,437,024 | 10,000,000 | 0.000006 | NEW-FIELD-PAGE
1634
19.060 | 0.000 | 23,679,200 | 10,000,001 | 0.000002 | PUT-FIELD-PAGE
1635
11.191 | 0.000 | 170,222,560 | 10,000,000 | 0.000001 | RELEASE-FIELD-PAGE
1636
0.000 | 0.000 | 0 | 1 | 0.000000 | COMPLETE-FIELD
1637
-----------------------------------------------------------------
1638
154.576 | 1.120 | 23,316,553,520 | 40,000,003 | | Total
1640
estimated total profiling overhead: 22.40 seconds
1641
overhead estimation parameters:
1642
1.6000001e-8s/call, 5.6e-7s total profiling, 1.2800001e-7s internal profiling
1644
These functions were not called:
1645
CREATE-RING-PAGE SIGNAL-SEMAPHORE WAIT-ON-SEMAPHORE
1648
;; (time (test-queue :count 10000000 :class 'page-ring :size 4)) ;; 62.812/122.148 +12,661MB
1650
* (time (test-queue :count 10000000 :class 'page-ring :size 32))
1652
#<PAGE-RING [[0,32/32] 0 @IO: (A B)] NIL {104D12B2D3}>
1654
62.911 seconds of real time
1655
122.343645 seconds of total run time (45.274829 user, 77.068816 system)
1656
[ Run times consist of 0.036 seconds GC time, and 122.308 seconds non-GC time. ]
1658
214,622,173,442 processor cycles
1659
683,204,896 bytes consed
1662
#<PAGE-RING [[0,31/32] 10000000 @NIL: (A B)] NIL {104D12B2D3}>
1663
* (sb-profile:report)
1665
seconds | gc | consed | calls | sec/call | name
1666
----------------------------------------------------------------
1667
35.864 | 0.036 | 254,472,816 | 10,000,001 | 0.000004 | NEW-FIELD-PAGE
1668
35.321 | 0.036 | 431,765,360 | 20,000,002 | 0.000002 | WAIT-ON-SEMAPHORE
1669
25.168 | 0.000 | 32,032,944 | 20,000,001 | 0.000001 | SIGNAL-SEMAPHORE
1670
23.778 | 0.000 | 115,306,416 | 10,000,001 | 0.000002 | GET-FIELD-PAGE
1671
21.471 | 0.000 | 124,254,784 | 10,000,000 | 0.000002 | RELEASE-FIELD-PAGE
1672
20.905 | 0.000 | 122,140,192 | 10,000,001 | 0.000002 | PUT-FIELD-PAGE
1673
0.000 | 0.000 | 32,768 | 32 | 0.000000 | CREATE-RING-PAGE
1674
0.000 | 0.000 | 0 | 1 | 0.000000 | COMPLETE-FIELD
1675
----------------------------------------------------------------
1676
162.507 | 0.072 | 1,080,005,280 | 80,000,039 | | Total
1679
;; fifo : 122/38.4 runtime
1680
;; mailbox : 217/56 runtime
1682
;; sbcl 1.0.46, linux, 78.99 seconds total runtime = 1265983.1 / second
1683
* (sb-profile:report)
1685
seconds | gc | consed | calls | sec/call | name
1686
------------------------------------------------------------
1687
0.000 | 0.000 | 2,883,584 | 1,000,000 | 0.000000 | WAIT-ON-QUEUE
1688
0.000 | 0.100 | 15,991,136 | 1,000,000 | 0.000000 | CHANNEL-PUT
1689
0.000 | 0.100 | 6,268,032 | 1,000,000 | 0.000000 | CHANNEL-GET
1690
------------------------------------------------------------
1691
0.000 | 0.200 | 25,142,752 | 3,000,000 | | Total
1693
estimated total profiling overhead: 17.59 seconds
1694
overhead estimation parameters:
1695
8.000001e-9s/call, 5.8640003e-6s total profiling, 3.048e-6s internal profiling
1699
;;; (run-test-query "select count(*) where {?s ?p ?o} " :repository-id "system/schema")
1700
;;; (time (run-test-query "select count(*) where {?s ?p ?o} " :repository-id "jan/cantor_merged-reasoned"))
1701
;;; (run-test-query "select * where {?s ?p ?o} limit 4" :repository-id "system/schema")
1703
;;; w/ page rings (first are profiled)
1704
* (time (run-test-query "select count(*) where {?s ?p ?o} " :repository-id "jan/cantor_merged-reasoned"))
1707
0.876 seconds of real time
1708
1.132071 seconds of total run time (1.032065 user, 0.100006 system)
1711
385 lambdas converted
1712
2,987,379,711 processor cycles
1713
41,063,424 bytes consed
1717
#<QUERY [00000000-0000-0000-0000-000000000000/NIL, query, jan/cantor_merged-reasoned] {100833C21B}>
1718
* (time (run-test-query "select count(*) where {?s ?p ?o} " :repository-id "jan/cantor_merged-reasoned"))
1721
0.717 seconds of real time
1722
0.972061 seconds of total run time (0.848053 user, 0.124008 system)
1725
2,446,119,256 processor cycles
1726
5,593,344 bytes consed
1730
#<QUERY [00000000-0000-0000-0000-000000000000/NIL, query, jan/cantor_merged-reasoned] {100A9BEDAB}>
1731
* (sb-profile:report)
1732
measuring PROFILE overhead..done
1734
seconds | gc | consed | calls | sec/call | name
1735
----------------------------------------------------------
1736
2.962 | 0.000 | 16,591,120 | 38,276 | 0.000077 | GET-FIELD-PAGE
1737
0.094 | 0.000 | 2,297,664 | 38,276 | 0.000002 | PUT-FIELD-PAGE
1738
0.042 | 0.000 | 7,241,504 | 38,276 | 0.000001 | NEW-FIELD-PAGE
1739
0.000 | 0.000 | 1,925,600 | 130 | 0.000000 | MAKE-PAGE
1740
----------------------------------------------------------
1741
3.099 | 0.000 | 28,055,888 | 114,958 | | Total
1743
estimated total profiling overhead: 0.08 seconds
1744
overhead estimation parameters:
1745
1.6000001e-8s/call, 7.2800003e-7s total profiling, 2.5600002e-7s internal profiling
1748
;;; w/ page rings, unprfoiled
1749
* (time (run-test-query "select count(*) where {?s ?p ?o} " :repository-id "jan/cantor_merged-reasoned"))
1752
0.683 seconds of real time
1753
0.948060 seconds of total run time (0.936059 user, 0.012001 system)
1756
2,331,045,822 processor cycles
1757
4,918,624 bytes consed
1761
#<QUERY [00000000-0000-0000-0000-000000000000/NIL, query, jan/cantor_merged-reasoned] {100AF4CE8B}>
1765
* (time (run-test-query "select count(*) where {?s ?p ?o} " :repository-id "jan/cantor_merged-reasoned"))
1768
1.545 seconds of real time
1769
1.164073 seconds of total run time (1.076067 user, 0.088006 system)
1772
363 lambdas converted
1773
5,268,970,064 processor cycles
1775
34,411,520 bytes consed
1779
#<QUERY [00000000-0000-0000-0000-000000000000/NIL, query, jan/cantor_merged-reasoned] {1008A3F299}>
1780
* (time (run-test-query "select count(*) where {?s ?p ?o} " :repository-id "jan/cantor_merged-reasoned"))
1783
0.697 seconds of real time
1784
0.964061 seconds of total run time (0.904057 user, 0.060004 system)
1787
2,378,037,240 processor cycles
1788
4,932,112 bytes consed
1792
#<QUERY [00000000-0000-0000-0000-000000000000/NIL, query, jan/cantor_merged-reasoned] {100AA7A6D9}>
1794
* (sb-profile:report)
1796
measuring PROFILE overhead..done
1798
seconds | gc | consed | calls | sec/call | name
1799
--------------------------------------------------------
1800
0.278 | 0.000 | 1,501,984 | 19,136 | 0.000015 | MAKE-PAGE
1801
--------------------------------------------------------
1802
0.278 | 0.000 | 1,501,984 | 19,136 | | Total
1804
estimated total profiling overhead: 0.02 seconds
1805
overhead estimation parameters:
1806
8.000001e-9s/call, 7.84e-7s total profiling, 3.04e-7s internal profiling