Coverage report: /development/source/library/org/datagraph/spocq-shard/src/spocq-server/extensions/linked-data-fragments.lisp
| Kind | Covered | All | % |
| expression | 142 | 398 | 35.7 |
| branch | 10 | 24 | 41.7 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
5
;;; This file implements the response method for a triple fragment server
6
;;; - http://www.hydra-cg.com/spec/latest/triple-pattern-fragments/
7
;;; - http://www.hydra-cg.com/spec/latest/linked-data-fragments/#sparql-query-results
9
;;; Copyright 2015 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved
11
;;; # /opt/sbcl/1.2.2/bin/sbcl --core /development/source/library/sbcl-spocq-server.core
12
;;; (load "patches/20150501-property-paths/mime-types.lisp")
13
;;; (load "/development/source/library/org/datagraph/spocq/src/core/encoding/nquads.lisp")
14
;;; (load "/development/source/library/org/datagraph/spocq/src/core/encoding/nquads-grammar.lisp")
16
;;; (load "/development/source/library/org/datagraph/spocq/src/extensions/triple-fragments.lisp")
18
;;; (load "patches/triple-fragments.lisp")
20
;;; * (format nil "~{ ~x ~}" (map 'list #'char-code " {}<>\"")) => " 20 7B 7D 3C 3E 22 "
25
(defclass ldf-request-processor (http-request-processor)
28
(defclass ldf-query (query)
30
(:metaclass applicable-query-class))
33
(defparameter *ldf-page-size* 128
34
"Use as the default statement count for a response page")
36
(defparameter *request-iri* nil
37
"Bind the request iri as global for the dynamic extent of the request processing.")
39
(defmethod decode-task :around
40
((source t) (media-type mime:application/sparql-query) (result-media-type mime:application/ldf) &rest args)
41
"given a ldf result, augment the query with the metadata"
42
(declare (ignore args))
43
(let* ((query (call-next-method))
44
(expression (query-sse-expression query))
45
(repository (repository *repository-id*))
46
(metadata-solutions (compute-ldf-metadata-solutions repository)))
47
(assert (find (task-operation query) '(spocq.a:|construct| spocq.a:|describe|)) ()
48
"Operation combination not permitted ~a: ~a" (task-operation query) result-media-type)
49
(setf expression `(spocq.a:|project|
51
(spocq.a:|bindings| ,metadata-solutions (?::|s| ?::|p| ?::|o| ?::|c|))
52
,(if *response-offset*
53
`(spocq.a:|slice| ,expression :offset ,*response-offset* :count ,*ldf-page-size*)
55
(?::|s| ?::|p| ?::|o| ?::|c|)))
56
(setf (slot-value query 'sse-expression) expression)
61
(defun ldf-main-query-loop (&rest args)
62
"Repeatedly read a configuration and a query from standard input, run the query and emit results to standard output
63
as a linked data fragment
64
Any errors optionally cause termination. For each iteration, write a soft-eof to standard output and standard error."
66
(let ((loop-thread nil))
67
(declare (ignorable loop-thread))
68
(labels ((loop-sigterm-handler (signal code context)
69
(declare (ignore signal code context))
71
(log-info "Stop: SIGTERM ~a." (iso-time))
74
(management-thread-loop ()
75
(loop (when (eq *run-state* :terminate) (return))
77
(sleep *management-thread-interval*)))
80
(with-command-line-configuration (args)
82
do (unless (ldf-query-loop-step)
83
(setq *run-state* :terminate)
84
(log-debug "query-loop: return...")
86
when (zerop (mod i 50))
87
do (constrain-resources)))))
89
(setq *start-timestamp* (iso-time))
90
(initialize-spocq :title (getarg "--title"))
91
(enable-interrupt :sigterm #'loop-sigterm-handler)
92
(log-info "Start ~a." *start-timestamp*)
93
;; (setq loop-thread (bt:make-thread #'query-loop))
94
;; (management-thread-loop)
97
;; run the management loop in the initial thread, leaving the streams for the query loop
98
;; (run-topelvel-loop :toplevel-function 'run-management-thread)
100
(log-info "Stop: return ~a." (iso-time)))))
102
(defmethod task-loop-step ((processor ldf-request-processor))
103
(handler-bind ((end-of-file
105
(declare (ignore condition))
106
(log-info "EOF: return ~a." (iso-time))
107
(return-from task-loop-step nil)))
108
(spocq.e::message-syntax-error
109
;; emit a parse error immediately and (optionally) exit
111
(complete-output *standard-output*)
112
(log-warn "responding to message syntax error: ~a" condition)
113
(print-error-conditions (list (cons nil condition)) *error-output*)
114
(complete-output *error-output*)
115
(maybe-exit-on-error 65)
116
;; flush anything else
117
(channel-get-all *error-condition-channel*)
118
(return-from task-loop-step condition)))
121
;; (format *error-output* "run-query-loop-times: top-level error:~%~a" condition)
122
(log-stacktrace "Error running query: ~a -> ~a."
124
(complete-output *standard-output*)
127
(maybe-exit-on-error 70)
128
(return-from task-loop-step condition))
129
(t (return-from task-loop-step nil))))))
130
(let* ((configuration-stream (processor-configuration-location processor)))
131
;; rebind the global configuration for each request
132
(with-configuration (:allow-other-keys t)
133
(setq *run-state* :initialize)
134
;; then agument the configuration with that read for the individual request
135
(unless (plusp (load-configuration configuration-stream))
136
(log-info "task-loop-step: no configuration: return ~a." (iso-time))
137
(return-from task-loop-step (make-condition 'end-of-file :stream configuration-stream)))
138
(validate-configuration)
139
(setq *run-state* :process)
140
(setq *agent* (or (agent-authenticated-by-token (api-key) :location *agent-location*)
141
(make-instance *class.agent* :id nil :location *agent-location*)))
143
(log-info "ldf-request-process-step: top-level [~a]" *thread-name*)
144
;; once the request configuration has been read, read the successive s-p-o constraints, generate a query and execute it
145
(pipe-ldf-query (processor-request-content-location processor)
146
(processor-response-content-location processor))))))
149
(defgeneric compute-ldf-metadata-solutions (repository request-iri &key offset)
151
"Return a quad solution field which covers the ldf metadata for the respective repository.
152
When the media type profile is 'http://www.hydra-cg.com/spec/latest/linked-data-fragments/',
153
include this data in the response.")
155
(:method ((repository-id string) request-iri &rest args)
156
(apply #'compute-ldf-metadata-solutions (repository repository-id) request-iri args))
158
(:method ((repository-id t) (request-iri string) &rest args)
159
(apply #'compute-ldf-metadata-solutions repository-id (intern-iri request-iri) args))
161
(:method ((repository repository) (query query) &rest args)
162
(declare (ignore args))
163
(labels ((find-form (op sse)
165
(if (eq op (first sse))
167
(find-form op (case (first sse)
168
(spocq.a:|graph| (third sse))
169
(t (second sse)))))))
170
(expression-bgp-form (sse)
171
(find-form 'spocq.a:|bgp| sse))
172
(expression-graph-form (sse)
173
(find-form 'spocq.a:|graph| sse))
174
(expression-slice-form (sse)
175
(find-form 'spocq.a:|slice| sse)))
176
(let* ((repository-uri (repository-uri repository))
177
(request-iri repository-uri)
178
(sse (query-sse-expression query))
179
(bgp-form (expression-bgp-form sse))
180
(graph-form (expression-graph-form sse))
181
(slice-form (expression-slice-form sse))
182
(statement-form (second bgp-form))
183
(s (second statement-form))
184
(p (third statement-form))
185
(o (fourth statement-form))
186
(g (when graph-form (second graph-form)))
187
(metadata-graph (intern-iri (concatenate 'string (iri-lexical-form repository-uri) "#metadata")))
188
(dataset-uri (intern-iri (concatenate 'string (iri-lexical-form repository-uri) "#dataset")))
189
(template-pattern (concatenate 'string (iri-lexical-form repository-uri) "/ldf{?s,p,o}"))
190
(offset (when slice-form (getf (cddr slice-form) :offset)))
191
(page (when offset (floor offset *ldf-page-size*)))
192
(count (repository-pattern-count repository s p o (or g |urn:dydra|:|all|)))
197
`((,request-iri <void:triples> ,count ,metadata-graph)
198
(,request-iri <hydra:totalItems> ,count ,metadata-graph)
199
(,metadata-graph <foaf:primaryTopic> ,request-iri ,metadata-graph)
200
(,dataset-uri <http://rdfs.org/ns/void#subset> ,request-iri ,metadata-graph)
201
(,dataset-uri <http://www.w3.org/ns/hydra/core#search> ,genid4 ,metadata-graph)
202
(,genid1 <http://www.w3.org/ns/hydra/core#variable> "s" ,metadata-graph)
203
(,genid1 <http://www.w3.org/ns/hydra/core#property> <http://www.w3.org/1999/02/22-rdf-syntax-ns#subject> ,metadata-graph)
204
(,genid2 <http://www.w3.org/ns/hydra/core#variable> "p" ,metadata-graph)
205
(,genid2 <http://www.w3.org/ns/hydra/core#property> <http://www.w3.org/1999/02/22-rdf-syntax-ns#predicate> ,metadata-graph)
206
(,genid3 <http://www.w3.org/ns/hydra/core#variable> "o" ,metadata-graph)
207
(,genid3 <http://www.w3.org/ns/hydra/core#property> <http://www.w3.org/1999/02/22-rdf-syntax-ns#object> ,metadata-graph)
208
(,genid4 <http://www.w3.org/ns/hydra/core#template> ,template-pattern ,metadata-graph)
209
(,genid4 <http://www.w3.org/ns/hydra/core#mapping> ,genid1 ,metadata-graph)
210
(,genid4 <http://www.w3.org/ns/hydra/core#mapping> ,genid2 ,metadata-graph)
211
(,genid4 <http://www.w3.org/ns/hydra/core#mapping> ,genid3 ,metadata-graph)
212
,@(when (and page (> page 1))
213
(let ((previous-uri (puri:uri (iri-lexical-form request-iri))))
214
(setf (iri-query-argument previous-uri :page) (1- page))
215
`((,request-iri <hydra:previousPage> ,(intern-iri previous-uri) ,metadata-graph))))
216
,@(when (and page (< page (/ count *ldf-page-size*)))
217
(let ((next-uri (puri:uri (iri-lexical-form request-iri))))
218
(setf (iri-query-argument next-uri :page) (1+ page))
219
`((,request-iri <hydra:nextPage> ,(intern-iri next-uri) ,metadata-graph)))))))))
222
(defgeneric pipe-ldf-query (request-location response-location &rest options)
223
(:method ((request-stream stream) (response-stream stream) &rest options)
224
(declare (dynamic-extent options))
225
(apply #'pipe-ldf-query (intern-iri (read-line request-stream)) response-stream options))
227
(:method ((*request-iri* spocq::iri) (response-stream stream) &rest options &key ((:response-content-type *response-content-type*) *response-content-type*))
228
(declare (dynamic-extent options))
229
(let* ((puri (puri:uri *request-iri*))
230
(s (iri-query-argument puri :s))
231
(p (iri-query-argument puri :p))
232
(o (iri-query-argument puri :o))
233
(g (or (iri-query-argument puri :c) (iri-query-argument puri :graph)))
234
(pattern-sparql (parse-sparql (format nil "select * where { ~:[?s~;~:*~a~] ~:[?p~;~:*~a~] ~:[?o~;~:*~a~] }" s p o)))
235
(bgp (second pattern-sparql))
236
(statement-pattern (rest (second bgp)))
237
(bindings (loop for term in statement-pattern
238
for variable in '(?::|s| ?::|p| ?::|o|)
239
unless (variable-p term)
240
collect (cons term variable)))
241
(base-form (let ((graph-pattern `(spocq.a:|extend| (spocq.a:|graph| ,(or g |urn:dydra|:|all|) ,bgp)
242
?::|c| |urn:dydra|:|default|)))
243
(loop for (term . variable) in bindings
244
do (setf graph-pattern `(spocq.a:|extend| ,graph-pattern ,variable ,term)))
246
(ldf-sparql `(spocq.a:|project|
249
,(compute-ldf-metadata-solutions *repository-id* puri)
250
(?::|s| ?::|p| ?::|o| ?::|c|))
251
,(if *response-offset*
252
`(spocq.a:|slice| ,base-form :offset ,*response-offset* :count ,*ldf-page-size*)
254
(?::|s| ?::|p| ?::|o| ?::|c|))))
255
(prog1 (with-accounting (apply #'pipe-query ldf-sparql response-stream
256
:dataset-graphs *dataset-graphs*
257
:dynamic-bindings *dynamic-bindings*
259
:repository-id *repository-id*
260
:response-content-type *response-content-type*
261
:revision-id *revision-id*
262
:task-id dydra:*task-id*
263
:user-id dydra:*user-id*
266
(complete-output response-stream)
267
(log-debug "complete-output stdout. accounting [~s->~a]..." (accounting-note-count) *accounting-destination*)
268
(when (and *accounting-destination*
269
(plusp (accounting-note-count)))
270
(publish-accounting-notes (get-accounting-notes)
271
*accounting-destination*)
272
(complete-output *accounting-destination*))
273
(log-debug "complete-output accounting. errors...")
274
(cond ((plusp (channel-count *error-condition-channel*))
275
;; print the errors and return the last condition
276
(let ((error-notes (channel-get-all *error-condition-channel*)))
277
(print-error-conditions error-notes *error-output*)
278
(complete-output *error-output*)
279
(log-error-notes-trig error-notes))
280
(maybe-exit-on-error))
282
(complete-output *error-output*)))
283
(log-debug "complete-output errors. state ~s." *run-state*)))))
286
(defgeneric pipe-spoc-query (request-location response-location &rest options)
287
(:method ((request-stream stream) (response-stream stream) &rest options)
288
(declare (dynamic-extent options))
289
(apply #'pipe-spoc-query (intern-iri (read-line request-stream)) response-stream options))
291
(:method ((repository spocq.i:repository) (response-stream stream)
292
&key ((:response-content-type *response-content-type*) *response-content-type*)
294
s p o c (from c) (graph from) page (task-id (spocq.i::make-task-id)))
295
(let* ((bgp `(spocq.a:|bgp| (spocq.a:|triple| ,(or s '?::|s|) ,(or p '?::|p|) ,(or o '?::|o|))))
296
(statement-pattern (rest (second bgp)))
297
(bindings (loop for term in statement-pattern
298
for variable in '(?::|s| ?::|p| ?::|o|)
299
unless (variable-p term)
300
collect (cons term variable)))
301
(base-form (let ((graph-pattern `(spocq.a:|extend| (spocq.a:|graph| ,(or graph |urn:dydra|:|all|) ,bgp)
302
?::|c| |urn:dydra|:|default|)))
303
(loop for (term . variable) in bindings
304
do (setf graph-pattern `(spocq.a:|extend| ,graph-pattern ,variable ,term)))
306
(ldf-sparql `(spocq.a:|project|
308
`(spocq.a:|slice| ,base-form :offset ,(* page *ldf-page-size*) :count ,*ldf-page-size*)
310
(?::|s| ?::|p| ?::|o| ?::|c|)))
311
(*class.query* 'ldf-query))
312
(prog1 (with-accounting (pipe-query
313
ldf-sparql response-stream
315
:dataset-graphs *dataset-graphs*
316
:dynamic-bindings *dynamic-bindings*
318
:repository-id *repository-id*
319
:response-content-type *response-content-type*
320
:revision-id *revision-id*
322
:user-id dydra:*user-id*
325
(complete-output response-stream)
326
(log-debug "psq: complete-output stdout. accounting [~s->~a]..." (accounting-note-count) *accounting-destination*)
327
(when (and *accounting-destination*
328
(plusp (accounting-note-count)))
329
(publish-accounting-notes (get-accounting-notes)
330
*accounting-destination*)
331
(complete-output *accounting-destination*))
332
(log-debug "psq: complete-output accounting. errors...")
333
(cond ((plusp (channel-count *error-condition-channel*))
334
;; print the errors and return the last condition
335
(let ((error-notes (channel-get-all *error-condition-channel*)))
336
(print-error-conditions error-notes *error-output*)
337
(complete-output *error-output*)
338
(log-error-notes-trig error-notes))
339
(maybe-exit-on-error))
341
(complete-output *error-output*)))
342
(log-debug "psq: complete-output errors. state ~s." *run-state*)))))
346
(defmethod pipe-query ((expression cons) (output-destination t) &rest args &key
348
(declare (dynamic-extent args))
349
(when (find *repository-id* *disabled-repositories* :test #'string-equal)
350
(error 'spocq.e:runtime-error
352
:expression (format nil "The repository has been disabled: ~s." *repository-id*)))
354
(labels ((handle-error (condition)
355
(log-stacktrace "Error processing query request : ~a -> ~a."
357
;; if in the context of a query and the channel is still usable
358
;; send an error response
360
(log-error "error in query: ~s" (query-sse-expression *query*))
361
(generate-error-note condition :task *query*)
362
(terminate-task *query*))))
363
(handler-bind ((spocq.e:runtime-error
365
;; if it is any other error, ack to the broker
367
;; is a query was created, send an error message to the client and terminate it
368
(generate-error-note condition)
369
(terminate-task *query*))
371
(log-stacktrace "Error in request w/o context~% ~a." condition)))
372
(return-from pipe-query condition))))
373
(let ((query (apply #'query :sse-expression expression
375
(log-notice "query: new: ~s: ~s: ~s"
378
(substitute #\space #.(code-char #o012) (query-sparql-expression query)))
379
(generate-accounting-note :parse :task query)
380
(setq *run-state* :process)
381
(flet ((guarded-handler (condition)
382
(handler-case (handle-error condition)
383
(serious-condition (c2)
384
(log-error "reentrant error processing query:~% ~a~% ~a."
386
(return-from pipe-query condition)))
387
;; set up handlers to catch and log stacktraces for most serious conditions,
388
;; but let interrupts through, and cause storage conditions to first unwind
391
(handler-bind ((storage-condition (lambda (c) (signal c)) ) ; unwind it to the handler-case
392
#+sbcl (sb-sys:interactive-interrupt (lambda (c) (invoke-debugger c)))
393
(serious-condition #'guarded-handler))
395
(register-query query)
396
(add-task-thread query (bt:current-thread))
397
(restart-bind ((terminate-task (lambda (task-to-terminate &optional condition)
398
(when (eq query task-to-terminate)
399
(return-from pipe-query condition)))))
400
(with-task-environment (:task query)
401
(let ((*package* *algebra-package*)
402
(*readtable* *sse-readtable*))
403
(log-info "pipe ~s, repository ~s, form ~s, bindings ~s"
405
(repository-id (task-repository query))
406
(query-sse-expression query)
407
(query-dynamic-bindings query)))
408
(initialize-task query))
409
(respond-to-task :query
411
output-destination)))
412
(storage-condition (c) (guarded-handler c))))
413
(log-debug "pipe complete [~a] from ~/format-iso-time/." (task-id query) (task-start-time query))
418
(in-package :spocq.i)
419
(mime:mime-type "application/n-triples;profile='http://www.hydra-cg.com/spec/latest/linked-data-fragments/'")
420
(let ((*request-iri* (spocq:make-iri "http://example.org/example?page=3&s=a")))
421
(ldf-main-query-loop))
422
./ldf --request-iri "http://example.org/example?page=3&s=a" <<EOF
423
((:task-id #.(spocq.i::make-task-id))
424
(:repository-id "james/test")
426
(:agent-location "94.219.67.94")
427
(:response-content-type "application/n-quads"))
428
?what <http://example.org/location> ?where
429
?what <http://example.org/location> ?where
430
?what <http://example.org/location> ?where
431
?what <http://example.org/location> ?where
432
?what <http://example.org/location> ?where
433
?what <http://example.org/location> ?where
434
?what <http://example.org/location> ?where
435
?what <http://example.org/location> ?where
436
?what <http://example.org/location> ?where
437
?what <http://example.org/location> ?where
438
?what <http://example.org/location> ?where
454
/opt/sbcl/1.2.6/bin/sbcl --core /development/source/library/sbcl-spocq-server.core <<EOF
455
(in-package :spocq.i)
457
(setq *run-state* :process)
458
(setq *agent* (system-agent))
460
(let ((*repository-id* "openrdf-sesame/mem-rdf")
461
(*response-content-type* mime:application/n-quads)
462
(*response-header-types* ()))
463
(pipe-ldf-query *standard-input* *standard-output*))
464
http://dydra.com/openrdf-sesame/mem-rdf
468
(let ((*repository-id* "openrdf-sesame/mem-rdf")
469
(*response-content-type* mime:application/n-quads)
470
(*response-header-types* ())
471
(pipe-ldf-query *standard-input* *standard-output*))
472
http://dydra.com/openrdf-sesame/mem-rdf?s=%3chttp://example.com/named-subject%3e
475
(defparameter *old-print* #'print)
476
(defparameter *in-print* nil)
477
(defun new-print (value &rest args)
478
(if (or value *in-print*)
479
(apply *old-print* value args)
480
(let ((*in-print* t))
481
(break "in-print"))))
482
(unwind-protect (let ((*repository-id* "openrdf-sesame/mem-rdf")
483
(*response-content-type* mime:application/n-quads))
484
(setf (symbol-function 'print) #'new-print)
485
(pipe-ldf-query *standard-input* *standard-output*))
486
(setf (symbol-function 'print) *old-print*))