Coverage report: /development/source/library/org/datagraph/spocq-shard/src/core/task-processing.lisp

KindCoveredAll%
expression15471 3.2
branch014 0.0
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
2
 
3
 (in-package :org.datagraph.spocq.implementation)
4
 
5
 ;;; (load #p"patches/task-processing.lisp")
6
 
7
 
8
 (defmethod (setf configuration-parameter) ((value list) (parameter (eql :script-stack)))
9
   (setq ssl::*toplevel-stack* value))
10
 
11
 (defmethod (setf configuration-parameter) ((value string) (parameter (eql :script-code)))
12
   (setf (configuration-parameter :script-code) value))
13
 
14
 (defmethod (setf configuration-parameter) ((value spocq:iri) (parameter (eql :script-code)))
15
   (setf (configuration-parameter :script-code) value))
16
 
17
 (defmethod (setf configuration-parameter) ((value cons) (parameter (eql :script-code)))
18
   (setq ssl::*toplevel-code* value))
19
 
20
 (defmethod (setf configuration-parameter) ((value list) (parameter (eql :script-environment)))
21
   (setq ssl::*toplevel-environment* value))
22
 
23
 
24
 (:documentation "top-level task processing"
25
   "provide definitions for
26
 
27
    - decode-task
28
    - process-task
29
 
30
    to handle the situations:
31
 
32
    - sparql query with stdin/stdout
33
      - processed as sparql
34
      - as parsed
35
      - as parsed and expanded
36
      - processed as sparql but as traced
37
    - script with stdin/stdout
38
   " 
39
  )
40
 
41
 
42
 (defun decode-sparql-content (source media-type &rest args
43
                                      &key (task-id *task-id*) (agent *agent*)
44
                                      (user-id *user-id*)
45
                                      (repository-id *repository-id*)
46
                                      (revision-id *revision-id*)
47
                                      (response-content-type *response-content-type*)
48
                                      (dynamic-bindings *dynamic-bindings*)
49
                                      &allow-other-keys)
50
   (declare (dynamic-extent args))
51
   (multiple-value-bind (operation arguments)
52
                        (apply #'receive-message source media-type args)
53
     (apply #'make-query
54
            :operation operation
55
            :task-id task-id
56
            :user-id user-id
57
            :agent agent
58
            :repository-id repository-id
59
            :revision-id revision-id
60
            :response-content-type response-content-type
61
            :dynamic-bindings dynamic-bindings
62
            :request-routing-key nil
63
            :request-exchange nil
64
             (case *dataset-source* ; nxp-213
65
               (:query arguments)
66
               (t (list* :dataset-graphs *dataset-graphs* arguments))))))
67
 
68
 
69
 (defgeneric decode-task (source source-media-type result-media-type &rest args)
70
   (:argument-precedence-order source-media-type result-media-type source)
71
 
72
   (:method ((source pathname) (media-type mime:application/sparql) result-media-type &rest args)
73
     (with-open-file (stream source :direction :input)
74
       (apply #'decode-task stream media-type result-media-type args)))
75
 
76
   (:method ((source stream) (media-type mime:application/sparql) (result-media-type mime::sparql-results) &rest args
77
             &key (response-content-type result-media-type)
78
             &allow-other-keys)
79
     "sparql->results is a process which intends perform some sort of query"
80
     (apply #'decode-sparql-content source media-type :response-content-type response-content-type args))
81
 
82
   (:method ((source string) (media-type mime:application/sparql) (result-media-type mime::sparql-results) &rest args
83
             &key (response-content-type result-media-type)
84
             &allow-other-keys)
85
     "sparql->results is a process which intends perform some sort of query"
86
     (apply #'decode-sparql-content source media-type :response-content-type response-content-type args))
87
 
88
   (:method ((source stream) (media-type mime:application/sparql) (result-media-type mime::query) &rest args
89
             &key (response-content-type result-media-type)
90
             &allow-other-keys)
91
     "sparql -> some query expression is a process which intends to operate on the query expression itself"
92
     (apply #'decode-sparql-content source media-type :response-content-type response-content-type args))
93
 
94
   (:method ((source string) (media-type mime:application/sparql) (result-media-type mime::query) &rest args
95
             &key (response-content-type result-media-type)
96
             &allow-other-keys)
97
     "sparql -> some query expression is a process which intends to operate on the query expression itself"
98
     (apply #'decode-sparql-content source media-type :response-content-type response-content-type args))
99
 
100
   (:method ((source stream) (media-type mime:application/sparql) (result-media-type mime:rdf) &rest args)
101
     "sparql->rdf is a process which intends perform some sort of query"
102
     (apply #'decode-sparql-content source media-type args))
103
 
104
   (:method ((source string) (media-type mime:application/sparql) (result-media-type mime:rdf) &rest args)
105
     "sparql->rdf is a process which intends perform some sort of query"
106
     (apply #'decode-sparql-content source media-type args))
107
 
108
   (:method ((source t) (media-type mime:application/sparql-query+ssl)  (result-media-type mime:application/sparql-results) &rest args)
109
     (multiple-value-bind (operation arguments)
110
                          (apply #'receive-message source media-type args)
111
       (apply #'make-script 
112
              :operation operation
113
              :task-id *task-id*
114
              :user-id *user-id*
115
              :repository-id *repository-id*
116
              :revision-id *revision-id*
117
              :request-content-type media-type
118
              :response-content-type *response-content-type*
119
              :dynamic-bindings *dynamic-bindings*
120
              :request-routing-key nil
121
              :request-exchange nil
122
              arguments)))
123
   )
124
 
125
 (defmethod receive-message ((location t) (media-type mime:application/sparql-query+ssl) &rest args)
126
   (declare (ignore args))
127
   (multiple-value-bind (form request-content) (load-ssl-graph location)
128
     (values (find-if #'(lambda (element) (and (symbolp element)
129
                                               (eql (symbol-package element)
130
                                                    (find-package '|http://dydra.com/schema/ssl#|))))
131
                      form)
132
             `(:sse-expression ,form
133
               :request-content ,request-content))))
134
 
135
 (defmethod load-configuration ((source request-processor))
136
   "load a processor's configuration by delegating to whatever it has as a source.
137
    allow specializaion which, eg loads one only"
138
   (load-configuration (processor-configuration-location source)))
139
 
140
 
141
 ;;; the central operator once the evaluation environment has been set up
142
 
143
 (defgeneric pipe-task (source destination source-media-type destination-media-type &rest args)
144
   (:documentation "pipe-task is the generic execution operator for queries, scripts, etc.
145
    given a generic source, it determines what sort of task to create based on the source media type and then
146
    delegates to the respective specialization.")
147
 
148
   (:method ((source t) (destination t) (source-media-type mime:application/sparql) (destination-media-type t) &rest args)
149
     "the base sparql-query method delegates to pipe-query"
150
     (declare (dynamic-extent args))
151
     (apply #'pipe-query source destination
152
            :request-content-type source-media-type
153
            :response-content-type destination-media-type
154
            args))
155
     
156
   (:method ((source t) (destination t) (source-media-type mime:application/sparql-query+ssl) (destination-media-type t) &rest args)
157
     (declare (dynamic-extent args))
158
     (apply #'pipe-task (apply #'decode-task source source-media-type destination-media-type args) destination
159
            source-media-type destination-media-type
160
            args))
161
 
162
   (:method ((task script) (destination t) (source-media-type mime:application/sparql-query+ssl) (destination-media-type t) &rest args)
163
     "given a script, set up the run-time environment and execute it"
164
     (declare (ignore args))
165
     (register-query task)
166
     (add-task-thread task (bt:current-thread))
167
     (restart-bind ((terminate-task (lambda (&optional (task-to-terminate task) condition)
168
                                      (warn "pipe-task: terminate task? : ~a." task)
169
                                      (when (eq task task-to-terminate)
170
                                        (warn "pipe-task: terminate task! : ~a." task)
171
                                        (return-from pipe-task condition)))))
172
       (let ((*task* task))
173
         (let* ((environment ssl::*toplevel-environment*)
174
                (bindings (task-dynamic-bindings task)))
175
           (loop for variable in (first bindings)
176
             for term in (rest bindings)
177
             do (push (cons variable term) environment))
178
           (setf environment (acons '?::|requestContentType| (task-request-content-type task)
179
                                    (acons '?::|responseContentType| (task-response-content-type task)
180
                                           (acons '?::|requestRepository| (repository-id (task-repository task))
181
                                                  environment))))
182
           (with-accounting
183
               (sslr:execute (script-stack task) (script-code task) environment)
184
             (generate-accounting-note :complete))))))
185
   )
186
 
187
 (defgeneric compile-task (task)
188
   (:method ((task query))
189
     (compile-query task))
190
   (:method ((task script))
191
     (compile-script task)))
192
 
193
 (defun compile-script (script)
194
   script)
195
 
196
 ;;; processing loop
197
 
198
 (defun main-query-loop (&optional (args (rest (command-line-argument-list))))
199
   (main-task-loop args))
200
 
201
 
202
 (defun main-task-loop (&optional (args (rest (command-line-argument-list))))
203
   "Repeatedly run a task: decode it from theinput source, execute it and emit results to the output destination.
204
  Any errors optionally cause termination. For each iteration, write a soft-eof to standard output and standard error."
205
   
206
   (when (or (equal args '("-v")) (equal args '("--verbose")))
207
     (format *standard-output* "~{~a ~a~%~}" (build-revisions))
208
     (return-from main-task-loop nil))
209
   (labels ((loop-sigterm-handler (signal code context)
210
              (declare (ignore signal code context))
211
              (stop-threads)
212
              (log-info "Stop: SIGTERM ~a." (iso-time))
213
              (exit-lisp 130))
214
            
215
            (start-task-loop ()
216
              (loop for result = (process-request-toplevel (request-processor))
217
                  do (typecase result
218
                       (condition (return result))
219
                       (task)))))
220
   (with-command-line-configuration (args)
221
     (setq *start-timestamp* (iso-time))
222
     (initialize-spocq :title (getarg "--title"))
223
     (enable-interrupt :sigterm #'loop-sigterm-handler)
224
     (log-info "Start ~a." *start-timestamp*)
225
     (prog1 (run-processing-threads (cons `(start-task-loop ,#'start-task-loop) *thread-specifications*))
226
       (stop-threads)
227
       (log-info "Stop: return ~a." (iso-time))))))
228
 
229
 
230
 (defgeneric process-request-toplevel (processor)
231
   (:documentation "Process a single request task:
232
 - establish toplevel handlers
233
 - assimilate any request settings and arguments into the runtime environment
234
   this includes refreshing the configuration, which could signal an eof, which will be returns and likely terminate the processing
235
 - parse the request body per the content type and instantiate a tesk/query/script instance
236
 - delegate to it for evaluation
237
 - record accounting ")
238
 
239
   (:method ((processor request-processor))
240
     (labels ((handle-error (condition)
241
                (log-stacktrace "process-request-toplevel: Error processing task : ~a -> ~a."
242
                                *task* condition)
243
                (complete-output *standard-output*)
244
                (case *run-state*
245
                  (:process
246
                   (maybe-exit-on-error 70)))
247
                (generate-error-note condition :task (or (error-task condition) *task*))
248
                (terminate-task *task*)
249
                (return-from process-request-toplevel condition))
250
              (guarded-handler (condition)
251
                (handler-case (handle-error condition)
252
                  (serious-condition (c2)
253
                                     (log-error "process-request-toplevel: Reentrant error processing task:~% ~a~% ~a." condition c2)))
254
                (return-from process-request-toplevel condition))
255
              (pipe-task-with-termination (query &rest args)
256
                (declare (dynamic-extent args))
257
                (restart-bind ((terminate-task (lambda (task-to-terminate &optional condition)
258
                                                 ;; (print (list :pipe-query :terminate-task query condition))
259
                                                 (when (eq query task-to-terminate)
260
                                                   (return-from process-request-toplevel condition)))))
261
                  (apply #'pipe-task query args))))
262
 
263
       ;; set up handlers to catch and log stacktraces for most serious conditions,
264
       ;; but let interrupts through, and cause storage conditions to first unwind 
265
       ;; before logging
266
       (handler-case 
267
           (handler-bind (#+sbcl
268
                          (sb-sys:interactive-interrupt
269
                           (lambda (c)
270
                             (when (and (streamp *terminal-io*)
271
                                        (interactive-stream-p *terminal-io*))
272
                               (invoke-debugger c))))
273
                          (storage-condition
274
                           (lambda (c) (signal c)) )    ; unwind it to the handler-case
275
                          (end-of-file
276
                           (lambda (condition)
277
                             (log-info "process-request-toplevel: EOF: return ~a." (iso-time))
278
                             (return-from process-request-toplevel condition)))
279
                          (spocq.e::message-syntax-error
280
                           ;; emit a parse error immediately and (optionally) exit
281
                           (lambda (condition)
282
                             (complete-output *standard-output*)
283
                             (log-warn "responding to message syntax error: ~a" condition)
284
                             (print-error-conditions (list (cons nil condition)) *error-output*)
285
                             (complete-output *error-output*)
286
                             (maybe-exit-on-error 65)
287
                             ;; flush anything else
288
                             (channel-get-all *error-condition-channel*)
289
                             (return-from process-request-toplevel condition)))
290
                          (serious-condition #'guarded-handler))
291
             (let* ((accounting-location (processor-accounting-location processor))
292
                    (error-location (processor-error-location processor))
293
                    ;; rebind the locations, but
294
                    (*request-location* (processor-request-content-location processor))
295
                    (*response-location* (processor-response-content-location processor))
296
                    ;; not the types, to permit configuration values to carry through
297
                    (*request-content-type* (processor-request-content-type processor))
298
                    (*response-content-type* (processor-response-content-type processor))
299
                    (task nil))
300
               ;; rebind the global configuration for each request
301
               (with-configuration ()
302
                 ;; then augument the configuration with that read for the individual request
303
                 (load-configuration processor)
304
                 (validate-configuration)
305
                 (unless *agent*
306
                   (setq *agent* (cond #+(or)
307
                                       (*agent-id*
308
                                        ;; construct the agent. nb. only the first location will count
309
                                        (user *agent-id* :location *agent-location*))
310
                                       ((and *agent-id*
311
                                             ;; construct the agent. nb. only the first location will count
312
                                             (agent-authenticated-by-name *agent-id* :location *agent-location*
313
                                                                          :if-does-not-exist nil)))
314
                                       ((agent-authenticated-by-token (api-key) :location *agent-location*
315
                                                                      :if-does-not-exist nil))
316
                                       (t
317
                                        (ensure-agent :location *agent-location*)))))
318
                 (setq *agent-id* (agent-name *agent*))
319
                 (log-debug "process-request-toplevel: agent: ~a" *agent*)
320
                 ;; once the request configuration has been read, there are two options which interact to
321
                 ;; determine how to process the request:
322
                 ;; -  expression signature : a signature is a hash of the request text.
323
                 ;;   if a locates a task, then that request has already been processed - discard the
324
                 ;;   document currently in the input stream, clone the prototype, and process it.
325
                 ;;   if the signature does not locate a task, parse the stream to create a new prototype,
326
                 ;;   register it and proceed as if it had been known.
327
                 ;; - parameter signature : given parameters, the query is compiled to expect them to be
328
                 ;;   special variables and is executed with them bound to given values. if the parameters
329
                 ;;   locate a query, it can be executed with the respctive values bound. otherwise a new
330
                 ;;   prototype must be created for the new parameters
331
                 ;;
332
                 ;; the two aspects constitute a combined signature : (request-signature . (first dynamic-bindings))
333
                 (prog1                          ; return the processed query uon success
334
                   (cond (*query-signature*
335
                          (let* ((resolved-revision (rdfcache::resolve-repository *repository-id* :revision *revision-id*))
336
                                 (task-prototype (find-query-prototype resolved-revision *query-signature*
337
                                                                        (append *dataset-graphs* (first *dynamic-bindings*)))))
338
                            ;; either the prototype is found, in which case it is cloned and reused
339
                            ;; or none exists and a new one is created
340
                            (cond (task-prototype
341
                                   (setf (task-start-time task-prototype) (get-universal-time)
342
                                         (task-start-run-time task-prototype) (get-internal-run-time)
343
                                         (task-start-real-time task-prototype) (get-internal-real-time))
344
                                   (log-info "process-request-toplevel: named prototype: ~s: ~s: ~s: ~s"
345
                                             (list *repository-id* *query-signature* (first *dynamic-bindings*))
346
                                             task-prototype
347
                                             (rest *dynamic-bindings*)
348
                                             (substitute #\space #.(code-char #o012) (task-request-content task-prototype)))
349
                                   ;; the expression x parameters combination already exists
350
                                   ;; discard the query text
351
                                   (read-stream *request-location*)
352
                                   ;; reuse the query prototype, but replace the with that from the new request
353
                                   ;; modulo that aspects which are compiled-in
354
                                   (assert-prototype-metadata (instance-metadata task-prototype))
355
                                   ;; but override things which should bet configuered for the new request
356
                                   (setf task (clone-instance task-prototype
357
                                                              :dynamic-bindings *dynamic-bindings*
358
                                                              :dataset-graphs (or *dataset-graphs* (task-dataset-graphs task-prototype))
359
                                                              :metadata *metadata*
360
                                                              :repository-id *repository-id*
361
                                                              :revision-id resolved-revision
362
                                                              :response-content-type *response-content-type*
363
                                                              :task-id *task-id*
364
                                                              :user-id *user-id*
365
                                                              :accounting-destination accounting-location
366
                                                              :error-destination error-location
367
                                                              ))
368
                                   (generate-accounting-note :parse :task task))
369
                                  (t
370
                                   (setf task-prototype (decode-task *request-location* *request-content-type* *response-content-type*))
371
                                   (generate-accounting-note :parse :task task-prototype)
372
                                   (log-info "process-request-toplevel: new prototype: ~s: ~s: ~s: ~s"
373
                                             (list *repository-id* *query-signature* (first *dynamic-bindings*))
374
                                             task-prototype
375
                                             *dynamic-bindings*
376
                                             (substitute #\space #.(code-char #o012) (task-request-content task-prototype)))
377
                                   (cond ((operation-read-only-p task-prototype)
378
                                          (log-debug "process-request-toplevel: pre-compiling prototype: ~s" task-prototype)
379
                                          (with-task-environment (:task task-prototype)
380
                                            (compile-task task-prototype)))
381
                                         (t
382
                                          (log-debug "process-request-toplevel: not compiling prototype: ~s" task-prototype)))
383
                                   (setf (find-query-prototype resolved-revision *query-signature*
384
                                                               (append *dataset-graphs* (first *dynamic-bindings*)))
385
                                         task-prototype)
386
                                   (setf task (clone-instance task-prototype
387
                                                              :task-id *task-id*
388
                                                              :user-id *user-id*
389
                                                              :repository-id *repository-id*
390
                                                              :revision-id resolved-revision
391
                                                              :accounting-destination accounting-location
392
                                                              :error-destination error-location))))
393
                            (pipe-task-with-termination task *response-location*
394
                                                        *request-content-type* *response-content-type*)))
395
                         (t ;; if no signature was provided, don't track
396
                          (log-info "process-request-toplevel: untracked: ~s: ~s: ~s: ~s"
397
                                    *request-location* *response-location*
398
                                    *request-content-type* *response-content-type*)
399
                          (setf task
400
                                (pipe-task-with-termination *request-location* *response-location*
401
                                                            *request-content-type* *response-content-type*
402
                                                            :dataset-graphs *dataset-graphs*
403
                                                            :dynamic-bindings *dynamic-bindings*
404
                                                            :metadata *metadata*
405
                                                            :repository-id *repository-id*
406
                                                            :request-content-type *request-content-type*
407
                                                            :response-content-type *response-content-type*
408
                                                            :revision-id *revision-id*
409
                                                            :task-id *task-id*
410
                                                            :user-id *user-id*
411
                                                            :accounting-destination accounting-location
412
                                                            :error-destination error-location))))
413
                   
414
                   (complete-output *response-location*)
415
                   ;;(log-debug "process-request-toplevel: complete ~s: ~s." (task-id task) *run-state*)
416
                   (log-debug "process-request-toplevel: complete ~s: ~s." task *run-state*)
417
                   task))))
418
         (storage-condition (c) (guarded-handler c))))))
419