Coverage report: /development/source/library/org/datagraph/spocq-shard/src/core/pipe-processing.lisp
| Kind | Covered | All | % |
| expression | 57 | 239 | 23.8 |
| branch | 1 | 16 | 6.3 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
3
(in-package :org.datagraph.spocq.implementation)
5
(:documentation "processing queries from/to streams"
6
"The engine also acts as a stream processor which reads and parses a query expression from a stream,
7
processes it against a store and encodes the result to an output stream." )
10
(defun pipe-query-toplevel (from to &rest args)
11
"parse, compile and processes a query between an arbitrary source and destination, with toplevel handlers
12
for errors, interrruption, and termination."
13
(declare (dynamic-extent args))
14
(labels ((handle-error (condition)
15
;; (print (list :handle-error condition) *error-output*)
16
(log-stacktrace "pipe-query-toplevel: Error processing query request : ~a -> ~a."
18
;; if in the context of a query and the channel is still usable
19
;; send an error response
21
(log-error "error in query: ~s" (query-sse-expression *query*))
22
(generate-error-note condition :task *query*)
23
(terminate-task *query* condition)))
24
(guarded-handler (condition)
25
;; (print (list :guarded-handler condition) *error-output*)
26
(handler-case (handle-error condition)
27
(serious-condition (c2)
28
(log-error "pipe-query-toplevel: Reentrant error processing query:~% ~a~% ~a."
30
(return-from pipe-query-toplevel condition)))
31
;; set up handlers to catch and log stacktraces for most serious conditions,
32
;; but let interrupts through, and cause storage conditions to first unwind
35
(handler-bind ((storage-condition (lambda (c) (signal c)) )
36
#+sbcl (sb-sys:interactive-interrupt
37
(lambda (c) (when (and (streamp *terminal-io*)
38
(interactive-stream-p *terminal-io*))
39
(invoke-debugger c))))
40
(spocq.e::message-syntax-error
42
(log-warn "responding to message syntax error: ~a" condition)
43
(generate-error-note condition :task (error-task condition))
44
(generate-accounting-note :terminate :task (error-task condition))
45
(return-from pipe-query-toplevel condition)))
46
(serious-condition #'guarded-handler))
47
(apply #'pipe-query from to args))
48
(storage-condition (c) (guarded-handler c)))))
51
(defgeneric pipe-query (input-source output-destination &key &allow-other-keys)
52
(:documentation "Execute a query.
53
Take the expression is from the source and the configuration from the arguments and/or dynamic bindings.
54
Emit the result to the destination and return the query, unless the destination is null.
55
In that case, return the result and the query as the second value.")
56
(:method ((input-source cons) (output-source null) &rest args)
58
(loop for (key value) on args by #'cddr
59
unless (member key '(:content-type :request-content-type))
60
append (list key value)))
61
(apply #'run-sparql input-source args))
63
(:method ((input-source string) (output-source null) &rest args)
65
(loop for (key value) on args by #'cddr
66
unless (member key '(:content-type :request-content-type))
67
append (list key value)))
68
(apply #'run-sparql input-source args))
70
(:method ((input-source pathname) (output-source t) &rest args)
71
(with-open-file (input-stream input-source :direction :input :element-type '(unsigned-byte 8))
72
(apply #'pipe-query input-stream output-source args)))
74
(:method ((input-source t) (output-source pathname) &rest args)
75
(with-open-file (output-stream output-source :direction :output :element-type '(unsigned-byte 8)
76
:if-does-not-exist :create :if-exists :supersede)
77
(apply #'pipe-query input-source output-stream args))))
80
(defmethod pipe-query ((input-source t) (output-destination stream) &rest args
82
((:task-id *task-id*) (or *task-id* (make-internal-task-id)))
83
((:user-id *user-id*) *user-id*)
84
((:repository-id *repository-id*) *repository-id*)
85
((:revision-id *revision-id*) *revision-id*)
86
((:response-content-type *response-content-type*) *response-content-type*)
87
((:dynamic-bindings *dynamic-bindings*) *dynamic-bindings*)
88
(content-type *request-content-type*)
89
(request-content-type content-type)
91
"Parse, and interpret query requests.
92
- read an parse the qreuest from a stream
93
- process it with through the same mechanism as brokered a request, but encode the results directly
94
to a destination stream."
95
(assert-argument-type pipe-query *task-id* string)
96
(assert-argument-type pipe-query *repository-id* (or null string))
98
(let ((task (apply #'decode-task input-source request-content-type *response-content-type* args)))
99
(log-notice "task: new: : ~s: ~s: ~s"
102
(substitute #\space #.(code-char #o012) (query-sparql-expression task)))
103
(when (eq *agent* (system-agent))
104
(setf (task-agent task) *agent*))
105
(generate-accounting-note :parse :task task)
106
(apply #'pipe-query task output-destination args))))
109
(defmethod pipe-query ((expression cons) (output-destination t) &rest args &key
110
(id (make-null-task-id)) &allow-other-keys)
111
(declare (dynamic-extent args))
112
(let ((query (apply #'make-query
114
:sse-expression expression
116
(log-notice "query: new: : ~s: ~s: ~s"
119
(substitute #\space #.(code-char #o012) (query-sparql-expression query)))
120
(apply #'pipe-query query output-destination args)))
123
(defmethod pipe-query ((query query) (output-destination stream)
124
&key ((:xslt-stylesheet *xslt-stylesheet*) *xslt-stylesheet*)
126
"Given a query, ensure that it is compiled, run it, and emit the output to the given stream."
127
;; (print (list :pipe-query query))
128
(register-query query)
129
(add-task-thread query (bt:current-thread))
130
(let ((*package* *algebra-package*)
131
(*readtable* *sse-readtable*))
132
(log-notice "pipe ~s, repository ~s~@[, signature ~s~]~@[, expression ~s~], bindings ~s"
134
(repository-id (task-repository query))
135
(query-signature query)
136
(unless (query-signature query) (query-sse-expression query))
137
(query-dynamic-bindings query)))
138
(add-task-thread query (bt:current-thread))
139
(restart-bind ((terminate-task (lambda (&optional (task-to-terminate query) condition)
140
(when (eq query task-to-terminate)
141
(log-warn "pipe-query: terminate task : ~a." query)
142
(return-from pipe-query condition)))))
143
(compile-query query)
144
;; in respond-to-task (initialize-task query)
145
(flet ((pipe-response (output-destination)
146
(respond-to-task (task-effective-request-operation query (task-operation query))
148
output-destination)))
149
(cond (*xslt-stylesheet*
150
(log-warn "pipe-query: xslt post-processing: ~s" *xslt-stylesheet*)
151
(unless (is-http-url-namestring *xslt-stylesheet*)
152
(spocq.e:request-error
153
"Invalid xslt sytlesheet designator: ~s" *xslt-stylesheet*))
154
(let ((xslt-processor (run-program *executable-pathname.xslt*
155
`("-i" "-" "-o" "-" "--stylesheet" ,*xslt-stylesheet*)
156
:output output-destination :input :stream :wait nil)))
157
(cond (xslt-processor
158
(unwind-protect (pipe-response (run-program-input xslt-processor))
159
(close (run-program-input xslt-processor))
160
(run-program-wait xslt-processor)
161
(let ((return-code (run-program-exit-code xslt-processor)))
164
(t (error "XSLT process failed: ~s" return-code))))
165
(run-program-close xslt-processor)))
167
(error "XSLT process not started."))))
170
(pipe-response output-destination))))
171
(log-debug "pipe complete [~a] from ~/format-iso-time/." (task-id query) (task-start-time query))
176
from stream -> stream
177
(flet ((guarded-handler (condition)
178
(handler-case (handle-error condition)
179
(serious-condition (c2)
180
(log-error "reentrant error processing task:~% ~a~% ~a."
182
(return-from pipe-query condition)))
183
;; set up handlers to catch and log stacktraces for most serious conditions,
184
;; but let interrupts through, and cause storage conditions to first unwind
187
(handler-bind ((storage-condition (lambda (c) (signal c)) ) ; unwind it to the handler-case
188
#+sbcl (sb-sys:interactive-interrupt (lambda (c)
189
(when (and (streamp *terminal-io*)
190
(interactive-stream-p *terminal-io*))
191
(invoke-debugger c))))
192
(serious-condition #'guarded-handler))
194
(register-query task)
195
(add-task-thread task (bt:current-thread))
196
(restart-bind ((terminate-task (lambda (task-to-terminate &optional condition)
197
(when (eq task task-to-terminate)
198
(return-from pipe-query condition)))))
199
(with-task-environment (:task task)
200
(let ((*package* *algebra-package*)
201
(*readtable* *sse-readtable*))
202
(log-info "pipe ~s, repository ~s, form ~s, bindings ~s"
204
(repository-id (task-repository task))
205
(query-sse-expression task)
206
(query-dynamic-bindings task)))
207
(unless (task-initialization-function task)
209
(initialize-task task)
210
(generate-accounting-note :abstract)))
211
(respond-to-task (task-effective-request-operation task (task-operation task))
213
output-destination)))
214
(storage-condition (c) (guarded-handler c))))
215
(log-debug "pipe complete [~a] from ~/format-iso-time/." (task-id task) (task-start-time task))
220
(let ((stream (make-instance 'de.setf.utility.implementation::vector-io-stream
221
:vector (map 'vector #'char-code "select * where {?s ?p ?o} limit 10"))))
222
(princ (pipe-query stream *standard-output*
223
:repository-id "3/288"
224
:response-content-type mime:application/sparql-query)))
226
(let ((stream (make-instance 'de.setf.utility.implementation::vector-io-stream
227
:vector (map 'vector #'char-code "select * where {?s ?p ?o} limit 10"))))
228
(time (pipe-query stream *standard-output*
231
:response-content-type mime:application/sparql-query)))
233
(let ((stream (make-instance 'de.setf.utility.implementation::vector-io-stream
234
:vector (map 'vector #'char-code "select * where {?s ?p ?o} limit 10"))))
235
(pipe-query stream *standard-output*
238
:response-content-type mime:application/sparql-query
239
:accept mime:application/sparql-results+term-number))
242
(let ((*standard-input* (make-instance 'de.setf.utility.implementation::vector-io-stream
243
:vector (map 'vector #'char-code "select * where {?s ?p ?o} limit 10")))
244
(sb-ext:*posix-argv* (append sb-ext:*posix-argv*
245
'("--id" "testing" "--repository-id" "5/304"
246
"--content-type" "application/sparql-query"))))
249
time sudo sbcl --core /development/source/library/sbcl-spocq.core \
250
--eval "(spocq.i::run-query)" \
251
--id "testing" --repository-id "5/304" \
252
--content-type "application/sparql-query" << EOF
253
select * where {?s ?p ?o} limit 10
256
sudo sbcl --disable-ldb --noinform --lose-on-corruption \
257
--core /development/source/library/sbcl-spocq.core \
258
--no-userinit --disable-debugger --noprint \
259
--eval "(spocq.i::run-query)" \
260
--id "testing" --repository-id "5/304" \
261
--content-type "application/sparql-query" \
262
--accept "application/sparql-results+term-number" < /home/arto/spocq-cli/construct.rq
264
sudo sbcl --disable-ldb --noinform --lose-on-corruption \
265
--core /development/source/library/sbcl-spocq.core \
266
--no-userinit --disable-debugger --noprint \
267
--eval "(spocq.i::run-query)" \
268
--id "testing" --repository-id "5/304" \
269
--content-type "application/sparql-query" \
270
--accept "application/sparql-results+term-number" << EOF
271
construct {<http://example.org/ephemeral> ?p "3.1415"^^<http://www.w3.org/2001/XMLSchema#decimal>} where {?s ?p ?o} limit 10
274
sudo ./sbcl-spocq-cli \
275
--id "testing" --repository-id "5/304" \
276
--content-type "application/sparql-query" \
277
--accept "application/sparql-results+term-number" << EOF
278
construct {<http://example.org/ephemeral> ?p "3.1415"^^<http://www.w3.org/2001/XMLSchema#decimal>} where {?s ?p ?o} limit 10
281
sudo sbcl --disable-ldb --noinform --lose-on-corruption \
282
--core /development/source/library/sbcl-spocq.core \
283
--no-userinit --disable-debugger --noprint \
284
--eval "(spocq.i::run-query)" \
285
--id "testing" --repository-id "5/199" \
286
--content-type "application/sparql-query" \
287
--accept "application/sparql-results+term-number" << EOF
288
SELECT * { <http://example.org/ns#x> ?p 456. . }
291
sudo ./sbcl-spocq-cli \
292
--id "testing" --repository-id "5/199" \
293
--content-type "application/sparql-query" \
294
--accept "application/sparql-results+term-number" << EOF
295
SELECT * { <http://example.org/ns#x> ?p 456. . }
298
sudo LD_DEBUG=all ./sbcl-spocq-cli \
299
--profile "(rdfcache:load-library algebra-thread-apply run-algebra-thread compile-query sb-ext:gc open-log load-configuration main-query initialize-spocq initialize-channels initialize-store initialize-interned-terms pipe-query receive-query-request respond-to-task query-run-in-thread send-query-response rdfcache:match rdfcache:count rdfcache:next bt:make-thread publish-accounting-notes)" \
300
--id "testing" --repository-id "5/304" \
301
--content-type "application/sparql-query" \
302
--accept "application/sparql-results+term-number" << EOF
303
construct {<http://example.org/ephemeral> ?p "3.1415"^^<http://www.w3.org/2001/XMLSchema#decimal>} where {?s ?p ?o} limit 10
309
(let ((stream (make-instance 'de.setf.utility.implementation::vector-io-stream
310
:vector (map 'vector #'char-code "construct {<http://example.org/ephemeral> ?p '3.1415'^^<http://www.w3.org/2001/XMLSchema#decimal>} where {?s ?p ?o} limit 10"))))
311
(pipe-query stream *standard-output*
314
:request-content-type mime:application/sparql-query
315
:response-content-type mime:application/sparql-results+term-number))
316
(parse-sparql "construct {
317
<http://example.org/ephemeral> ?p '3.1415'^^<http://www.w3.org/2001/XMLSchema#decimal>
319
{?s ?p ?o} limit 10")
322
sudo sbcl --disable-ldb --noinform --lose-on-corruption \
323
--core /development/source/library/sbcl-spocq.core \
324
--no-userinit --disable-debugger \
325
--id "testing" --repository-id "5/304" \
326
--content-type "application/sparql-query" \
327
--accept "application/sparql-results+term-number" << EOF
328
construct {<http://example.org/ephemeral> ?p "3.1415"^^<http://www.w3.org/2001/XMLSchema#decimal>} where {?s ?p ?o} limit 10
331
sudo sbcl --disable-ldb --noinform --lose-on-corruption \
332
--core ./sbcl-spocq.core \
333
--no-userinit --disable-debugger --noprint \
334
--eval "(spocq.i::run-query)" \
335
--id "testing" --repository-id "2/21" \
336
--content-type "application/sparql-query" \
337
--accept "application/sparql-results+term-number" << EOF
338
SELECT * { ?s ?p ?o }
342
sudo sbcl --disable-ldb --noinform --lose-on-corruption \
343
--core /development/source/library/sbcl-spocq.core \
344
--no-userinit --disable-debugger \
345
--eval "(spocq.i::run-query)" \
347
--content-type "application/sparql-query" \
348
--accept "application/sparql-results+term-number" << EOF
349
PRAGMA REPOSITORY "5/304"
350
construct {<http://example.org/ephemeral> ?p "3.1415"^^<http://www.w3.org/2001/XMLSchema#decimal>} where {?s ?p ?o} limit 10