Coverage report: /development/source/library/org/datagraph/spocq-shard/src/spocq-server/asynchronous-operations.lisp

KindCoveredAll%
expression0347 0.0
branch012 0.0
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
2
 
3
 (in-package :org.datagraph.spocq.server.implementation)
4
 
5
 #|
6
 perform asynchronous tasks
7
 
8
 imports:
9
 the directory /srv/dydra/runtime/imports/ stages imports datasets.
10
 if the request is synchronous, the staged file is consumed and deleted.
11
 if the request is asynchronous, an entry is added to /srv/dydra/runtime/imports/queue/
12
 to describe the request.
13
 
14
 queries:
15
 each repository directory includes a "views" directors,
16
 after each transaction, these are executed for the respctive repository.
17
 
18
 
19
 the asynchronous process retrieves each task request entry in turn, performs the task and
20
 sends a notification to the indicated endpoint.
21
 the "imports" characterization stems from the initial use case, which was long-running imports.
22
 in the cases where the upload fit withnt the request timeout, but the import did not, this permitted
23
 the longer-running imports to be performed through remote requests.
24
 
25
 the background asynchronous process is a service which runs the spocq-listener via the spocq-async.sh bash script.
26
 that establishes a server run-time and then invokes the asynchronous loop wth an iteration limt.
27
 when the limit is reached, it exits and permits a new version to start.
28
 the restart is to avoid memory exhaustion issues which may arise with resource-intensive materialization queries.
29
 
30
 http://httpbin.org
31
 $ pip install httpbin
32
 $ gunicorn httpbin:app
33
 
