Coverage report: /development/source/library/org/datagraph/spocq-shard/src/core/processing.lisp
| Kind | Covered | All | % |
| expression | 426 | 1101 | 38.7 |
| branch | 30 | 76 | 39.5 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
3
(in-package :org.datagraph.spocq.implementation)
5
(:documentation "top-level query processing"
6
"The 'processor' comprises a set of message handling threads.
7
It manages its resource usage in terms of number of threads and the role(s) for which they are configured.
8
There are two kinds of thread, request threads and response threads.
9
Each configured is upon creation to serve a specific purpose:
11
* A request thread is intended to accept query requests. It creates as its input, a stream/channel which
12
to consume query messages from a given brokered domain. It prepares each parsed request, in particular,
13
by compiling it, and then sends it through the *service-channel* to a response thread for execution.
14
with a single queue to delegate BGP operations to the store.
15
* A response thread administers query execution by dispatching the constituent algebra operations to
16
individual threads and conecting the result solution stream to a response stream/channel to the
20
Each request and response comprises at least three required message properties,
26
and a body. Additional, optional properties are taken from the request header:
31
A query request's body is the query expression encoded either as SPARQL or as SSE/BERT, as per the
32
request MIME type. A query response message's body depends on the query operator
34
* SELECT : a solution field
35
* CONSTRUCT : a triple (or optionally quad?) sequence (graph)
36
* DESCRIBE : a triple sequence (graph)
39
Should a store request or a query request fail, the response is an error message.
41
The managed execution process involves constructung graph of connected threads, each of which
42
processes one node in the algebra graph. Internal nodes accept arguments are streams and produce
43
a result stream. The leaves perform BGP matching and iterate over match results from the store to
44
emit a result stream. The response thread connects the root node output stream to an encoding
45
filter as per the response MIME type, which emits the result to the response stream/channel.
47
Each algebra node involves three processing phases:
48
- compilation : the sse expression is expanded/translated into one in terms of spocq.e runtime operations
49
and compiled from lisp into an native function.
50
- propagation network construction : each first-level spocq.e operator constructs an algebra graph
51
node which encapsulates the operations parameters - eg., the specific reduction function, projection
52
dimensions, slice specification, and the links to the argument source nodes. This is a 'static' graph,
53
which is intended to be reused, and requires store access for selectivity data only, in order to sort
54
BGP constituent patterns.
55
- reduction execution : a top-down control process delegates the respective operation for each node to
56
a thread, which applies the reduction function to source channels - or in the case of BGP leaves, to the
57
store, to emt the result field stream to the destination.
59
The second and third phases are split, rather than initiating the execution simultaneous with network
60
construction in order that
61
- the respective reduction thread is active prior to the argument thread(s), and
62
- the network can be reused by just flushing its channels of ay unused results and restarting the
65
Some notes about optimization.
66
- arq[1] uses 'selectivity' criteria to order related bgps.
67
- arq/vc : static values by variable position
68
- arq/vcp : augmented with values for predicate values, for predicate classes
69
- arq/gsh : uses exact statistics for any literal term. could be extended to term combinations
70
- the others (pfj, pfn) constitute unclear variations of the above
72
[2] presents three aspects of optimization
73
- abstract symbolic which follows from operator simplification patterns (from perez)
74
- symbolic transformation based on certain/probable variables
75
- semantic optimization, which follows from tgd/egd-based inference
78
NB. There is code included to delegate processing for query components to remote processes, but that code
80
If the query request entailes BGP queries, the execution publishes any BGP queries and yields a function as
81
the immediate local result. This function is bound globally to its BGP identifiers to be avaialble for
82
a response thread to combine with response from the store. If, on the other hand, the query included no BGP,
83
the execution yields an immediate result - a solution field, a triple set, or a scalar (boolean or numeric),
84
which is published back to the client immediately.
86
Any 'incomplete' query functions remain bound to their pending BGP queries. As each response appears, its
87
query function is retrieved according to BGP identifier and applied to the store result. Each successive
88
application yields a result analogous to the initial one. As long as a the result is incomplete, the
89
function remains registered. Any results are returned immediately. Once the function is complete, any
90
final results are sent as a completion response.
92
A store request's body is a single
93
BGP encoded as SSE/BERT. A store response's body is a solution field - a set of variable binding solutions.
95
working with streaming revisions
96
- the revision has a repeat interval - with or without an end.
97
this means after it completes, the min/max ordinals are adjusted and it runs again.
98
- completely? : no without initialization, just the query-run-in-thread suffices? (it is used for internal service forms as well)
99
- results are emitted to its destination queue as normal
100
- the query must indicate how to mark them temporally for combination with other streams
102
- algebra-thread-apply wiuld be premature - it does initialte execution, but not completion,
103
which means the transaction parameters cannot be changed.
105
this should be captured as a function
106
(query-run-in-thread task result-or-generator)
107
(send-response-message (task-operation task) task stream encoding)))
108
for repreted intervals, update the bounds around it. actually need to mediate the response stream
109
to take thee-o-s marker as the cueue to reiterate.
112
begin,end,repeat,window
119
[1] : http://www2008.org/papers/pdf/p595-stocker1.pdf
120
[2] : tcdt2010_cd.pdf")
123
;;; consolidated interface to execute a single query
125
(defun test-sparql (query-string &rest args &key (repository-id "system/system") (agent (system-agent))
127
(apply #'run-sparql query-string
128
:repository-id repository-id :agent agent
131
(defun count-system-quads (&key (account "system"))
132
(test-sparql "select count(*) from <urn:dydra:all> where {?s ?p ?o}"
133
:repository-id (concatenate 'string account "/system")))
135
(defun rqpl (repository)
137
(write-string "? " *terminal-io*)
138
(finish-output *terminal-io*)
139
(let ((query (loop with input = ()
140
for line = (read-line *terminal-io* nil nil)
141
if (zerop (length line))
142
return (format nil "~{~a~^~%~}" (reverse input))
143
else do (push line input))))
144
(unless (plusp (length query)) (return))
145
(handler-case (let ((*print-pretty* nil))
146
(multiple-value-bind (solutions variables) (run-sparql query :repository-id repository)
147
(format *terminal-io* "~{~a~^ ~}~%" variables)
148
(format *terminal-io* "~{~{~s~^ ~}~%~}" solutions)))
149
(error (c) (warn "error: ~%~a~%" c))))))
151
(defun run-sparql-internal (query &rest args)
152
"run sparql with auth controls, but suppress all extensions - inference, views, etc
153
which would be recognized during a request.
154
This, in particular to permit sparql request for extension metadata without
155
infinite recursion to retrieve it."
156
(let ((*library-path* ())
157
(*macroexpand-bgp-phases* *macroexpand-bgp-base-phases*))
158
(declare (dynamic-extent args))
159
(apply #'run-sparql query :library-path nil args)))
161
(defgeneric run-sparql (query-string &key repository-id graphs library-path dynamic-bindings id operation
162
revision-id revision-windows
163
accounting-handler error-handler solution-handler user-id
164
continuation agent response-content-type
167
(:method ((location pathname) &rest args )
168
(apply #'run-sparql (read-file location) args))
170
(:method ((query-string string) &rest args)
171
(with-configuration () ; intended for internal use
172
(multiple-value-bind (sse-expression options)
173
(parse-sparql query-string)
174
(apply #'run-sparql sse-expression
175
:request-content query-string
177
;; take the dataset freom the environment or from the query text
178
(case *dataset-source*
181
(list* :dataset-graphs *dataset-graphs* options))))))))
183
(:method ((sse-expression cons) &rest args &key (id (make-task-id))
184
(operation (first sse-expression))
185
(response-content-type "application/sparql-query+sse")
187
(apply #'run-sparql (apply #'make-query
190
:sse-expression sse-expression
191
:request-exchange "test" :request-routing-key "key"
192
:store-routing-key "host.app.pid"
193
:trace-routing-key nil
194
:response-content-type response-content-type
195
(plist-difference args '(:accounting-handler :error-handler :solution-handler :continuation
196
;; should be adopted for the initargs
199
#+(or) ;; revision windows here
200
(:method ((query query) &key (accounting-handler nil ah-s) (error-handler nil eh-s) (continuation nil c-s)
201
(dynamic-bindings nil db-s) (solution-handler #'term-value-field) response-content-type
204
(let* ((post-result ())
206
(repository (task-repository query))
207
(windowed-revision (when revision-windows
208
(compute-repository-revision repository revision-windows))))
210
(progn (block :handle-termination
211
(add-task-thread query (bt:current-thread))
212
(restart-bind ((terminate-task (lambda (task-to-terminate &optional condition)
213
(when (eq query task-to-terminate)
214
(warn "query task terminated: ~a" task-to-terminate)
215
(return-from run-sparql (values nil nil query condition))))))
216
(with-task-environment (:task query)
217
(when db-s ;; modify a clone's bindings
218
(setf (task-dynamic-bindings query) dynamic-bindings))
219
(block :collect-result
220
(labels ((collect-result (element)
222
(let ((element-result (funcall solution-handler element)))
223
(when (consp element-result)
224
(setf post-result (if c-s
226
(funcall continuation element))
227
(append post-result element-result)))))
228
(return-from :collect-result)))
229
(report-errors (task.condition)
230
(destructuring-bind (task . condition) task.condition
231
(declare (ignore task))
232
(warn "query error: ~a: ~a" query condition)))
233
(report-accounting (task.properties)
234
(destructuring-bind (task . properties) task.properties
235
(declare (ignore task))
236
(when *accounting-io*
237
(let ((*print-pretty* nil)) (format *accounting-io* "~&~a~%" properties)))
238
(labels ((clear-list (list)
241
(cons (clear-list (rest list))
242
(setf (first list) nil (rest list) nil))
245
(clear-list properties))))
246
(run-task-for-revision (effective-revision)
247
(let ((effective-revision-id (spocq.i::repository-revision-id effective-revision)))
248
(setf (spocq.i::task-revision query) effective-revision)
249
(setf (spocq.i::task-revision-id query) effective-revision-id)
250
(log-debug "initiate-task.mqtt: run-task-for-revision: ~a" effective-revision-id))
251
(cond (response-content-type
252
(initiate-task query response-content-type))
254
(initiate-task query #'collect-result
255
:dimension-handler #'(lambda (d) (setf dimensions d))
256
:accounting-handler (if ah-s accounting-handler #'report-accounting)
257
:error-handler (if eh-s error-handler #'report-errors)))))
258
(for-each-revision (min-record max-record)
259
(log-debug "initiate-task.run-sparql: for-each-revision: ~a -- ~a" min-record max-record)
260
(unless max-record (setf max-record min-record))
261
(let* ((min-id (ORG.DATAGRAPH.RDF.LMDB.IMPLEMENTATION::REVISION-RECORD-UUID min-record))
262
(max-id (ORG.DATAGRAPH.RDF.LMDB.IMPLEMENTATION::REVISION-RECORD-UUID max-record))
263
(id (if (equalp min-id max-id) min-id (concatenate 'string min-id "-" max-id)))
264
(effective-revision (spocq.i::compute-repository-revision repository id)))
265
(run-task-for-revision effective-revision)
266
(spocq.i::reinitialize-task query))))
267
(if windowed-revision
268
(map-repository-revision-intervals #'for-each-revision windowed-revision)
269
(run-task-for-revision (spocq.i::task-revision query))))))
270
(when (fboundp 'process-provenance-information)
271
(funcall 'process-provenance-information query))))
272
(values post-result dimensions query t))
273
(return-query () (values nil dimensions query nil)))))
275
;; revision windows in initialize task
276
(:method ((query query) &key (accounting-handler nil ah-s) (error-handler nil eh-s) (continuation nil c-s)
277
(dynamic-bindings nil db-s) (solution-handler #'term-value-field) response-content-type
280
(let ((post-result ())
282
(windowed-revision (when revision-windows
283
(spocq.i::compute-repository-revision (task-repository query) revision-windows))))
285
(progn (block :handle-termination
286
(add-task-thread query (bt:current-thread))
287
(restart-bind ((terminate-task (lambda (task-to-terminate &optional condition)
288
(when (eq query task-to-terminate)
289
(warn "query task terminated: ~a" task-to-terminate)
290
(return-from run-sparql (values nil nil query condition))))))
291
(with-task-environment (:task query)
292
(when db-s ;; modify a clone's bindings
293
(setf (task-dynamic-bindings query) dynamic-bindings))
294
(block :collect-result
295
(labels ((collect-result (element)
297
(let ((element-result (funcall solution-handler element)))
298
(when (consp element-result)
299
(setf post-result (if c-s
301
(funcall continuation element))
302
(append post-result element-result)))))
303
(return-from :collect-result)))
304
(report-errors (task.condition)
305
(destructuring-bind (task . condition) task.condition
306
(declare (ignore task))
307
(warn "query error: ~a: ~a" query condition)))
308
(report-accounting (task.properties)
309
(destructuring-bind (task . properties) task.properties
310
(declare (ignore task))
311
(when *accounting-io*
312
(let ((*print-pretty* nil)) (format *accounting-io* "~&~a~%" properties)))
313
(labels ((clear-list (list)
316
(cons (clear-list (rest list))
317
(setf (first list) nil (rest list) nil))
320
(clear-list properties)))))
321
(when windowed-revision
322
(let ((effective-revision-id (spocq.i::repository-revision-id windowed-revision)))
323
(setf (spocq.i::task-revision query) windowed-revision)
324
(setf (spocq.i::task-revision-id query) effective-revision-id)
325
(log-debug "run-sparql: revision windows: ~a" windowed-revision)))
326
(cond (response-content-type
327
(initiate-task query response-content-type))
329
(initiate-task query #'collect-result
330
:dimension-handler #'(lambda (d) (setf dimensions d))
331
:accounting-handler (if ah-s accounting-handler #'report-accounting)
332
:error-handler (if eh-s error-handler #'report-errors)))))))
333
(when (fboundp 'process-provenance-information)
334
(funcall 'process-provenance-information query))))
335
(values post-result dimensions query t))
336
(return-query () (values nil dimensions query nil))))))
338
;;; (dydra:run-sparql "select count(*) where {?s ?p ?o}" :repository-id "james/test" :agent (dydra:system-agent))
340
;;; operational components
342
(defgeneric initiate-service-task (operation task)
343
(:documentation "Run a task as an asynchronous thread with the results to be emitted as a
344
response. Performs just registration and setup immediately and delegates other aspects
345
through a channel to an asynchronous thread which performs actual processing, response generation,
348
(:method ((operation symbol) (query t))
349
;; by default, delegate to the :query response method
350
(initiate-service-task :query query))
352
(:method ((operation (eql :query)) (query query))
353
"Compile, register and initiate processing for a query. If it included a trace key,
354
send the parsed query expression. Emit accounting notes as per progress."
356
(register-query query)
357
(with-task-environment (:task query)
358
;; any commit must happen in the response thread - prior to responding
359
(let ((*package* *algebra-package*)
360
(*readtable* *sse-readtable*))
361
(log-notice "task ~s~@[/~s~], repository ~s registered."
364
(repository-id (task-repository query)))
365
(log-debug "task ~s, form ~s"
367
(query-sse-expression query)))
368
(unless (task-initialization-function query)
369
(compile-query query)
370
(generate-accounting-note :abstract))
371
(log-debug "query compiled: ~s, ~s" (task-id query) (query-sse-expression query))
372
(channel-put *service-channel* query)))
374
(:method ((operation (eql :plan)) (query query))
375
"Expand a query into its plan and return that as the result."
377
(register-query query)
378
(with-task-environment (:task query)
379
(let ((*package* *algebra-package*)
380
(*readtable* *sse-readtable*))
381
(log-notice "plan ~s, repository ~s, form ~s"
383
(repository-id (task-repository query))
384
(query-sse-expression query)))
385
(unless (task-initialization-function query)
386
(compile-query query)
387
(generate-accounting-note :abstract))
388
(channel-put *service-channel* query))))
391
(defun initiate-internal-task (task modifier &rest args)
392
"stub to continue to the generic operator"
393
(declare (dynamic-extent args))
394
(apply #'initiate-task task modifier args))
396
(defgeneric run-precommit-test (query revision)
397
(:method ((task task) (revision repository-revision))
398
(flet ((accept-result (result)
399
(declare (ignore result))
400
;; if it yielded a result, then the test succeeded
401
(return-from run-precommit-test t)))
402
(initiate-task task #'accept-result)
403
;; if it returns without a result, then the test failed
406
(:method ((view view) revision)
407
(run-precommit-test (view-query view) revision))
409
(:method ((test-string string) revision)
410
(let ((http-method-scanner (load-time-value (cl-ppcre:create-scanner `(:sequence :start-anchor
411
(:alternation "get" "post" "put"))
412
:case-insensitive-mode t))))
413
(cond ((cl-ppcre:scan-to-strings http-method-scanner test-string)
414
(process-asynchronous-task test-string))
417
(with-configuration () ; intended for internal use
418
(multiple-value-bind (sse-expression options)
419
(parse-sparql test-string)
420
(run-precommit-test (apply #'make-query
421
:id (make-service-task-id)
422
:dynamic-bindings (query-dynamic-bindings *task*)
423
:repository (repository-revision-reference revision)
425
:revision-id (repository-revision-id revision)
426
:operation (first sse-expression)
427
:sse-expression sse-expression
428
:response-content-type nil
429
:user-id (task-user-id *task*)
430
:sparql-expression test-string
431
:agent (task-agent *task*)
435
(defgeneric process-asynchronous-task (request-string)
436
(:documentation "Define a stub for the eventual spocq.si implementation")
437
(:method ((request t))
438
"This place-holder always returns nil"
441
(defgeneric call-for-task-revisions (operator task revision)
442
(:method (operator (task task) (revision repository-revision))
443
"The base method just call the function"
444
(funcall operator task revision)))
446
(defgeneric initiate-task (task continuation &key accounting-handler error-handler dimension-handler)
448
This serves as the top-level internal operator to process requests.
449
Given a task and either a continuation which accepts a result stream or a media type to specify encoding,
450
compile the query and either initiate the query to yield its generator function and initiate its processing
451
or, depending on media type, initiate and delegate directly to the encoding operation, or
452
just delegate for the media type to perform necessary intermediate steps and then emit the result.")
454
(:method :around (task continuation &rest args)
455
(declare (ignore args))
456
(with-task-environment (:task task)
457
(register-query task)
458
;; note the principal task in order to receive signals
459
(setf (task-thread task) (bt:current-thread))
460
(unwind-protect (multiple-value-prog1 (call-next-method)
461
(setf (task-thread task) nil))
462
(when (task-thread task)
463
(log-notice "initiate-task: ~s. incomplete" (task-id task)))
464
(finalize-task task))))
466
(:method ((task task) (continuation function) &key error-handler accounting-handler dimension-handler)
467
"Given a continuation as the disposition, invoke it for each solution"
468
(labels ((result-mediator (result)
470
(generate-accounting-note :complete) ; set the final task state
471
;; clear any in-process threads
472
(log-notice "initiate-task: ~s. complete [~/format-iso-time/ - ~/format-iso-time/) = ~d/~d seconds."
473
(task-id task) (task-start-time task) (task-time task)
474
(float (/ (- (get-internal-run-time) (task-start-run-time task))
475
internal-time-units-per-second))
476
(float (/ (- (get-internal-real-time) (task-start-real-time task))
477
internal-time-units-per-second)))
478
(unschedule-task task)
479
(when (and error-handler *error-condition-channel*)
480
(filter-notes error-handler (channel-get-all *error-condition-channel*)))
481
(when accounting-handler
482
(filter-notes accounting-handler (get-accounting-notes))))
483
(funcall continuation result))
484
(filter-notes (handler notes)
485
(loop for (note-task . properties) in notes
486
until (null note-task)
487
if (eq note-task task)
489
(locally (declare (type function handler))
490
(assert-argument-types initiate-task
492
(funcall handler (cons note-task properties)))))))
493
(initialize-task task)
494
(let* ((result-or-generator (task-result-generator task)))
495
(etypecase result-or-generator
497
(result-mediator result-or-generator))
498
(abstract-field-generator
499
(let ((channel (abstract-field-generator-channel result-or-generator)))
500
(when dimension-handler
501
(funcall dimension-handler (abstract-field-generator-dimensions result-or-generator)))
502
(flet ((initiate-task-for-revision (task revision)
503
(declare (ignore revision))
504
(query-run-in-thread task result-or-generator)
505
(do-pages (page channel)
506
(result-mediator page))))
507
(declare (dynamic-extent #'initiate-task-for-revision))
508
(call-for-task-revisions #'initiate-task-for-revision task (task-revision task)))))))
509
(result-mediator nil)))
511
(:method ((task task) (encoding mime:mime-type) &key (stream *standard-output*) &allow-other-keys)
512
"In general, for mime types, run the query and send the generated result as the response"
513
(initialize-task task)
514
(let ((result-or-generator (task-result-generator task))
515
(*thread-operations* (cons (list 'initiate-task (task-id task))
516
*thread-operations*)))
517
(flet ((initiate-task-for-revision (task revision)
518
(declare (ignore revision))
519
(query-run-in-thread task result-or-generator)
520
(send-response-message (task-operation task) task stream encoding)))
521
(declare (dynamic-extent #'initiate-task-for-revision))
522
(call-for-task-revisions #'initiate-task-for-revision task (task-revision task))))
525
(:method ((task query) (encoding mime::query) &key (stream *standard-output*) &allow-other-keys)
526
"if the response is strictly the query, then send the response directly, without processing"
527
(send-response-message (task-operation task) (query-sse-expression task) stream encoding)))
531
(defun thread-task (thread)
532
(when (bt:thread-alive-p thread)
533
(sb-thread:symbol-value-in-thread '*task* thread nil)))
536
(defun thread-task (thread)
537
(log-error "no thread-task implementation")
541
(defun cancel-algebra-threads ()
542
(loop for thread in (bt:all-threads)
543
count (unless (or (eq thread *management-thread*) ;
544
(eq thread (bt:current-thread)))
545
(ignore-errors (cancel-thread thread :task nil)))))
550
;;; 5 : 76.59 65.29 92.98
551
;;; 6 : 66.48 55.6 74.04
552
;;; 7 : 66.96 66.48 72.39
553
;;; w/10 all results were materialized in a single thread
554
;;; 10: 75.95 59.98 65.12
557
;;; 10: 81.4 81.33 75.46 59.07
560
;;; 10: 77.72 67.31 96.93
562
(defgeneric get-task-operation (task)
563
(:method ((task task))
564
"ensure that the task is still active - otherwise, delayed process creation can leave a hanging process"
565
(when (task-active-p task)
566
;; must wait as the thread will be created before the operation
567
(channel-get (task-operations task) :wait t))))
569
(defgeneric put-task-operation (task operation)
570
(:method ((task task) operation)
571
"allow the task to be picked up by an idle thread, suspend to allow a thread
572
to queue to fetch an operation and if non is waiting create a new one.
573
queue operations per task in order to permit eventual prioritzation"
574
(let ((operations-queue (task-operations task)))
575
(bt:with-lock-held (*algebra-task-lock*)
576
(channel-put *algebra-task-channel* task)
577
(unless (plusp (channel-get-wait-count *algebra-task-channel*))
578
;; (let ((*print-pretty* nil)) (format *trace-output* "~%starting thread for ~s." source))
579
(start-algebra-thread #'(lambda () (run-algebra-thread-loop)))
581
(channel-put operations-queue operation)
585
(defparameter *cache-channel-threads* nil)
587
(defgeneric query-run-in-thread (task task-activation)
588
(:documentation "Arrange to add an operation to a query to make it available for another thread to execute.
589
This may be the initial task operation or some indiviaul algepra operation.")
591
(:method ((task task) (source null))
594
(:method ((task task) (source abstract-field-generator))
595
(put-task-operation task (abstract-field-generator-expression source)))
597
(:method ((task task) (expression cons))
598
(when *cache-channel-threads*
599
(loop for object in expression
600
when (typep object 'channel)
601
do (setf (channel-writer object) (bt:current-thread))
602
when (typep object 'solution-generator)
603
do (let ((channel (solution-generator-channel object)))
604
(when channel (setf (channel-reader channel) (bt:current-thread))))))
605
(put-task-operation task expression))
607
(:method ((task task) (task-activation function))
608
(put-task-operation task task-activation)))
611
(defun start-algebra-thread (function &optional (initial-bindings nil))
612
(make-processing-thread function
613
:name (format nil "algebra-dynamic-~d" (rdfcache:time-in-thread))
614
:initial-bindings initial-bindings))
617
(defun make-processing-thread (function &rest args &key initial-bindings name)
618
"create a new thread which ensures thread-saftey for global bits
619
- *random-state*, for use in blank node generation
620
- *task*, to keep it local"
621
(declare (ignore name))
623
(apply #'bt:make-thread function
624
:initial-bindings (loop for (variable value)
625
in `((*print-array* ,*print-array*)
626
(*random-state* ,(make-random-state t))
627
(*thread-string-buffer* nil)
628
(*thread-byte-buffer* nil)
630
unless (assoc variable initial-bindings)
631
do (setf initial-bindings
632
(acons variable value initial-bindings))
633
finally (return initial-bindings))
635
(unless (bt:threadp thread)
636
(error "thread creation failed: ~s" function))
639
;;; (dotimes (x 3) (make-processing-thread #'(lambda () (print (random 10)))))
640
;;; (time (dotimes (x 100) (make-processing-thread #'(lambda () nil)))) == ca 45ms/call
642
#+(and use-affinity sbcl)
644
(defparameter *affinity-masks* (make-array (sb-cpu-affinity:cpu-count)))
645
(dotimes (i (length *affinity-masks*))
646
(setf (aref *affinity-masks* i) ;; (sb-cpu-affinity::make-cpu-mask)))
647
(sb-alien:make-alien (sb-alien:unsigned 8) 1024))
648
(sb-cpu-affinity::%get-cpu-affinity-mask (aref *affinity-masks* i))
649
(sb-cpu-affinity::set-cpu-affinity i (aref *affinity-masks* i))))
652
;; (defparameter *run-algebra-thread.lock* (bt:make-lock "run-algebra-thread"))
654
(defun algebra-thread-apply (task expression-or-function &key (priority *algebra-running-priority*))
655
"Apply the task operation in an existing thread.
656
Modify the thread priority and bind the task environment around the call.
657
Emit an accounting note and trace messages for the operation completion and return the
658
operatiosn completion state"
660
(log-trace "reduce aspect: ~a: ~a"
661
(task-id task) expression-or-function)
662
(let ((active-priority *algebra-idle-priority*))
663
(with-task-environment (:task task)
664
(unwind-protect (progn
665
(setf active-priority (setf (thread-priority) priority))
666
(let ((return-state (multiple-value-list (etypecase expression-or-function
667
(function (funcall expression-or-function))
669
(cons (apply (first expression-or-function)
670
(rest expression-or-function)))))))
671
(generate-accounting-note (first return-state) :task task)
672
(log-trace "reduced aspect: ~a: ~a -> ~a"
673
(task-id task) expression-or-function return-state)
674
(apply #'values return-state)))
675
(unless (eql active-priority *algebra-idle-priority*)
676
(handler-case (setf (thread-priority) *algebra-idle-priority*)
677
(error (c) (log-warn "error resetting thread priority: ~s: ~a."
680
(defparameter *run-algebra-thread.init-thread* t)
681
(defparameter *run-algebra-thread.exit-thread* nil)
683
;;; 2019-05-04: observing htat each thrad creation required 30-50ms, a single-operation thread model
684
;;; was not realistic for short-lived requests.
685
;;; as a consequence, refactored run-algebra-thread(task) into run-algebra-thread-loop + get-task -> task-run-algebra-thread
687
(defun run-state-is-processing ()
688
(eq *run-state* :processing))
689
(defun run-state-is-terminate ()
690
(eq *run-state* :terminate))
692
(defun run-algebra-thread-loop ()
693
"run as the toplevel thread loop to request a task which needs processing,
694
attach to it and process a task for it."
695
(loop for task = (channel-get *algebra-task-channel*)
696
do (task-run-algebra-thread task)
697
until (run-state-is-terminate)))
700
(defun task-run-algebra-thread (task)
701
(declare (optimize (debug 2) (safety 3)))
702
(log-trace "algebra thread starting: ~s." task)
703
(assert-argument-type task-run-algebra-thread task task)
706
(format stream "~&rat.start: ~s: ~s~%" task (bt:thread-name (bt:current-thread)))
707
(write-char #\+ *trace-output*) (finish-output *trace-output*)))
708
(let ((*thread-operations* nil)
710
(when *run-algebra-thread.init-thread*
711
(rdfcache:init-thread) ) ; required for rdfcache channel setup
712
;; note that the thread accepts operations from the tasks's queue w/o setting up the respective
713
;; environment. given which, terminate task
714
(restart-bind ((cancel-thread (lambda (&key ((:task cancel-task) nil))
715
(trace-threads task-run-algebra-thread.cancel-thread :current-thread (bt:current-thread) :canceled)
716
(cond ((or (null cancel-task) (eq cancel-task task))
717
(log-info "task thread canceled: returning: ~a ~a" *task* (bt:current-thread ))
718
(return-from task-run-algebra-thread))
720
;; this happens on rare occasions.
721
;; it indicates that this thread has been associate with a task which is cancelling its threads,
722
;; but the association is no longer true.
723
;; this could happen where the cancellation is a spawned thread while the thread itself
724
;; has returned, disassociated itself from the task and commenced operation for another.
725
;; this agrees with the pronted state, where the thread's task is active and the cancel-task is complete.
726
(log-warn "task thread canceled ... not: ~a/~a/~a ~a"
727
*task* task cancel-task (bt:current-thread )))))))
729
(flet ((handle-error (condition)
731
(spocq.e:compilation-error
732
(log-warn "task-run-algebra-thread: compilation error in query thread: ~a -> ~a." task condition)
733
(if *log-compilation-errors*
734
(write-tracelog "task-run-algebra-thread: compilation error in query thread: ~a -> ~a." task condition)))
736
(log-stacktrace "Error in query thread: ~a -> ~a." task condition)))
737
;; send an error response
738
(generate-error-note condition :task task)
739
(trace-threads task-run-algebra-thread.handle-error :task-terminated :current-thread (bt:current-thread) :task task)
740
(log-warn "task-run-algebra-thread.handle-error: terminating task: ~a: ~a: ~a" (bt:current-thread) task *thread-operations*)
741
(generate-accounting-note :terminate :task task)
742
(terminate-task task condition)
743
(return-from task-run-algebra-thread condition)))
745
(add-task-thread task (bt:current-thread))
746
(flet ((guarded-handler (c)
747
;;(print (list :task-run-algebra-thread :guarded-handler c) *error-output*)
748
(handler-case (handle-error c)
749
(serious-condition (c2)
750
(log-error "reentrant error processing query:~% ~a~% ~a."
752
(return-from task-run-algebra-thread)))
753
(declare (dynamic-extent #'guarded-handler))
755
(handler-bind ((storage-condition
757
(SB-DEBUG:PRINT-BACKTRACE :count 8)
758
(signal c)) ) ; unwind it to the handler-case
760
;; handle the error in the dynamic context in order to terminate other threads
761
;; then, as the last this one, in order to not close the transaction until
762
;; no other thread needs it.
764
(let ((operation (get-task-operation task)))
765
;;(trace-when *trace-output* "~&rat.next ~s.~s: ~s" task (task-operation task) (bt:current-thread))
766
(when operation ;; allow that the task has been terminated
767
(setq *thread-operations* (list operation))
768
;; run exactly one operation per thread
769
(algebra-thread-apply task operation))))
770
(storage-condition (c) (guarded-handler c)))))
771
(log-info "algebra thread returning: ~s." task)
772
(remove-task-thread task (bt:current-thread))
775
(format stream "~&rat.complete: ~s: ~s @~d~%" task (bt:thread-name (bt:current-thread)) (incf unwind-count))
776
(print *thread-trace-output* *terminal-io*)
777
(finish-output *terminal-io*)))
778
(when *run-algebra-thread.exit-thread*
779
(rdfcache:exit-thread)) ; channel housekeeping
782
(defmacro process-query-request (&rest args)
783
(error "NYI: process-query-request . ~s" args))
786
(defun process-query-request (query *request-io* *store-io*)
787
"Mediate query requests against a store.
789
The request channel is active based on subscriptions
790
* to service a request domain - with respective access and priority
791
* to field store responses
792
The store queue is bound to a brokered queue for a triple store, which performs the BGP processing.
793
Each incoming message is dispatched as either a new request or a store response for an existing
796
To process a request, compile and execute it. The execution yields two values:
797
* a solution set, which may be empty, and
798
* a boolean to indicate completion.
799
If either the solution is not empty, or the request is complete, publish the result."
801
(funcall (task-operation query) query *request-io* *store-io*))
804
(defgeneric process-store-response (response-type message *request-io* *store-io*)
805
(:documentation "Process a store message This allows for BGP solutions and for errors.")
807
(:method ((type (eql 'spocq:|bgpfield|)) message (*request-io* t) (*store-io* t))
808
"A reply message contains a solution field. Integrate it into an ongoing query, capture any
809
(possibly intermediate) results and publish them to any subscribers. If the query remains complete,
810
terminate the client streams. Otherwise leave the augmented query standing."
812
(handler-bind ((spocq.e:runtime-error
814
;; if it is a spocq error, send an error message to the client and terminate
816
(let ((query (error-query condition)))
817
(generate-error-note condition :task query)
818
(generate-accounting-note :terminate :task query)
819
(finalize-task query)
820
;; return t to indicate that the message was handled
821
(return-from process-store-response t)))))
822
(error "Not supported: ~s."
823
`(spocq:|bgpfield| ,message ,*request-io* ,*store-io*))))
825
(:method ((type (eql 'spocq:|error|)) message *store-io* *request-io*)
826
(spocq:|error| message *request-io* *store-io*)))
829
(defgeneric spocq:|query| (task request-io store-io)
830
(:method ((*query* query) request-io store-io)
831
"Compile, register and initiate processing for a query. If it included a trace information,
832
send the parsed query expression. Emit accounting notes as per progress."
834
(send-account-note *query* :abstract *accounting-io*)
835
(compile-query *query*)
836
(when (task-trace-routing-key *query*)
837
(send-trace-response *query* request-io :|query-form| (query-sse-expression *query*)))
838
(let ((*package* *ALGEBRA-PACKAGE*)
839
(*readtable* *sse-readtable*))
840
(log-notice "task ~s, repository ~s, form ~s"
842
(repository-id (task-repository *query*))
843
(query-sse-expression *query*)))
844
(register-query *query*)
845
;; initiate any bgp matching
846
(match-query *query*)
847
;; attempt immediate reduction - in case bgps were literal or repository in direct
848
(reduce-query *query*)
849
(task-state *query*)))
852
(defgeneric spocq:|bgpfield| (message request-io store-io)
853
(:method ((task store-reply) request-io store-io)
854
(let* ((bgp-id (task-bgp-id task))
855
(solution-field (task-solution-field task))
856
(*query* (find-query bgp-id)))
858
(when (task-trace-routing-key *query*)
859
(send-trace-response *query* request-io :|bgp-complete| (list bgp-id (length solution-field))))
860
(log-info "task ~s, bgp ~s, count ~s."
861
(task-id *query*) bgp-id (solution-field-row-count solution-field))
862
(setf (query-pattern-solutions *query* bgp-id) solution-field)
863
(if (every #'agp-completed-p (query-patterns *query*))
864
(reduce-query *query*)
865
(match-query *query*))
866
(task-state *query*))
868
(log-warn "No query found for store response: ~s" task)
872
(defgeneric spocq:|error| (message request-io store-io)
873
(:method ((task store-error-task) request-io store-io)
874
(let* ((bgp-id (task-bgp-id task))
875
(*query* (find-query bgp-id)))
877
(log-notice "query: ~s. terminated. (~a ~a)"
878
(task-id *query*) (task-condition task) (task-detail task))
879
(generate-error-note (task-condition task) :task *query*)
880
(generate-accounting-note :terminate :task *query*)
881
(finalize-task *query*))
883
(log-warn "No query found for store error: ~s" task))))))
886
(defgeneric spocq:|terminate| (message request-io store-io)
887
(:documentation "The interface operator for terminate defines a query-request method which
888
locates a designated query and unregisters it. This allows in-progress algebra computations
889
to complete, but prevents future operations. The operation is logs and makes an account note.")
891
(:method ((request terminate-request) request-io (store-io t))
892
(let* ((id (task-id request))
893
(query (find-query id)))
895
(terminate-task query))
897
(log-warn "No query found for terminate: ~s" request)))))
899
(:method ((query query) (request-io t) (store-io t))
900
(terminate-task query)))
902
(defparameter *repository-purge-threshold* 3)
904
(defun constrain-resources ()
905
(let* ((now (get-universal-time ))
906
(terminated-tasks ())
907
(deleted-repositories ())
908
(deleted-prototypes ()))
909
(flet ((constrain-query (query)
910
(when (task-quota-exceeded-p query)
911
(log-notice "Task over quota: ~a (~a ~a ~a)."
912
query (task-start-time query) (task-time query) (task-time-limit query))
913
(terminate-task query
914
(make-condition 'spocq.e::quota-error
916
:detail (list :solutions (task-solution-limit query)
917
:time (task-time-limit query))))
918
(push query terminated-tasks))))
919
(declare (dynamic-extent #'constrain-query))
920
;; first constrain active queries
921
(map-queries #'constrain-query)
922
;; then limit prototype collected
923
(when (> (hash-table-count *query-prototypes*) *query-prototypes-maximum-count*)
924
(setf deleted-prototypes
925
(loop for (prototype repository signature bindings)
926
in (subseq (sort (loop for key being each hash-key of *query-prototypes*
927
using (hash-value prototype)
928
collect (cons prototype key))
929
#'< :key #'(lambda (cell) (task-start-run-time (first cell))))
930
0 (floor (/ *query-prototypes-maximum-count* 2)))
931
do (progn (setf (find-query-prototype repository signature bindings) nil)
934
;; then limit the repositores saved
935
(when (zerop (logand now 15))
936
(cond ((> (hash-table-count *repositories*) *repository-purge-threshold*)
937
(setf deleted-repositories (release-repository-resources *repositories*)))
939
((> (sb-kernel::dynamic-usage) 2500000000)
940
(setf deleted-repositories (release-repository-resources *repositories*)))))
941
(when (or terminated-tasks deleted-repositories deleted-prototypes)
942
(log-info "terminated tasks: ~s, deleted repositories: ~s, deleted prototypes: ~s"
943
(length terminated-tasks)
944
(length deleted-repositories)
945
(length deleted-prototypes)))
946
(values terminated-tasks deleted-repositories deleted-prototypes))))
948
(defmethod task-quota-exceeded-p (task)
949
(let ((time-limit (task-time-limit task))
950
(start-time (task-start-time task))
951
(task-time (task-time task)))
952
(cond ((eq (task-agent task) (system-agent))
954
((null time-limit) nil)
956
(> (- task-time start-time) time-limit))
958
(setf (task-start-time task) task-time)
962
(defparameter *saved-expression* nil)
963
(defparameter *saved-function* nil)
965
;;; (setq *query-evaluation-mode* :interpreted)
967
(defun compile-query (query)
968
(compile-query-for-repository query (task-repository query)))
970
(defgeneric compile-query-for-repository (query repository)
971
(:documentation "Compile the top-level query algebra expression.
972
The macro-expands each operator in the expression tree which transforms the algebra operations
973
into the respective run-time function calls. In the process, where bgp expansion happens, it will
974
re-order, slice, and/or combine graph patterns accoring to the transaction-specific repository
975
statistics. this means
976
Uses abstract-query-expression to abstract the sse over its bgp forms. Use the bgp bindings to construct a
977
function (query-arguments-function) which issues asynchronous bgpmatch requests the repository's store, and
978
a predicate (query-predicate-function) which tests whether the bgpmatch results are complete. Use the
979
abstract sse to construct a function over the bgp results and a response continuation to compute the
980
results and generate a query reply.
982
Note that a describe form generates no immediate response. It constructs an additional query for the indicated
983
subjects on-the-fly based on the initial response and evaluates it subsequent to the original query task.")
985
(:method ((query query) (repository repository))
986
(if (task-initialization-function query)
987
(task-initialization-function query)
988
(let* ((expression (query-sse-expression query))
990
;; the initialization function generates the propagation tree. this happens
991
;; in the scope of the task transaciton, but must leave it to be continued
992
;, once other leaf operators run
993
(ecase *query-evaluation-mode*
996
;;(print ',expression)
997
(with-task-environment (:task query :normal-disposition :continue)
998
;; no longer necessary to prime bgp matches, as they now return standard
999
;; generators with initiation functions which the combination operators
1000
;; pass to run-in-thread...
1004
(with-task-environment (:task query :normal-disposition :continue)
1005
;; no longer necessary to prime bgp matches, as they now return standard
1006
;; generators with initiation functions which the combination operators
1007
;; pass to run-in-thread...
1008
(eval ',expression)))))))
1009
(with-task-environment (:task query :normal-disposition :continue)
1011
(log-debug "compile-query ~a: ~s" (task-id query) expression)
1012
(setf (query-patterns query) nil)
1013
(setf (task-operator-count query) (length (expression-algebra-operators expression :unique-p nil)))
1014
;; compile the query form and collect the agp instances as a side-effect
1015
(let ((initialization-function
1016
(if *compile-algebra-expression*
1017
(spocq-compile lambda-expression)
1018
#'(lambda (query) (eval (list lambda-expression query))))))
1019
(setf (task-initialization-function query) initialization-function)
1020
(generate-accounting-note :compile :task query)
1021
(values initialization-function
1022
lambda-expression))))))))
1027
(defgeneric match-query (agp-or-query)
1028
(:documentation "Compute or retrieve the values for abstracted bgp/abp patterns. This is specialized for
1029
query instances to iterate over all of its apgs, and for agps to obtain the respective solution field
1030
either by local retrieval or by delegating the match to the remote store.")
1032
(:method ((query query))
1033
(let ((match-start (task-time query))
1034
(match-start-run-time (get-internal-run-time)))
1035
(log-debug "match-query: ~s: start." (task-id query))
1036
(send-account-note query :delegate *accounting-io*)
1038
(setf (query-patterns query) (compute-agp-order (query-patterns query)))
1039
(dolist (agp (query-patterns query))
1040
;; resolve each agp which is itself still un-resolved, but has completed predecessors
1041
(when (and (agp-initialized-p agp)
1042
(every #'agp-completed-p (agp-predecessors agp)))
1043
(agp-solutions agp)))
1044
(send-account-note query :bind *accounting-io*)
1045
(let ((match-end (task-time query)))
1046
(log-debug "query: ~s: matched: [~/format-iso-time/ - ~/format-iso-time/) = ~d/~d seconds."
1047
(task-id query) match-start match-end
1048
(float (/ (- (get-internal-run-time) match-start-run-time) internal-time-units-per-second))
1049
(- match-end match-start))))))
1053
;;; internal query execution operators
1055
(defgeneric call-with-query-solutions (function query &key repository repository-id graphs)
1056
(:documentation "Apply FUNCTION to each of the solutions generated by the given QUERY
1059
(:method (function (query string) &rest args &key graphs &allow-other-keys)
1060
(declare (dynamic-extent args))
1061
(declare (ignore graphs))
1062
(multiple-value-bind (sse-expression options)
1063
(parse-sparql query)
1064
(apply #'call-with-query-solutions function
1066
(merge-property-lists args options))))
1068
(:method (function (query list) &rest args &key
1069
(id (format nil "internal-~d" (rdfcache:time-in-thread)))
1070
(operation :|query|)
1072
(declare (dynamic-extent args))
1073
(call-with-query-solutions function
1074
(apply #'make-instance *class.query*
1076
:sse-expression query
1077
:operation operation
1080
(:method (function (task query) &key &allow-other-keys)
1081
(let ((dimensions nil)
1083
(with-task-environment (:task task)
1084
(initialize-task task)
1085
(let* ((generator (task-result-generator task))
1086
(channel (abstract-field-generator-channel generator)))
1087
(setf dimensions (abstract-field-generator-dimensions generator))
1088
(unwind-protect (block :handle-termination
1089
(add-task-thread task (bt:current-thread))
1090
(restart-bind ((terminate-task (lambda (task-to-terminate &optional condition)
1091
(when (or (null task-to-terminate) (eq task task-to-terminate))
1092
(warn "task terminated: ~a" task-to-terminate)
1094
(return-from :handle-termination condition)))))
1095
(query-run-in-thread task generator)
1096
(loop for page = (get-field-page channel)
1098
do (progn (incf count (array-dimension page 0))
1099
(map nil function (term-value-field page)))
1100
finally (funcall function nil))
1102
(unless *management-thread*
1103
(get-accounting-notes)
1104
(channel-get-all *error-condition-channel*))
1105
(finalize-task task))))
1106
(values count dimensions))))
1108
;;; (call-with-query-solutions #'print "select count(*) where { {graph ?g {?s ?p ?o}} union {?s ?p ?o} }" :repository-id "1/1571")
1111
;; (setq *compile-algebra-expression* nil)
1113
#+(or) ;; obsolete - applies to materialization
1114
(defgeneric reduce-query (query)
1115
(:documentation "Double-check, that the query preconditions are met - at least, that the arguments
1116
are all set, and apply the algebra function to the arguments and to a response continuation.
1117
That continuation accepts a result term - boolean, triple set, or solution field and responds
1118
to the original query request.")
1120
(:method ((query query))
1121
(let ((state :bind))
1122
(when (query-runnable-p query)
1123
(setf state :reduce)
1124
(send-account-note query :reduce *accounting-io*)
1125
(handler-bind (((or warning error)
1127
(spocq.e:reduction-error :condition c :expression (query-sse-expression query) :query query)
1128
;;; signals to the message processing loop. does not return
1130
(let ((result (funcall (task-initialization-function query) query)))
1131
(when (member result '(:complete :terminate))
1132
(return-from reduce-query (task-state query)))
1133
(let ((now (task-time query))
1134
(start (task-start-time query)))
1135
(case (task-state query)
1136
(:terminate ; reduction aborted or completed after time-out
1137
(log-notice "task ~s. complete (terminate) [~/format-iso-time/ - ~/format-iso-time/) = ~d/~d seconds."
1138
(task-id query) start now
1139
(float (/ (- (get-internal-run-time) (task-start-run-time query)) internal-time-units-per-second))
1140
(float (/ (- (get-internal-real-time) (task-start-real-time query)) internal-time-units-per-second))))
1142
(log-notice "task ~s. complete (to send) [~/format-iso-time/ - ~/format-iso-time/) = ~d/~d seconds."
1143
(task-id query) start now
1144
(float (/ (- (get-internal-run-time) (task-start-run-time query)) internal-time-units-per-second))
1145
(float (/ (- (get-internal-real-time) (task-start-real-time query)) internal-time-units-per-second)))
1146
(when (log-level-qualifies? :debug)
1148
(let ((variables (first result))
1149
(first (first (rest result)))
1150
(last (first (last result)))
1151
(count (length (rest result))))
1152
(log-debug "task ~s result: ~s~@[ . (~s~@[ [... x~d ...]~]~@[ ~s~])~]."
1156
(when (> count 2) (- count 2))
1157
(when (> count 1) last)))
1158
(log-debug "task ~s result: ~s."
1161
(send-query-response query *request-io* result)
1162
(setf now (task-time query))
1163
(log-notice "task ~s. complete (~a) [~/format-iso-time/ - ~/format-iso-time/) = ~d/~d seconds."
1164
(task-id query) (task-state query) start now
1165
(float (/ (- (get-internal-run-time) (task-start-run-time query)) internal-time-units-per-second))
1166
(float (/ (- (get-internal-real-time) (task-start-real-time query)) internal-time-units-per-second)))
1167
(setf state (setf (task-state query) :complete))))))
1168
(when (task-complete-p query)
1169
(let ((time-interval (task-time-interval query)))
1170
(cond ((and time-interval
1171
(< (setf (task-start-time query) (+ (task-time query) time-interval))
1172
(task-end-time query)))
1173
;; if complete, but repeating, rerun at the respective start time
1174
(let ((delay (- (task-start-time query) (task-time query))))
1175
;; reset the argument list
1176
(dolist (agp (query-patterns query))
1181
(reduce-query query))
1183
;; if complete, but not repeating, purge it
1184
(send-account-note query :complete *accounting-io*)
1185
(finalize-task query)))))))
1188
;;; ruby bin/spocq-query '(select (?s ?p ?o) (bgp (triple ?s ?p ?o)))'