Coverage report: /development/source/library/org/datagraph/spocq-shard/src/core/streaming-matrix-field.lisp
| Kind | Covered | All | % |
| expression | 0 | 634 | 0.0 |
| branch | 0 | 56 | 0.0 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
3
(in-package :org.datagraph.spocq.implementation)
5
(:documentation "This file defines the field streams based on external array data
6
for the 'org.datagraph.spocq' RDF SPARQL engine."
9
"Copyright 2015 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved.")
12
"A field stream comprises a data structure which permits two threads to interact
13
on an unbounded sequence of query solutions, where the api treats the data as a stream.
14
One writer thread introduces solutions into the stream and one removes them.
15
The interaction is interlocked, such that the reading thread blocks if there is no solution
16
present and the writing thread blocks if no buffer space remains.
17
The initial implementation, as the matrix-field structure permitted unlimited queues
18
with buffers allocated and freed on demand.
19
This was replaced with the paged-matrix-buffer implementation, for which the queue length
20
is fixed when the stream is initialized, the buffers themselves are allocated on demand
21
until the full complement has been created, and the are all released as soon and the
22
last solution is read from a completed stream. Only in a abnormal termination case does
23
a termination method ensure that the buffer have been released.
26
;;; (setq *make-channel.class* 'paged-matrix)
28
(require-features (or :digitool :sbcl :clozure :lispworks)
29
"This file must be conditionalized for ~a."
30
(lisp-implementation-type))
31
(defstruct (matrix-field (:include solution-field
32
(solutions (cffi:null-pointer) :type cffi:foreign-pointer))
33
(:constructor _make-matrix-field))
34
"A matrix-field comprises a dimension list and an external data matrix"
35
;; the type list specifies (optionally) the type of an individual solution dimension
37
;; data locates the base of the current foreign array. it is initialized to nil,
38
;; which indicates that no operations have occurred. upon first use, it
39
;; synchronized with the data pointer from the solution matrix and is updated as
40
;; that foreign array is paged. for a source field, when it is exhausted, this
41
;; is set to a null pointer and the row index to -1.
42
(data (cffi:null-pointer) :type cffi:foreign-pointer)
43
;; when non-minus it is the channel over which successive pages would be written/read
44
;; for a page-streamed field.
45
(channel -1 :type fixnum)
46
(row-count-limit nil) ; when an integer, limits the page size and triggers streaming to the channel on overflow
49
(defstruct (null-matrix-field (:include matrix-field)))
50
(defstruct (false-matrix-field (:include null-matrix-field)))
51
(defstruct (table-matrix-field (:include matrix-field (row-count 1) (length 1) (count 1))))
52
(defstruct (true-matrix-field (:include table-matrix-field)))
54
(defparameter *finalize-matrix-field-solutions.verbose* nil)
56
(defparameter *finalize-matrix-field-solutions.registry* (make-registry :test 'eql))
58
(defmethod field-dimensions ((field matrix-field))
59
(solution-field-dimensions field))
61
(defmethod field-width ((field matrix-field))
62
(let ((%data (matrix-field-data field)))
63
(if (cffi:null-pointer-p %data)
65
(rdfcache:matrix-column-count %data))))
67
(defmethod field-length ((field matrix-field))
68
(let ((%data (matrix-field-data field)))
69
(if (cffi:null-pointer-p %data)
71
(rdfcache:matrix-row-count %data))))
73
(defmethod field-page-length ((field matrix-field))
74
(let ((%data (matrix-field-data field)))
75
(if (cffi:null-pointer-p %data)
77
(rdfcache:matrix-row-count %data))))
79
(defun clone-matrix-field (field &key (data nil data-s) (dimensions nil dimensions-s)
80
(sort-dimensions nil sort-dimensions-s))
81
(let ((new (copy-matrix-field field)))
82
(when data-s (setf (matrix-field-data new) data))
83
(when dimensions-s (setf (matrix-field-dimensions new) dimensions))
84
(when sort-dimensions-s (setf (matrix-field-sort-dimensions new) sort-dimensions))
87
(defun finalize-matrix-field-solutions (field %matrix)
88
(declare (ignore %matrix))
91
(unless (cffi:null-pointer-p %matrix)
93
(let ((tag *thread-name*)
94
(key (cffi:pointer-address %matrix)))
95
(flet ((release-matrix ()
96
;; unless it has already been released, free the external object
97
(with-locked-registry (*finalize-matrix-field-solutions.registry*)
98
(cond ((eql tag (get-registry key *finalize-matrix-field-solutions.registry*))
99
;; if this function still controls
100
(rem-registry key *finalize-matrix-field-solutions.registry*)
101
(when *finalize-matrix-field-solutions.verbose*
102
(format *trace-output* " to finalize: ~s" %matrix)
103
(rdfcache:print-matrix %matrix *trace-output*)
104
(finish-output *trace-output*))
105
(rdfcache:matrix-free %matrix))
106
(*finalize-matrix-field-solutions.verbose*
107
(format *trace-output* " orphaned: ")
108
(rdfcache:print-matrix %matrix *trace-output*)
109
(finish-output *trace-output*))))))
110
(when (get-registry key *finalize-matrix-field-solutions.registry*)
111
(format *trace-output* " already finalized ~s: " %matrix)
112
(finish-output *trace-output*))
113
(setf (get-registry key *finalize-matrix-field-solutions.registry*) tag)
114
(sb-ext:finalize field #'release-matrix)))
117
(defun unfinalize-matrix-field-solutions (field %matrix)
118
(declare (ignore %matrix))
119
;; mark the finalization such that the matrix is not released until some other thread/field
122
(with-locked-registry (*finalize-matrix-field-solutions.registry*)
123
(when (eql *thread-name* (get-registry %matrix *finalize-matrix-field-solutions.registry*))
124
(setf (get-registry (cffi:pointer-address %matrix) *finalize-matrix-field-solutions.registry*) field)))
127
(defun matrix-field-column-count (matrix)
128
(solution-field-column-count matrix))
131
(defun make-matrix-field (&rest args &key data solutions (row-count 0 rc-s) (dimensions nil d-s) &allow-other-keys)
132
(assert (not (and solutions data)) ()
133
"both solutions and data may not be supplied")
135
(assert (and (typep solutions 'cffi:foreign-pointer)
136
(not (cffi:null-pointer-p solutions))) ()
137
"solutions must be a foreign array: ~s." solutions)
139
(assert (= (rdfcache:matrix-row-count solutions) row-count) ()
140
"row-count must agree with the solution dimensions: ~s ~s."
141
row-count (rdfcache:matrix-row-count solutions))
142
(setf row-count (rdfcache:matrix-row-count solutions)))
144
(assert (= (rdfcache:matrix-column-count solutions) (length dimensions)) ()
145
"arity must agree with solution dimensions: ~s ~s."
146
dimensions (rdfcache:matrix-column-count solutions))
147
(setf dimensions (loop for i below (rdfcache:matrix-row-count solutions)
148
collect (cons-variable)))))
150
(let ((data-row-count (solution-field-solutions-row-count data))
151
(data-column-count (solution-field-solutions-column-count data)))
153
(assert (= data-row-count row-count) ()
154
"row-count must agree with the data dimensions: ~s ~s."
155
row-count data-row-count)
156
(setf row-count data-row-count))
158
(assert (= data-column-count (length dimensions)) ()
159
"arity must agree with data dimensions: ~s ~s."
160
dimensions data-column-count)
161
(setf dimensions (loop for i below data-column-count
162
collect (cons-variable))))
163
(setf solutions (rdfcache:make-matrix data-row-count data-column-count))))
165
(setf solutions (rdfcache:make-matrix row-count (length dimensions)))))
166
(when data (remf args :data))
167
(let ((field (apply #'_make-matrix-field
168
:dimensions dimensions
172
(finalize-matrix-field-solutions field solutions)
174
(set-solution-field-solutions field data)
175
(setf (matrix-field-data field) (rdfcache:matrix-data-pointer solutions)))
178
(defun make-matrix (row-count dimensions)
179
"given a shorthand matrix description in terms of row count and dimensions,
180
construct a matrix-field instance with the indicated external data matrix."
182
(make-matrix-field :dimensions dimensions
183
:row-count row-count))
185
(defun make-null-matrix (dimensions)
186
(make-matrix 0 dimensions))
188
(defun matrix-field-row-size (matrix)
189
(* +matrix-element-size+ (rdfcache:matrix-column-count (matrix-field-solutions matrix))))
191
(defmethod initialize-solution-field ((field null) &rest args)
192
(declare (dynamic-extent args))
193
(apply #'initialize-solution-field (make-matrix-field) args))
195
(defmethod initialize-solution-field ((field matrix-field) &key
196
(sort-dimensions '() sd-s)
198
(row-count (solution-field-solutions-row-count data))
199
(dimensions (solution-field-sort-dimensions field) d-s)
202
(setf (solution-field-dimensions field) dimensions))
204
(setf (solution-field-sort-dimensions field) sort-dimensions))
206
(set-solution-field-solutions field data)
207
(let ((matrix (matrix-field-solutions field)))
208
(assert (and matrix (not (cffi:null-pointer-p matrix))) ()
209
"invalid solution matrix: ~s." field)
210
(unless (and (= row-count (rdfcache:matrix-row-count matrix))
211
(= (length dimensions) (rdfcache:matrix-column-count matrix)))
212
(rdfcache:matrix-resize matrix row-count (length dimensions)))
213
(setf (matrix-field-data field) (rdfcache:matrix-data-pointer matrix)
214
(matrix-field-length field) row-count
215
(matrix-field-row-count field) row-count
216
(matrix-field-row-index field) 0)))
220
(defmethod solution-field-sort ((field matrix-field) order)
222
(solution-field-materialize field)
223
(let ((%matrix (matrix-field-solutions field)))
224
(cond ((or (cffi:null-pointer-p %matrix)
225
(<= (rdfcache:matrix-row-count %matrix) 0))
226
(setf (matrix-field-data field) (cffi:null-pointer)
227
(matrix-field-length field) 0
228
(matrix-field-row-count field) 0
229
(matrix-field-row-index field) 0))
231
(let* ((dimensions (solution-field-dimensions field))
232
(order-map (loop for dimension in order
233
collect (or (position dimension dimensions)
234
(error "sort dimension not present in field: ~s: ~a"
236
(rdfcache:matrix-sort-by %matrix order-map)
237
(setf (solution-field-sort-dimensions field) order))
238
(setf (matrix-field-data field) (rdfcache:matrix-data-pointer %matrix)
239
(matrix-field-length field) (rdfcache:matrix-row-count %matrix)
240
(matrix-field-row-count field) (rdfcache:matrix-row-count %matrix)
241
(matrix-field-row-index field) 0)
242
(when *matrix-trace-output*
243
(let ((*print-pretty* nil))
244
(format *matrix-trace-output* "sorted: ~a~%~a~%" order (rdfcache:matrix-to-list %matrix))))))))
245
(values field order))
248
(defmethod set-solution-field-solutions ((%matrix sb-sys:system-area-pointer) (solution-data array))
249
(unless (and (= (array-dimension solution-data 0) (rdfcache:matrix-row-count %matrix))
250
(= (array-dimension solution-data 1) (rdfcache:matrix-column-count %matrix)))
251
(rdfcache:matrix-resize %matrix (array-dimension solution-data 0) (array-dimension solution-data 1)))
252
(loop for row from 0 below (array-dimension solution-data 0)
253
do (loop for column from 0 below (array-dimension solution-data 1)
254
do (rdfcache:matrix-set %matrix row column (aref solution-data row column))))
257
(defmethod set-solution-field-solutions ((%matrix sb-sys:system-area-pointer) (solution-data list))
258
(set-solution-field-solutions %matrix (term-number-field solution-data)))
261
(defmethod set-solution-field-solutions ((field matrix-field) (solution-data array))
262
(when (> (array-dimension solution-data 0) 0)
263
(assert (= (array-dimension solution-data 1) (length (solution-field-dimensions field))) ()
264
"Inconsistent solutions / dimensions: ~s ~s"
265
(array-dimensions solution-data)
266
(solution-field-dimensions field)))
267
(let ((%matrix (matrix-field-solutions field)))
268
(cond ((or (null %matrix) (cffi:null-pointer-p %matrix))
270
(setf (matrix-field-solutions field)
271
(rdfcache:make-matrix (array-dimension solution-data 0) (array-dimension solution-data 1))))
272
(finalize-matrix-field-solutions field %matrix))
273
((not (and (= (array-dimension solution-data 0) (rdfcache:matrix-row-count %matrix))
274
(= (array-dimension solution-data 1) (rdfcache:matrix-column-count %matrix))))
275
(rdfcache:matrix-resize %matrix (array-dimension solution-data 0) (array-dimension solution-data 1))))
276
(loop for row from 0 below (array-dimension solution-data 0)
277
do (loop for column from 0 below (array-dimension solution-data 1)
278
do (rdfcache:matrix-set %matrix row column (aref solution-data row column))))
279
(setf (matrix-field-data field) (rdfcache:matrix-data-pointer %matrix)))
280
(setf (solution-field-row-index field)
281
(setf (solution-field-length field)
282
(setf (solution-field-row-count field) (array-dimension solution-data 0))))
285
(defmethod set-solution-field-solutions ((field matrix-field) (solution-data list))
286
(set-solution-field-solutions field (term-number-field solution-data)))
288
(defmethod set-solution-field-solutions ((field matrix-field) (%matrix sb-sys:system-area-pointer))
289
;; an external pointer is handled as a matrix
290
(unless (eq (matrix-field-solutions field) %matrix)
291
(setf (matrix-field-solutions field) %matrix)
292
(finalize-matrix-field-solutions field %matrix))
293
(setf (matrix-field-data field) (rdfcache:matrix-data-pointer %matrix))
294
(setf (solution-field-row-index field)
295
(setf (solution-field-length field)
296
(setf (solution-field-row-count field) (rdfcache:matrix-row-count %matrix))))
301
(defmethod solution-field-concatenate ((field1 matrix-field) (field2 matrix-field))
302
(let ((solutions (rdfcache:matrix-concatenate (solution-field-solutions field1)
303
(solution-field-solutions field2))))
304
(make-matrix-field :solutions solutions)))
308
(defgeneric solution-field-solutions-row-count (field)
309
(:method ((field list)) (length field))
310
(:method ((field array)) (array-dimension field 0))
311
(:method ((field vector)) (reduce #'+ field :key #'(lambda (page) (array-dimension page 0)) :initial-value 0))
312
#+sbcl (:method ((field sb-sys:system-area-pointer))
313
(if (cffi:null-pointer-p field)
315
(rdfcache:matrix-row-count field)))
316
(:method ((field solution-field))
317
(solution-field-row-count (solution-field-solutions field))))
319
(defmethod solution-field-column-count ((field matrix-field))
320
(let ((solutions (matrix-field-solutions field)))
321
(if (or (null solutions) (cffi:null-pointer-p solutions))
323
(rdfcache:matrix-column-count solutions))))
325
(defmethod release-field-data ((field matrix-field))
326
(unless (cffi:null-pointer-p (matrix-field-solutions field))
327
(rdfcache::matrix-release (matrix-field-solutions field))
328
(setf (matrix-field-solutions field) (cffi:null-pointer)
329
(matrix-field-data field) (cffi:null-pointer))))
331
(defmethod field-dimensions ((field matrix-field))
332
(solution-field-dimensions field))
334
(defmethod complete-field-input ((field matrix-field))
335
(release-field-data field))
339
;;; paged matrix : combine the paging and the channel in one instance
341
(defclass paged-matrix (paged-buffer matrix-page-channel)
344
:accessor paged-buffer-current-read-page)
347
:accessor paged-buffer-current-read-row)
350
:accessor paged-buffer-current-read-length)
353
:accessor paged-buffer-current-write-page)
356
:accessor paged-buffer-current-write-row)
359
:accessor field-sort-dimensions))
360
(:documentation "Specialize paged-buffer to use off-heap foreign arrays
361
to store the solutions. These are allocated on-demand up to the queue
362
depth and freed upon normal completion. A termination function is defined
363
to release the pages in the event of abnormal completion."))
365
(defmethod field-dimensions ((field paged-matrix))
366
(channel-dimensions field))
368
(defmethod field-width ((field paged-matrix))
369
(channel-page-width field))
371
(defmethod field-page-length ((field paged-matrix))
372
(channel-page-length field))
375
(defmethod field-current-page-length ((field paged-matrix))
376
(let ((current-page ()))
378
(rdfcache:matrix-row-count current-page)
381
(defmethod make-paged-buffer-page-ring ((instance paged-matrix))
382
"Create an initial page wing for a paged matrix for which the entries are null"
383
(loop with size = (channel-size instance)
387
(defmethod make-paged-buffer-page ((instance paged-matrix) length width)
388
(rdfcache:make-matrix length width))
390
(defmethod initialize-instance :after ((instance paged-matrix) &key)
391
(let ((ring (paged-buffer-write-ring instance))
392
(count (channel-size instance)))
393
(sb-ext:finalize instance #'(lambda ()
394
(loop for i below count
396
for page = (first pages)
398
do (progn (rdfcache:matrix-release page)
399
(setf (first pages) nil)))))))
401
(defmethod shared-initialize :before ((instance paged-matrix) (slot-names t) &key)
402
(when (slot-boundp instance 'write-ring)
403
(release-field-pages instance))
404
(setf (paged-buffer-current-read-page instance) nil)
405
(setf (paged-buffer-current-read-row instance) nil)
406
(setf (paged-buffer-current-write-page instance) nil)
407
(setf (paged-buffer-current-write-row instance) nil))
409
(defgeneric release-field-pages (field)
410
(:method ((field paged-matrix))
411
(sb-ext:cancel-finalization field)
412
#| should not do this, as th elist is circular and the matrix was stable
413
(let ((current-read-page (paged-buffer-current-read-page field)))
414
(when current-read-page
415
(unless (find current-read-page (paged-buffer-read-ring field))
416
(rdfcache:matrix-release current-read-page))))|#
417
(loop for i below (channel-size field)
418
for pages on (paged-buffer-write-ring field)
419
for page = (first pages)
421
do (progn (rdfcache:matrix-release page)
422
(setf (first pages) nil)))))
424
(defmethod solution-field-materialize ((channel paged-matrix))
425
(loop with materialized-page = nil
426
for page = (get-field-page channel)
428
do (setf materialized-page
429
(if materialized-page
430
(rdfcache:matrix-concatenate materialized-page page)
432
finally (return (setf (channel-state channel) :input
433
(paged-buffer-current-read-row channel) 0
434
(paged-buffer-current-read-page channel) materialized-page))))
436
(defmethod complete-field ((channel paged-matrix))
437
(let* ((write-row (paged-buffer-current-write-row channel))
438
(write-count (when write-row (1+ write-row)))
439
(write-page (paged-buffer-current-write-page channel)))
440
(when (and write-count (< write-count (channel-page-length channel)))
441
(rdfcache:matrix-resize write-page
443
(channel-page-width channel)))
444
(put-field-page channel write-page)
448
(defmethod channel-complete-input ((channel paged-matrix))
449
(release-field-pages channel)
452
(defmethod complete-field-input ((channel paged-matrix))
453
(channel-complete-input channel))
455
;;; backwards, for now
456
(defmethod complete-field-output ((channel paged-matrix))
457
(complete-field channel))
459
(defmethod new-field-page ((channel paged-matrix) length width)
460
"ensure that a page is present. allocate and save one if necessary"
461
(cond ((call-next-method))
463
;; otherwise, make the page
464
(setf (first (paged-buffer-write-ring channel))
465
(make-paged-buffer-page channel length width)))))
467
(defmethod get-field-page ((channel paged-matrix))
468
(let ((next-page (call-next-method)))
469
(setf (paged-buffer-current-read-row channel) -1
470
(paged-buffer-current-read-page channel) next-page)))
472
(defmethod next-field-row ((channel paged-matrix))
473
"Return the next page and the next row index in the page to read.
475
(let ((current-page (paged-buffer-current-read-page channel))
476
(current-row (paged-buffer-current-read-row channel)))
479
(release-field-page channel current-page))
480
(if (setf current-page (get-field-page channel))
481
(values (rdfcache:matrix-data-pointer current-page)
482
(setf (paged-buffer-current-read-row channel) 0))
483
(cffi:null-pointer))))
485
(if (>= (incf current-row) (rdfcache:matrix-row-count current-page))
487
(values (rdfcache:matrix-data-pointer current-page)
488
(setf (paged-buffer-current-read-row channel) current-row)))
491
(defmethod new-field-row ((channel paged-matrix))
492
"Return the next page and the next row in the page to write."
493
(let ((current-page (paged-buffer-current-write-page channel))
494
(current-row (paged-buffer-current-write-row channel)))
497
(setf (paged-buffer-current-write-page channel)
498
(new-field-page channel
499
(channel-page-length channel)
500
(channel-page-width channel))))
501
(values (rdfcache:matrix-data-pointer current-page)
502
(setf (paged-buffer-current-write-row channel) 0))))
504
(if (>= (incf current-row) (rdfcache:matrix-row-count current-page))
506
(values (rdfcache:matrix-data-pointer current-page)
507
(setf (paged-buffer-current-write-row channel) current-row)))