Coverage report: /development/source/library/org/datagraph/spocq-shard/src/core/message-processing.lisp
| Kind | Covered | All | % |
| expression | 48 | 499 | 9.6 |
| branch | 0 | 10 | 0.0 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
3
(in-package :org.datagraph.spocq.implementation)
5
(:documentation "messaging control flow for queries"
6
"The 'sparql algebra engine' comprises a set of message handling threads and or processes.
7
It manages its resource usage in terms of number of threads and the role(s) for which they are
8
configured. Each thread is bound at creation with three channels multiplexed over two connections
9
- one for query requests and responses
10
- one for store bgp requests and solutions
11
- one for accounting (on its own connection)
13
The query channel is used to consume query messages clients from brokered, shared queue bound to an
14
'universal' routing key. The responses pass back though the same exchange, but are routed to the
16
The store channel is used to dispatch bgp matching requests to a shared queue and subscribes to a provate
17
queue via a process-specific routing key to a receive the responses.
19
Queries are processed in successive, independent phases, driven by messages. The process starts when a
20
client publishes a query to its request queue. That message is accepted by one of that queue's consumers,
21
and compiled it into a three functions:
22
- a bgp request generator
23
- a bgp completion predicate
24
- a reduction function.
25
The request generator executes immediately. As each response appears, the processor integrates it into the
26
respective query task. Once all bgp responses have been integrated, the reduction function executes and
27
generates the response solution to the client.
29
Each query request contains the following in additon to the query expression;
31
* repository_id : designates an owner/repostiory-name combination to identify the repository to the store
32
* query_id : identifies the query for accounting purposes and in the response to the client
33
* routing_id : specifies the return path for the response message
37
A query request body is the query expression encoded as SSF/BERT. An ssf augmnts the query sse expression
38
with information from the SPARQL form
40
* SELECT : a solution field
41
* CONSTRUCT : a triple (or optionally quad?) sequence (graph)
42
* DESCRIBE : a triple sequence (graph)
45
A store request body is a single BGP or GRAPH form. It can be encoded as SSE/BERT or SSE/UTF8 depending
46
on the channel encoding. The rquest header specifies the repository and query identifiers from the query
47
request, and adds a process-specific routing key for the return path to the sae
48
process and a bdp identifier to specify the binding.
50
A store response's body is a solution field - a set of variable binding solutions in the channel's encoding.
51
It can be a singleton field - a 'unit table', to indicate a match for a constant pattern. In
52
addition it includes the query and bgp identifiers. The provided sae process routing key serves to route
53
it back to the originating process.
55
A query response contains a result respective the query form encoded as per the channel's encoding.
56
It includes the query identifier for use by the client. The request routing key serves the route it back
59
In addition, should a store request or a query request fail, the response is an error message.
61
If the query request entailed BGP queries, the execution publishes any BGP queries and yields a query state
62
which indicates that it is still active. In that case, this query task is bound globally to its BGP
63
identifiers to be avaialble for store responses. If, on the other hand, the query included no BGP,
64
the execution yields an immediate result - a solution field, a triple set, or a scalar (boolean or numeric),
65
which is published back to the client immediately, and the query is complete.
67
Any 'incomplete' query functions remain bound to their pending BGP queries. As each response appears, its
68
query task is retrieved according to BGP identifier. It integrates the result, and checks if all responses
69
are complete. If so, it reduces the query and emits the result to the client. If the query is configured
70
for repeated execution, it is reset and the processes begins anew. Each successive
71
application yields a result analogous to the initial one.")
73
;;; channel/exchange/queue management
77
(defun run-query-request-thread-idle (channel)
80
(defun run-query-request-thread (&key (broker-uri (or *broker-uri* (error "broker-uri is required.")))
81
(query-queue (engine-query-queue-name))
82
(query-exchange (query-exchange))
83
(query-routing-key *engine-query-routing-key*)
84
(break-on-signals nil)
85
(request-content-type *request-content-type*))
86
"Accept, parse, and queue query requests.
87
- connect to a request message broker, define and bind the requests queue, subscribe to those queues
88
- for each delivery, parse the request, create a query task, and queue that task for further processing
89
This neither interprets the query nor communicates a response. All requests are acknowledged to to the broker
90
immediately and secondary status messages are send to accounting. The only substantive response is in the event the
91
the request fails to parse."
94
(log-notice "request task reconnecting to exchanges...")
95
(setq *run-state* :connect)
96
(amqp:with-open-connection (connection :uri broker-uri)
97
(amqp:with-open-channel (query-channel connection
98
:element-type '(unsigned-byte 8)
99
:content-type request-content-type)
100
(log-notice "request task query channel : ~s" query-channel)
101
(let ((*query-queue* (amqp:queue query-channel :queue query-queue))
102
(*query-exchange* (amqp:exchange query-channel :exchange query-exchange))
103
(query-consumer-tag nil))
104
(declare (ignorable query-consumer-tag))
105
;; the exchanges should already exist, but the process' queue needs to be bound
106
(setf (amqp.u:channel-content-type query-channel) request-content-type)
107
;; instantiate the channels' basic and specify non-persistent delivery mode
108
(amqp:basic query-channel :delivery-mode 1)
109
;; removed :auto-delete t on rabbitmq transition to 0.9.1, but it shouldn't be present
110
;; anyway, since the exchanges should be permanent
111
(amqp:declare *query-exchange* :type "topic")
112
;; make and bind the queue to receive queries
113
(amqp:declare *query-queue* :auto-delete t)
114
(amqp:bind *query-queue* :exchange *query-exchange* :queue *query-queue*
115
:routing-key query-routing-key)
116
;; instantiate the query channel and specify a single-message window
117
;; not implemented in rmq (amqp:qos (amqp:basic query-channel) :prefetch-count 1 :global t)
118
(amqp:qos (amqp:basic query-channel) :prefetch-count 1)
119
;; enable acknowledgements at the protocol level for query requests, but
120
(setf query-consumer-tag (amqp:consume *query-queue* :no-ack nil))
121
;; disable auto-ack to pemit ack post-processing
122
(setf (amqp.u:channel-acknowledge-messages query-channel) nil)
124
(labels ((handle-error (condition)
125
(log-stacktrace "Error processing query request (~s/~s): ~a -> ~a."
126
query-exchange query-queue *query* condition)
127
;; if in the context of a query and the channel is still usable
128
;; send an error response
130
(log-error "run-query-request-thread: Error in query: ~s" (query-sse-expression *query*))
131
(when (open-stream-p query-channel)
132
(send-ack query-channel))
133
(generate-error-note condition :task *query*)
134
(terminate-task *query* condition))
135
;; clear out the connection
136
(close connection :abort t))
138
;; acknowledge a message so long as the basic state indicates it is pending
139
;; this servces in cases where non-local transfer has skipped the ack in amqp:deliver, but
140
;; the message should still be "consumed".
141
(let* ((basic (amqp:basic channel))
142
(tag (amqp:basic-delivery-tag basic)))
143
(when (typep tag '(integer 1 *))
144
(setf (amqp:basic-delivery-tag basic) 0)
145
(amqp::send-ack basic :delivery-tag tag :multiple 1))))
146
(handle-query-request (channel class method &rest args)
147
(declare (dynamic-extent args)
148
(ignore channel method))
149
(let ((request-content-type nil)
150
(*break-on-signals* break-on-signals))
151
(flet ((receive-message-content (stream content-type)
152
(setf request-content-type content-type)
153
(receive-query-request stream content-type)))
154
(declare (dynamic-extent #'receive-message-content))
155
(handler-bind ((spocq.e:message-syntax-error
157
(log-warn "responding to message syntax error: ~a" condition)
158
;; take the routing information from the message
159
(send-ack query-channel)
160
(generate-error-note condition :task (error-task condition))
161
(generate-accounting-note :terminate :task (error-task condition))
162
(return-from handle-query-request t)))
163
(spocq.e:runtime-error
165
;; if it is any other error, ack to the broker
166
(send-ack query-channel)
168
;; is a query was created, send an error message to the client and terminate it
169
(generate-error-note condition)
170
(terminate-task *query* condition))
172
(log-stacktrace "run-query-request-thread: Error in request w/o context~% ~a." condition)))
173
(return-from handle-query-request t))))
175
(multiple-value-bind (operation-or-prototype arguments)
176
(apply #'amqp:respond-to-deliver class
177
:body #'receive-message-content
179
(typecase operation-or-prototype
181
(setf (task-start-time operation-or-prototype) (get-universal-time)
182
(task-start-run-time operation-or-prototype) (get-internal-run-time)
183
(task-start-real-time operation-or-prototype) (get-internal-real-time))
184
(log-notice "query: named prototype: ~s: ~s: ~s"
185
(list (repository-id (task-repository operation-or-prototype))
186
(query-signature operation-or-prototype)
187
(first (query-dynamic-bindings operation-or-prototype)))
188
operation-or-prototype
189
(substitute #\space #.(code-char #o012) (query-sparql-expression operation-or-prototype)))
190
(let ((query (apply #'clone-instance operation-or-prototype arguments)))
191
(generate-accounting-note :parse :task query)
192
(initiate-service-task (task-operation query) query)))
194
(log-debug "query: new instance: ~s" arguments)
195
(let ((query (apply #'make-query arguments)))
196
(generate-accounting-note :parse :task query)
197
(initiate-service-task operation-or-prototype query))))))
198
;; acknowledge once the query task has been initiated.
199
(send-ack query-channel)
202
(run-query-request-thread-idle c)
204
((:terminate :restart)
205
;; shut down by first unsubscribing
206
;;!!! possible race condition: should amqp already have a request in-flight, it could
207
;;!!! time-out to the client before amqp gets to re-queue it.
208
(cond ((stringp query-consumer-tag)
209
(log-warn "terminate: canceling query consumer.")
210
(amqp:cancel (amqp:channel.basic query-channel) :consumer-tag query-consumer-tag)
211
(setf query-consumer-tag nil))
212
(t ;; and then, if still idle return to stop
213
(log-warn "terminate: messaging processor returning.")
214
(return-from run-query-request-thread *run-state*)))))
217
(push #'idle-handler (amqp.u::connection-idle-handlers connection))
218
;; subscribe to the query queue, then run the connection loop
219
(setf (de.setf.amqp.implementation::channel-command query-channel 'amqp:deliver)
220
#'handle-query-request)
221
;; bind a dynamic handler in order to use any active query
222
(block :process-connection
223
(setq *run-state* :process)
224
(flet ((guarded-handler (c)
225
(handler-case (handle-error c)
226
(serious-condition (c2)
227
(log-error "run-query-request-thread: Reentrant error processing query:~% ~a~% ~a."
229
(return-from :process-connection)))
230
;; set up handlers to catch and log stacktraces for most serious conditions,
231
;; but let interrupts through, and cause storage conditions to first unwind
234
(handler-bind (#+sbcl
235
(sb-sys:interactive-interrupt () ) ; do nothing
236
(storage-condition () ) ; let it go to the handler-case
237
(serious-condition #'guarded-handler))
238
;; don't have amqp handle the errors; shut down the connection and restart
239
(amqp.u:process-connection-loop connection))
240
(storage-condition (c) (guarded-handler c)))))))))))
243
(defun run-response-thread (&key (broker-uri (or *broker-uri* (error "broker-uri is required.")))
244
((:service-channel *service-channel*) *service-channel*)
245
(response-content-type *response-content-type*))
246
(flet ((connect-and-service-tasks ()
247
(log-notice "response task reconnecting to exchanges...")
248
(setq *run-state* :connect)
249
(amqp:with-open-connection (connection :uri broker-uri)
250
(amqp:with-open-channel (response-channel connection
251
:element-type '(unsigned-byte 8)
252
:content-type response-content-type)
253
(setq *run-state* :process)
254
(log-notice "response task query channel : ~s" response-channel)
255
;; no exchange / queue declarations
256
;; instantiate the channels' basic and specify non-persistent delivery mode
257
(amqp:basic response-channel :delivery-mode 1)
259
;; set up handlers to catch and log stacktraces for most serious conditions,
260
;; but let interrupts through, and cause storage conditions to first unwind
262
(labels ((handle-error (condition)
263
(log-stacktrace "Error scheduling query: ~a -> ~a."
265
;; if in the context of a query and the channel is still usable
266
;; send an error response
268
(log-error "Error scheduling query: expression: ~s" (query-sse-expression *query*))
269
(generate-error-note condition :task *query*)
270
(terminate-task *query* condition))
271
;; clear out the connection
272
(close connection :abort t))
274
(handler-case (handle-error c)
275
(serious-condition (c2)
276
(log-error "reentrant error processing query:~% ~a~% ~a."
278
(return-from connect-and-service-tasks))
279
(terminate-connection ()
280
;; when the mangement thread observes that its broker connection has failed,
281
;; the response tasks must proactively terminate theirs.
282
;; otherwise their first attmpt to send a response fails.
283
(close connection :abort t)
284
(return-from connect-and-service-tasks)))
287
(handler-bind (#+sbcl (sb-sys:interactive-interrupt (lambda (c) (invoke-debugger c)))
288
(storage-condition (lambda (c) (signal c)) ) ; unwind it to the handler-case
289
(serious-condition #'guarded-handler))
290
(loop for task = (channel-get *service-channel*)
292
do (restart-bind ((terminate-task (lambda (task-to-terminate &optional condition)
293
(when (eq task task-to-terminate)
294
(log-warn "task terminated in response thread: ~a (~a)"
295
task (type-of condition))
296
(generate-accounting-note :terminate)
297
(close connection :abort t)
298
(return-from connect-and-service-tasks)))))
299
(respond-to-task (task-effective-request-operation task (task-operation task))
300
task response-channel)
302
(storage-condition (c) (guarded-handler c)))
303
(terminate-connection () (terminate-connection))))))))
305
(rdfcache:init-thread)
307
(loop (connect-and-service-tasks))
308
(log-info "response thread returning: ~s" broker-uri)
309
(rdfcache:exit-thread))))
312
(defgeneric respond-to-task (operation task response-channel)
313
(:method ((operation t) (query query) (response-stream stream))
314
(add-task-thread query (bt:current-thread))
315
(with-task-environment (:task query)
316
(initiate-task query (task-response-content-type query) :stream response-stream)
317
(log-debug "respond-to-task: provenance: ~s" query)
318
(process-provenance-information query)
319
(let ((now (task-time query))
320
(start (task-start-time query)))
321
(log-notice "task ~s(~s). complete [~/format-iso-time/ - ~/format-iso-time/) = ~d/~d seconds."
322
(task-id query) (repository-id (task-repository query))
324
(float (/ (- (get-internal-run-time) (task-start-run-time query))
325
internal-time-units-per-second))
326
(float (/ (- (get-internal-real-time) (task-start-real-time query))
327
internal-time-units-per-second))))
328
(generate-accounting-note :complete)
329
(unschedule-task query))
333
(:method ((operation (eql :algebra)) (query query) (response-stream stream))
334
(with-task-environment (:task query)
335
(send-algebra-response query
337
(query-sse-expression query))
338
(generate-accounting-note :complete))
339
(finalize-task query))
341
(:method ((operation (eql :plan)) (query query) (response-stream stream))
342
(with-task-environment (:task query)
343
(send-algebra-response query
345
(expand-query (query-sse-expression query) :repository-id (task-repository query)))
346
(generate-accounting-note :complete))
347
(finalize-task query))
349
(:method ((operation (eql :trace)) (query query) (response-stream stream))
350
(with-task-environment (:task query)
351
(send-trace-response query
353
(run-deconstructed-query query (task-repository query)))
354
(generate-accounting-note :complete))
355
(finalize-task query)))
357
(defmethod respond-to-task :after ((operation t) (task t) (channel t))
361
(:documentation "messaging control flow for a storage service"
362
"The RDF graph pattern store accepts AGP messages which specify, performs primitive queries for matching triples,
363
and returns the solution field as a single message, routed back to the originating requestor.
365
The requests are sse terms of the form
367
(spocq.a:|agp| (triple ?s ?p ?o) ...)
369
for which the solutions are those which unify for the variables and blank nodes, with the
370
[distinction](http://seaborne.blogspot.com/2006/11/sparql-basic-graph-pattern-matching.html).
371
that variables are universal and blank nodes are existential.
372
The result is returned as a solution field, a sequence of variable bindings.
374
((?VAR1 ?VAR2 ... ?VARn) (value1 value2 ... valuen) ... (value1 value2 ... valuen))
378
(defun run-store-processor (&key (broker-uri (or *broker-uri* (error "broker-uri is required.")))
379
((:store-uri *store-uri*) *store-uri*)
380
(store-queue (store-store-queue-name))
381
(store-exchange (store-exchange))
382
(store-routing-key *store-store-routing-key*)
383
(content-type *store-content-type*))
385
(log-notice "store processor reconnecting to store and exchange...")
386
(setq *run-state* :connect)
387
(amqp:with-open-connection (connection :uri broker-uri)
388
(amqp:with-open-channel (store-channel connection
389
:element-type '(unsigned-byte 8)
390
:content-type content-type)
391
(let ((*store-queue* (amqp:queue store-channel :queue store-queue))
392
(*store-exchange* (amqp:exchange store-channel :exchange store-exchange)))
394
(setf (amqp.u:channel-content-type store-channel) content-type)
395
;; removed :auto-delete t on rabbitmq transition to 0.9.1, but it shouldn't be present
396
;; anyway, since the exchanges should be permanent
397
(amqp:declare *store-exchange* :type "topic")
398
(amqp:declare *store-queue* :auto-delete t)
399
(amqp:bind *store-queue* :exchange *store-exchange* :queue *store-queue*
400
:routing-key store-routing-key)
402
(labels ((handle-error (condition)
403
(log-stacktrace "Error processing store requests (~s/~s): ~a."
404
store-exchange store-queue condition)
405
(return-from run-store-processor condition))
406
(handle-store-message (channel class method &rest args)
407
(declare (dynamic-extent args)
408
(ignore channel method))
411
(flet ((receive-message-content (stream content-type)
412
(receive-store-request stream content-type)))
413
(declare (dynamic-extent #'receive-message-content))
414
(handler-bind ((serious-condition #'handle-error))
415
(setf request (apply #'amqp:respond-to-deliver class
416
:body #'receive-message-content
418
(process-store-request request store-channel)))))))
420
;; subscribe to the query and store queues, then run the connection loop
421
(setf (de.setf.amqp.implementation::channel-command store-channel 'amqp:deliver)
422
#'handle-store-message)
423
(amqp:consume *store-queue*)
424
(amqp.u:process-connection-loop connection))))))
427
(defgeneric process-store-request (request stream)
428
(:documentation "Perform operations on/with a store."))
433
(defgeneric respond-to-task (operation task response-channel)
434
(:documentation "Given an operation indicator for an initiated task, feed its results to a response
435
function through a message queue. when the response is complete, either finalize the query to emit
436
any error and/or accounting messages.
437
return the query instance.")
439
(:method ((operation (eql :query)) (query query) (response-channel amqp:channel))
440
(add-task-thread query (bt:current-thread))
441
(with-task-environment (:task query)
442
(initialize-task query)
443
(let* ((result-or-generator (task-result-generator query)))
444
(etypecase result-or-generator
445
;; a concrete result is emitted immdiately
447
(send-query-response query response-channel result-or-generator))
448
;; while a generated result is emitted as read from the channel
449
(abstract-field-generator
450
(let ((field-expression (abstract-field-generator-expression result-or-generator))
451
(*thread-operations* (cons (list* 'respond-to-task (task-id query)) *thread-operations*)))
452
(query-run-in-thread query field-expression)
453
(send-response-message (task-operation query) query response-channel (task-response-content-type query)))))
454
(generate-accounting-note :complete))
455
(log-debug "respond-to-task: provenance: ~s" query)
456
(process-provenance-information query))
457
(finalize-task query))
459
(:method ((operation symbol) (query t) (response-stream t))
460
;; by default, delegate to the :query response method
461
(respond-to-task :query query response-stream))
463
(:method ((operation (eql :query)) (query query) (response-stream stream))
464
(add-task-thread query (bt:current-thread))
465
(with-task-environment (:task query)
466
;; (re)build the data from graph anew each time in order to creae fresh queues
467
(initialize-task query)
468
(let* ((field-generator (task-result-generator query))
469
(field-expression (abstract-field-generator-expression field-generator)))
470
(let ((*thread-operations* (cons (list* 'respond-to-task (task-id query) field-generator)
471
*thread-operations*)))
472
(query-run-in-thread query field-expression)
473
(send-response-message (task-operation query) query response-stream (task-response-content-type query)))
474
(generate-accounting-note :complete))
475
(log-debug "respond-to-task: provenance: ~s" query)
476
(process-provenance-information query))
478
(let ((now (task-time query))
479
(start (task-start-time query)))
480
(log-notice "task ~s. complete [~/format-iso-time/ - ~/format-iso-time/) = ~d/~d seconds."
481
(task-id query) start now
482
(float (/ (- (get-internal-run-time) (task-start-run-time query))
483
internal-time-units-per-second))
484
(float (/ (- (get-internal-real-time) (task-start-real-time query))
485
internal-time-units-per-second)))
486
(unschedule-task query)
487
(finalize-task query)))
489
(:method ((operation (eql :algebra)) (query query) (response-stream stream))
490
(with-task-environment (:task query)
491
(send-algebra-response query response-stream (query-sse-expression query))
492
(generate-accounting-note :complete))
493
(finalize-task query))
495
(:method ((operation (eql :plan)) (query query) (response-stream stream))
496
(with-task-environment (:task query)
497
(send-algebra-response query response-stream (expand-query (query-sse-expression query)
498
:repository-id (task-repository query)))
499
(generate-accounting-note :complete))
500
(finalize-task query))
502
(:method ((operation (eql :trace)) (query query) (response-stream stream))
503
(with-task-environment (:task query)
504
(send-trace-response query response-stream
505
(run-deconstructed-query query (task-repository query)))
506
(generate-accounting-note :complete))
507
(finalize-task query)))