Coverage report: /development/source/library/org/datagraph/spocq-shard/src/core/pipe-processing.lisp

KindCoveredAll%
expression57239 23.8
branch116 6.3
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
2
 
3
 (in-package :org.datagraph.spocq.implementation)
4
 
5
 (:documentation "processing queries from/to streams"
6
   "The engine also acts as a stream processor which  reads and parses a query expression from a stream,
7
  processes it against a store and encodes the result to an output stream." )
8
 
9
 
10
 (defun pipe-query-toplevel (from to &rest args)
11
   "parse, compile and processes a query between an arbitrary source and destination, with toplevel handlers
12
    for errors, interrruption, and termination."
13
   (declare (dynamic-extent args))
14
   (labels ((handle-error (condition)
15
              ;; (print (list :handle-error condition) *error-output*)
16
              (log-stacktrace "pipe-query-toplevel: Error processing query request : ~a -> ~a."
17
                              *query* condition)
18
              ;; if in the context of a query and the channel is still usable
19
              ;; send an error response
20
              (when *query*
21
                (log-error "error in query: ~s" (query-sse-expression *query*))
22
                (generate-error-note condition :task *query*)
23
                (terminate-task *query* condition)))
24
            (guarded-handler (condition)
25
              ;; (print (list :guarded-handler condition) *error-output*)
26
              (handler-case (handle-error condition)
27
                (serious-condition (c2)
28
                                   (log-error "pipe-query-toplevel: Reentrant error processing query:~% ~a~% ~a."
29
                                              condition c2)))
30
              (return-from pipe-query-toplevel condition)))
31
     ;; set up handlers to catch and log stacktraces for most serious conditions,
32
     ;; but let interrupts through, and cause storage conditions to first unwind 
33
     ;; before logging
34
     (handler-case
35
         (handler-bind ((storage-condition (lambda (c) (signal c)) )
36
                        #+sbcl (sb-sys:interactive-interrupt
37
                                (lambda (c) (when (and (streamp *terminal-io*)
38
                                                       (interactive-stream-p *terminal-io*))
39
                                              (invoke-debugger c))))
40
                        (spocq.e::message-syntax-error
41
                         (lambda (condition)
42
                           (log-warn "responding to message syntax error: ~a" condition)
43
                           (generate-error-note condition :task (error-task condition))
44
                           (generate-accounting-note :terminate :task (error-task condition))
45
                           (return-from pipe-query-toplevel condition)))
46
                        (serious-condition #'guarded-handler))
47
           (apply #'pipe-query from to args))
48
       (storage-condition (c) (guarded-handler c)))))
49
 
50
 
51
 (defgeneric pipe-query (input-source output-destination &key &allow-other-keys)
52
   (:documentation "Execute a query.
53
    Take the expression is from the source and the configuration from the arguments and/or dynamic bindings.
54
    Emit the result to the destination and return the query, unless the destination is null.
55
    In that case, return the result and the query as the second value.")
56
   (:method ((input-source cons) (output-source null) &rest args)
57
     (setf args
58
           (loop for (key value) on args by #'cddr
59
                 unless (member key '(:content-type :request-content-type))
60
                 append (list key value)))
61
     (apply #'run-sparql input-source args))
62
 
63
   (:method ((input-source string) (output-source null) &rest args)
64
     (setf args
65
           (loop for (key value) on args by #'cddr
66
                 unless (member key '(:content-type :request-content-type))
67
                 append (list key value)))
68
     (apply #'run-sparql input-source args))
69
 
70
   (:method ((input-source pathname) (output-source t) &rest args)
71
     (with-open-file (input-stream input-source :direction :input :element-type '(unsigned-byte 8))
72
       (apply #'pipe-query input-stream output-source args)))
73
 
74
   (:method ((input-source t) (output-source pathname) &rest args)
75
     (with-open-file (output-stream output-source :direction :output :element-type '(unsigned-byte 8)
76
                                    :if-does-not-exist :create :if-exists :supersede)
77
       (apply #'pipe-query input-source output-stream args))))
78
 
79
 
80
 (defmethod pipe-query ((input-source t) (output-destination stream) &rest args
81
                        &key 
82
                        ((:task-id *task-id*) (or *task-id* (make-internal-task-id)))
83
                        ((:user-id *user-id*) *user-id*)
84
                        ((:repository-id *repository-id*)  *repository-id*)
85
                        ((:revision-id *revision-id*) *revision-id*)
86
                        ((:response-content-type *response-content-type*) *response-content-type*)
87
                        ((:dynamic-bindings *dynamic-bindings*) *dynamic-bindings*)
88
                        (content-type *request-content-type*)
89
                        (request-content-type content-type)
90
                        &allow-other-keys)
91
   "Parse, and interpret query requests.
92
  - read an parse the qreuest from a stream
93
  - process it with through the same mechanism as brokered a request, but encode the results directly
94
    to a destination stream."
95
   (assert-argument-type pipe-query *task-id* string) 
96
   (assert-argument-type pipe-query *repository-id* (or null string))
97
   (with-accounting
98
     (let ((task (apply #'decode-task input-source request-content-type *response-content-type* args)))
99
       (log-notice "task: new: : ~s: ~s: ~s"
100
                   task
101
                   *dynamic-bindings*
102
                   (substitute #\space #.(code-char #o012) (query-sparql-expression task)))
103
       (when (eq *agent* (system-agent))
104
         (setf (task-agent task) *agent*))
105
       (generate-accounting-note :parse :task task)
106
       (apply #'pipe-query task output-destination args))))
107
 
108
 
109
 (defmethod pipe-query ((expression cons) (output-destination t) &rest args &key
110
                        (id (make-null-task-id)) &allow-other-keys)
111
   (declare (dynamic-extent args))
112
   (let ((query (apply #'make-query
113
                       :id id
114
                       :sse-expression expression
115
                       args)))
116
     (log-notice "query: new: : ~s: ~s: ~s"
117
                 query
118
                 *dynamic-bindings*
119
                 (substitute #\space #.(code-char #o012) (query-sparql-expression query)))
120
     (apply #'pipe-query query output-destination args)))
121
 
122
 
123
 (defmethod pipe-query ((query query) (output-destination stream)
124
                        &key ((:xslt-stylesheet *xslt-stylesheet*) *xslt-stylesheet*)
125
                        &allow-other-keys)
126
   "Given a query, ensure that it is compiled, run it, and emit the output to the given stream."
127
   ;;  (print (list :pipe-query query))
128
   (register-query query)
129
   (add-task-thread query (bt:current-thread))
130
   (let ((*package* *algebra-package*)
131
         (*readtable* *sse-readtable*))
132
     (log-notice "pipe ~s, repository ~s~@[, signature ~s~]~@[, expression ~s~], bindings ~s"
133
                 (task-id query)
134
                 (repository-id (task-repository query))
135
                 (query-signature query)
136
                 (unless (query-signature query) (query-sse-expression query))
137
                 (query-dynamic-bindings query)))
138
   (add-task-thread query (bt:current-thread))
139
   (restart-bind ((terminate-task (lambda (&optional (task-to-terminate query) condition)
140
                                    (when (eq query task-to-terminate)
141
                                      (log-warn "pipe-query: terminate task : ~a." query)
142
                                      (return-from pipe-query condition)))))
143
     (compile-query query)
144
     ;; in respond-to-task (initialize-task query)
145
     (flet ((pipe-response (output-destination)
146
              (respond-to-task (task-effective-request-operation query (task-operation query))
147
                               query
148
                               output-destination)))
149
       (cond (*xslt-stylesheet*
150
              (log-warn "pipe-query: xslt post-processing: ~s" *xslt-stylesheet*)
151
              (unless (is-http-url-namestring *xslt-stylesheet*)
152
                (spocq.e:request-error
153
                 "Invalid xslt sytlesheet designator: ~s" *xslt-stylesheet*))
154
              (let ((xslt-processor (run-program *executable-pathname.xslt*
155
                                                 `("-i" "-" "-o" "-" "--stylesheet" ,*xslt-stylesheet*)
156
                                                 :output output-destination :input :stream :wait nil)))
157
                  (cond (xslt-processor
158
                         (unwind-protect (pipe-response (run-program-input xslt-processor))
159
                           (close (run-program-input xslt-processor))
160
                           (run-program-wait xslt-processor)
161
                           (let ((return-code (run-program-exit-code xslt-processor)))
162
                             (case return-code
163
                               ((0 nil))
164
                               (t (error "XSLT process failed: ~s" return-code))))
165
                           (run-program-close xslt-processor)))
166
                        (t
167
                         (error "XSLT process not started."))))
168
              )
169
             (t
170
              (pipe-response output-destination))))
171
     (log-debug "pipe complete [~a] from ~/format-iso-time/." (task-id query) (task-start-time query))
172
     query))
173
 
174
 
175
 #|
176
 from stream -> stream
177
             (flet ((guarded-handler (condition)
178
                      (handler-case (handle-error condition)
179
                        (serious-condition (c2)
180
                                           (log-error "reentrant error processing task:~% ~a~% ~a."
181
                                                      condition c2)))
182
                      (return-from pipe-query condition)))
183
               ;; set up handlers to catch and log stacktraces for most serious conditions,
184
               ;; but let interrupts through, and cause storage conditions to first unwind 
185
               ;; before logging
186
               (handler-case
187
                 (handler-bind ((storage-condition (lambda (c) (signal c)) )    ; unwind it to the handler-case
188
                                #+sbcl (sb-sys:interactive-interrupt (lambda (c)
189
                                                                       (when (and (streamp *terminal-io*)
190
                                                                                  (interactive-stream-p *terminal-io*))
191
                                                                         (invoke-debugger c))))
192
                                (serious-condition #'guarded-handler))
193
                   
194
                   (register-query task)
195
                   (add-task-thread task (bt:current-thread))
196
                   (restart-bind ((terminate-task (lambda (task-to-terminate &optional condition)
197
                                                    (when (eq task task-to-terminate)
198
                                                      (return-from pipe-query condition)))))
199
                     (with-task-environment (:task task)
200
                       (let ((*package* *algebra-package*)
201
                             (*readtable* *sse-readtable*))
202
                         (log-info "pipe ~s, repository ~s, form ~s, bindings ~s"
203
                                   (task-id task)
204
                                   (repository-id (task-repository task))
205
                                   (query-sse-expression task)
206
                                   (query-dynamic-bindings task)))
207
                       (unless (task-initialization-function task)
208
                         (compile-query task)
209
                         (initialize-task task)
210
                         (generate-accounting-note :abstract)))
211
                     (respond-to-task (task-effective-request-operation task (task-operation task))
212
                                      task
213
                                      output-destination)))
214
                 (storage-condition (c) (guarded-handler c))))
215
             (log-debug "pipe complete [~a] from ~/format-iso-time/." (task-id task) (task-start-time task))
216
             task)))))
217
 |#
218
 
219
 #|
220
 (let ((stream (make-instance 'de.setf.utility.implementation::vector-io-stream
221
                 :vector (map 'vector #'char-code "select *  where {?s ?p ?o} limit 10"))))
222
   (princ (pipe-query stream *standard-output*
223
                              :repository-id "3/288"
224
                              :response-content-type mime:application/sparql-query)))
225
 
226
 (let ((stream (make-instance 'de.setf.utility.implementation::vector-io-stream
227
                 :vector (map 'vector #'char-code "select *  where {?s ?p ?o} limit 10"))))
228
   (time (pipe-query stream *standard-output*
229
                     :id "test"
230
                     :repository-id "5/5"
231
                     :response-content-type mime:application/sparql-query)))
232
 
233
 (let ((stream (make-instance 'de.setf.utility.implementation::vector-io-stream
234
                 :vector (map 'vector #'char-code "select *  where {?s ?p ?o} limit 10"))))
235
   (pipe-query stream *standard-output*
236
                     :id "test"
237
                     :repository-id "5/5"
238
                     :response-content-type mime:application/sparql-query
239
                     :accept mime:application/sparql-results+term-number))
240
 
241
 
242
 (let ((*standard-input* (make-instance 'de.setf.utility.implementation::vector-io-stream
243
                           :vector (map 'vector #'char-code "select *  where {?s ?p ?o} limit 10")))
244
       (sb-ext:*posix-argv* (append sb-ext:*posix-argv*
245
                                    '("--id" "testing" "--repository-id" "5/304"
246
                                      "--content-type" "application/sparql-query"))))
247
   (time (run-query)))
248
 
249
 time sudo sbcl --core /development/source/library/sbcl-spocq.core \
250
   --eval "(spocq.i::run-query)" \
251
   --id "testing" --repository-id "5/304" \
252
   --content-type "application/sparql-query" << EOF
253
 select *  where {?s ?p ?o} limit 10
254
 EOF
255
 
256
 sudo sbcl --disable-ldb --noinform --lose-on-corruption \
257
   --core /development/source/library/sbcl-spocq.core \
258
   --no-userinit --disable-debugger --noprint \
259
   --eval "(spocq.i::run-query)" \
260
   --id "testing" --repository-id "5/304" \
261
   --content-type "application/sparql-query" \
262
   --accept "application/sparql-results+term-number" < /home/arto/spocq-cli/construct.rq
263
 
264
 sudo sbcl --disable-ldb --noinform --lose-on-corruption \
265
   --core /development/source/library/sbcl-spocq.core \
266
   --no-userinit --disable-debugger --noprint \
267
   --eval "(spocq.i::run-query)" \
268
   --id "testing" --repository-id "5/304" \
269
   --content-type "application/sparql-query" \
270
   --accept "application/sparql-results+term-number" << EOF
271
 construct {<http://example.org/ephemeral> ?p "3.1415"^^<http://www.w3.org/2001/XMLSchema#decimal>} where {?s ?p ?o} limit 10
272
 EOF
273
 
274
 sudo ./sbcl-spocq-cli \
275
   --id "testing" --repository-id "5/304" \
276
   --content-type "application/sparql-query" \
277
   --accept "application/sparql-results+term-number" << EOF
278
 construct {<http://example.org/ephemeral> ?p "3.1415"^^<http://www.w3.org/2001/XMLSchema#decimal>} where {?s ?p ?o} limit 10
279
 EOF
280
 
281
 sudo sbcl --disable-ldb --noinform --lose-on-corruption \
282
   --core /development/source/library/sbcl-spocq.core \
283
   --no-userinit --disable-debugger --noprint \
284
   --eval "(spocq.i::run-query)" \
285
   --id "testing" --repository-id "5/199" \
286
   --content-type "application/sparql-query" \
287
   --accept "application/sparql-results+term-number" << EOF
288
 SELECT * { <http://example.org/ns#x> ?p 456. . }
289
 EOF
290
 
291
 sudo ./sbcl-spocq-cli \
292
   --id "testing" --repository-id "5/199" \
293
   --content-type "application/sparql-query" \
294
   --accept "application/sparql-results+term-number" << EOF
295
 SELECT * { <http://example.org/ns#x> ?p 456. . }
296
 EOF
297
 
298
 sudo LD_DEBUG=all ./sbcl-spocq-cli \
299
   --profile "(rdfcache:load-library algebra-thread-apply run-algebra-thread compile-query sb-ext:gc open-log load-configuration main-query initialize-spocq initialize-channels initialize-store initialize-interned-terms pipe-query receive-query-request respond-to-task query-run-in-thread send-query-response rdfcache:match rdfcache:count rdfcache:next bt:make-thread publish-accounting-notes)" \
300
   --id "testing" --repository-id "5/304" \
301
   --content-type "application/sparql-query" \
302
   --accept "application/sparql-results+term-number" << EOF
303
 construct {<http://example.org/ephemeral> ?p "3.1415"^^<http://www.w3.org/2001/XMLSchema#decimal>} where {?s ?p ?o} limit 10
304
 EOF
305
 
306
      
307
 
308
 
309
 (let ((stream (make-instance 'de.setf.utility.implementation::vector-io-stream
310
                 :vector (map 'vector #'char-code "construct {<http://example.org/ephemeral> ?p '3.1415'^^<http://www.w3.org/2001/XMLSchema#decimal>} where {?s ?p ?o} limit 10"))))
311
   (pipe-query stream *standard-output*
312
                     :id "test"
313
                     :repository-id "5/5"
314
                     :request-content-type mime:application/sparql-query
315
                     :response-content-type mime:application/sparql-results+term-number))
316
 (parse-sparql "construct {
317
  <http://example.org/ephemeral> ?p '3.1415'^^<http://www.w3.org/2001/XMLSchema#decimal>
318
  } where
319
  {?s ?p ?o} limit 10")
320
 
321
 
322
 sudo sbcl --disable-ldb --noinform --lose-on-corruption \
323
   --core /development/source/library/sbcl-spocq.core \
324
   --no-userinit --disable-debugger \
325
   --id "testing" --repository-id "5/304" \
326
   --content-type "application/sparql-query" \
327
   --accept "application/sparql-results+term-number" << EOF
328
 construct {<http://example.org/ephemeral> ?p "3.1415"^^<http://www.w3.org/2001/XMLSchema#decimal>} where {?s ?p ?o} limit 10
329
 EOF
330
 
331
 sudo sbcl --disable-ldb --noinform --lose-on-corruption \
332
   --core ./sbcl-spocq.core \
333
   --no-userinit --disable-debugger --noprint \
334
   --eval "(spocq.i::run-query)" \
335
   --id "testing" --repository-id "2/21" \
336
   --content-type "application/sparql-query" \
337
   --accept "application/sparql-results+term-number" << EOF
338
 SELECT * { ?s ?p ?o }
339
 EOF
340
 
341
 # w/ pragmas
342
 sudo sbcl --disable-ldb --noinform --lose-on-corruption \
343
   --core /development/source/library/sbcl-spocq.core \
344
   --no-userinit --disable-debugger \
345
   --eval "(spocq.i::run-query)" \
346
   --id "testing" \
347
   --content-type "application/sparql-query" \
348
   --accept "application/sparql-results+term-number" << EOF
349
 PRAGMA REPOSITORY "5/304"
350
 construct {<http://example.org/ephemeral> ?p "3.1415"^^<http://www.w3.org/2001/XMLSchema#decimal>} where {?s ?p ?o} limit 10
351
 EOF
352
 
353
 |#
354