Coverage report: /development/source/library/org/datagraph/spocq-shard/src/spocq-server/asynchronous-operations.lisp
| Kind | Covered | All | % |
| expression | 0 | 347 | 0.0 |
| branch | 0 | 12 | 0.0 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
3
(in-package :org.datagraph.spocq.server.implementation)
6
perform asynchronous tasks
9
the directory /srv/dydra/runtime/imports/ stages imports datasets.
10
if the request is synchronous, the staged file is consumed and deleted.
11
if the request is asynchronous, an entry is added to /srv/dydra/runtime/imports/queue/
12
to describe the request.
15
each repository directory includes a "views" directors,
16
after each transaction, these are executed for the respctive repository.
19
the asynchronous process retrieves each task request entry in turn, performs the task and
20
sends a notification to the indicated endpoint.
21
the "imports" characterization stems from the initial use case, which was long-running imports.
22
in the cases where the upload fit withnt the request timeout, but the import did not, this permitted
23
the longer-running imports to be performed through remote requests.
25
the background asynchronous process is a service which runs the spocq-listener via the spocq-async.sh bash script.
26
that establishes a server run-time and then invokes the asynchronous loop wth an iteration limt.
27
when the limit is reached, it exits and permits a new version to start.
28
the restart is to avoid memory exhaustion issues which may arise with resource-intensive materialization queries.
32
$ gunicorn httpbin:app
34
(spocq.i::make-processing-thread 'asynchronous-task-loop
35
:name "asynchronous-task-loop"
36
:initial-bindings (acons '*task* nil
37
(acons '*thread-name* "asynchronous toplevel" nil)))
39
runs as spocq-async.sh
41
(let* ((host-name (dydra:server-host-name))
42
(host-package (or (find-package host-name)
43
(make-package host-name :use ()))))
44
(import *response-functions* host-package)
45
(with-package-iterator (next host-package :internal)
46
(loop (multiple-value-bind (symbol-p symbol) (next)
47
(unless symbol-p (return))
48
(export symbol host-package))))
49
(let* ((hunchentoot:*acceptor*
50
(make-instance 'spocq-acceptor
52
:port *host-port* :address (spocq.i::host-name)
53
:request-class *service-request-class*
54
:response-class *service-response-class*
55
:taskmaster (make-instance *class.taskmaster*
56
:name (concatenate 'string (spocq.i::host-name) "-taskmaster"))))
57
(*spocq-acceptor* hunchentoot:*acceptor*)
58
(*history-directory* #p"/srv/dydra/history/"))
59
(setf (http:acceptor-dispatch-function *spocq-acceptor*) host-package)
60
;;; process queued requests
61
(spocq.si::asynchronous-task-loop :limit 10)))
66
(defparameter *asynchronous-delete-task-entries* t)
67
(defparameter *asynchronous-notify-timeout* 10)
68
(defparameter *asynchronous-task-entry* nil)
70
(defun asynchronous-task-loop (&key (limit nil))
71
(loop for count from 1
72
until (or (eq *run-state* :terminate) (and limit (> count limit)))
73
do ;; for each stored request, run it as a request and optionally notify
74
(let ((task-entries (sort (remove-if-not #'spocq.i::is-uuid-string
75
(directory (merge-pathnames (make-pathname :name :wild)
76
(spocq.i::import-queue-root-pathname)))
79
:key #'file-write-date)))
80
(when task-entries (dydra:log-notice "asynchronous requests: ~a" task-entries))
81
(loop for *asynchronous-task-entry* in task-entries
82
for output-pathname = (response-history-pathname (pathname-name *asynchronous-task-entry*))
83
do (ensure-directories-exist output-pathname)
84
collect (handler-case (process-asynchronous-task-entry *spocq-acceptor*
85
*asynchronous-task-entry*
86
output-pathname :if-exists :supersede)
88
(dydra:log-error "asynchronous request signaled: ~a: ~a"
89
*asynchronous-task-entry*
91
(dydra:log-stacktrace "Runtime error in query request: ~a." c)
93
(sleep spocq.i::*processing-thread-interval*))))
95
(defgeneric process-asynchronous-task-entry (acceptor source destination &rest args)
96
(:documentation "Given an import task entry - either as a pathname or as the respective
97
request headers, execute the import and send the notification to the indicated
98
location. The response content is a rdf result field with the operation status the
99
encoding is as specified as the Asynchronous-Content-Type header.")
100
(:method ((acceptor t) (source pathname) (destination pathname) &rest args)
101
(multiple-value-prog1
102
(with-open-file (source-stream source :direction :input :element-type :default)
103
(apply #'process-asynchronous-task-entry acceptor source-stream destination args))
104
;;; !!! this should be done by the caller
105
(when *asynchronous-delete-task-entries*
106
(conditional-delete-file source))))
108
(:method ((acceptor t) (source stream) (destination pathname) &rest args)
109
(dydra:log-notice "asynchronous request: initiate: ~a ~a" source destination)
110
(let ((processed nil))
111
(multiple-value-bind (request response)
112
(apply #'hunchentoot::process-asynchronous-connection acceptor source destination args)
114
(let ((asynchronous-end-point (http:request-header request "Asynchronous-Location")))
115
(cond (asynchronous-end-point
116
(let* ((length (with-open-file (stream destination :direction :input) (file-length stream)))
117
(asynchronous-authorization (or (http:request-header request "Asynchronous-Authorization")
118
(http:request-header request "Authorization")))
119
(asynchronous-method (intern (string (or (http:request-header request "Asynchronous-Method") :post)) :keyword))
120
(asynchronous-content-type (or (http:request-header request "Asynchronous-Content-Type") "text/turtle")))
121
(dydra:log-notice "asynchronous request: notify: ~a (~a ~a): ~a -> ~a '~a': ~a"
122
asynchronous-end-point asynchronous-method asynchronous-content-type
124
(request-client-request-id request)
127
(bt:with-timeout (*asynchronous-notify-timeout*)
128
;; skip the response headers
129
(with-open-file (destination-stream destination :external-format :default
130
:element-type '(unsigned-byte 8))
131
(loop for code = (read-byte destination-stream nil nil)
132
until (and (eql code (load-time-value (char-code #\linefeed)))
133
(eql (read-byte destination-stream nil nil) (load-time-value (char-code #\return)))
134
(eql (read-byte destination-stream nil nil) (load-time-value (char-code #\linefeed)))))
135
(multiple-value-bind (response-content status)
136
(tbnl::with-open-request-stream (asynchronous-stream asynchronous-end-point
137
:content-length (- length (file-position destination-stream))
138
:method asynchronous-method
140
:content-type asynchronous-content-type
141
:additional-headers (when asynchronous-authorization
142
(acons "Authorization"
143
asynchronous-authorization
145
(http:copy-stream destination-stream asynchronous-stream))
146
(unless (stringp response-content)
147
(setf response-content (map 'string #'code-char response-content)))
148
(dydra:log-notice "asynchronous request: notify status ~a: ~a -> ~a '~a': ~a ~a"
149
asynchronous-end-point source destination
150
(request-client-request-id request)
151
status response-content)
152
(cond ((and *asynchronous-delete-task-entries* (< status 400))
153
(conditional-delete-file destination))
155
(dydra:log-notice "asynchronous request: retained")))
156
(values status response-content))))
157
(sb-ext:timeout (c) (dydra:log-warn "asynchronous request timed out: ~a: ~a" source c)))))
159
(dydra:log-notice "asynchronous request: ~a: ~a"
163
(defmethod process-asynchronous-task ((request string))
164
"construct a runtime context for http, execute the task, return t if it completes successfully"
165
(with-input-from-string (stream request)
166
(let* ((host-name (dydra:server-host-name))
167
;; presumes a running server
168
(host-package (or (find-package host-name)
169
(make-package host-name :use ())))
170
(hunchentoot:*acceptor*
171
(make-instance 'spocq-acceptor
173
:port *host-port* :address (spocq.i::host-name)
174
:request-class *service-request-class*
175
:response-class *service-response-class*
176
:taskmaster (make-instance *class.taskmaster*
177
:name (concatenate 'string (spocq.i::host-name) "-taskmaster"))))
178
(*spocq-acceptor* hunchentoot:*acceptor*)
179
(*history-directory* #p"/srv/dydra/history/")
180
(tmp-pathname (tmp-import-pathname "system" "system")))
181
(setf (http:acceptor-dispatch-function *spocq-acceptor*) host-package)
182
;;; process queued requests
183
(multiple-value-bind (status response-content)
184
(process-asynchronous-task-entry *spocq-acceptor* stream tmp-pathname)
185
(declare (ignore response-content))
189
#+(or) ;;; simplified to reuse process-asynchronous-task-entry
190
(defgeneric process-asynchronous-task (method resource content-type headers)
191
(:documentation "Given an import task entry - either as a pathname or as the respective
192
request headers, execute the import and send the notification to the indicated
193
location. Thecontent is a sparql result field with a ssingle solution and the
194
encoding is as specified in the original request.")
195
(:method ((method string) (resource t) (content-type t) headers)
196
(process-asynchronous-task (or (find method http:+method-keys+ :test #'string-equal)
197
(error "process-asynchronous-task: invalid method: ~s" method))
201
(:method ((method symbol) (resource t) (content-type string) headers)
202
(process-asynchronous-task method location
203
(or (media-type content-type)
204
(log-warn "process-asynchronous-task-entry: invalid content type present: ~s ~s ~s . ~s"
205
method resource content-type headers))
207
(:method ((method symbol) (resource string) (content-type mime:rdf) headers)
208
"Treat rdf content as an import"
209
(process-asynchronous-import method resource content-type headers)
212
(:method ((method symbol) (resource string) (content-type mime:query) headers)
213
"Treat any query form as a query request"
214
(process-asynchronous-query method resource content-type headers)))
217
(defgeneric process-asynchronous-query (method resource content-type headers)
218
(:method ((method symbol) (repository-id string) (content-type mime:sparql) headers)
219
"given an asynchronous get request, take the query expression from the request url,
220
set up the execution environment (handlers, agent, output stream, header context, ...) and
221
execute the query as if it were a synchronous request, emitting the response to the
222
asynchronous destination."
223
(let* ((location (http:request-header headers :location))
224
(query-string (read-file location))
225
(asynchronous-end-point (or (http:request-header headers "Asynchronous-Location")
226
(http:request-header headers "Asynchronous-End-Point")))
227
(asynchronous-method (http:request-header headers "Asynchronous-Method"))
228
(client-request-id (http:request-header headers "Client-Request-ID"))
231
(spocq.i::with-http-request-stream (request-stream response-stream asynchronous-end-point
232
:method asynchronous-method
233
:content-type content-type)
234
(let ((tag (dydra:resolve-repository-revision-id repository))
236
(output-stream (make-instance 'http:output-stream :real-stream response-stream))
237
(http:*response* (http:make-response *spocq-acceptor*
238
:server-protocol "HTTP/1.1"
239
;; create the output stream which supports character output for the headers
240
;; with the initial character encoding set to ascii
241
:content-stream response-stream))
244
(setf (http:response-header response :tag) tag)
245
(graph-store-query resource query-string request response content-type asynchronous-content-type)
246
(log-notice "process-asynchronous-query: status: ~s"))
248
(log-warn "process-asynchronous-query: request failed: ~s" c)))))))
251
(defgeneric process-asynchronous-import (method resource content-type headers)
252
(:method ((method symbol) (repository-id string) (content-type mime:rdf) headers)
253
(let ((location (http:request-header headers :location))
254
(asynchronous-end-point (or (http:request-header headers "Asynchronous-Location")
255
(http:request-header headers "Asynchronous-End-Point")))
256
(asynchronous-content-type (http:request-header headers "Asynchronous-Content-Type"))
257
(asynchronous-method (http:request-header headers "Asynchronous-Method"))
258
(graph-name (http:request-header headers :graph))
259
(client-request-id (http:request-header headers "Client-Request-ID"))
265
(:patch (repository-patch-graph-content (repository repository-id) location
267
:content-type content-type))
268
(:post (repository-post-graph-content (repository repository-id) location
270
:content-type content-type))
271
(:put (repository-put-graph-content (repository repository-id) location
273
:content-type content-type)))
274
(error (c) (setf state :failed
276
(let* ((tag (dydra:resolve-repository-revision-id repository))
279
(:failed (format nil "~a" condition))))
281
(cons `((?::|status| ?::|ETag| ?::|Client-Request-ID| ?::|message|)
282
(,state ,tag ,client-request-id ,message)))))
283
(spocq.i::with-http-request-stream (request-stream response-stream asynchronous-end-point
284
:method asynchronous-method
285
:content-type asynchronous-content-type)
286
(send-request-headers request-stream headers)
287
(spocq.i::send-response-message :log response-content request-stream asynchronous-content-type))))))
290
;;; notification support
292
(defparameter *asynchronous-monitors*
293
(make-hash-table :test 'eql))
295
(defvar *notifyer* nil)
297
(defun asynchronous-notification-monitor ()
298
"collect all registered views and start a file system monitor for each respective lmdb file.
299
upon receipt of a modify event, run each of the views as an asyschronous task."
300
(setq *notifyer* (cl-inotify:make-inotify))
301
(loop for view-directory in (directory (merge-pathnames (make-pathname :directory '(:relative "repositories" :wild "views") :name :wild)
302
*catalog-root-pathname*))
303
for repository-pathname = (make-pathname :directory (butlast (pathname-directory view-directory)))
305
do (setf (gethash count *asynchronous-monitors*)
306
(make-instance 'asynchronous-monitor
308
:repository-pathname repository-pathname
309
:view-files (directory (merge-pathnames (make-pathname :name :wild) view-directory)))))
310
(loop until (eq *run-state* :terminate)
311
do (loop for event in (cl-inotify:next-events *notifyer*)
312
for id = (cl-inotify:inotify-event-wd event)
313
for monitor = (gethash id *asynchronous-monitors*)
314
do (loop for *asynchronous-task-entry* in (directory (asynchronous-monitor-view-files monitor))
315
for output-pathname = #p"/dev/null"
317
(process-asynchronous-task-entry *spocq-acceptor*
318
*asynchronous-task-entry*
319
output-pathname :if-exists :supersede)
321
(dydra:log-error "asynchronous view signaled: ~a: ~a"
322
*asynchronous-task-entry*
324
(dydra:log-stacktrace "Runtime error in view request: ~a." c)
328
(defclass asynchronous-monitor ()
331
:reader asynchronous-monitor-id)
333
:initarg :pathname :initform (error "pathname is required")
334
:reader asynchronous-monitor-repository-pathname)
336
:initarg :view-files :initform (error "view-files is required")
337
:reader asynchronous-monitor-view-files)
339
:initarg :mdb-pathname :initform (error "view-pathname is required")
340
:reader asynchronous-monitor-mdb-pathname)))
342
(defmethod initialize-instance ((instance asynchronous-monitor) &rest initargs
343
&key repository-pathname)
344
(let ((mdb-pathname (merge-pathnames spocq.i::*lmdb-filename* repository-pathname)))
345
(apply #'call-next-method instance
346
:mdb-pathname mdb-pathname
348
(cl-inotify:watch *notifyer* mdb-pathname :modify)))
351
(defun spocq-acceptor ()
353
(let* ((host-name (dydra:server-host-name))
354
(host-package (or (find-package host-name)
355
(let ((host-package (make-package host-name :use ())))
356
(import *response-functions* host-package)
357
(with-package-iterator (next host-package :internal)
358
(loop (multiple-value-bind (symbol-p symbol) (next)
359
(unless symbol-p (return))
360
(export symbol host-package))))
362
(acceptor (make-instance 'spocq-acceptor
364
:port *host-port* :address (spocq.i::host-name)
365
:request-class *service-request-class*
366
:response-class *service-response-class*
367
:taskmaster (make-instance *class.taskmaster*
368
:name (concatenate 'string (spocq.i::host-name) "-taskmaster")))))
369
(setf (http:acceptor-dispatch-function acceptor) host-package)
370
(setf *spocq-acceptor* acceptor))))
373
(defgeneric test-sparql-request (resource method content headers destination)
374
(:method ((resource string) method content headers (destination stream))
375
(let* ((*spocq-acceptor* (spocq-acceptor))
376
(hunchentoot:*acceptor* *spocq-acceptor*)
377
(crlf (map 'string #'code-char CHUNGA::+CRLF+))
378
(input (format nil "~a ~a HTTP/1.1~a~{~{~a: ~a~a~}~}~a~a~a"
380
(loop for header in headers collect (append header (list crlf))) crlf
383
(let ((source (make-instance 'de.setf.utility.implementation::vector-input-stream :vector input))
384
(spocq.i::*agent* (spocq.i::system-agent)))
385
(hunchentoot::process-asynchronous-connection hunchentoot:*acceptor* source destination)))))
389
(defmethod sb-gray:stream-listen ((stream DE.SETF.UTILITY.IMPLEMENTATION::VECTOR-INPUT-STREAM))
390
(not (de.setf.utility.implementation::stream-eofp stream)))
393
#BASE <https://www.not-datahub.nexperia.com/nexperia/>
394
PREFIX nxp: <http://purl.org/nxp/schema/v1/>
395
PREFIX plm: <http://www.data.nexperia.com/def/plm/>
405
(test-sparql-request "http://nl10.dydra.com/james/test/sparql?base_iri=https://localhost/james/"
407
'(("Accept" "application/VND.DYDRA.SPARQL-QUERY") ("Content-Type" "application/sparql-query"))
410
(test-sparql-request "http://nl10.dydra.com/james/test/sparql"
412
'(("Accept" "application/VND.DYDRA.SPARQL-QUERY")
413
("Content-Type" "application/sparql-query")
414
("Base-IRI" "https://localhost/jamestoo/"))
417
(test-sparql-request "http://nl10.dydra.com/james/test/sparql?base_iri=https://localhost/james/"
419
'(("Accept" "application/sparql-results+json") ("Content-Type" "application/sparql-query"))
422
(http:copy-stream (make-instance 'de.setf.utility.implementation::vector-input-stream :vector "qwertz") *trace-output*)
423
(http:copy-stream (make-instance 'de.setf.utility.implementation::vector-input-stream :vector "qwertz")
424
(make-array 10 :element-type 'character :adjustable t :fill-pointer 0))
426
(loop with stream = (make-instance 'de.setf.utility.implementation::vector-input-stream :vector "qwertz")
427
for byte = (read-byte stream)
429
until (not (integerp byte)) do (print byte))