Coverage report: /development/source/library/org/datagraph/spocq-shard/src/core/task-processing.lisp
| Kind | Covered | All | % |
| expression | 15 | 471 | 3.2 |
| branch | 0 | 14 | 0.0 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
3
(in-package :org.datagraph.spocq.implementation)
5
;;; (load #p"patches/task-processing.lisp")
8
(defmethod (setf configuration-parameter) ((value list) (parameter (eql :script-stack)))
9
(setq ssl::*toplevel-stack* value))
11
(defmethod (setf configuration-parameter) ((value string) (parameter (eql :script-code)))
12
(setf (configuration-parameter :script-code) value))
14
(defmethod (setf configuration-parameter) ((value spocq:iri) (parameter (eql :script-code)))
15
(setf (configuration-parameter :script-code) value))
17
(defmethod (setf configuration-parameter) ((value cons) (parameter (eql :script-code)))
18
(setq ssl::*toplevel-code* value))
20
(defmethod (setf configuration-parameter) ((value list) (parameter (eql :script-environment)))
21
(setq ssl::*toplevel-environment* value))
24
(:documentation "top-level task processing"
25
"provide definitions for
30
to handle the situations:
32
- sparql query with stdin/stdout
35
- as parsed and expanded
36
- processed as sparql but as traced
37
- script with stdin/stdout
42
(defun decode-sparql-content (source media-type &rest args
43
&key (task-id *task-id*) (agent *agent*)
45
(repository-id *repository-id*)
46
(revision-id *revision-id*)
47
(response-content-type *response-content-type*)
48
(dynamic-bindings *dynamic-bindings*)
50
(declare (dynamic-extent args))
51
(multiple-value-bind (operation arguments)
52
(apply #'receive-message source media-type args)
58
:repository-id repository-id
59
:revision-id revision-id
60
:response-content-type response-content-type
61
:dynamic-bindings dynamic-bindings
62
:request-routing-key nil
64
(case *dataset-source* ; nxp-213
66
(t (list* :dataset-graphs *dataset-graphs* arguments))))))
69
(defgeneric decode-task (source source-media-type result-media-type &rest args)
70
(:argument-precedence-order source-media-type result-media-type source)
72
(:method ((source pathname) (media-type mime:application/sparql) result-media-type &rest args)
73
(with-open-file (stream source :direction :input)
74
(apply #'decode-task stream media-type result-media-type args)))
76
(:method ((source stream) (media-type mime:application/sparql) (result-media-type mime::sparql-results) &rest args
77
&key (response-content-type result-media-type)
79
"sparql->results is a process which intends perform some sort of query"
80
(apply #'decode-sparql-content source media-type :response-content-type response-content-type args))
82
(:method ((source string) (media-type mime:application/sparql) (result-media-type mime::sparql-results) &rest args
83
&key (response-content-type result-media-type)
85
"sparql->results is a process which intends perform some sort of query"
86
(apply #'decode-sparql-content source media-type :response-content-type response-content-type args))
88
(:method ((source stream) (media-type mime:application/sparql) (result-media-type mime::query) &rest args
89
&key (response-content-type result-media-type)
91
"sparql -> some query expression is a process which intends to operate on the query expression itself"
92
(apply #'decode-sparql-content source media-type :response-content-type response-content-type args))
94
(:method ((source string) (media-type mime:application/sparql) (result-media-type mime::query) &rest args
95
&key (response-content-type result-media-type)
97
"sparql -> some query expression is a process which intends to operate on the query expression itself"
98
(apply #'decode-sparql-content source media-type :response-content-type response-content-type args))
100
(:method ((source stream) (media-type mime:application/sparql) (result-media-type mime:rdf) &rest args)
101
"sparql->rdf is a process which intends perform some sort of query"
102
(apply #'decode-sparql-content source media-type args))
104
(:method ((source string) (media-type mime:application/sparql) (result-media-type mime:rdf) &rest args)
105
"sparql->rdf is a process which intends perform some sort of query"
106
(apply #'decode-sparql-content source media-type args))
108
(:method ((source t) (media-type mime:application/sparql-query+ssl) (result-media-type mime:application/sparql-results) &rest args)
109
(multiple-value-bind (operation arguments)
110
(apply #'receive-message source media-type args)
115
:repository-id *repository-id*
116
:revision-id *revision-id*
117
:request-content-type media-type
118
:response-content-type *response-content-type*
119
:dynamic-bindings *dynamic-bindings*
120
:request-routing-key nil
121
:request-exchange nil
125
(defmethod receive-message ((location t) (media-type mime:application/sparql-query+ssl) &rest args)
126
(declare (ignore args))
127
(multiple-value-bind (form request-content) (load-ssl-graph location)
128
(values (find-if #'(lambda (element) (and (symbolp element)
129
(eql (symbol-package element)
130
(find-package '|http://dydra.com/schema/ssl#|))))
132
`(:sse-expression ,form
133
:request-content ,request-content))))
135
(defmethod load-configuration ((source request-processor))
136
"load a processor's configuration by delegating to whatever it has as a source.
137
allow specializaion which, eg loads one only"
138
(load-configuration (processor-configuration-location source)))
141
;;; the central operator once the evaluation environment has been set up
143
(defgeneric pipe-task (source destination source-media-type destination-media-type &rest args)
144
(:documentation "pipe-task is the generic execution operator for queries, scripts, etc.
145
given a generic source, it determines what sort of task to create based on the source media type and then
146
delegates to the respective specialization.")
148
(:method ((source t) (destination t) (source-media-type mime:application/sparql) (destination-media-type t) &rest args)
149
"the base sparql-query method delegates to pipe-query"
150
(declare (dynamic-extent args))
151
(apply #'pipe-query source destination
152
:request-content-type source-media-type
153
:response-content-type destination-media-type
156
(:method ((source t) (destination t) (source-media-type mime:application/sparql-query+ssl) (destination-media-type t) &rest args)
157
(declare (dynamic-extent args))
158
(apply #'pipe-task (apply #'decode-task source source-media-type destination-media-type args) destination
159
source-media-type destination-media-type
162
(:method ((task script) (destination t) (source-media-type mime:application/sparql-query+ssl) (destination-media-type t) &rest args)
163
"given a script, set up the run-time environment and execute it"
164
(declare (ignore args))
165
(register-query task)
166
(add-task-thread task (bt:current-thread))
167
(restart-bind ((terminate-task (lambda (&optional (task-to-terminate task) condition)
168
(warn "pipe-task: terminate task? : ~a." task)
169
(when (eq task task-to-terminate)
170
(warn "pipe-task: terminate task! : ~a." task)
171
(return-from pipe-task condition)))))
173
(let* ((environment ssl::*toplevel-environment*)
174
(bindings (task-dynamic-bindings task)))
175
(loop for variable in (first bindings)
176
for term in (rest bindings)
177
do (push (cons variable term) environment))
178
(setf environment (acons '?::|requestContentType| (task-request-content-type task)
179
(acons '?::|responseContentType| (task-response-content-type task)
180
(acons '?::|requestRepository| (repository-id (task-repository task))
183
(sslr:execute (script-stack task) (script-code task) environment)
184
(generate-accounting-note :complete))))))
187
(defgeneric compile-task (task)
188
(:method ((task query))
189
(compile-query task))
190
(:method ((task script))
191
(compile-script task)))
193
(defun compile-script (script)
198
(defun main-query-loop (&optional (args (rest (command-line-argument-list))))
199
(main-task-loop args))
202
(defun main-task-loop (&optional (args (rest (command-line-argument-list))))
203
"Repeatedly run a task: decode it from theinput source, execute it and emit results to the output destination.
204
Any errors optionally cause termination. For each iteration, write a soft-eof to standard output and standard error."
206
(when (or (equal args '("-v")) (equal args '("--verbose")))
207
(format *standard-output* "~{~a ~a~%~}" (build-revisions))
208
(return-from main-task-loop nil))
209
(labels ((loop-sigterm-handler (signal code context)
210
(declare (ignore signal code context))
212
(log-info "Stop: SIGTERM ~a." (iso-time))
216
(loop for result = (process-request-toplevel (request-processor))
218
(condition (return result))
220
(with-command-line-configuration (args)
221
(setq *start-timestamp* (iso-time))
222
(initialize-spocq :title (getarg "--title"))
223
(enable-interrupt :sigterm #'loop-sigterm-handler)
224
(log-info "Start ~a." *start-timestamp*)
225
(prog1 (run-processing-threads (cons `(start-task-loop ,#'start-task-loop) *thread-specifications*))
227
(log-info "Stop: return ~a." (iso-time))))))
230
(defgeneric process-request-toplevel (processor)
231
(:documentation "Process a single request task:
232
- establish toplevel handlers
233
- assimilate any request settings and arguments into the runtime environment
234
this includes refreshing the configuration, which could signal an eof, which will be returns and likely terminate the processing
235
- parse the request body per the content type and instantiate a tesk/query/script instance
236
- delegate to it for evaluation
237
- record accounting ")
239
(:method ((processor request-processor))
240
(labels ((handle-error (condition)
241
(log-stacktrace "process-request-toplevel: Error processing task : ~a -> ~a."
243
(complete-output *standard-output*)
246
(maybe-exit-on-error 70)))
247
(generate-error-note condition :task (or (error-task condition) *task*))
248
(terminate-task *task*)
249
(return-from process-request-toplevel condition))
250
(guarded-handler (condition)
251
(handler-case (handle-error condition)
252
(serious-condition (c2)
253
(log-error "process-request-toplevel: Reentrant error processing task:~% ~a~% ~a." condition c2)))
254
(return-from process-request-toplevel condition))
255
(pipe-task-with-termination (query &rest args)
256
(declare (dynamic-extent args))
257
(restart-bind ((terminate-task (lambda (task-to-terminate &optional condition)
258
;; (print (list :pipe-query :terminate-task query condition))
259
(when (eq query task-to-terminate)
260
(return-from process-request-toplevel condition)))))
261
(apply #'pipe-task query args))))
263
;; set up handlers to catch and log stacktraces for most serious conditions,
264
;; but let interrupts through, and cause storage conditions to first unwind
267
(handler-bind (#+sbcl
268
(sb-sys:interactive-interrupt
270
(when (and (streamp *terminal-io*)
271
(interactive-stream-p *terminal-io*))
272
(invoke-debugger c))))
274
(lambda (c) (signal c)) ) ; unwind it to the handler-case
277
(log-info "process-request-toplevel: EOF: return ~a." (iso-time))
278
(return-from process-request-toplevel condition)))
279
(spocq.e::message-syntax-error
280
;; emit a parse error immediately and (optionally) exit
282
(complete-output *standard-output*)
283
(log-warn "responding to message syntax error: ~a" condition)
284
(print-error-conditions (list (cons nil condition)) *error-output*)
285
(complete-output *error-output*)
286
(maybe-exit-on-error 65)
287
;; flush anything else
288
(channel-get-all *error-condition-channel*)
289
(return-from process-request-toplevel condition)))
290
(serious-condition #'guarded-handler))
291
(let* ((accounting-location (processor-accounting-location processor))
292
(error-location (processor-error-location processor))
293
;; rebind the locations, but
294
(*request-location* (processor-request-content-location processor))
295
(*response-location* (processor-response-content-location processor))
296
;; not the types, to permit configuration values to carry through
297
(*request-content-type* (processor-request-content-type processor))
298
(*response-content-type* (processor-response-content-type processor))
300
;; rebind the global configuration for each request
301
(with-configuration ()
302
;; then augument the configuration with that read for the individual request
303
(load-configuration processor)
304
(validate-configuration)
306
(setq *agent* (cond #+(or)
308
;; construct the agent. nb. only the first location will count
309
(user *agent-id* :location *agent-location*))
311
;; construct the agent. nb. only the first location will count
312
(agent-authenticated-by-name *agent-id* :location *agent-location*
313
:if-does-not-exist nil)))
314
((agent-authenticated-by-token (api-key) :location *agent-location*
315
:if-does-not-exist nil))
317
(ensure-agent :location *agent-location*)))))
318
(setq *agent-id* (agent-name *agent*))
319
(log-debug "process-request-toplevel: agent: ~a" *agent*)
320
;; once the request configuration has been read, there are two options which interact to
321
;; determine how to process the request:
322
;; - expression signature : a signature is a hash of the request text.
323
;; if a locates a task, then that request has already been processed - discard the
324
;; document currently in the input stream, clone the prototype, and process it.
325
;; if the signature does not locate a task, parse the stream to create a new prototype,
326
;; register it and proceed as if it had been known.
327
;; - parameter signature : given parameters, the query is compiled to expect them to be
328
;; special variables and is executed with them bound to given values. if the parameters
329
;; locate a query, it can be executed with the respctive values bound. otherwise a new
330
;; prototype must be created for the new parameters
332
;; the two aspects constitute a combined signature : (request-signature . (first dynamic-bindings))
333
(prog1 ; return the processed query uon success
334
(cond (*query-signature*
335
(let* ((resolved-revision (rdfcache::resolve-repository *repository-id* :revision *revision-id*))
336
(task-prototype (find-query-prototype resolved-revision *query-signature*
337
(append *dataset-graphs* (first *dynamic-bindings*)))))
338
;; either the prototype is found, in which case it is cloned and reused
339
;; or none exists and a new one is created
340
(cond (task-prototype
341
(setf (task-start-time task-prototype) (get-universal-time)
342
(task-start-run-time task-prototype) (get-internal-run-time)
343
(task-start-real-time task-prototype) (get-internal-real-time))
344
(log-info "process-request-toplevel: named prototype: ~s: ~s: ~s: ~s"
345
(list *repository-id* *query-signature* (first *dynamic-bindings*))
347
(rest *dynamic-bindings*)
348
(substitute #\space #.(code-char #o012) (task-request-content task-prototype)))
349
;; the expression x parameters combination already exists
350
;; discard the query text
351
(read-stream *request-location*)
352
;; reuse the query prototype, but replace the with that from the new request
353
;; modulo that aspects which are compiled-in
354
(assert-prototype-metadata (instance-metadata task-prototype))
355
;; but override things which should bet configuered for the new request
356
(setf task (clone-instance task-prototype
357
:dynamic-bindings *dynamic-bindings*
358
:dataset-graphs (or *dataset-graphs* (task-dataset-graphs task-prototype))
360
:repository-id *repository-id*
361
:revision-id resolved-revision
362
:response-content-type *response-content-type*
365
:accounting-destination accounting-location
366
:error-destination error-location
368
(generate-accounting-note :parse :task task))
370
(setf task-prototype (decode-task *request-location* *request-content-type* *response-content-type*))
371
(generate-accounting-note :parse :task task-prototype)
372
(log-info "process-request-toplevel: new prototype: ~s: ~s: ~s: ~s"
373
(list *repository-id* *query-signature* (first *dynamic-bindings*))
376
(substitute #\space #.(code-char #o012) (task-request-content task-prototype)))
377
(cond ((operation-read-only-p task-prototype)
378
(log-debug "process-request-toplevel: pre-compiling prototype: ~s" task-prototype)
379
(with-task-environment (:task task-prototype)
380
(compile-task task-prototype)))
382
(log-debug "process-request-toplevel: not compiling prototype: ~s" task-prototype)))
383
(setf (find-query-prototype resolved-revision *query-signature*
384
(append *dataset-graphs* (first *dynamic-bindings*)))
386
(setf task (clone-instance task-prototype
389
:repository-id *repository-id*
390
:revision-id resolved-revision
391
:accounting-destination accounting-location
392
:error-destination error-location))))
393
(pipe-task-with-termination task *response-location*
394
*request-content-type* *response-content-type*)))
395
(t ;; if no signature was provided, don't track
396
(log-info "process-request-toplevel: untracked: ~s: ~s: ~s: ~s"
397
*request-location* *response-location*
398
*request-content-type* *response-content-type*)
400
(pipe-task-with-termination *request-location* *response-location*
401
*request-content-type* *response-content-type*
402
:dataset-graphs *dataset-graphs*
403
:dynamic-bindings *dynamic-bindings*
405
:repository-id *repository-id*
406
:request-content-type *request-content-type*
407
:response-content-type *response-content-type*
408
:revision-id *revision-id*
411
:accounting-destination accounting-location
412
:error-destination error-location))))
414
(complete-output *response-location*)
415
;;(log-debug "process-request-toplevel: complete ~s: ~s." (task-id task) *run-state*)
416
(log-debug "process-request-toplevel: complete ~s: ~s." task *run-state*)
418
(storage-condition (c) (guarded-handler c))))))