Coverage report: /development/source/library/org/datagraph/spocq-shard/src/core/streaming.lisp

KindCoveredAll%
expression2791850 15.1
branch11116 9.5
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
2
 
3
 (in-package :org.datagraph.spocq.implementation)
4
 ;;; (load #p"patches/streaming.lisp")
5
 
6
 #+(or)
7
 (progn
8
   (initialize-spocq)
9
   (load "classes")
10
   (load "utilities")
11
   (load "processing")
12
   (fmakunbound 'channel-put)
13
   (load (compile-file "streaming"))
14
   (load "test-query"))
15
 
16
 #+sbcl
17
 (require :sb-concurrency)
18
 
19
 (defgeneric page-length (page)
20
   (:method ((page array)) (first (array-dimensions page))))
21
   
22
 
23
 
24
 (defvar *streaming-trace-output* nil)
25
 (defmacro trace-streaming (operator &rest values)
26
   (let ((op (gensym "trace-")))
27
     `(flet ((,op (,op)
28
               (format ,op "~&~a[~a] : ~@{~a~^ ~}~%"
29
                       ',operator (bt:thread-name (bt:current-thread)) ,@values)))
30
        (declare (dynamic-extent #',op))
31
        (trace-when *streaming-trace-output* #',op))))
32
 
33
 
34
 #+(and mcl nil)
35
 (progn                                  ; just enough to allow it to compile
36
   (defpackage :sb-thread
37
     (:export :wait-on-semaphore :signal-semaphore :semaphore-count :make-semaphore))
38
   (defstruct semaphore
39
     (name "anonymous")
40
     (lock (bt:make-lock ))
41
     (count 0))
42
   (defun sb-thread:make-semaphore (&rest args)
43
     (apply #'make-semaphore args))
44
   (defun sb-thread:semaphore-count (semaphore)
45
     (semaphore-count semaphore))
46
   (defun sb-thread:wait-on-semaphore (semaphore)
47
     (loop (bt:with-lock-held ((semaphore-lock semaphore))
48
             (when (plusp (semaphore-count semaphore))
49
               (decf (semaphore-count semaphore))
50
               (return-from sb-thread:wait-on-semaphore t)))
51
           (ccl:process-wait "semaphore wait" #'(lambda (semaphore) (plusp (semaphore-count semaphore))) semaphore)))
52
   (defun sb-thread:signal-semaphore (semaphore)
53
     (bt:with-lock-held ((semaphore-lock semaphore))
54
       (incf (semaphore-count semaphore))))
55
   )
56
 #+sbcl
57
 (progn
58
   (defun wait-on-semaphore (s)
59
     (sb-thread:wait-on-semaphore s))
60
   (defun signal-semaphore (s)
61
     (sb-thread:signal-semaphore s))
62
   (defun try-semaphore (s)
63
     (sb-thread:try-semaphore s)))
64
 
65
 
66
 ;;; interface
67
 
68
 (defgeneric cl-user::format-channel-state (stream object &optional colon at &rest args)
69
   )
70
 
71
 (defgeneric channel-count (channel)
72
   (:documentation "Return the count of elements currently in the channel."))
73
 
74
 (defgeneric channel-create-page (channel &optional length width)
75
   (:documentation "Return a new page sized for the channel page with and length."))
76
 
77
 (defgeneric channel-get (channel &key wait)
78
   (:documentation "Return the next available element from the channel.
79
  If WAIT is true, then suspend the process until data is available.
80
  Returns two values, the datum and a boolean to indicate wheter data was available."))
81
 
82
 (defgeneric channel-get-all (channel) )
83
 
84
 (defgeneric channel-name (channel) )
85
 
86
 (defgeneric (setf channel-name) (name channel) )
87
 
88
 (defgeneric channel-page-count (channel)
89
   (:documentation "Return the count of elements passed through the channel."))
90
 
91
 (defgeneric (setf channel-page-length) (length channel)
92
   (:documentation "Set the number of solutions per page for pages in the channel."))
93
 
94
 (defgeneric channel-page-length (channel)
95
   (:documentation "Return the number of solutions per page for pages in the channel.")
96
   (:method ((channel function))
97
     1))
98
 
99
 (defgeneric channel-put (channel value)
100
    #+(or)
101
    (:method :after ((c t) (v t))
102
      (let ((*print-pretty* nil) (*print-array* nil))
103
        (format *trace-output* "~&channel-put: ~s: ~s" c v))))
104
 
105
 (defgeneric channel-reset (channel) )
106
 
107
 (defgeneric channel-size (channel) )
108
 
109
 (defgeneric (setf channel-size) (size channel)
110
   (:documentation "Set the channel depth. The size of the channel buffer limits the number
111
  of unconsumed pages before the producer is blocked."))
112
 
113
 (defgeneric channel-solution-count (channel)
114
   (:documentation "Return the count of solutions passed through the channel."))
115
 
116
 (defgeneric channel-get-wait-count (channel)
117
   (:documentation "Return the count of threads waiting on the channel."))
118
 
119
 (defgeneric channel-put-wait-count (channel)
120
   (:documentation "Return the count of threads waiting to write the channel."))
121
 
122
 (defgeneric complete-field (destination)
123
   (:documentation "Indicate that no further solutions will be added to a field channel.")
124
   (:method ((destination function))
125
     (funcall destination nil)))
126
 
127
 (defgeneric get-field-page (source)
128
   (:documentation "Get the next available page from the source. Supports various queues and
129
  functions as destination.")
130
 
131
   (:method ((source function))
132
     (funcall source)))
133
 
134
 
135
 (defgeneric map-channel (operator channel) )
136
 
137
 (defmethod new-field-page ((destination t) result-page-length result-page-width)
138
   (make-page result-page-length result-page-width))
139
 
140
 (defgeneric put-field-page (destination page)
141
   (:documentation "Pass the page through the channel. Supports various queues and
142
  functions as destination.")
143
 
144
   (:method ((destination function) page)
145
     (funcall destination page)))
146
 
147
 (defgeneric release-field-page (source page)
148
   (:method ((source t) page) (release-page page)))
149
 
150
 ;;; utilities
151
 
152
 (defun effective-page-length (&key start end)
153
   "Determine the field page length to use given query parameters. Iff a slice length is specified,
154
  limit the page to that. Otherwise use the default page length."
155
  
156
   (if end
157
     (max (min *field-page-length* (- end (or start 0))) *field-sliced-page-length*)
158
     *field-page-length*))
159
 
160
 
161
 (defun effective-channel-size (&key start end)
162
   "Determine the field channel depth to use given query parameters. Iff a slice length is specified,
163
  use a size specific to slices. Otherwise use the default size."
164
 
165
   (declare (ignore start))
166
   (if end
167
     *channel-sliced-size-limit*
168
     *channel-size-limit*))
169
 
170
 
171
 (defgeneric channel-object-solution-count (object)
172
   (:documentation "Returns the object's 'size' to add to the channel's flow count.")
173
   (:method ((object array)) (if (plusp (array-rank object)) (array-dimension object 0) 1))
174
   (:method ((object null)) 0)
175
   (:method ((object t)) 1))
176
 
177
 
178
 (defgeneric object-channel-copy (object)
179
   (:method ((object array)) (copy-page object))
180
   (:method ((object t)) object))
181
 
182
 
183
 ;;; classes
184
 
185
 (defclass channel ()
186
   ((de.setf.amqp.implementation::name
187
     :initform nil :initarg :name
188
     :accessor channel-name)
189
    (reader :initform nil :accessor channel-reader)
190
    (writer :initform nil :accessor channel-writer)
191
    (state :initform :io :type (member :io :input nil) :accessor channel-state)
192
    (channels
193
     :initform nil :initarg :channels
194
     :accessor channel-channels
195
     :documentation "A list of channels into which all input is broadcast.")
196
    (timeout
197
     :initform nil :accessor channel-timeout)
198
    (start-timestamp
199
     :initform 0 :accessor channel-start-timestamp
200
     :documentation "records the timestamp in microseconds")
201
    (start-thread-time
202
     :initform 0 :accessor channel-start-thread-time
203
     :documentation "records the thread-local start in nanoseconds")
204
    (end-thread-time
205
     :initform 0 :accessor channel-end-thread-time))
206
   (:documentation "the protocol class for field solution channels."))
207
 
208
 (defmethod print-object ((object channel) stream)
209
   (print-unreadable-object (object stream :identity t :type t)
210
     (format stream "[~/format-channel-state/] ~s"
211
             object
212
             (bound-slot-value object 'de.setf.amqp.implementation::name))))
213
 
214
 (defgeneric channel-p (object)
215
   (:method ((channel channel)) t)
216
   (:method ((object t)) nil))
217
 
218
 (defmethod cl-user::format-channel-state ((stream t) (channel channel) &optional colon at &rest args)
219
   (declare (ignore colon at args))
220
   (format stream "~a @~a" (channel-name channel) (channel-state channel)))
221
 
222
 (defmethod channel-reset ((channel channel))
223
   (setf (channel-reader channel) nil)
224
   (setf (channel-writer channel) nil)
225
   (setf (channel-state channel) :io))
226
 
227
 (defmethod put-field-page :before ((channel channel) (value t))
228
   ;; set it in the correct thread
229
   (when (zerop (channel-start-timestamp channel))
230
     (setf (channel-start-timestamp channel) (get-timeline-location)
231
           (channel-start-thread-time channel) (rdfcache:time-in-thread))))
232
 
233
 (defmethod complete-field :before ((channel channel))
234
   ;;; maybe never started
235
   (when (zerop (channel-start-timestamp channel))
236
     (setf (channel-start-timestamp channel) (get-timeline-location)
237
           (channel-start-thread-time channel) (rdfcache:time-in-thread)))
238
   (setf (channel-end-thread-time channel) (rdfcache:time-in-thread)))
239
 
240
 
241
 
242
 (defclass abstract-page-channel (channel)
243
   ((dimensions
244
     :initform (error "dimensions is required.") :initarg :dimensions
245
     :reader channel-dimensions)
246
    (sort-dimensions
247
     :initform (error "sort-dimensions is required.") :initarg :sort-dimensions
248
     :accessor channel-sort-dimensions)
249
    (size
250
     :initform *channel-size-limit* :initarg :size
251
     :reader channel-size :writer setf-channel-size)
252
    (page-width
253
     :initarg :page-width ; must be suppied explicitly or in terms of dimensions
254
     :accessor channel-page-width)
255
    (page-length
256
     :initform *field-page-length* :initarg :page-length
257
     :reader channel-page-length :writer setf-channel-page-length)
258
    (page-count
259
     :initform 0
260
     :accessor channel-page-count)
261
    (solution-count
262
     :initform 0
263
     :accessor channel-solution-count)))
264
 
265
 (defmethod initialize-instance ((instance abstract-page-channel)
266
                                 &rest initargs
267
                                 &key dimensions
268
                                 (sort-dimensions dimensions))
269
   (declare (dynamic-extent initargs))
270
   (apply #'call-next-method instance
271
          :sort-dimensions sort-dimensions
272
          initargs))
273
 
274
 (defmethod cl-user::format-channel-state ((stream t) (channel abstract-page-channel) &optional colon at &rest args)
275
   (declare (ignore colon at args))
276
   (format stream "~a (~a x ~a) @~a"
277
           (channel-name channel)
278
           (channel-page-length channel) (channel-page-width channel)
279
           (channel-state channel)))
280
 
281
 
282
 ;;; the  principle prtocol classes are array-page-channel and matrix-page-channel
283
 ;;; then specialize algebra operator methods with respect to the access method required
284
 ;;; for the respective page representation - heap array or external array.
285
 
286
 (defclass array-page-channel (abstract-page-channel)
287
   ()
288
   (:documentation "Mark the implementation as representing pages as arrays"))
289
 
290
 (defclass matrix-page-channel (abstract-page-channel)
291
   ()
292
   (:documentation "Mark the implementation as representing pages as pointers to foreign matrix values"))
293
 
294
 
295
 (defclass page-channel (abstract-page-channel)
296
   ((free-count
297
     :type fixnum
298
     :accessor channel-free-count)
299
    (bound-count
300
     :type fixnum
301
     :accessor channel-bound-count)
302
    (write-count
303
     :type fixnum
304
     :accessor channel-write-count)
305
    (read-count
306
     :type fixnum
307
     :accessor channel-read-count)))
308
 
309
 (defmethod initialize-instance :after ((instance page-channel) &key)
310
   (with-slots (size free-count bound-count write-count read-count
311
                page-width dimensions) instance
312
     (setf free-count size
313
           bound-count 0
314
           write-count 0
315
           read-count 0)
316
     (setf page-width (length dimensions))))
317
 
318
 (defmethod print-object ((object page-channel) stream)
319
   (print-unreadable-object (object stream :identity t :type t)
320
     (handler-case (format stream "[~/format-channel-state/: ~a] ~s"
321
                           object
322
                           (bound-slot-value object 'dimensions)
323
                           (bound-slot-value object 'de.setf.amqp.implementation::name))
324
       (error (c)
325
              (format stream "error: ~a" (type-of c))))))
326
 
327
 (defmethod cl-user::format-channel-state (stream (object page-channel) &optional colon at &rest args)
328
   (declare (ignore colon at args))
329
   (handler-case (format stream "[~a:~a/~a/~a/~a]x~:d.~:d~@[@~a~]"
330
                         (bound-slot-value object 'size)
331
                         (bound-slot-value object 'free-count)
332
                         (bound-slot-value object 'bound-count)
333
                         (bound-slot-value object 'write-count)
334
                         (bound-slot-value object 'read-count)
335
                         (bound-slot-value object 'page-count)
336
                         (bound-slot-value object 'solution-count)
337
                         (bound-slot-value object 'state))
338
     (error (c)
339
            (print-unreadable-object (object stream :type t :identity t)
340
              (format stream "error: ~a" (type-of c))))))
341
 
342
 
343
 (defmethod channel-reset ((channel page-channel))
344
   (call-next-method)
345
   (setf (channel-page-count channel) 0)
346
   (setf (channel-solution-count channel) 0)
347
   (setf (channel-bound-count channel) 0
348
         (channel-write-count channel) 0
349
         (channel-read-count channel) 0
350
         (channel-free-count channel) (channel-size channel))
351
   )
352
 
353
 
354
 (defmethod (setf channel-size) (size (channel abstract-page-channel))
355
   "record the new size, adjust the buffer lengths and reset the start/end indices"
356
   (unless (eql (channel-size channel) size)
357
     (setf-channel-size size channel)
358
     (channel-reset channel))
359
   size)
360
 
361
 (defmethod (setf channel-page-length) (length (channel abstract-page-channel))
362
   "record the new length and delete any pages present"
363
   (unless (eql (channel-page-length channel) length)
364
     (setf-channel-page-length length channel)
365
     (channel-reset channel))
366
   length)
367
 
368
 
369
 (defclass page-ring (page-channel array-page-channel)
370
   ((pages
371
     :type vector
372
     :accessor ring-pages)
373
    (free-count
374
     :type fixnum
375
     :accessor ring-free-count)
376
    (bound-count
377
     :type fixnum
378
     :accessor ring-bound-count)
379
    (write-count
380
     :type fixnum
381
     :accessor ring-write-count)
382
    (read-count
383
     :type fixnum
384
     :accessor ring-read-count)
385
    (size-mask
386
     :reader channel-size-mask))
387
   (:documentation "A page ring encapsulates a bounded set of field pages within an interface which permits
388
  operations which tread the page set as a resourced stream from a producing to a consuming thread.
389
  In the initial state, the inactive set is all null entries and the inactive buffer is empty.
390
  The operators (setf channel-size) and (setf channel-page-length) are used when linking channels together
391
  to coordinate the channel parameters for the query type: sliced and/or path queries use smaller pages and
392
  shallower channels than a complete projection."))
393
 
394
 
395
 (defmethod initialize-instance :after ((instance page-ring) &key &allow-other-keys)
396
   (with-slots (size-mask size) instance
397
     (assert (= 1 (logcount size)) ()
398
             "Invalid ring size (must be a power of two): ~s" size)
399
     (setf size-mask (1- size))))
400
 
401
 (defmethod shared-initialize :after ((instance page-ring) (slots t) &key &allow-other-keys)
402
   "(re)establish the state of a page ring:
403
  - create the inactive and active rin buffers;
404
  - clear the counts"
405
 
406
   (with-slots (pages size) instance
407
     (setf pages (make-array size :initial-element nil))))
408
 
409
 (defmethod cl-user::format-channel-state (stream (object page-ring) &optional colon at &rest args)
410
   (declare (ignore colon at args))
411
   (format stream "[~a:~a/~a/~a/~a]x~:d.~:d~@[@~a~]"
412
             (channel-size object)
413
             (ring-free-count object)
414
             (ring-bound-count object)
415
             (ring-write-count object)
416
             (ring-read-count object)
417
             (channel-page-count object)
418
             (channel-solution-count object)
419
             (channel-state object)))
420
 ;;; a state label like '[8:7/8/8/8]x0.0@NIL' would indicate a closed channel which had no pages
421
 
422
 
423
 #+sbcl
424
 (progn
425
   (defclass page-mailbox (page-channel array-page-channel)
426
     ((free-pages
427
       :reader channel-free-pages)
428
      (active-pages
429
       :reader channel-active-pages))
430
     (:documentation "A page-mailbox combines two mailboxes, each of which encapsulates its own
431
       queue and an availability semaphore. The mailboxes support multi-reader / multi-writer
432
       access."))
433
   
434
   (defmethod cl-user::format-channel-state (stream (object page-mailbox) &optional colon at &rest args)
435
     (declare (ignore colon at args))
436
     (format stream "[~a:~a/~a/]x~:d.~:d~@[@~a~]"
437
             (channel-size object)
438
             (channel-free-count object)
439
             (channel-count object)
440
             (channel-page-count object)
441
             (channel-solution-count object)
442
             (channel-state object)))
443
   
444
   (defmethod initialize-instance ((instance page-mailbox) &key name)
445
     (let ((*print-pretty* nil))
446
       (with-slots (active-pages free-pages) instance
447
         (setf free-pages (sb-concurrency:make-mailbox :name (format nil "~a.free" name)))
448
         (setf active-pages (sb-concurrency:make-mailbox :name (format nil "~a.active" name)))))
449
     (call-next-method))
450
   )
451
 
452
 #+sbcl
453
 (progn
454
   (defclass solution-channel (channel)
455
     ((dimensions :initarg :dimensions
456
                  :reader channel-dimensions)
457
      (mailbox :reader channel-mailbox))
458
     (:documentation "A solution-channel wraps a mailbox to add name and dimensions."))
459
   
460
   (defmethod cl-user::format-channel-state (stream (object solution-channel) &optional colon at &rest args)
461
     (declare (ignore colon at args))
462
     (format stream "[~a]x~:d~@[@~a~]"
463
             (channel-name object)
464
             (channel-count object)
465
             (channel-state object)))
466
   
467
   (defmethod initialize-instance ((instance solution-channel) &key name)
468
     (let ((*print-pretty* nil))
469
       (with-slots (mailbox) instance
470
         (setf mailbox (sb-concurrency:make-mailbox :name (format nil "~a.mailbox" name)))))
471
     (call-next-method))
472
   
473
   (defmethod complete-field ((channel solution-channel))
474
     (sb-concurrency:send-message (channel-mailbox channel) nil))
475
   (defmethod channel-count ((channel solution-channel))
476
     (sb-concurrency:mailbox-count (channel-mailbox channel)))
477
   (defmethod channel-put ((channel solution-channel) (solution sequence))
478
     (sb-concurrency:send-message (channel-mailbox channel) solution))
479
   (defmethod channel-get ((channel solution-channel) &key (wait *channel-get-wait*))
480
     (when (or wait (plusp (sb-concurrency:mailbox-count (channel-mailbox channel))))
481
       (values (sb-concurrency:receive-message (channel-mailbox channel)) t)))
482
   )
483
 
484
 ;;;
485
 
486
 (defclass page-queue (page-channel array-page-channel)
487
   ((free-pages
488
     :reader channel-free-pages)
489
    (free-semaphore
490
     :initform (sb-thread:make-semaphore :name "queue free semaphore" :count 0)
491
     :reader channel-free-semaphore)
492
    (active-pages
493
     :reader channel-active-pages)
494
    (active-semaphore
495
     :initform (sb-thread:make-semaphore :name "queue active semaphore" :count 0)
496
     :reader channel-active-semaphore))
497
   (:documentation "A page-channel combines two queues and respective semaphores for signaling
498
     availability. The queues support just single-reader / single-writer access."))
499
 
500
   
501
 (defmethod initialize-instance :after ((instance page-queue) &key name)
502
   (let ((*print-pretty* nil))
503
     (with-slots (active-pages free-pages) instance
504
       (setf free-pages (make-instance 'amqp.i::queue :name (format nil "~a.free" name)))
505
       (setf active-pages (make-instance 'amqp.i::queue :name (format nil "~a.active" name))))))
506
 
507
 
508
 ;;;
509
 
510
 (defmethod complete-field ((channel page-ring))
511
   (trace-streaming complete-field.before channel)
512
   (let ((write-count (ring-write-count channel)))
513
     (barrier (:write)
514
       ;; ensure space for the e-o-d marker
515
       (new-field-page channel nil nil)
516
       ;; mark for input only
517
       (setf (channel-state channel) :input)
518
       ;; indicate completion
519
       (setf (svref (ring-pages channel) (logand write-count (channel-size-mask channel)))
520
             nil))
521
     (setf (ring-write-count channel) (1+ write-count)))
522
   (dolist (next (channel-channels channel))
523
     (complete-field next))
524
   (trace-streaming complete-field.after channel))
525
 
526
 
527
 (defmethod channel-count ((channel page-ring))
528
   (- (ring-write-count channel) (ring-read-count channel)))
529
 
530
 (defmethod channel-get-wait-count ((channel page-ring))
531
   (+ (if (>= (ring-bound-count channel) (ring-free-count channel)) 1 0)
532
      (if (>= (ring-read-count channel) (ring-write-count channel)) 1 0)))
533
 
534
 (defmethod channel-get ((channel page-ring) &key (wait *channel-get-wait*))
535
   (when (or wait (plusp (channel-count channel)))
536
     (values (get-field-page channel) t)))
537
   
538
 (defmethod channel-get-all ((channel page-ring))
539
   (loop for page = (get-field-page channel)
540
         until (null page)
541
         collect page))
542
 
543
 (defmethod channel-put ((channel page-ring) value)
544
   ;; skip the new- operation
545
   (new-field-page channel (channel-page-length channel) (channel-page-width channel))
546
   (put-field-page channel value))
547
 
548
 
549
 (defmethod create-ring-page ((channel page-ring))
550
   (make-array (list (channel-page-length channel) (channel-page-width channel))
551
               :element-type 'fixnum :initial-element +NULL-TERM-ID+))
552
 
553
 
554
 
555
 (defmethod (setf channel-page-length) (length (channel page-ring))
556
   "record the new length and delete any pages present"
557
   (setf-channel-page-length length channel)
558
   (channel-reset channel)
559
   length)
560
 
561
 (defmethod channel-reset ((channel page-ring))
562
   (call-next-method)
563
   (if (= (length (ring-pages channel)) (channel-size channel))
564
       (fill (ring-pages channel) nil)
565
       (setf (ring-pages channel) (make-array (channel-size channel) :initial-element nil))))
566
 
567
 (defmethod (setf channel-size) (size (channel page-ring))
568
   "record the new size, adjust the buffer lengths and reset the start/end indices"
569
   (with-slots (size-mask) channel
570
     (assert (= 1 (logcount size)) ()
571
             "Invalid ring size (must be a power of two): ~s" size)
572
     (setf size-mask (1- size))
573
     (setf-channel-size size channel)
574
     (channel-reset channel)
575
     size))
576
 
577
 
578
 (defmethod new-field-page ((channel page-ring) result-page-length result-page-width)
579
   "Remove an entry from the inactive buffer. If none is available, wait until one is.
580
  While waiting, reduce the thread priority to idle."
581
   (declare (ignore result-page-length result-page-width))
582
   
583
   (trace-streaming new-field-page channel)
584
   (ecase (channel-state channel)
585
     (:io
586
      ;; when nothing is already available, then adjust the priority in case
587
      ;; the process is suspended.
588
      (let* ((timeout (channel-timeout channel))
589
             (deadline (when timeout (+ timeout (get-internal-run-time))))
590
             (bound-count (ring-bound-count channel)))
591
        (loop (when (dotimes (x 100 nil)
592
                      (when (> (ring-free-count channel) bound-count) (return t))
593
                      ;; #+sbcl (sb-ext:spin-loop-hint)
594
                      )
595
                (return))
596
              (when (and deadline (plusp bound-count) (>= (get-internal-run-time) deadline))
597
                (spocq.e::timeout-error :detail (format nil "new-field-page: ~a" channel)))
598
              (setf (thread-priority) *algebra-idle-priority*)
599
              ; (bt:thread-yield)
600
               (sleep 0.000001)
601
              (setf (thread-priority) *algebra-running-priority*))
602
        (let ((data (svref (ring-pages channel)
603
                           (logand bound-count (channel-size-mask channel)))))
604
          
605
          (setf (ring-bound-count channel) (1+ bound-count))
606
          (etypecase data
607
            (array (setf data (clear-page data)))
608
            ;; special, to allow group entries
609
            (cons (setf data (clear-page (second data))))
610
            (null (case (channel-state channel)
611
                    (:io
612
                     (setf data (create-ring-page channel)))
613
                    (t
614
                     nil))))
615
          (trace-streaming new-field-page.dequeued channel data)
616
          data)))
617
     ((:input nil)                       ; should be no further output
618
      (log-warn "new-field-page: invalid channel state: ~a(~s)." channel (channel-state channel))
619
      nil)))
620
 
621
 
622
 (defmethod put-field-page ((channel page-ring) data)
623
   "Add an entry to the active buffer. As each page originated from the inactive buffer, there can be no more of
624
  them than the inactive buffer count. Which means, the inactive buffer read semaphore suffices to preclude buffer
625
  over-run."
626
   (trace-streaming put-field-page channel data)
627
   (dolist (next (channel-channels channel))
628
     (let ((to (new-field-page next (channel-page-length channel) (channel-page-width channel))))
629
       (assert (= (array-dimension to 1) (array-dimension data 1)) ()
630
               "page width mismatch: ~a/~a : ~a/~a."
631
               (channel-dimensions channel) (array-dimension data 1)
632
               (channel-dimensions next) (array-dimension to 1))
633
       (unless (= (array-dimension to 0) (array-dimension data 0))
634
         (setf to (adjust-page to (array-dimensions data))))
635
       (copy-page data to)
636
       (put-field-page next to)))
637
   (ecase (channel-state channel)
638
     (:io 
639
      (assert data () "invalid channel data: ~a: ~a" channel data)
640
      (let ((write-count (ring-write-count channel)))
641
        (barrier (:write)
642
          (setf (svref (ring-pages channel) (logand write-count (channel-size-mask channel)))
643
                data))
644
        (setf (ring-write-count channel) (1+ write-count)))
645
      (incf (channel-solution-count channel) (channel-object-solution-count data))
646
      (incf (channel-page-count channel)))
647
     ((:input nil)
648
      (log-warn "put-field-page: invalid channel state: ~a(~s): ~a." channel (channel-state channel) data)))
649
   (trace-streaming put-field-page.enqueued channel)
650
   data)
651
 
652
 
653
 (defmethod get-field-page ((channel page-ring))
654
   "Return the next active page. If none is available and the channel is still active, wait until one is.
655
  While waiting, reduce the thread priority to idle."
656
   (declare (optimize (debug 3)))
657
   (ecase (channel-state channel)
658
     ((:input :io)
659
      ;; when nothing is already available, then adjust the priority in case
660
      ;; the process is suspended.
661
      (let* ((timeout (channel-timeout channel))
662
             (deadline (when timeout (+ timeout (get-internal-run-time))))
663
             (read-count (ring-read-count channel)))
664
        (loop (when (dotimes (x 100 nil)
665
                      (when (> (ring-write-count channel) read-count) (return t))
666
                      ;; #+sbcl (sb-ext:spin-loop-hint)
667
                      )
668
                (return))
669
              (when (and deadline (plusp read-count) (>= (get-internal-run-time) deadline))
670
                (spocq.e::timeout-error :detail (format nil "get-field-page: ~a" channel)))
671
              (setf (thread-priority) *algebra-idle-priority*)
672
              ; (bt:thread-yield)
673
              (sleep 0.000001)
674
              (setf (thread-priority) *algebra-running-priority*))
675
        (let ((data (svref (ring-pages channel)
676
                           (logand read-count (channel-size-mask channel)))))
677
          (trace-streaming get-field-page.dequeued channel read-count data)
678
          (setf (ring-read-count channel) (1+ read-count))
679
          ;; the last element is nil, afterwhich nothing should be read
680
          (unless data
681
            (unless (eq (channel-state channel) :input)
682
              (log-warn "null data w/ :io: ~a ~a"
683
                        channel (map 'list #'type-of (ring-pages channel))))
684
            (setf (channel-state channel) nil))
685
          data)))
686
     ((nil)                        ; nothing present, nothing to expect
687
      (log-warn "get-field-page: invalid channel state: ~a(~s): ~a." channel (channel-state channel))
688
      nil)))
689
 
690
 (defmethod release-field-page ((channel page-ring) page)
691
   (trace-streaming release-field-page channel page)
692
   (ecase (channel-state channel)
693
     ((:io :input) )
694
     ((nil)
695
      (log-warn "release-field-page: invalid channel state: ~a(~s): ~a." channel (channel-state channel) page)
696
      (setf page nil)))
697
   (setf (svref (ring-pages channel)
698
                (logand (shiftf (ring-free-count channel) (1+ (ring-free-count channel)))
699
                        (channel-size-mask channel)))
700
         (when (= (array-dimension page 0) (channel-page-length channel))
701
           ;; save the page only if it is a 'standard' length
702
           page))
703
   (trace-streaming release-field-page.enqueued channel)
704
   page)
705
 
706
 (defmethod map-pages (op (source page-ring))
707
   (loop for page = (get-field-page source)
708
         until (null page)
709
         do (progn (funcall op page)
710
                   (release-field-page source page))))
711
 
712
 ;;; channels based on amqp queues
713
 
714
 (defclass fifo (page-channel amqp.i::queue array-page-channel)
715
   ((semaphore
716
     :initform (sb-thread:make-semaphore :name "queue semaphore" :count 0)
717
     :reader queue-semaphore)
718
    (channels
719
     :initform nil :initarg :channels
720
     :accessor channel-channels
721
     :documentation "A list of channels into which all input is broadcast.")))
722
 
723
 
724
 (defmethod print-object ((object fifo) stream  &aux (*print-pretty* nil))
725
   (print-unreadable-object (object stream :type t :identity t)
726
     (handler-case (format stream "[~/format-channel-state/: ~a] ~s x~s"
727
                           object
728
                           (bound-slot-value object 'dimensions)
729
                           (bound-slot-value object 'de.setf.amqp.implementation::name)
730
                           (when (slot-boundp object 'de.setf.amqp.implementation::header) (amqp.i::collection-size object)))
731
       (error (c)
732
              (format stream "error: ~a" (type-of c))))))
733
   
734
 
735
 (defmethod channel-count ((channel fifo))
736
   (sb-thread:semaphore-count (queue-semaphore channel)))
737
 
738
 (defmethod channel-get-wait-count ((channel fifo))
739
   (sb-thread::semaphore-waitcount (queue-semaphore channel)))
740
 
741
 (defmethod channel-get ((channel fifo) &key (wait *channel-get-wait*))
742
   (when (or wait (plusp (channel-count channel)))
743
     (values (get-field-page channel) t)))
744
 
745
 (defmethod channel-get-all ((channel fifo))
746
   (collect-list (collect)
747
     (loop (multiple-value-bind (value not-empty) (amqp.u:dequeue channel)
748
             (if not-empty
749
               (collect value)
750
               (return))))))
751
 
752
 (defmethod channel-put ((channel fifo) value)
753
   (put-field-page channel value))
754
 
755
 (defmethod complete-field ((channel fifo))
756
   (put-field-page channel nil)
757
   (dolist (next (channel-channels channel))
758
     (complete-field next))
759
   (setf (channel-state channel) :input))
760
 
761
 (defmethod new-field-page ((channel fifo) page-length page-width)
762
   (trace-streaming new-field-page channel page-length page-width)
763
   (assert (= page-width (channel-page-width channel)) ()
764
           "Invalid page width: ~s != ~s." (channel-page-width channel) page-width)
765
   (let ((data (make-page page-length page-width)))
766
     (incf (channel-bound-count channel))
767
     (trace-streaming new-field-page.dequeued channel data)))
768
 
769
 (defmethod put-field-page ((channel fifo) page)
770
   (trace-streaming put-field-page channel page)
771
   (let ((semaphore (queue-semaphore channel))
772
         (limit (channel-size channel)))
773
     (when limit
774
       (loop (when (< (sb-thread:semaphore-count semaphore) limit) (return))
775
             (setf (thread-priority) *algebra-idle-priority*)
776
             (bt:thread-yield)
777
             (setf (thread-priority) *algebra-running-priority*)))
778
     (dolist (next (channel-channels channel))
779
       (let ((to (new-field-page next (channel-page-length channel) (channel-page-width channel))))
780
         (unless (= (array-dimension to 0) (array-dimension page 0))
781
         (setf to (adjust-page to (array-dimensions page))))
782
         (copy-page page to)
783
         (put-field-page next to)))
784
     (amqp.u:enqueue page channel)
785
     (incf (channel-write-count channel))
786
     (when page
787
       (incf (channel-solution-count channel) (channel-object-solution-count page))
788
       (incf (channel-page-count channel)))
789
     (signal-semaphore semaphore)
790
     (trace-streaming put-field-page.enqueued channel)
791
     page))
792
 
793
 (defmethod get-field-page ((channel fifo))
794
   (trace-streaming get-field-page channel)
795
   (let ((page (loop (multiple-value-bind (value not-empty)
796
                                          (amqp.u:dequeue channel)
797
                       (when not-empty
798
                         (return value))
799
                       (setf (thread-priority) *algebra-idle-priority*)
800
                       (wait-on-semaphore (queue-semaphore channel))
801
                       (setf (thread-priority) *algebra-running-priority*)))))
802
     (trace-streaming get-field-page.dequeued channel page)
803
     page))
804
 
805
 
806
 (defmethod release-field-page ((channel fifo) page)
807
   (trace-streaming release-field-page channel page)
808
   (release-page page)
809
   (trace-streaming release-field-page.enqueued channel)
810
   page)
811
 
812
 #+sbcl
813
 (progn (defparameter *resource-streaming-pages* nil)
814
        (defmethod complete-field ((channel page-mailbox))
815
          (trace-streaming complete-field.before channel)
816
          ;; empty the free page queue
817
          (sb-concurrency:receive-pending-messages (channel-free-pages channel))
818
          ;; mark for input only
819
          (setf (channel-state channel) :input)
820
          ;; indicate completion
821
          (sb-concurrency:send-message (channel-active-pages channel) nil)
822
          (dolist (next (channel-channels channel))
823
            (complete-field next))
824
          (trace-streaming complete-field.after channel))
825
 
826
        (defmethod channel-count ((channel page-mailbox))
827
          "Return the count from the active page queue."
828
          (sb-concurrency:mailbox-count (channel-active-pages channel)))
829
 
830
        (defmethod channel-create-page ((channel page-channel) &optional
831
                                        (length (channel-page-length channel))
832
                                        (width (channel-page-width channel)))
833
          (make-array (list length width)
834
                      :element-type 'fixnum :initial-element +NULL-TERM-ID+))
835
 
836
        (defmethod channel-free-count ((channel page-mailbox))
837
          "Return the count from the free page queue."
838
          (sb-concurrency:mailbox-count (channel-free-pages channel)))
839
        
840
        (defmethod channel-get ((channel page-mailbox) &key (wait *channel-get-wait*))
841
          (if wait
842
            (sb-concurrency:receive-message (channel-active-pages channel))
843
            (sb-concurrency:receive-message-no-hang (channel-active-pages channel))))
844
 
845
        (defmethod channel-get-all ((channel page-mailbox))
846
          "Empty both mailbox queues. Return the content of the active page queue."
847
          (sb-concurrency:receive-pending-messages (channel-free-pages channel))
848
          (sb-concurrency:receive-pending-messages (channel-active-pages channel)))
849
 
850
        (defmethod channel-put ((channel page-mailbox) page)
851
          (let ((width (channel-page-width channel)))
852
            (assert (= width (array-dimension page 1)) ()
853
                    "channel-put: Invalid page width (expected ~a): ~a, ~a."
854
                    width (array-dimension page 1) (array-dimensions page)))
855
          (sb-concurrency:send-message (channel-active-pages channel) page))
856
 
857
        (defmethod (setf channel-page-length) (length (channel page-mailbox))
858
          "record the new length and delete any pages present"
859
          (setf-channel-page-length length channel)
860
          (channel-reset channel)
861
          length)
862
 
863
        (defmethod channel-reset ((channel page-mailbox))
864
          (call-next-method)
865
          (sb-concurrency:receive-pending-messages (channel-free-pages channel))
866
          (sb-concurrency:receive-pending-messages (channel-active-pages channel)))
867
 
868
        (defmethod (setf channel-size) (size (channel page-mailbox))
869
          "record the new size, adjust the buffer lengths and reset the start/end indices"
870
          (setf-channel-size size channel)
871
          (channel-reset channel)
872
          size)
873
 
874
        (defmethod channel-get-wait-count ((channel page-mailbox))
875
          "Return the count from the active page queue."
876
          (let* ((mailbox (channel-active-pages channel))
877
                 (semaphore (sb-concurrency::mailbox-semaphore mailbox)))
878
            (sb-thread::semaphore-waitcount semaphore)))
879
 
880
        (defmethod new-field-page ((channel page-mailbox) length width)
881
          "Attempt to dequeue a free page. If that does not succeed immediately,
882
           then iff the channel is still below the size limit, make a new page.
883
           Otherwise, wait until a page is released."
884
          (trace-streaming new-field-page channel)
885
          (assert (= width (channel-page-width channel)) ()
886
                    "new-field-page: Invalid page width requested (expected ~a): ~a."
887
                    width (channel-page-width channel))
888
          (ecase (channel-state channel)
889
            (:io
890
             (if *resource-streaming-pages*
891
               (let* ((mailbox (channel-free-pages channel))
892
                      (semaphore (sb-concurrency::mailbox-semaphore mailbox))
893
                      (queue     (sb-concurrency::mailbox-queue mailbox))
894
                      (data
895
                       ;; w/o the locking from receive-message(-no-hang) across semaphore/queue
896
                       ;; operations, as there is just one reader / one writer
897
                       (cond ((sb-thread:try-semaphore semaphore)
898
                              (sb-concurrency:dequeue queue))
899
                             ((< (sb-thread:semaphore-count
900
                                  (sb-concurrency::mailbox-semaphore (channel-active-pages channel)))
901
                                 (channel-size channel))
902
                              (channel-create-page channel))
903
                             (t
904
                              (sb-thread:wait-on-semaphore semaphore)
905
                              (sb-concurrency:dequeue queue)))))
906
                 (trace-streaming new-field-page.dequeued channel data)
907
                 (unless (and (eql (array-dimension data 0) length(eql (array-dimension data 1) width))
908
                   (setf data (adjust-array data (list length width))))
909
                 (let ((width (channel-page-width channel)))
910
                   (assert (= width (array-dimension data 1)) ()
911
                    "new-field-page: Invalid page width (expected ~a): ~a, ~a."
912
                    width (array-dimension data 1) (array-dimensions data)))
913
                 data)
914
               (channel-create-page channel length width)))
915
            (((:input nil)                       ; should be no further output
916
              (log-warn "new-field-page: invalid channel state: ~a(~s)." channel (channel-state channel))
917
              nil))))
918
 
919
        (defmethod put-field-page ((channel page-mailbox) page)
920
          "Add an entry to the active pages. As each page should have originated from the free pages, there can
921
           be no more of them than the inactive buffer count. Which means, the free pages semaphore suffices to
922
           preclude buffer over-run."
923
          (trace-streaming put-field-page channel page)
924
          (when page
925
            (let ((width (channel-page-width channel)))
926
              (assert (= width (array-dimension page 1)) ()
927
                      "put-field-page: Invalid page width (expected ~a): ~a, ~a."
928
                      width (array-dimension page 1) (array-dimensions page))))
929
          (ecase (channel-state channel)
930
            (:io
931
             (sb-concurrency:send-message (channel-active-pages channel) page)
932
             (incf (channel-solution-count channel) (channel-object-solution-count page))
933
             (incf (channel-page-count channel))
934
             (dolist (next (channel-channels channel))
935
               (let ((to (new-field-page next (channel-page-length channel) (channel-page-width channel))))
936
                 (assert (= (array-dimension to 1) (array-dimension page 1)) ()
937
                         "page width mismatch: ~a/~a : ~a/~a."
938
                         (channel-dimensions channel) (array-dimension page 1)
939
                         (channel-dimensions next) (array-dimension to 1))
940
                 (unless (= (array-dimension to 0) (array-dimension page 0))
941
                   (setf to (adjust-page to (array-dimensions page))))
942
                 (copy-page page to)
943
                 (put-field-page next to))))
944
            ((:input nil)
945
             (log-warn "put-field-page: invalid channel state: ~a(~s): ~a." channel (channel-state channel) page)))
946
          (trace-streaming put-field-page.enqueued channel)
947
          page)
948
 
949
        (defmethod get-field-page ((channel page-mailbox))
950
          "Return the next active page. If none is available and the channel is still active, wait until one is.
951
           While waiting, reduce the thread priority to idle."
952
          (trace-streaming get-field-page channel)
953
          (ecase (channel-state channel)
954
            ((:input :io)
955
             (let* ((mailbox (channel-active-pages channel))
956
                    (semaphore (sb-concurrency::mailbox-semaphore mailbox))
957
                    (queue     (sb-concurrency::mailbox-queue mailbox))
958
                    (data
959
                     ;; w/o the locking from receive-message(-no-hang) across semaphore/queue
960
                     ;; operations, as there is just one reader / one writer
961
                     (cond ((sb-thread:try-semaphore semaphore)
962
                            (sb-concurrency:dequeue queue))
963
                           (t
964
                            (unwind-protect
965
                              (progn (setf (thread-priority) *algebra-idle-priority*)
966
                                     (sb-thread:wait-on-semaphore semaphore)
967
                                     (sb-concurrency:dequeue queue))
968
                              (setf (thread-priority) *algebra-running-priority*))))))
969
               (unless (or data (eq (channel-state channel) :input))
970
                 (log-warn "get-field-page: null data w/ :io: ~a ~a" channel mailbox)
971
                 (setf (channel-state channel) nil))
972
               (when data
973
                 (let ((width (channel-page-width channel)))
974
                   (unless (= width (array-dimension data 1))
975
                     (warn "get-field-page: Invalid page width (expected ~a): ~a, ~a."
976
                            width (array-dimension data 1) (array-dimensions data)))
977
                   (assert (= width (array-dimension data 1)) ()
978
                           "get-field-page: Invalid page width (expected ~a): ~a, ~a."
979
                           width (array-dimension data 1) (array-dimensions data))))
980
               (trace-streaming get-field-page.dequeued channel data)
981
               data))
982
            (((nil)                       ; should be no further output
983
              (log-warn "get-field-page: invalid channel state: ~a(~s): ~a." channel (channel-state channel))
984
              nil))))
985
 
986
        (defmethod release-field-page ((channel page-mailbox) page)
987
          (trace-streaming release-field-page channel page)
988
          (ecase (channel-state channel)
989
            ((:io :input) )
990
            ((nil)
991
             (log-warn "release-field-page: invalid channel state: ~a(~s): ~a." channel (channel-state channel) page)
992
             (setf page nil)))
993
          (when *resource-streaming-pages*
994
            (sb-concurrency:send-message (channel-free-pages channel)
995
                                         (when (= (array-dimension page 0) (channel-page-length channel))
996
                                           ;; save the page only if it is a 'standard' length
997
                                           page))
998
            (trace-streaming release-field-page.enqueued channel))
999
          page)
1000
 
1001
        (defmethod map-pages (op (source page-mailbox))
1002
          (loop for page = (get-field-page source)
1003
                until (null page)
1004
                do (progn (funcall op page)
1005
                          (release-field-page source page))))
1006
 
1007
        )
1008
 
1009
 #+sbcl
1010
 (progn
1011
   (defmethod channel-count ((channel sb-concurrency:mailbox))
1012
     (sb-concurrency:mailbox-count channel))
1013
 
1014
   (defmethod channel-get-wait-count ((channel sb-concurrency:mailbox))
1015
     "Return the count from the active page queue."
1016
     (let* ((semaphore (sb-concurrency::mailbox-semaphore channel)))
1017
       (sb-thread::semaphore-waitcount semaphore)))
1018
   
1019
   (defmethod channel-get-all ((channel sb-concurrency:mailbox))
1020
     (sb-concurrency:receive-pending-messages channel))
1021
 
1022
   (defmethod channel-get ((channel sb-concurrency:mailbox) &key (wait *channel-get-wait*)
1023
                           (timeout nil))
1024
     (if wait
1025
       (sb-concurrency:receive-message channel :timeout timeout)
1026
       (sb-concurrency:receive-message-no-hang channel)))
1027
   
1028
   (defmethod channel-name ((channel sb-concurrency:mailbox))
1029
     (sb-concurrency:mailbox-name channel))
1030
   
1031
   (defmethod (setf channel-name) (name (channel sb-concurrency:mailbox))
1032
     (setf (sb-concurrency:mailbox-name channel) name))
1033
 
1034
   (defmethod channel-put ((channel sb-concurrency:mailbox) data)
1035
     (sb-concurrency:send-message channel data))
1036
   
1037
   (defun make-pool (&key name) (sb-concurrency:make-mailbox :name name))
1038
 
1039
   (defmethod map-channel (operator (channel sb-concurrency:mailbox))
1040
     (map nil operator (sb-concurrency:list-mailbox-messages channel)))
1041
   
1042
   )
1043
          
1044
 ;;; paged-buffer
1045
 ;;; (load "patches/streaming.lisp")
1046
 
1047
 (defun queue-read-wait (queue lock)
1048
   (sb-thread:condition-wait queue lock))
1049
 
1050
 (defun queue-write-wait (queue lock)
1051
   (sb-thread:condition-wait queue lock))
1052
 
1053
 (defun queue-read-notify (queue)
1054
   (sb-thread:condition-notify queue))
1055
 
1056
 (defun queue-write-notify (queue)
1057
   (sb-thread:condition-notify queue))
1058
 
1059
 (defvar *trace-paged-buffer-output* nil)
1060
 
1061
 (defclass paged-buffer (abstract-page-channel)
1062
   ((waitqueue
1063
     :accessor paged-buffer-waitqueue)
1064
    (lock
1065
     :accessor paged-buffer-lock)
1066
    (active-page-count
1067
     :initform 0 :type (or fixnum null)
1068
     :accessor paged-buffer-active-page-count)
1069
    (read-ring
1070
     :accessor paged-buffer-read-ring)
1071
    (write-ring
1072
     :accessor paged-buffer-write-ring))
1073
   (:documentation
1074
    "Encapsulate a condition variable w/lock and the state required to implement a paged solution
1075
    stream with at least double-buffering.
1076
    A static sequence of pages is bound as a ring and the reader/writer pair cooperate to step
1077
    through the ring such that the reader tails the writer.
1078
    The new/get/put/release operations are implemented such that the reader blocks if it has caugt up
1079
    and the writer blocks if it would over-run.
1080
    A condition variable serves to coordinate notification when the respective blocked state is eliminated.
1081
    The initial state is that the rings coincide, which means the reader likely blocks as soon as it starts.
1082
    a write fills the next write page, advances the write ring when filled, but blocks if the next page
1083
    is not available and notifies if the available page unblocks the reader.
1084
    a read blocks until the page is avaialble, advances the read ring when emptied and notifies
1085
    if the advance unblocks the writer."))
1086
 
1087
 (defclass paged-array (paged-buffer array-page-channel)
1088
   ()
1089
   (:documentation "a paged buffer which carries array pages"))
1090
 
1091
 (defmacro trace-paged-buffer (&rest args)
1092
   `(when *trace-paged-buffer-output*
1093
      (bt:with-lock-held (*trace-lock*)
1094
        (terpri *trace-paged-buffer-output*)
1095
        (format *trace-paged-buffer-output* ,@args)
1096
        (finish-output *trace-paged-buffer-output*))))
1097
 
1098
 (defmethod initialize-instance ((instance paged-buffer) &rest initargs &key name
1099
                                 (dimensions () d-s) (size *channel-size-limit*) page-width)
1100
   (typecase name
1101
     ((or null string))
1102
     (t (setf name (write-to-string name))))
1103
   (cond (page-width
1104
          (assert (eql page-width (length dimensions)) ()
1105
                  "paged buffer dimensions do not match page width: ~a: ~a" dimensions page-width))
1106
         (d-s ; if null dimensions are explicitly supplied, the width is 0
1107
          (setf page-width (length dimensions)))
1108
         (t
1109
          (error "dimensions are required.")))
1110
   (setf (paged-buffer-waitqueue instance) (sb-thread:make-waitqueue))
1111
   (setf (paged-buffer-lock instance)  (sb-thread:make-mutex :name name))
1112
   (apply #'call-next-method instance
1113
          :size size
1114
          :page-width page-width
1115
          initargs))
1116
 
1117
 (defgeneric make-paged-buffer-page-ring (paged-bufer)
1118
   (:method ((instance paged-buffer))
1119
     (loop with size = (channel-size instance)
1120
       with length = (channel-page-length instance)
1121
       with width = (channel-page-width instance)
1122
       for i below size
1123
       collect (make-page  length width))))
1124
 
1125
 (defmethod shared-initialize ((instance paged-buffer) (slot-names t) &key)
1126
   (call-next-method)
1127
   (let ((pages (make-paged-buffer-page-ring instance)))
1128
     (setf (rest (last pages)) pages)
1129
     (setf (paged-buffer-write-ring instance) pages)
1130
     (setf (paged-buffer-read-ring instance) pages)
1131
     (setf (paged-buffer-active-page-count instance) 0)
1132
     (setf (channel-state instance) :io)))
1133
 
1134
 (defmethod cl-user::format-channel-state ((stream t) (channel paged-buffer) &optional colon at &rest args)
1135
   (declare (ignore colon at args))
1136
   (format stream "~a (~a x ~a) @~a ~a/~a" (channel-name channel)
1137
           (channel-page-length channel) (channel-page-width channel)
1138
           (channel-state channel)
1139
           (paged-buffer-active-page-count channel) (channel-size channel)))
1140
 
1141
 (defgeneric channel-complete-input (channel)
1142
   (:method ((channel paged-buffer))
1143
     (setf (channel-state channel) nil)))
1144
 
1145
 (defmethod channel-get ((channel paged-buffer) &key (wait t))
1146
   "get the next available read page.
1147
    iff none is available wait.
1148
    iff the count is null, the stream is complete."
1149
   (flet ((complete-input ()
1150
            ;;no further pages to be expected
1151
            (channel-complete-input channel)
1152
            (return-from channel-get nil)))
1153
     (ecase (channel-state channel)
1154
       ((:io :input)
1155
        ;; lock and optionall queu only when the nothing is present.
1156
        ;; otherwise, the writer can only increase the count
1157
        (unless (plusp (paged-buffer-active-page-count channel))
1158
          (sb-thread:with-mutex ((paged-buffer-lock channel))
1159
            (trace-paged-buffer " [<- active ~s]" (paged-buffer-active-page-count channel))
1160
            (unless (plusp (paged-buffer-active-page-count channel))
1161
              (when (eq (channel-state channel) :input)
1162
                (complete-input))
1163
              (unless wait (return-from channel-get nil))
1164
              (loop
1165
                (trace-paged-buffer " [<- wait]")
1166
                (queue-read-wait (paged-buffer-waitqueue channel) (paged-buffer-lock channel))
1167
                (trace-paged-buffer " [<- notified active ~s,~s]" (paged-buffer-active-page-count channel) (channel-state channel))
1168
                (if (plusp (paged-buffer-active-page-count channel))
1169
                    ;; if a page is ready, return it
1170
                    (return)
1171
                    (ecase  (channel-state channel)
1172
                      ;; otherwise either complete or note the anomaly and continue to wait
1173
                      (:io (log-warn "paged-buffer reader notified with no pagees active"))
1174
                      (:input (complete-input))))))))
1175
        ;; return the next page, or nil if the ring is now empty
1176
        (first (paged-buffer-read-ring channel)))
1177
       ((nil)))))
1178
 
1179
 (defmethod channel-put ((channel paged-buffer) page)
1180
   "indcate that a page has been written and is available to read.
1181
    iff the reader is waiting, notify it."
1182
   (incf (channel-page-count channel))
1183
   (incf (channel-solution-count channel) (channel-object-solution-count page))
1184
   (ecase (channel-state channel)
1185
     (:io
1186
      (sb-thread:with-mutex ((paged-buffer-lock channel))
1187
        (setf (first (paged-buffer-write-ring channel)) page)
1188
        (trace-paged-buffer " [-> active ~s]" (paged-buffer-active-page-count channel))
1189
        (cond ((>= (incf (paged-buffer-active-page-count channel)) (channel-size channel))
1190
               ;; transition to full requires wait
1191
               (loop
1192
                 (trace-paged-buffer " [-> wait]")
1193
                 (queue-write-wait (paged-buffer-waitqueue channel) (paged-buffer-lock channel))
1194
                 (trace-paged-buffer  " [-> notified active ~s]" (paged-buffer-active-page-count channel))
1195
                 (if (< (paged-buffer-active-page-count channel) (channel-size channel))
1196
                     (return)
1197
                     (log-warn "paged-buffer writer notified with all pages still active"))))
1198
              ((= (paged-buffer-active-page-count channel) 1)
1199
               (trace-paged-buffer  " [-> notifying]")
1200
               ;; 0 -> 1 transitions requires notification
1201
               (queue-write-notify (paged-buffer-waitqueue channel)))))
1202
      ;; advance to the next page for any subsequent new page request
1203
      (pop (paged-buffer-write-ring channel)))
1204
     ((:input nil)
1205
      (log-warn "output to complete channel: ~s" channel)))
1206
   ;; return the page
1207
   page)
1208
 
1209
 
1210
 (defmethod channel-get-wait-count ((channel paged-buffer))
1211
   (if (zerop (paged-buffer-active-page-count channel))
1212
       1 0))
1213
 
1214
 (defmethod channel-put-wait-count ((channel paged-buffer))
1215
   (if (>= (paged-buffer-active-page-count channel) (channel-size channel))
1216
       1 0))
1217
 
1218
 (defmethod channel-reset ((channel paged-buffer))
1219
   (reinitialize-instance channel))
1220
 
1221
 (defmethod complete-field ((channel paged-buffer))
1222
   (sb-thread:with-mutex ((paged-buffer-lock channel))
1223
     (trace-paged-buffer " [-> completing]")
1224
     (setf (channel-state channel) :input)
1225
     (when (zerop (paged-buffer-active-page-count channel))
1226
       (trace-paged-buffer " [-> notifying complete]")
1227
       ;; completion @ 0 requires notification
1228
       (queue-write-notify (paged-buffer-waitqueue channel)))))
1229
 
1230
 (defmethod get-field-page ((channel paged-buffer))
1231
   ;; (third (print (list :get channel (channel-get channel))))
1232
   (channel-get channel))
1233
 
1234
 (defmethod map-channel (operator (channel paged-buffer))
1235
   (loop for page = (get-field-page channel)
1236
         until (null page)
1237
         do (progn (funcall operator page)
1238
              (release-field-page channel page))))
1239
 
1240
 (defmethod map-pages (operator (channel paged-buffer))
1241
   (map-channel operator channel))
1242
 
1243
 (defmethod new-field-page ((channel paged-buffer) length width)
1244
   "immediately return the first page.
1245
    do not change the ring position, as the subsequent put may supply resized, for the last page, or
1246
    nil, once complete."
1247
   (assert (or (and (null width) (null length))
1248
               (and (eql width (channel-page-width channel))
1249
                    ;; must allow to shorten the last page
1250
                    ;; (eql length (channel-page-length channel))
1251
                    ))
1252
           ()
1253
           "invalid page dimensions: ~a: ~s x ~s" channel width length)
1254
   (first (paged-buffer-write-ring channel)))
1255
 
1256
 (defmethod put-field-page ((channel paged-buffer) page)
1257
   ;; (print (list :put channel page))
1258
   (channel-put channel page))
1259
 
1260
 (defmethod release-field-page ((channel paged-buffer) page)
1261
   "when processing of the read page is complete, advance the read read
1262
    and optionally notify"
1263
   ;; advance in the read ring
1264
   (pop (paged-buffer-read-ring channel))
1265
   (sb-thread:with-mutex ((paged-buffer-lock channel))
1266
     (when (zerop (decf (paged-buffer-active-page-count channel)))
1267
       (trace-paged-buffer " [<- notifying]")
1268
       (queue-read-notify (paged-buffer-waitqueue channel)))))
1269
 
1270
 
1271
 
1272
 ;;; page-queue
1273
 
1274
 (defmethod complete-field ((channel page-queue))
1275
   (trace-streaming complete-field.before channel)
1276
   ;; empty the free page queue
1277
   (amqp.i::collection-clear (channel-free-pages channel))
1278
   ;; mark for input only
1279
   (setf (channel-state channel) :input)
1280
   ;; indicate completion
1281
   (amqp.i::enqueue nil (channel-active-pages channel))
1282
   (signal-semaphore (channel-active-semaphore channel))
1283
   (dolist (next (channel-channels channel))
1284
     (complete-field next))
1285
   (trace-streaming complete-field.after channel))
1286
 
1287
 (defmethod channel-count ((channel page-queue))
1288
   "The count is the difference between the pages written and the pages read."
1289
   (- (channel-write-count channel) (channel-read-count channel)))
1290
 
1291
 (defmethod channel-create-page ((channel page-queue) &optional
1292
                                 (length (channel-page-length channel))
1293
                                 (width (channel-page-width channel)))
1294
   (make-array (list length width)
1295
               :element-type 'fixnum :initial-element +NULL-TERM-ID+))
1296
 
1297
 (defmethod channel-get ((channel page-queue) &key (wait *channel-get-wait*))
1298
   (when (or wait (plusp (channel-count channel)))
1299
     (values (get-field-page channel) t)))
1300
 
1301
 (defmethod channel-get-all ((channel page-queue))
1302
   "Empty both mailbox queues. Return the content of the active page queue."
1303
   (loop for q on (amqp.i::collection-content channel)
1304
         collect (shiftf (first q) nil)))
1305
 
1306
 (defmethod channel-put ((channel page-queue) value)
1307
   (amqp.i::enqueue value (channel-active-pages channel)))
1308
 
1309
 (defmethod channel-reset ((channel page-queue))
1310
   (call-next-method)
1311
   (amqp.i::collection-clear (channel-free-pages channel))
1312
   (amqp.i::collection-clear (channel-active-pages channel)))
1313
 
1314
 
1315
 (defmethod channel-get-wait-count ((channel page-queue))
1316
   "Return the count from the active page queue."
1317
   (sb-thread::semaphore-waitcount (channel-active-semaphore channel)))
1318
 
1319
 (defmethod new-field-page ((channel page-queue) length width)
1320
   "Attempt to dequeue a free page. If that does not succeed immediately,
1321
           then iff the channel is still below the size limit, make a new page.
1322
           Otherwise, wait until a page is released."
1323
   (declare (ignore length width))
1324
   
1325
   (trace-streaming new-field-page channel)
1326
   (ecase (channel-state channel)
1327
     (:io
1328
      (let* ((semaphore (channel-free-semaphore channel))
1329
             (queue     (channel-free-pages channel))
1330
             (data
1331
              ;; w/o the locking from receive-message(-no-hang) across semaphore/queue
1332
              ;; operations, as there is just one reader / one writer
1333
              (cond ((try-semaphore semaphore)
1334
                     (amqp.i::dequeue queue))
1335
                    ((< (channel-count channel) (channel-size channel))
1336
                     (channel-create-page channel))
1337
                    (t
1338
                     (unwind-protect
1339
                       (progn (setf (thread-priority) *algebra-idle-priority*)
1340
                              (wait-on-semaphore semaphore)
1341
                              (amqp.i::dequeue queue))
1342
                       (setf (thread-priority) *algebra-running-priority*))))))
1343
        (incf (channel-bound-count channel))
1344
        (trace-streaming new-field-page.dequeued channel data)
1345
        data))
1346
     ((:input nil)                       ; should be no further output
1347
      (log-warn "new-field-page: invalid channel state: ~a(~s)." channel (channel-state channel))
1348
      nil)))
1349
 
1350
 (defmethod put-field-page ((channel page-queue) page)
1351
   "Add an entry to the active pages. As each page should have originated from the free pages, there can
1352
           be no more of them than the inactive buffer count. Which means, the free pages semaphore suffices to
1353
           preclude buffer over-run."
1354
   (trace-streaming put-field-page channel page)
1355
   (ecase (channel-state channel)
1356
     (:io
1357
      (amqp.i::enqueue page (channel-active-pages channel))
1358
      (incf (channel-solution-count channel) (channel-object-solution-count page))
1359
      (incf (channel-page-count channel))
1360
      (incf (channel-write-count channel))
1361
      (signal-semaphore (channel-active-semaphore channel))
1362
      (dolist (next (channel-channels channel))
1363
        (let ((to (new-field-page next (channel-page-length channel) (channel-page-width channel))))
1364
          (unless (= (array-dimension to 0) (array-dimension page 0))
1365
            (setf to (adjust-page to (array-dimensions page))))
1366
          (copy-page page to)
1367
          (put-field-page next to))))
1368
     ((:input nil)
1369
      (log-warn "put-field-page: invalid channel state: ~a(~s): ~a." channel (channel-state channel) page)))
1370
   (trace-streaming put-field-page.enqueued channel)
1371
   page)
1372
 
1373
 (defmethod get-field-page ((channel page-queue))
1374
   "Return the next active page. If none is available and the channel is still active, wait until one is.
1375
           While waiting, reduce the thread priority to idle."
1376
   (trace-streaming get-field-page channel)
1377
   (ecase (channel-state channel)
1378
     ((:input :io)
1379
      (let* ((semaphore (channel-active-semaphore channel))
1380
             (queue     (channel-active-pages channel))
1381
             (data
1382
              ;; w/o the locking from receive-message(-no-hang) across semaphore/queue
1383
              ;; operations, as there is just one reader / one writer
1384
              (cond ((try-semaphore semaphore)
1385
                     (amqp.i::dequeue queue))
1386
                    (t
1387
                     (unwind-protect
1388
                       (progn (setf (thread-priority) *algebra-idle-priority*)
1389
                              (wait-on-semaphore semaphore)
1390
                              (amqp.i::dequeue queue))
1391
                       (setf (thread-priority) *algebra-running-priority*))))))
1392
        (unless (or data (eq (channel-state channel) :input))
1393
          (log-warn "null data w/ :io: ~a ~a" channel queue)
1394
          (setf (channel-state channel) nil))
1395
        (incf (channel-read-count channel))
1396
        (trace-streaming get-field-page.dequeued channel data)
1397
        data))
1398
     ((nil)                      ; should be no further output
1399
      (log-warn "get-field-page: invalid channel state: ~a(~s): ~a." channel (channel-state channel))
1400
      nil)))
1401
 
1402
 (defmethod release-field-page ((channel page-queue) page)
1403
   (trace-streaming release-field-page channel page)
1404
   (ecase (channel-state channel)
1405
     ((:io :input) )
1406
     ((nil)
1407
      (log-warn "release-field-page: invalid channel state: ~a(~s): ~a." channel (channel-state channel) page)
1408
      (setf page nil)))
1409
   (amqp.i::enqueue (when (= (array-dimension page 0) (channel-page-length channel))
1410
                      ;; save the page only if it is a 'standard' length
1411
                      page)
1412
                    (channel-free-pages channel))
1413
   (incf (channel-free-count channel))
1414
   (signal-semaphore (channel-free-semaphore channel))
1415
   (trace-streaming release-field-page.enqueued channel)
1416
   page)
1417
 
1418
 #+sbcl+queue
1419
 (progn (defmethod channel-put ((channel sb-concurrency:queue) value &key (limit *channel-size-limit*) (wait *channel-put-wait*))
1420
          (cond (limit
1421
                 (loop (cond ((< (sb-concurrency:queue-count channel) limit)
1422
                              (sb-concurrency:enqueue value channel)
1423
                              (return t))
1424
                             (wait
1425
                              (sleep wait))
1426
                             (t
1427
                              (return nil)))))
1428
                (t
1429
                 (sb-concurrency:enqueue value channel)
1430
                 t)))
1431
        (defmethod channel-get ((channel sb-concurrency:queue) &key (wait *channel-get-wait*))
1432
          (if wait
1433
            (loop (multiple-value-bind (value not-empty)
1434
                                       (sb-concurrency:dequeue channel)
1435
                    (if not-empty
1436
                      (return (values value not-empty))
1437
                      (bt:thread-yield))))
1438
            (sb-concurrency:dequeue channel)))
1439
        (defmethod channel-count ((channel sb-concurrency:queue))
1440
          (sb-concurrency:queue-count channel))
1441
        (defmethod channel-get-all ((channel sb-concurrency:queue))
1442
          (loop until (sb-concurrency:queue-empty-p channel)
1443
                collect (sb-concurrency:dequeue channel)))
1444
        (defmethod channel-name ((channel sb-concurrency:queue))
1445
          (sb-concurrency:queue-name channel))
1446
        (defmethod (setf channel-name) (name (channel sb-concurrency:queue))
1447
          (setf (sb-concurrency:queue-name channel) name)))
1448
 
1449
 #+mcl
1450
 (progn (defclass pool (channel amqp.i::locked-queue array-page-channel)
1451
          ((page-count
1452
            :initform 0
1453
            :accessor channel-page-count)))
1454
        (defun make-pool (&key name) (declare (ignore name)) (make-instance 'pool))
1455
        (defmethod channel-put ((channel pool) value &key (limit *channel-size-limit*) (wait *channel-put-wait*))
1456
          (dolist (next (channel-channels channel))
1457
            (channel-put next value))
1458
          (incf (channel-page-count channel))
1459
          (cond (limit
1460
                 (loop (cond ((< (amqp.i::collection-size channel) limit)
1461
                              (amqp.u:enqueue value channel)
1462
                              (return t))
1463
                             (wait
1464
                              (sleep wait))
1465
                             (t
1466
                              (return nil)))))
1467
                (t
1468
                 (amqp.u:enqueue value channel)
1469
                 t)))
1470
        (defmethod channel-get ((channel pool) &key (wait *channel-get-wait*))
1471
          (cond (wait
1472
                 (amqp.u:dequeue channel :if-empty :wait))
1473
                ((not (amqp.i::collection-empty-p channel))
1474
                 (amqp.u:dequeue channel))))
1475
        (defmethod channel-count ((channel pool))
1476
          (amqp.i::collection-size channel))
1477
        (defmethod channel-get-all ((channel pool))
1478
          (collect-list (collect)
1479
            (loop (multiple-value-bind (value not-empty) (amqp.u:dequeue channel)
1480
                    (if not-empty
1481
                      (collect value)
1482
                      (return)))))))
1483
 
1484
 
1485
 ;;;
1486
 
1487
 (defun make-channel (&rest args &key dimensions name size page-length page-width sort-dimensions)
1488
   (declare (ignorable dimensions name size page-length page-width sort-dimensions))
1489
   ;;(setf (getf args :size) 128)
1490
   (case *make-channel.class*
1491
     ;; all are standard classes
1492
     (t (apply #'make-instance *make-channel.class* args))))
1493
 
1494
 (defun make-null-channel (&rest args &key (dimensions (error "dimensions is required."))
1495
                                 (name (list 'spocq.a:|null| (task-id *query*)))
1496
                                 (size *channel-sliced-size-limit*)
1497
                                 (page-length *field-sliced-page-length*))
1498
   (apply #'make-channel :dimensions dimensions :name name :size size :page-length page-length
1499
          args))
1500
 
1501
 (defun make-unit-table-channel (&rest args &key (dimensions nil)
1502
                                 (name (list 'spocq.a:|table| (task-id *query*)))
1503
                                 (size *channel-sliced-size-limit*)
1504
                                 (page-length *field-sliced-page-length*))
1505
   (apply #'make-channel :dimensions dimensions :name name :size size :page-length page-length
1506
          args))
1507
 
1508
 (defun make-solution-channel (&rest args)
1509
   (apply #'make-instance 'solution-channel args))
1510
 
1511
 #+(or)                                  ; not used
1512
 (progn
1513
   
1514
   (defclass dimensioned-funcallable-object (c2mop:funcallable-standard-object)
1515
     ((dimensions
1516
       :initarg :dimensions :initform (error "dimensions is required.")
1517
       :reader function-dimensions)
1518
      #+mcl
1519
      (function
1520
       :initarg :function :initform (error "function is required.")
1521
       :accessor dimensioned-funcallable-object-function))
1522
     #-mcl (:metaclass c2mop:funcallable-standard-class))
1523
   
1524
   (defmethod initialize-instance ((instance dimensioned-funcallable-object) &key function)
1525
     (declare (ignorable function))
1526
     #-mcl (c2mop:set-funcallable-instance-function instance function)
1527
     (call-next-method))
1528
   
1529
   (defmacro dimlambda (parameter-list &rest body)
1530
     `(make-instance 'dimensioned-function
1531
        :dimensions ',parameter-list
1532
        :function (lambda ,parameter-list ,@body)))
1533
   
1534
   
1535
   (defmethod dimensioned-funcallable-object-function ((function function))
1536
     function)
1537
   )
1538
 
1539
 
1540
 #|
1541
 
1542
 (let ((c (make-channel :size 4 :dimensions '(a))))
1543
   (print c)
1544
   (dotimes (x 9)
1545
     (let ((p (new-field-page c 10 1)))
1546
       (put-field-page c p) 
1547
       (get-field-page c)
1548
       (release-field-page c p)))
1549
   (complete-field c)
1550
   (get-field-page c)
1551
   c)
1552
 
1553
 (setq *data-trace-output* *trace-output*)
1554
 
1555
 (sb-profile:profile complete-field put-field-page get-field-page new-field-page release-field-page
1556
                     create-ring-page
1557
                     wait-on-semaphore signal-semaphore try-semaphore)
1558
 (sb-profile:unprofile)
1559
 
1560
 (defun test-queue (&key (size 8) (count (* 8 size)) (verbose nil) (test nil)
1561
                         (dimensions '(a b)) (width (length dimensions)) (length 8)
1562
                         (class 'page-ring))
1563
   (let* ((q (make-instance class :dimensions dimensions :page-length length :size size))
1564
          (t-put nil)
1565
          (result 0)
1566
          (t-get nil))
1567
     (print q *trace-output*)
1568
     (finish-output *trace-output*)
1569
     (flet ((run-put ()
1570
              (dotimes (i count)
1571
                (let ((page (new-field-page q width length)))
1572
                  (trace-streaming tq.new q)
1573
                  (when test
1574
                    (fill (make-array (* (channel-page-width q) (channel-page-length q))
1575
                                      :element-type 'fixnum
1576
                                      :displaced-to page)
1577
                          i))
1578
                  (put-field-page q page))
1579
                (trace-streaming tq.put q)
1580
                (when verbose (write-char #\+ *trace-output*) (finish-output *trace-output*)))
1581
              (complete-field q))
1582
            (run-get ()
1583
              (dotimes (test-count count)
1584
                (let ((page (get-field-page q)))
1585
                  (trace-streaming tq.get q)
1586
                  (when (and test
1587
                             (position test-count
1588
                                       (make-array (* (channel-page-width q) (channel-page-length q))
1589
                                                   :element-type 'fixnum
1590
                                                   :displaced-to page)
1591
                                       :test-not #'=))
1592
                    (warn "missed: ~s != ~s." test-count page)
1593
                    (bt:destroy-thread t-put)
1594
                    (setf result :failed)
1595
                    (return))
1596
                  (release-field-page q page)
1597
                  (trace-streaming tq.release q)
1598
                  (when verbose (write-char #\- *trace-output*) (finish-output *trace-output*))
1599
                  (incf result)))
1600
              (unless (null (get-field-page q))
1601
                (warn "non null after ~d." count))
1602
              (bt:join-thread t-put)))
1603
       (setf t-put (bt:make-thread #'run-put))
1604
       (setf t-get (bt:make-thread #'run-get)))
1605
     (bt:join-thread t-get)
1606
     (values result q)))
1607
 
1608
 (test-queue :verbose t :size 4 :count 4)
1609
 (let ((*print-array* nil) (*print-pretty* nil)) (test-queue :count 8 :size 2 :verbose t))
1610
 (time (test-queue :count 100000 :class 'page-fifo))
1611
 (time (test-queue :count 100000 :class 'page-ring))
1612
 (time (test-queue :count 10000000 :class 'page-ring))
1613
 
1614
 * (time (test-queue :count 10000000 :class 'page-fifo)) ;; 59.758/103.722482 +12,661MB
1615
 
1616
 #<PAGE-FIFO "#<PAGE-FIFO {1047F401A3}>" [0] {1047F401A3}> 
1617
 Evaluation took:
1618
   58.989 seconds of real time
1619
   102.366397 seconds of total run time (54.079380 user, 48.287017 system)
1620
   [ Run times consist of 0.560 seconds GC time, and 101.807 seconds non-GC time. ]
1621
   173.53% CPU
1622
   3 lambdas converted
1623
   201,243,870,487 processor cycles
1624
   12,660,919,744 bytes consed
1625
   
1626
 10000000
1627
 #<PAGE-FIFO "#<PAGE-FIFO {1047F401A3}>" [0] {1047F401A3}>
1628
 * (sb-profile:report)
1629
 
1630
   seconds  |     gc     |     consed     |    calls   |  sec/call  |  name  
1631
 -----------------------------------------------------------------
1632
     62.878 |      0.560 | 10,504,214,736 | 10,000,001 |   0.000006 | GET-FIELD-PAGE
1633
     61.447 |      0.560 | 12,618,437,024 | 10,000,000 |   0.000006 | NEW-FIELD-PAGE
1634
     19.060 |      0.000 |     23,679,200 | 10,000,001 |   0.000002 | PUT-FIELD-PAGE
1635
     11.191 |      0.000 |    170,222,560 | 10,000,000 |   0.000001 | RELEASE-FIELD-PAGE
1636
      0.000 |      0.000 |              0 |          1 |   0.000000 | COMPLETE-FIELD
1637
 -----------------------------------------------------------------
1638
    154.576 |      1.120 | 23,316,553,520 | 40,000,003 |            | Total
1639
 
1640
 estimated total profiling overhead: 22.40 seconds
1641
 overhead estimation parameters:
1642
   1.6000001e-8s/call, 5.6e-7s total profiling, 1.2800001e-7s internal profiling
1643
 
1644
 These functions were not called:
1645
  CREATE-RING-PAGE SIGNAL-SEMAPHORE WAIT-ON-SEMAPHORE
1646
 
1647
 
1648
 ;; (time (test-queue :count 10000000 :class 'page-ring :size 4)) ;; 62.812/122.148 +12,661MB
1649
 
1650
 * (time (test-queue :count 10000000 :class 'page-ring :size 32))
1651
 
1652
 #<PAGE-RING [[0,32/32] 0 @IO: (A B)] NIL {104D12B2D3}> 
1653
 Evaluation took:
1654
   62.911 seconds of real time
1655
   122.343645 seconds of total run time (45.274829 user, 77.068816 system)
1656
   [ Run times consist of 0.036 seconds GC time, and 122.308 seconds non-GC time. ]
1657
   194.47% CPU
1658
   214,622,173,442 processor cycles
1659
   683,204,896 bytes consed
1660
   
1661
 10000000
1662
 #<PAGE-RING [[0,31/32] 10000000 @NIL: (A B)] NIL {104D12B2D3}>
1663
 * (sb-profile:report)
1664
 
1665
   seconds  |     gc     |     consed    |    calls   |  sec/call  |  name  
1666
 ----------------------------------------------------------------
1667
     35.864 |      0.036 |   254,472,816 | 10,000,001 |   0.000004 | NEW-FIELD-PAGE
1668
     35.321 |      0.036 |   431,765,360 | 20,000,002 |   0.000002 | WAIT-ON-SEMAPHORE
1669
     25.168 |      0.000 |    32,032,944 | 20,000,001 |   0.000001 | SIGNAL-SEMAPHORE
1670
     23.778 |      0.000 |   115,306,416 | 10,000,001 |   0.000002 | GET-FIELD-PAGE
1671
     21.471 |      0.000 |   124,254,784 | 10,000,000 |   0.000002 | RELEASE-FIELD-PAGE
1672
     20.905 |      0.000 |   122,140,192 | 10,000,001 |   0.000002 | PUT-FIELD-PAGE
1673
      0.000 |      0.000 |        32,768 |         32 |   0.000000 | CREATE-RING-PAGE
1674
      0.000 |      0.000 |             0 |          1 |   0.000000 | COMPLETE-FIELD
1675
 ----------------------------------------------------------------
1676
    162.507 |      0.072 | 1,080,005,280 | 80,000,039 |            | Total
1677
 
1678
 
1679
 ;; fifo : 122/38.4 runtime
1680
 ;; mailbox : 217/56 runtime
1681
 
1682
 ;; sbcl 1.0.46, linux, 78.99 seconds total runtime = 1265983.1 / second
1683
 * (sb-profile:report)
1684
 
1685
   seconds  |     gc     |   consed   |   calls   |  sec/call  |  name  
1686
 ------------------------------------------------------------
1687
      0.000 |      0.000 |  2,883,584 | 1,000,000 |   0.000000 | WAIT-ON-QUEUE
1688
      0.000 |      0.100 | 15,991,136 | 1,000,000 |   0.000000 | CHANNEL-PUT
1689
      0.000 |      0.100 |  6,268,032 | 1,000,000 |   0.000000 | CHANNEL-GET
1690
 ------------------------------------------------------------
1691
      0.000 |      0.200 | 25,142,752 | 3,000,000 |            | Total
1692
 
1693
 estimated total profiling overhead: 17.59 seconds
1694
 overhead estimation parameters:
1695
   8.000001e-9s/call, 5.8640003e-6s total profiling, 3.048e-6s internal profiling
1696
 
1697
     
1698
 
1699
 ;;; (run-test-query "select count(*) where {?s ?p ?o} " :repository-id "system/schema")
1700
 ;;; (time (run-test-query "select count(*) where {?s ?p ?o} " :repository-id "jan/cantor_merged-reasoned"))
1701
 ;;; (run-test-query "select * where {?s ?p ?o} limit 4" :repository-id "system/schema")
1702
 
1703
 ;;; w/ page rings (first are profiled)
1704
 * (time (run-test-query "select count(*) where {?s ?p ?o} " :repository-id "jan/cantor_merged-reasoned"))
1705
 
1706
 Evaluation took:
1707
   0.876 seconds of real time
1708
   1.132071 seconds of total run time (1.032065 user, 0.100006 system)
1709
   129.22% CPU
1710
   5 forms interpreted
1711
   385 lambdas converted
1712
   2,987,379,711 processor cycles
1713
   41,063,424 bytes consed
1714
   
1715
 ((9796925))
1716
 (?::COUNT1)
1717
 #<QUERY [00000000-0000-0000-0000-000000000000/NIL, query, jan/cantor_merged-reasoned] {100833C21B}>
1718
 * (time (run-test-query "select count(*) where {?s ?p ?o} " :repository-id "jan/cantor_merged-reasoned"))
1719
 
1720
 Evaluation took:
1721
   0.717 seconds of real time
1722
   0.972061 seconds of total run time (0.848053 user, 0.124008 system)
1723
   135.56% CPU
1724
   9 lambdas converted
1725
   2,446,119,256 processor cycles
1726
   5,593,344 bytes consed
1727
   
1728
 ((9796925))
1729
 (?::COUNT2)
1730
 #<QUERY [00000000-0000-0000-0000-000000000000/NIL, query, jan/cantor_merged-reasoned] {100A9BEDAB}>
1731
 * (sb-profile:report)
1732
 measuring PROFILE overhead..done
1733
 
1734
   seconds  |     gc     |   consed   |  calls  |  sec/call  |  name  
1735
 ----------------------------------------------------------
1736
      2.962 |      0.000 | 16,591,120 |  38,276 |   0.000077 | GET-FIELD-PAGE
1737
      0.094 |      0.000 |  2,297,664 |  38,276 |   0.000002 | PUT-FIELD-PAGE
1738
      0.042 |      0.000 |  7,241,504 |  38,276 |   0.000001 | NEW-FIELD-PAGE
1739
      0.000 |      0.000 |  1,925,600 |     130 |   0.000000 | MAKE-PAGE
1740
 ----------------------------------------------------------
1741
      3.099 |      0.000 | 28,055,888 | 114,958 |            | Total
1742
 
1743
 estimated total profiling overhead: 0.08 seconds
1744
 overhead estimation parameters:
1745
   1.6000001e-8s/call, 7.2800003e-7s total profiling, 2.5600002e-7s internal profiling
1746
 
1747
 
1748
 ;;; w/ page rings, unprfoiled
1749
 * (time (run-test-query "select count(*) where {?s ?p ?o} " :repository-id "jan/cantor_merged-reasoned"))
1750
 
1751
 Evaluation took:
1752
   0.683 seconds of real time
1753
   0.948060 seconds of total run time (0.936059 user, 0.012001 system)
1754
   138.80% CPU
1755
   3 lambdas converted
1756
   2,331,045,822 processor cycles
1757
   4,918,624 bytes consed
1758
   
1759
 ((9796925))
1760
 (?::COUNT3)
1761
 #<QUERY [00000000-0000-0000-0000-000000000000/NIL, query, jan/cantor_merged-reasoned] {100AF4CE8B}>
1762
 
1763
 
1764
 ;; w/ fifo queues
1765
 * (time (run-test-query "select count(*) where {?s ?p ?o} " :repository-id "jan/cantor_merged-reasoned"))
1766
 
1767
 Evaluation took:
1768
   1.545 seconds of real time
1769
   1.164073 seconds of total run time (1.076067 user, 0.088006 system)
1770
   75.34% CPU
1771
   5 forms interpreted
1772
   363 lambdas converted
1773
   5,268,970,064 processor cycles
1774
   187 page faults
1775
   34,411,520 bytes consed
1776
   
1777
 ((9796925))
1778
 (?::COUNT1)
1779
 #<QUERY [00000000-0000-0000-0000-000000000000/NIL, query, jan/cantor_merged-reasoned] {1008A3F299}>
1780
 * (time (run-test-query "select count(*) where {?s ?p ?o} " :repository-id "jan/cantor_merged-reasoned"))
1781
 
1782
 Evaluation took:
1783
   0.697 seconds of real time
1784
   0.964061 seconds of total run time (0.904057 user, 0.060004 system)
1785
   138.31% CPU
1786
   9 lambdas converted
1787
   2,378,037,240 processor cycles
1788
   4,932,112 bytes consed
1789
   
1790
 ((9796925))
1791
 (?::COUNT2)
1792
 #<QUERY [00000000-0000-0000-0000-000000000000/NIL, query, jan/cantor_merged-reasoned] {100AA7A6D9}>
1793
 
1794
 * (sb-profile:report)
1795
 
1796
 measuring PROFILE overhead..done
1797
 
1798
   seconds  |     gc     |   consed  |  calls |  sec/call  |  name  
1799
 --------------------------------------------------------
1800
      0.278 |      0.000 | 1,501,984 | 19,136 |   0.000015 | MAKE-PAGE
1801
 --------------------------------------------------------
1802
      0.278 |      0.000 | 1,501,984 | 19,136 |            | Total
1803
 
1804
 estimated total profiling overhead: 0.02 seconds
1805
 overhead estimation parameters:
1806
   8.000001e-9s/call, 7.84e-7s total profiling, 3.04e-7s internal profiling
1807
 
1808
 |#