34
 (spocq.i::make-processing-thread 'asynchronous-task-loop
35
                                    :name "asynchronous-task-loop"
36
                                    :initial-bindings (acons '*task* nil
37
                                                             (acons '*thread-name*  "asynchronous toplevel" nil)))
38
 
39
 runs as spocq-async.sh
40
 
41
 (let* ((host-name (dydra:server-host-name))                                                                                
42
        (host-package (or (find-package host-name)                                                                          
43
                          (make-package host-name :use ()))))                                                               
44
   (import *response-functions* host-package)                                                                               
45
   (with-package-iterator (next host-package :internal)                                                                     
46
     (loop (multiple-value-bind (symbol-p symbol) (next)                                                                    
47
             (unless symbol-p (return))                                                                                     
48
             (export symbol host-package))))                                                                                
49
   (let* ((hunchentoot:*acceptor*                                                                                           
50
           (make-instance 'spocq-acceptor                                                                                   
51
             :name "spocq"                                                                                                  
52
             :port *host-port* :address (spocq.i::host-name)                                                                
53
             :request-class *service-request-class*                                                                         
54
             :response-class *service-response-class*                                                                       
55
             :taskmaster (make-instance *class.taskmaster*                                                                  
56
                           :name (concatenate 'string (spocq.i::host-name) "-taskmaster"))))                                
57
          (*spocq-acceptor* hunchentoot:*acceptor*)                                                                         
58
          (*history-directory* #p"/srv/dydra/history/"))                                                                    
59
     (setf (http:acceptor-dispatch-function *spocq-acceptor*) host-package)                                                 
60
     ;;; process queued requests                                                                                            
61
     (spocq.si::asynchronous-task-loop :limit 10)))                                                                         
62
 
63
 
64
 |#
65
 
66
 (defparameter *asynchronous-delete-task-entries* t)
67
 (defparameter *asynchronous-notify-timeout* 10)
68
 (defparameter *asynchronous-task-entry* nil)
69
 
70
 (defun asynchronous-task-loop (&key (limit nil))
71
   (loop for count from 1
72
     until (or (eq *run-state* :terminate) (and limit (> count limit)))
73
     do ;; for each stored request, run it as a request and optionally notify
74
     (let ((task-entries (sort (remove-if-not #'spocq.i::is-uuid-string
75
                                                (directory (merge-pathnames (make-pathname :name :wild)
76
                                                                            (spocq.i::import-queue-root-pathname)))
77
                                                :key #'pathname-name)
78
                                 #'<
79
                                 :key #'file-write-date)))
80
       (when task-entries (dydra:log-notice "asynchronous requests: ~a" task-entries))
81
       (loop for *asynchronous-task-entry* in task-entries
82
         for output-pathname = (response-history-pathname (pathname-name *asynchronous-task-entry*))
83
         do (ensure-directories-exist output-pathname)
84
         collect (handler-case (process-asynchronous-task-entry *spocq-acceptor*
85
                                                                *asynchronous-task-entry*
86
                                                                output-pathname :if-exists :supersede)
87
                   (error (c)
88
                     (dydra:log-error "asynchronous request signaled: ~a:  ~a"
89
                                      *asynchronous-task-entry*
90
                                      c)
91
                     (dydra:log-stacktrace "Runtime error in query request: ~a." c)
92
                     c)))
93
       (sleep spocq.i::*processing-thread-interval*))))
94
 
95
 (defgeneric process-asynchronous-task-entry (acceptor source destination &rest args)
96
   (:documentation "Given an import task entry - either as a pathname or as the respective
97
    request headers, execute the import and send the notification to the indicated
98
    location. The response content is a rdf result field with the operation status the
99
    encoding is as specified as the Asynchronous-Content-Type header.")
100
   (:method ((acceptor t) (source pathname) (destination pathname) &rest args)
101
     (multiple-value-prog1
102
         (with-open-file (source-stream source :direction :input :element-type :default)
103
           (apply #'process-asynchronous-task-entry acceptor source-stream destination args))
104
       ;;; !!! this should be done by the caller
105
       (when *asynchronous-delete-task-entries*
106
         (conditional-delete-file source))))
107
 
108
   (:method ((acceptor t) (source stream) (destination pathname) &rest args)
109
     (dydra:log-notice "asynchronous request: initiate: ~a ~a" source destination)
110
     (let ((processed nil))
111
       (multiple-value-bind (request response)
112
                            (apply #'hunchentoot::process-asynchronous-connection acceptor source destination args)
113
         (setf processed t)
114
         (let ((asynchronous-end-point (http:request-header request "Asynchronous-Location")))
115
           (cond (asynchronous-end-point
116
                  (let* ((length (with-open-file (stream destination :direction :input) (file-length stream)))
117
                         (asynchronous-authorization (or (http:request-header request "Asynchronous-Authorization")
118
                                                         (http:request-header request "Authorization")))
119
                         (asynchronous-method (intern (string (or (http:request-header request "Asynchronous-Method") :post)) :keyword))
120
                         (asynchronous-content-type (or (http:request-header request "Asynchronous-Content-Type") "text/turtle")))
121
                    (dydra:log-notice "asynchronous request: notify: ~a (~a ~a): ~a -> ~a '~a': ~a"
122
                                      asynchronous-end-point asynchronous-method asynchronous-content-type
123
                                      source destination
124
                                      (request-client-request-id request)
125
                                      response)
126
                    (handler-case
127
                        (bt:with-timeout (*asynchronous-notify-timeout*)
128
                          ;; skip the response headers
129
                          (with-open-file (destination-stream destination :external-format :default
130
                                                              :element-type '(unsigned-byte 8))
131
                            (loop for code = (read-byte destination-stream nil nil)
132
                              until (and (eql code (load-time-value (char-code #\linefeed)))
133
                                         (eql (read-byte destination-stream nil nil) (load-time-value (char-code #\return)))
134
                                         (eql (read-byte destination-stream nil nil) (load-time-value (char-code #\linefeed)))))
135
                            (multiple-value-bind (response-content status)
136
                                                 (tbnl::with-open-request-stream (asynchronous-stream asynchronous-end-point
137
                                                                                                      :content-length (- length (file-position destination-stream))
138
                                                                                                      :method asynchronous-method
139
                                                                                                      :accept "*/*"
140
                                                                                                      :content-type asynchronous-content-type
141
                                                                                                      :additional-headers (when asynchronous-authorization
142
                                                                                                                            (acons "Authorization"
143
                                                                                                                                   asynchronous-authorization
144
                                                                                                                                   ())))
145
                                                   (http:copy-stream destination-stream asynchronous-stream))
146
                              (unless (stringp response-content)
147
                                (setf response-content (map 'string #'code-char response-content)))
148
                              (dydra:log-notice "asynchronous request: notify status ~a: ~a -> ~a '~a': ~a ~a"
149
                                                asynchronous-end-point source destination
150
                                                (request-client-request-id request)
151
                                                status response-content)
152
                              (cond ((and *asynchronous-delete-task-entries* (< status 400))
153
                                     (conditional-delete-file destination))
154
                                    (t
155
                                     (dydra:log-notice "asynchronous request: retained")))
156
                              (values status response-content))))
157
                      (sb-ext:timeout (c) (dydra:log-warn "asynchronous request timed out: ~a: ~a" source c)))))
158
                 (t
159
                  (dydra:log-notice "asynchronous request: ~a: ~a"
160
                                    source response)
161
                  response)))))))
162
 
163
 (defmethod process-asynchronous-task ((request string)) 
164
   "construct a runtime context for http, execute the task, return t if it completes successfully"
165
   (with-input-from-string (stream request)
166
     (let* ((host-name (dydra:server-host-name))
167
            ;; presumes a running server
168
            (host-package (or (find-package host-name)
169
                              (make-package host-name :use ())))
170
            (hunchentoot:*acceptor*                                                                        
171
             (make-instance 'spocq-acceptor
172
             :name "spocq"
173
             :port *host-port* :address (spocq.i::host-name)
174
             :request-class *service-request-class*
175
             :response-class *service-response-class*
176
             :taskmaster (make-instance *class.taskmaster*
177
                           :name (concatenate 'string (spocq.i::host-name) "-taskmaster"))))
178
            (*spocq-acceptor* hunchentoot:*acceptor*)
179
            (*history-directory* #p"/srv/dydra/history/")
180
            (tmp-pathname (tmp-import-pathname "system" "system")))
181
       (setf (http:acceptor-dispatch-function *spocq-acceptor*) host-package)
182
       ;;; process queued requests
183
       (multiple-value-bind (status response-content)
184
                            (process-asynchronous-task-entry *spocq-acceptor* stream tmp-pathname)
185
         (declare (ignore response-content))
186
         (< status 400)))))
187
   
188
 
189
 #+(or)  ;;; simplified to reuse process-asynchronous-task-entry
190
 (defgeneric process-asynchronous-task (method resource content-type headers)
191
   (:documentation "Given an import task entry - either as a pathname or as the respective
192
    request headers, execute the import and send the notification to the indicated
193
    location. Thecontent is a sparql result field with a ssingle solution and the
194
    encoding is as specified in the original request.")
195
   (:method ((method string) (resource t) (content-type t) headers)
196
     (process-asynchronous-task (or (find method http:+method-keys+ :test #'string-equal)
197
                                    (error "process-asynchronous-task: invalid method: ~s" method))
198
                                resource
199
                                content-type
200
                                headers))
201
   (:method ((method symbol) (resource t) (content-type string) headers)
202
     (process-asynchronous-task method location
203
                                (or (media-type content-type)
204
                                    (log-warn "process-asynchronous-task-entry: invalid content type present: ~s ~s ~s . ~s"
205
                                              method resource content-type headers))
206
                                headers))
207
   (:method ((method symbol) (resource string) (content-type mime:rdf) headers)
208
     "Treat rdf content as an import"
209
     (process-asynchronous-import method resource content-type headers)
210
     )
211
 
212
   (:method ((method symbol) (resource string) (content-type mime:query) headers)
213
     "Treat any query form as a query request"
214
     (process-asynchronous-query method resource content-type headers)))
215
 
216
 #+(or)
217
 (defgeneric process-asynchronous-query (method resource content-type headers)
218
   (:method ((method symbol) (repository-id string) (content-type mime:sparql) headers)
219
     "given an asynchronous get request, take the query expression from the request url,
220
     set up the execution environment (handlers, agent, output stream, header context, ...) and
221
     execute the query as if it were a synchronous request, emitting the response to the
222
     asynchronous destination."
223
     (let* ((location (http:request-header headers :location))
224
            (query-string (read-file location))
225
            (asynchronous-end-point (or (http:request-header headers "Asynchronous-Location")
226
                                        (http:request-header headers "Asynchronous-End-Point")))
227
            (asynchronous-method (http:request-header headers "Asynchronous-Method"))
228
            (client-request-id (http:request-header headers "Client-Request-ID"))
229
            (state :success))
230
       (handler-case
231
           (spocq.i::with-http-request-stream (request-stream response-stream  asynchronous-end-point
232
                                                              :method asynchronous-method
233
                                                              :content-type content-type)
234
             (let ((tag (dydra:resolve-repository-revision-id repository))
235
                   (resource )
236
                   (output-stream (make-instance 'http:output-stream :real-stream response-stream))
237
                   (http:*response* (http:make-response *spocq-acceptor*
238
                                                        :server-protocol "HTTP/1.1"
239
                                                        ;; create the output stream which supports character output for the headers
240
                                                        ;; with the initial character encoding set to ascii
241
                                                        :content-stream response-stream))
242
                   (http:*request* )
243
            )
244
             (setf (http:response-header response :tag) tag)
245
             (graph-store-query resource query-string request response content-type asynchronous-content-type)
246
             (log-notice "process-asynchronous-query: status: ~s"))
247
         (error (c)
248
             (log-warn "process-asynchronous-query: request failed: ~s" c)))))))
249
 
250
 #+(or)
251
 (defgeneric process-asynchronous-import (method resource content-type headers)
252
   (:method ((method symbol) (repository-id string) (content-type mime:rdf) headers)
253
     (let ((location (http:request-header headers :location))
254
           (asynchronous-end-point (or (http:request-header headers "Asynchronous-Location")
255
                                       (http:request-header headers "Asynchronous-End-Point")))
256
           (asynchronous-content-type (http:request-header headers "Asynchronous-Content-Type"))
257
           (asynchronous-method (http:request-header headers "Asynchronous-Method"))
258
           (graph-name (http:request-header headers :graph))
259
           (client-request-id (http:request-header headers "Client-Request-ID"))
260
           (state :success)
261
           (condition nil)
262
           )
263
       (handler-case
264
           (ecase method
265
             (:patch (repository-patch-graph-content (repository repository-id) location
266
                                                     :context graph-name
267
                                                     :content-type content-type))
268
             (:post (repository-post-graph-content (repository repository-id) location
269
                                                   :context graph-name
270
                                                   :content-type content-type))
271
             (:put (repository-put-graph-content (repository repository-id) location
272
                                                 :context graph-name
273
                                                 :content-type content-type)))
274
         (error (c) (setf state :failed
275
                          condition c)))
276
       (let* ((tag (dydra:resolve-repository-revision-id repository))
277
              (message  (case state
278
                          (:success "")
279
                          (:failed (format nil "~a" condition))))
280
              (response-content
281
               (cons `((?::|status| ?::|ETag| ?::|Client-Request-ID| ?::|message|)
282
                       (,state  ,tag ,client-request-id ,message)))))
283
         (spocq.i::with-http-request-stream (request-stream response-stream  asynchronous-end-point
284
                                                   :method asynchronous-method
285
                                                   :content-type asynchronous-content-type)
286
           (send-request-headers request-stream headers)
287
           (spocq.i::send-response-message :log response-content request-stream asynchronous-content-type))))))
288
 
289
 
290
 ;;; notification support
291
 
292
 (defparameter *asynchronous-monitors*
293
   (make-hash-table :test 'eql))
294
 
295
 (defvar *notifyer* nil)
296
 
297
 (defun asynchronous-notification-monitor ()
298
   "collect all registered views and start a file system monitor for each respective lmdb file.
299
   upon receipt of a modify event, run each of the views as an asyschronous task."
300
   (setq *notifyer* (cl-inotify:make-inotify))
301
   (loop for view-directory in (directory (merge-pathnames (make-pathname :directory '(:relative "repositories" :wild "views") :name :wild)
302
                                                           *catalog-root-pathname*))
303
     for repository-pathname = (make-pathname :directory (butlast (pathname-directory view-directory)))
304
     for count from 1
305
     do (setf (gethash count *asynchronous-monitors*)
306
              (make-instance 'asynchronous-monitor
307
                :id count
308
                :repository-pathname repository-pathname
309
                :view-files (directory (merge-pathnames (make-pathname :name :wild) view-directory)))))
310
   (loop until (eq *run-state* :terminate)
311
     do (loop for event in (cl-inotify:next-events *notifyer*)
312
          for id = (cl-inotify:inotify-event-wd event)
313
          for monitor = (gethash id *asynchronous-monitors*)
314
          do (loop for *asynchronous-task-entry* in (directory (asynchronous-monitor-view-files monitor))
315
               for output-pathname = #p"/dev/null"
316
               do (handler-case
317
                      (process-asynchronous-task-entry *spocq-acceptor*
318
                                                       *asynchronous-task-entry*
319
                                                       output-pathname :if-exists :supersede)
320
                    (error (c)
321
                           (dydra:log-error "asynchronous view signaled: ~a:  ~a"
322
                                            *asynchronous-task-entry*
323
                                            c)
324
                           (dydra:log-stacktrace "Runtime error in view request: ~a." c)
325
                           c))))))
326
           
327
 
328
 (defclass asynchronous-monitor ()
329
   ((id
330
     :initarg :id
331
     :reader asynchronous-monitor-id)
332
    (repository-pathname
333
     :initarg :pathname :initform (error "pathname is required")
334
     :reader asynchronous-monitor-repository-pathname)
335
    (view-files
336
     :initarg :view-files :initform (error "view-files is required")
337
     :reader asynchronous-monitor-view-files)
338
    (mdb-pathname
339
     :initarg :mdb-pathname :initform (error "view-pathname is required")
340
     :reader asynchronous-monitor-mdb-pathname)))
341
 
342
 (defmethod initialize-instance ((instance asynchronous-monitor) &rest initargs
343
                                 &key repository-pathname)
344
   (let ((mdb-pathname (merge-pathnames spocq.i::*lmdb-filename* repository-pathname)))
345
     (apply #'call-next-method instance
346
            :mdb-pathname mdb-pathname
347
            initargs)
348
     (cl-inotify:watch *notifyer* mdb-pathname :modify)))
349
 
350
 
351
 (defun spocq-acceptor ()
352
   (or *spocq-acceptor*
353
       (let* ((host-name (dydra:server-host-name))                                                                             
354
             (host-package (or (find-package host-name)
355
                               (let ((host-package (make-package host-name :use ())))
356
                                 (import *response-functions* host-package)
357
                                 (with-package-iterator (next host-package :internal)                                                                   
358
                                   (loop (multiple-value-bind (symbol-p symbol) (next)
359
                                           (unless symbol-p (return))
360
                                           (export symbol host-package))))
361
                                 host-package)))
362
             (acceptor (make-instance 'spocq-acceptor
363
                         :name "spocq"
364
                         :port *host-port* :address (spocq.i::host-name)
365
                         :request-class *service-request-class*
366
                         :response-class *service-response-class*
367
                         :taskmaster (make-instance *class.taskmaster*
368
                                       :name (concatenate 'string (spocq.i::host-name) "-taskmaster")))))
369
         (setf (http:acceptor-dispatch-function acceptor) host-package)
370
         (setf *spocq-acceptor* acceptor))))
371
 
372
             
373
 (defgeneric test-sparql-request (resource method content headers destination)
374
   (:method ((resource string) method content headers (destination stream))
375
     (let* ((*spocq-acceptor* (spocq-acceptor))
376
            (hunchentoot:*acceptor* *spocq-acceptor*)
377
            (crlf (map 'string #'code-char CHUNGA::+CRLF+))
378
            (input (format nil "~a ~a HTTP/1.1~a~{~{~a: ~a~a~}~}~a~a~a"
379
                           method resource crlf
380
                           (loop for header in headers collect (append header (list crlf))) crlf
381
                           content crlf)))
382
       ;;(print input)
383
       (let ((source (make-instance 'de.setf.utility.implementation::vector-input-stream :vector input))
384
             (spocq.i::*agent* (spocq.i::system-agent)))
385
         (hunchentoot::process-asynchronous-connection hunchentoot:*acceptor* source destination)))))
386
 
387
 #+(or)
388
 (
389
 (defmethod sb-gray:stream-listen ((stream DE.SETF.UTILITY.IMPLEMENTATION::VECTOR-INPUT-STREAM))
390
   (not (de.setf.utility.implementation::stream-eofp stream)))
391
 
392
 (defparameter *q* "
393
 #BASE <https://www.not-datahub.nexperia.com/nexperia/>
394
 PREFIX nxp: <http://purl.org/nxp/schema/v1/>
395
 PREFIX plm: <http://www.data.nexperia.com/def/plm/>
396
 
397
 select *
398
 WHERE {
399
   SERVICE <test> {
400
     ?s ?p 1
401
   }
402
 }
403
 ")
404
 
405
 (test-sparql-request "http://nl10.dydra.com/james/test/sparql?base_iri=https://localhost/james/"
406
                      :post *q*
407
                      '(("Accept" "application/VND.DYDRA.SPARQL-QUERY") ("Content-Type" "application/sparql-query"))
408
                      *trace-output*)
409
 
410
 (test-sparql-request "http://nl10.dydra.com/james/test/sparql"
411
                      :post *q*
412
                      '(("Accept" "application/VND.DYDRA.SPARQL-QUERY")
413
                        ("Content-Type" "application/sparql-query")
414
                        ("Base-IRI" "https://localhost/jamestoo/"))
415
                      *trace-output*)
416
 
417
 (test-sparql-request "http://nl10.dydra.com/james/test/sparql?base_iri=https://localhost/james/"
418
                      :post *q*
419
                      '(("Accept" "application/sparql-results+json") ("Content-Type" "application/sparql-query"))
420
                      *trace-output*)
421
 
422
 (http:copy-stream (make-instance 'de.setf.utility.implementation::vector-input-stream :vector "qwertz") *trace-output*)
423
 (http:copy-stream (make-instance 'de.setf.utility.implementation::vector-input-stream :vector "qwertz")
424
                   (make-array 10 :element-type 'character :adjustable t :fill-pointer 0))
425
 
426
 (loop with stream = (make-instance 'de.setf.utility.implementation::vector-input-stream :vector "qwertz")
427
   for byte = (read-byte stream)
428
   for i below 10
429
   until (not (integerp byte)) do (print byte))
430
 )