Coverage report: /development/source/library/org/datagraph/spocq-shard/src/core/message-processing.lisp

KindCoveredAll%
expression48499 9.6
branch010 0.0
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
2
 
3
 (in-package :org.datagraph.spocq.implementation)
4
 
5
 (:documentation "messaging control flow for queries"
6
   "The 'sparql algebra engine' comprises a set of message handling threads and or processes.
7
  It manages its resource usage in terms of number of threads and the role(s) for which they are
8
  configured. Each thread is bound at creation with three channels multiplexed over two connections
9
  - one for query requests and responses
10
  - one for store bgp requests and solutions
11
  - one for accounting (on its own connection)
12
 
13
  The query channel is used to consume query messages clients from brokered, shared queue bound to an
14
  'universal' routing key. The responses pass back though the same exchange, but are routed to the
15
  originating client.
16
  The store channel is used to dispatch bgp matching requests to a shared queue and subscribes to a provate
17
  queue via a process-specific routing key to a receive the responses.
18
 
19
  Queries are processed in successive, independent phases, driven by messages. The process starts when a
20
  client publishes a query to its request queue. That message is accepted by one of that queue's consumers,
21
  and compiled it into a three functions:
22
  - a bgp request generator
23
  - a bgp completion predicate
24
  - a reduction function.
25
  The request generator executes immediately. As each response appears, the processor integrates it into the
26
  respective query task. Once all bgp responses have been integrated, the reduction function executes and
27
  generates the response solution to the client.
28
 
29
  Each query request contains the following in additon to the query expression;
30
 
31
  * repository_id : designates an owner/repostiory-name combination to identify the repository to the store
32
  * query_id : identifies the query for accounting purposes and in the response to the client
33
  * routing_id : specifies the return path for the response message
34
  * priority 
35
  * quota 
36
 
37
  A query request body is the query expression encoded as SSF/BERT. An ssf augmnts the query sse expression
38
  with information from the SPARQL form 
39
 
40
  * SELECT : a solution field
41
  * CONSTRUCT : a triple (or optionally quad?) sequence (graph)
42
  * DESCRIBE : a triple sequence (graph)
43
  * ASK : a boolean
44
 
45
  A store request body is a single BGP or GRAPH form. It can be encoded as SSE/BERT or SSE/UTF8 depending
46
  on the channel encoding. The rquest header specifies the repository and query identifiers from the query
47
  request, and adds a process-specific routing key for the return path to the sae
48
  process and a bdp identifier to specify the binding.
49
 
50
  A store response's body is a solution field - a set of variable binding solutions in the channel's encoding.
51
  It can be a singleton field - a 'unit table', to indicate a match for a constant pattern. In
52
  addition it includes the query and bgp identifiers. The provided sae process routing key serves to route
53
  it back to the originating process. 
54
 
55
  A query response contains a result respective the query form encoded as per the channel's encoding.
56
  It includes the query identifier for use by the client. The request routing key serves the route it back
57
  to the client.
58
 
59
  In addition, should a store request or a query request fail, the response is an error message.
60
 
61
  If the query request entailed BGP queries, the execution publishes any BGP queries and yields a query state
62
  which indicates that it is still active. In that case, this query task is bound globally to its BGP
63
  identifiers to be avaialble for store responses. If, on the other hand, the query included no BGP,
64
  the execution yields an immediate result - a solution field, a triple set, or a scalar (boolean or numeric),
65
  which is published back to the client immediately, and the query is complete.
66
 
67
  Any 'incomplete' query functions remain bound to their pending BGP queries. As each response appears, its
68
  query task is retrieved according to BGP identifier. It integrates the result, and checks if all responses
69
  are complete. If so, it reduces the query and emits the result to the client. If the query is configured
70
  for repeated execution, it is reset and the processes begins anew. Each successive
71
  application yields a result analogous to the initial one.")
72
 
73
 ;;; channel/exchange/queue management
74
 
75
 
76
 
77
 (defun run-query-request-thread-idle (channel)
78
   channel)
79
 
80
 (defun run-query-request-thread (&key (broker-uri (or *broker-uri* (error "broker-uri is required.")))
81
                                       (query-queue (engine-query-queue-name))
82
                                       (query-exchange (query-exchange))
83
                                       (query-routing-key *engine-query-routing-key*)
84
                                       (break-on-signals nil)
85
                                       (request-content-type *request-content-type*))
86
   "Accept, parse, and queue query requests.
87
  - connect to a request message broker, define and bind the requests queue, subscribe to those queues
88
  - for each delivery, parse the request, create a query task, and queue that task for further processing
89
  This neither interprets the query nor communicates a response. All requests are acknowledged to to the broker
90
  immediately and secondary status messages are send to accounting. The only substantive response is in the event the
91
  the request fails to parse." 
92
   
93
   (loop
94
     (log-notice "request task reconnecting to exchanges...")
95
     (setq *run-state* :connect)
96
     (amqp:with-open-connection (connection :uri broker-uri)
97
       (amqp:with-open-channel (query-channel connection
98
                                              :element-type '(unsigned-byte 8)
99
                                              :content-type request-content-type)
100
         (log-notice "request task query channel : ~s" query-channel)
101
         (let ((*query-queue* (amqp:queue query-channel :queue query-queue))
102
               (*query-exchange* (amqp:exchange query-channel :exchange query-exchange))
103
               (query-consumer-tag nil))
104
           (declare (ignorable query-consumer-tag))
105
           ;; the exchanges should already exist, but the process' queue needs to be bound
106
           (setf (amqp.u:channel-content-type query-channel) request-content-type)
107
           ;; instantiate the channels' basic and specify non-persistent delivery mode
108
           (amqp:basic query-channel :delivery-mode 1)
109
           ;; removed :auto-delete t on rabbitmq transition to 0.9.1, but it shouldn't be present
110
           ;; anyway, since the exchanges should be permanent
111
           (amqp:declare *query-exchange*  :type "topic")
112
           ;; make and bind the queue to receive queries
113
           (amqp:declare *query-queue* :auto-delete t)
114
           (amqp:bind *query-queue* :exchange *query-exchange* :queue *query-queue*
115
                      :routing-key query-routing-key)
116
           ;; instantiate the query channel and specify a single-message window
117
           ;; not implemented in rmq (amqp:qos (amqp:basic query-channel) :prefetch-count 1 :global t)
118
           (amqp:qos (amqp:basic query-channel) :prefetch-count 1)
119
           ;; enable acknowledgements at the protocol level for query requests, but
120
           (setf query-consumer-tag (amqp:consume *query-queue* :no-ack nil))
121
           ;; disable auto-ack to pemit ack post-processing
122
           (setf (amqp.u:channel-acknowledge-messages query-channel) nil)
123
           
124
           (labels ((handle-error (condition)
125
                      (log-stacktrace "Error processing query request (~s/~s): ~a -> ~a."
126
                                      query-exchange query-queue *query* condition)
127
                      ;; if in the context of a query and the channel is still usable
128
                      ;; send an error response
129
                      (when *query*
130
                        (log-error "run-query-request-thread: Error in query: ~s" (query-sse-expression *query*))
131
                        (when (open-stream-p query-channel)
132
                          (send-ack query-channel))
133
                        (generate-error-note condition :task *query*)
134
                        (terminate-task *query* condition))
135
                      ;; clear out the connection
136
                      (close connection :abort t))
137
                    (send-ack (channel)
138
                      ;; acknowledge a message so long as the basic state indicates it is pending
139
                      ;; this servces in cases where non-local transfer has skipped the ack in amqp:deliver, but
140
                      ;; the message should still be "consumed".
141
                      (let* ((basic (amqp:basic channel))
142
                             (tag (amqp:basic-delivery-tag basic)))
143
                        (when (typep tag '(integer 1 *))
144
                          (setf (amqp:basic-delivery-tag basic) 0)
145
                          (amqp::send-ack basic :delivery-tag tag :multiple 1))))
146
                    (handle-query-request (channel class method &rest args)
147
                      (declare (dynamic-extent args)
148
                               (ignore channel method))
149
                      (let ((request-content-type nil)
150
                            (*break-on-signals* break-on-signals))
151
                        (flet ((receive-message-content (stream content-type)
152
                                 (setf request-content-type content-type)
153
                                 (receive-query-request stream content-type)))
154
                          (declare (dynamic-extent #'receive-message-content))
155
                          (handler-bind ((spocq.e:message-syntax-error
156
                                          (lambda (condition)
157
                                            (log-warn "responding to message syntax error: ~a" condition)
158
                                            ;; take the routing information from the message
159
                                            (send-ack query-channel)
160
                                            (generate-error-note condition :task (error-task condition))
161
                                            (generate-accounting-note :terminate :task (error-task condition))
162
                                            (return-from handle-query-request t)))
163
                                         (spocq.e:runtime-error
164
                                          (lambda (condition)
165
                                            ;; if it is any other error, ack to the broker
166
                                            (send-ack query-channel)
167
                                            (cond (*query*
168
                                                   ;; is a query was created, send an error message to the client and terminate it
169
                                                   (generate-error-note condition)
170
                                                   (terminate-task *query* condition))
171
                                                  (t
172
                                                   (log-stacktrace "run-query-request-thread: Error in request w/o context~% ~a." condition)))
173
                                            (return-from handle-query-request t))))
174
                            (with-accounting
175
                              (multiple-value-bind (operation-or-prototype arguments)
176
                                                   (apply #'amqp:respond-to-deliver class
177
                                                          :body #'receive-message-content 
178
                                                          args)
179
                                (typecase operation-or-prototype
180
                                  (query
181
                                   (setf (task-start-time operation-or-prototype) (get-universal-time)
182
                                         (task-start-run-time operation-or-prototype) (get-internal-run-time)
183
                                         (task-start-real-time operation-or-prototype) (get-internal-real-time))
184
                                   (log-notice "query: named prototype: ~s: ~s: ~s"
185
                                               (list (repository-id (task-repository operation-or-prototype))
186
                                                     (query-signature operation-or-prototype)
187
                                                     (first (query-dynamic-bindings operation-or-prototype)))
188
                                               operation-or-prototype
189
                                               (substitute #\space #.(code-char #o012) (query-sparql-expression operation-or-prototype)))
190
                                   (let ((query (apply #'clone-instance operation-or-prototype arguments)))
191
                                     (generate-accounting-note :parse :task query)
192
                                     (initiate-service-task (task-operation query) query)))
193
                                  (t
194
                                   (log-debug "query: new instance: ~s" arguments)
195
                                   (let ((query (apply #'make-query arguments)))
196
                                     (generate-accounting-note :parse :task query) 
197
                                     (initiate-service-task operation-or-prototype query))))))
198
                            ;; acknowledge once the query task has been initiated.
199
                            (send-ack query-channel)
200
                            t))))
201
                    (idle-handler (c)
202
                      (run-query-request-thread-idle c)
203
                      (case *run-state*
204
                        ((:terminate :restart)
205
                         ;; shut down by first unsubscribing
206
                         ;;!!! possible race condition: should amqp already have a request in-flight, it could
207
                         ;;!!! time-out to the client before amqp gets to re-queue it.
208
                         (cond ((stringp query-consumer-tag)
209
                                (log-warn "terminate: canceling query consumer.")
210
                                (amqp:cancel (amqp:channel.basic query-channel) :consumer-tag query-consumer-tag)
211
                                (setf query-consumer-tag nil))
212
                               (t        ;; and then, if still idle return to stop
213
                                (log-warn "terminate: messaging processor returning.")
214
                                (return-from run-query-request-thread *run-state*)))))
215
                      ;; return non-null
216
                      *run-state*))
217
             (push #'idle-handler (amqp.u::connection-idle-handlers connection))
218
             ;; subscribe to the query queue, then run the connection loop
219
             (setf (de.setf.amqp.implementation::channel-command query-channel 'amqp:deliver)
220
                   #'handle-query-request)
221
             ;; bind a dynamic handler in order to use any active query
222
             (block :process-connection
223
               (setq *run-state* :process)
224
               (flet ((guarded-handler (c)
225
                        (handler-case (handle-error c)
226
                          (serious-condition (c2)
227
                                             (log-error "run-query-request-thread: Reentrant error processing query:~% ~a~% ~a."
228
                                                        c c2)))
229
                        (return-from :process-connection)))
230
                 ;; set up handlers to catch and log stacktraces for most serious conditions,
231
                 ;; but let interrupts through, and cause storage conditions to first unwind 
232
                 ;; before logging
233
                 (handler-case
234
                   (handler-bind (#+sbcl
235
                                  (sb-sys:interactive-interrupt () )         ; do nothing
236
                                  (storage-condition () )    ; let it go to the handler-case
237
                                  (serious-condition #'guarded-handler))
238
                     ;; don't have amqp handle the errors; shut down the connection and restart
239
                     (amqp.u:process-connection-loop connection))
240
                   (storage-condition (c) (guarded-handler c)))))))))))
241
 
242
 
243
 (defun run-response-thread (&key (broker-uri (or *broker-uri* (error "broker-uri is required.")))
244
                                        ((:service-channel *service-channel*) *service-channel*)
245
                                        (response-content-type *response-content-type*))
246
   (flet ((connect-and-service-tasks ()
247
            (log-notice "response task reconnecting to exchanges...")
248
            (setq *run-state* :connect)
249
            (amqp:with-open-connection (connection :uri broker-uri)
250
              (amqp:with-open-channel (response-channel connection
251
                                                        :element-type '(unsigned-byte 8)
252
                                                        :content-type response-content-type)
253
                (setq *run-state* :process)
254
                (log-notice "response task query channel : ~s" response-channel)
255
                ;; no exchange / queue declarations
256
                ;; instantiate the channels' basic and specify non-persistent delivery mode
257
                (amqp:basic response-channel :delivery-mode 1)
258
                
259
                ;; set up handlers to catch and log stacktraces for most serious conditions,
260
                ;; but let interrupts through, and cause storage conditions to first unwind 
261
                ;; before logging
262
                (labels ((handle-error (condition)
263
                           (log-stacktrace "Error scheduling query: ~a -> ~a."
264
                                           *query* condition)
265
                           ;; if in the context of a query and the channel is still usable
266
                           ;; send an error response
267
                           (when *query*
268
                             (log-error "Error scheduling query: expression: ~s" (query-sse-expression *query*))
269
                             (generate-error-note condition :task *query*)
270
                             (terminate-task *query* condition))
271
                           ;; clear out the connection
272
                           (close connection :abort t))
273
                         (guarded-handler (c)
274
                           (handler-case (handle-error c)
275
                             (serious-condition (c2)
276
                                                (log-error "reentrant error processing query:~% ~a~% ~a."
277
                                                           c c2)))
278
                           (return-from connect-and-service-tasks))
279
                         (terminate-connection ()
280
                           ;; when the mangement thread observes that its broker connection has failed,
281
                           ;; the response tasks must proactively terminate theirs.
282
                           ;; otherwise their first attmpt to send a response fails.
283
                           (close connection :abort t)
284
                           (return-from connect-and-service-tasks)))
285
                  (restart-case
286
                    (handler-case
287
                      (handler-bind (#+sbcl (sb-sys:interactive-interrupt (lambda (c) (invoke-debugger c)))  
288
                                     (storage-condition (lambda (c) (signal c)) )    ; unwind it to the handler-case
289
                                     (serious-condition #'guarded-handler))
290
                        (loop for task = (channel-get *service-channel*)
291
                              until (null task)
292
                              do (restart-bind ((terminate-task (lambda (task-to-terminate &optional condition)
293
                                                                  (when (eq task task-to-terminate)
294
                                                                    (log-warn "task terminated in response thread: ~a (~a)"
295
                                                                              task (type-of condition))
296
                                                                    (generate-accounting-note :terminate)
297
                                                                    (close connection :abort t)
298
                                                                    (return-from connect-and-service-tasks)))))
299
                                   (respond-to-task (task-effective-request-operation task (task-operation task))
300
                                                     task response-channel)
301
                                   (setf task nil))))
302
                      (storage-condition (c) (guarded-handler c)))
303
                    (terminate-connection () (terminate-connection))))))))
304
 
305
     (rdfcache:init-thread)
306
     (unwind-protect
307
       (loop (connect-and-service-tasks))
308
       (log-info "response thread returning: ~s" broker-uri)
309
       (rdfcache:exit-thread))))
310
 
311
 
312
 (defgeneric respond-to-task (operation task response-channel)
313
   (:method ((operation t) (query query) (response-stream stream))
314
     (add-task-thread query (bt:current-thread))
315
     (with-task-environment (:task query)
316
       (initiate-task query (task-response-content-type query) :stream response-stream)
317
       (log-debug "respond-to-task: provenance: ~s" query)
318
       (process-provenance-information query)
319
       (let ((now (task-time query))
320
             (start (task-start-time query)))
321
         (log-notice "task ~s(~s). complete [~/format-iso-time/ - ~/format-iso-time/) = ~d/~d seconds."
322
                     (task-id query) (repository-id (task-repository query))
323
                     start now
324
                     (float (/ (- (get-internal-run-time) (task-start-run-time query))
325
                               internal-time-units-per-second))
326
                     (float (/ (- (get-internal-real-time) (task-start-real-time query))
327
                               internal-time-units-per-second))))
328
       (generate-accounting-note :complete)
329
       (unschedule-task query))
330
     query)
331
 
332
   ;;; special cases
333
   (:method ((operation (eql :algebra)) (query query) (response-stream stream))
334
     (with-task-environment (:task query)
335
       (send-algebra-response query
336
                              response-stream
337
                              (query-sse-expression query))
338
       (generate-accounting-note :complete))
339
     (finalize-task query))
340
 
341
   (:method ((operation (eql :plan)) (query query) (response-stream stream))
342
     (with-task-environment (:task query)
343
       (send-algebra-response query
344
                              response-stream
345
                              (expand-query (query-sse-expression query) :repository-id (task-repository query)))
346
       (generate-accounting-note :complete))
347
     (finalize-task query))
348
 
349
   (:method ((operation (eql :trace)) (query query) (response-stream stream))
350
     (with-task-environment (:task query)
351
       (send-trace-response query
352
                            response-stream
353
                            (run-deconstructed-query query (task-repository query)))
354
       (generate-accounting-note :complete))
355
     (finalize-task query)))
356
 
357
 (defmethod respond-to-task :after ((operation t) (task t) (channel t))
358
   (incf *task-count*))
359
 
360
 
361
 (:documentation "messaging control flow for a storage service"
362
   "The RDF graph pattern store accepts AGP messages which specify, performs primitive queries for matching triples,
363
  and returns the solution field as a single message, routed back to the originating requestor.
364
 
365
  The requests are sse terms of the form
366
 
367
     (spocq.a:|agp| (triple ?s ?p ?o) ...)
368
 
369
  for which the solutions are those which unify for the variables and blank nodes, with the
370
  [distinction](http://seaborne.blogspot.com/2006/11/sparql-basic-graph-pattern-matching.html).
371
  that variables are universal and blank nodes are existential.
372
  The result is returned as a solution field, a sequence of variable bindings.
373
 
374
     ((?VAR1 ?VAR2 ... ?VARn) (value1 value2 ... valuen) ... (value1 value2 ... valuen))
375
 ")
376
 
377
 
378
 (defun run-store-processor (&key (broker-uri (or *broker-uri* (error "broker-uri is required.")))
379
                                  ((:store-uri *store-uri*)  *store-uri*)
380
                                  (store-queue (store-store-queue-name))
381
                                  (store-exchange (store-exchange))
382
                                  (store-routing-key *store-store-routing-key*)
383
                                  (content-type *store-content-type*))
384
   
385
   (log-notice "store processor reconnecting to store and exchange...")
386
   (setq *run-state* :connect)
387
   (amqp:with-open-connection (connection :uri broker-uri)
388
     (amqp:with-open-channel (store-channel connection
389
                                            :element-type '(unsigned-byte 8)
390
                                            :content-type content-type)
391
       (let ((*store-queue* (amqp:queue store-channel :queue store-queue))
392
             (*store-exchange* (amqp:exchange store-channel :exchange store-exchange)))
393
         
394
         (setf (amqp.u:channel-content-type store-channel) content-type)
395
         ;; removed :auto-delete t on rabbitmq transition to 0.9.1, but it shouldn't be present
396
         ;; anyway, since the exchanges should be permanent
397
         (amqp:declare *store-exchange*  :type "topic")
398
         (amqp:declare *store-queue* :auto-delete t)
399
         (amqp:bind *store-queue* :exchange *store-exchange* :queue *store-queue*
400
                    :routing-key store-routing-key)
401
         
402
         (labels ((handle-error (condition)
403
                    (log-stacktrace "Error processing store requests (~s/~s): ~a."
404
                                    store-exchange store-queue condition)
405
                    (return-from run-store-processor condition))
406
                  (handle-store-message (channel class method &rest args)
407
                    (declare (dynamic-extent args)
408
                             (ignore channel method))
409
                    (with-accounting
410
                      (let ((request nil))
411
                        (flet ((receive-message-content (stream content-type)
412
                                 (receive-store-request stream content-type)))
413
                          (declare (dynamic-extent #'receive-message-content))
414
                          (handler-bind ((serious-condition #'handle-error))
415
                            (setf request (apply #'amqp:respond-to-deliver class
416
                                                 :body #'receive-message-content
417
                                                 args))
418
                            (process-store-request request store-channel)))))))
419
           
420
           ;; subscribe to the query and store queues, then run the connection loop
421
           (setf (de.setf.amqp.implementation::channel-command store-channel 'amqp:deliver)
422
                 #'handle-store-message)
423
           (amqp:consume *store-queue*)
424
           (amqp.u:process-connection-loop connection))))))
425
 
426
 
427
 (defgeneric process-store-request (request stream)
428
   (:documentation "Perform operations on/with a store."))
429
 
430
 
431
 ;;;
432
 #+(or)
433
 (defgeneric respond-to-task (operation task response-channel)
434
   (:documentation "Given an operation indicator for an initiated task, feed its results to a response
435
     function through a message queue. when the response is complete, either finalize the query to emit
436
     any error and/or accounting messages.
437
     return the query instance.")
438
   
439
   (:method ((operation (eql :query)) (query query) (response-channel amqp:channel))
440
     (add-task-thread query (bt:current-thread))
441
     (with-task-environment (:task query)
442
       (initialize-task query)
443
       (let* ((result-or-generator (task-result-generator query)))
444
         (etypecase result-or-generator
445
           ;; a concrete result is emitted immdiately
446
           (matrix-field
447
            (send-query-response query response-channel result-or-generator))
448
           ;; while a generated result is emitted as read from the channel
449
           (abstract-field-generator
450
            (let ((field-expression (abstract-field-generator-expression result-or-generator))
451
                  (*thread-operations* (cons (list* 'respond-to-task (task-id query)) *thread-operations*)))
452
              (query-run-in-thread query field-expression)
453
              (send-response-message (task-operation query) query response-channel (task-response-content-type query)))))
454
         (generate-accounting-note :complete))
455
       (log-debug "respond-to-task: provenance: ~s" query)
456
       (process-provenance-information query))
457
     (finalize-task query))
458
   
459
   (:method ((operation symbol) (query t) (response-stream t))
460
     ;; by default, delegate to the :query response method
461
     (respond-to-task :query query response-stream))
462
   
463
   (:method ((operation (eql :query)) (query query) (response-stream stream))
464
     (add-task-thread query (bt:current-thread))
465
     (with-task-environment (:task query)
466
       ;; (re)build the data from graph anew each time in order to creae fresh queues
467
       (initialize-task query)
468
       (let* ((field-generator (task-result-generator query))
469
              (field-expression (abstract-field-generator-expression field-generator)))
470
         (let ((*thread-operations* (cons (list* 'respond-to-task (task-id query) field-generator)
471
                                          *thread-operations*)))
472
           (query-run-in-thread query field-expression)
473
           (send-response-message (task-operation query) query response-stream (task-response-content-type query)))
474
         (generate-accounting-note :complete))
475
       (log-debug "respond-to-task: provenance: ~s" query)
476
       (process-provenance-information query))
477
     
478
     (let ((now (task-time query))
479
           (start (task-start-time query)))
480
       (log-notice "task ~s. complete [~/format-iso-time/ - ~/format-iso-time/) = ~d/~d seconds."
481
                   (task-id query) start now
482
                   (float (/ (- (get-internal-run-time) (task-start-run-time query))
483
                             internal-time-units-per-second))
484
                   (float (/ (- (get-internal-real-time) (task-start-real-time query))
485
                             internal-time-units-per-second)))
486
       (unschedule-task query)
487
       (finalize-task query)))
488
   
489
   (:method ((operation (eql :algebra)) (query query) (response-stream stream))
490
     (with-task-environment (:task query)
491
       (send-algebra-response query response-stream (query-sse-expression query))
492
       (generate-accounting-note :complete))
493
     (finalize-task query))
494
 
495
   (:method ((operation (eql :plan)) (query query) (response-stream stream))
496
     (with-task-environment (:task query)
497
       (send-algebra-response query response-stream (expand-query (query-sse-expression query)
498
                                                                  :repository-id (task-repository query)))
499
       (generate-accounting-note :complete))
500
     (finalize-task query))
501
 
502
   (:method ((operation (eql :trace)) (query query) (response-stream stream))
503
     (with-task-environment (:task query)
504
       (send-trace-response query response-stream
505
                            (run-deconstructed-query query (task-repository query)))
506
       (generate-accounting-note :complete))
507
     (finalize-task query)))