Coverage report: /development/source/library/org/datagraph/spocq-shard/src/store/rdfcache/processing.lisp
| Kind | Covered | All | % |
| expression | 99 | 430 | 23.0 |
| branch | 7 | 42 | 16.7 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
3
(in-package :org.datagraph.spocq.implementation)
5
;;; triple rates count (user+system time)
6
;;; 20110224 spocq 2700 (spo materialized 991 solutions) 0.296018
7
;;; 20110224 spocq 33100 (s x4 po materialized 9619 solutions) 0.268017
8
;;; agraph 40000 (s x10 po per sparql) [http://www.franz.com/products/allegrograph/allegrograph.datasheet.pdf]
10
(:documentation "top-level store processing"
13
(defvar *repository* nil)
14
(defvar *repository-name* nil)
16
(defgeneric test-store-query-operator (store)
17
(:method ((store null)) 'trivial-quad-match))
20
(defmacro with-term-literal ((cursor term type-code string) &body body)
21
`(let* ((,type-code (quad-term-type ,cursor ',term))
22
(.term-length. (quad-term-decoded-length ,cursor ',term))
23
(,string (make-array .term-length. :element-type 'character)))
24
(declare (fixnum .term-length.) (dynamic-extent ,string))
25
(==foreign-string-to-lisp== (quad-term-value ,cursor ',term) ,string)
28
(defmethod process-store-request ((request bgp-match) (*engine-io* stream))
29
(send-account-note request :start *accounting-io*)
30
(compile-bgp-match request)
31
(send-account-note request :compiled *accounting-io*)
32
(apply-bgp-match-function request *engine-io*)
33
(send-account-note request :complete *accounting-io*))
36
(defgeneric compile-bgp-match (task)
37
(:documentation "Compile a BGP-match expression into a filtered series of queries.
38
see compute-bgp-lambda.")
40
(:method ((*query* bgp-match))
41
(let* ((expression (bgp-match-expression *query*))
42
(variables (expression-variables expression))
43
(lambda-expression `(lambda (*task* continuation)
44
(log-debug "bgp-match ~s, to reduce: ~s" (task-id *task*) ',expression)
45
(spocq.a:|select| ,variables
48
(log-debug "bgp-match ~a, lambda expression: ~s" (task-id *query*) lambda-expression)
49
(setf (bgp-match-lambda *query*) lambda-expression)
50
(setf (bgp-match-function *query*) (spocq-compile lambda-expression))
51
(values (bgp-match-function *query*)
52
(bgp-match-lambda *query*)))))
55
(defgeneric apply-bgp-match-function (match-task response-stream)
56
(:method ((request bgp-match) (response-stream stream))
57
(flet ((response-continuation (solution-field)
58
(send-account-note request :send *accounting-io*)
59
(send-store-response request response-stream solution-field)
60
(log-info "match ~s. complete." (task-id request))
61
(when (log-level-qualifies? :debug)
62
(if (consp solution-field)
63
(let ((variables (first solution-field))
64
(first (first (rest solution-field)))
65
(last (first (last solution-field)))
66
(count (length (rest solution-field))))
67
(log-debug "match ~s result: ~s~@[ . (~s~@[ [... x~d ...]~]~@[ ~s~])~]."
71
(when (> count 2) (- count 2))
72
(when (> count 1) last)))
73
(log-debug "match ~s result: ~s."
76
(declare (dynamic-extent #'response-continuation))
77
(funcall (bgp-match-function request) (task-revision request) #'response-continuation))))
80
;;;!!! does not correctly implement the standard dataset graph logic
81
(defmethod compute-bgp-lambda ((store repository-cache) statement-patterns &key
87
(projection-dimensions (expression-variables (cons graph statement-patterns)))
88
(query-operator 'spocq.e::query)
91
(declare (ignore graphs dataset-graphs environment))
93
(let* ((graph-variable (if (variable-p graph) graph nil))
94
;; bind it if it's a variable
95
(bind-graph-variable (when graph-variable `((,graph-variable (graph stmt))))))
97
(labels ((compute-filter-continuation (form rest-triples accumulated-variables)
98
(let ((cc (compute-next-continuation rest-triples accumulated-variables))
99
(test-expression (second form)))
100
`(when (ignore-errors (spocq.a:|&&| ,test-expression t))
102
(compute-statement-continuation (form rest-triples accumulated-variables)
103
(flet ((local-variable-p (v) (and (variable-p v) (not (member v accumulated-variables)))))
104
(let* ((all-variables (substitute-if nil (complement #'local-variable-p) (rest form)))
105
(uniqued-variables (loop for vars on all-variables
107
if (and v (member v (rest vars)))
110
(pattern (substitute-if nil #'local-variable-p (rest form)))
111
(query `(,query-operator repository ,@(loop for term in pattern
112
for key in '(:subject :predicate :object)
114
nconc (list key (cond ((variable-p term) term)
115
((symbolp term) `(quote ,term))
117
:context ,(cond (bind-graph-variable wildcard-term)
118
(graph-variable graph-variable)
120
(t default-graph-term))))
121
(extended-variables (append accumulated-variables (remove nil (remove-duplicates all-variables))))
122
(body `(progn (incf match-requests)
123
(dolist (stmt ,query)
124
(when (zerop (logand (incf match-requests) #x1fff))
125
(check-query-status))
126
,@(unless (some #'identity uniqued-variables) '((declare (ignorable stmt))))
127
(let (,@(loop for v in uniqued-variables
128
for accessor in '(triple-subject triple-predicate triple-object)
130
collect `(,v (,accessor stmt)))
131
,@(shiftf bind-graph-variable nil))
132
;; outer iteration only, iff the context is a variable, set it from the first triple
133
,(let ((continuation (compute-next-continuation rest-triples extended-variables)))
134
(if (equal uniqued-variables all-variables)
136
`(when (and ,@(loop for v in all-variables
137
for u in uniqued-variables
138
when (and v (not (eq u v)))
139
collect `(spocq.e:equal ,u ,v)))
140
,continuation))))))))
142
(compute-base-continuation ()
143
`(collect (list ,@(loop for variable in projection-dimensions
144
nconc `((quote ,variable) ,variable))
145
,@(when (variable-p graph-variable)
146
;; if the found statements had nothing substitute something
147
;; otherwise the query will fail
148
`((quote ,graph-variable)
149
(or ,graph-variable (de.setf.rdf:mediator-default-context repository)))))))
150
(compute-next-continuation (triples accumulated-variables)
152
(let* ((form (first triples))) ; (triple ?s ?p ?o)
154
((spocq.a:|triple| spocq.a:|quad|)
155
(compute-statement-continuation form (rest triples) accumulated-variables))
157
(compute-filter-continuation form (rest triples) accumulated-variables))))
158
(compute-base-continuation))))
160
`(lambda (repository &key (solutions '(nil)))
161
(declare (ignorable solutions)) ; if no solution variables were supplied
162
(let ((match-requests 0)
164
(values ,(if base-dimensions
165
`(collect-solutions (collect)
166
(dolist (solution solutions)
167
(destructuring-bind (&key ,@(mapcar #'(lambda (var) `((,var ,var)
168
(load-time-value (spocq:make-unbound-variable ',var))))
172
,(compute-next-continuation statement-patterns base-dimensions))))
173
`(collect-solutions (collect )
174
,(compute-next-continuation statement-patterns nil)))
180
(defmethod compute-bgp-lambda ((repo amqp-repository) statement-patterns &key
181
(base-dimensions ()) ; initial solution field variables
182
(projection-dimensions (expression-variables statement-patterns))
184
graph ; if in a graph clause, then either a variable or a literal
187
(graphs (cond ((null graph) default-graphs)
188
((variable-p graph) named-graphs)
189
((typep graph 'iri) (list graph))
191
(error "Invalid dataset specification: graph ~s, default ~s, named ~s."
192
graph default-graphs named-graphs)))))
193
(declare (ignore wildcard-term))
194
`(lambda (repository &key (solutions nil solutions-s))
195
(declare (ignorable solutions)) ; if no solution variables were supplied
196
(let* ((agp-form (agp-form agp))
197
(projection-dimensions ',projection-dimensions)
198
(base-dimensions ',base-dimensions)
201
(effective-agp-form `(,(first agp-form)
202
(spocq.a:|project| ,projection-dimensions)
203
,@(when solutions-s `((spocq.a::|solutions| ,base-dimensions ,@solutions)))
204
,@(when graph `((spocq.a::|graph| ,graph)))
205
,@(when graphs `((spocq.a::|graphs| ,@graphs)))
207
(send-store-request (agp-query agp) (repository-channel repository) (agp-id agp) effective-agp-form
208
:repository (repository-id repository))
209
(values nil 0 0 (setf (agp-state agp) :delegate)))))
213
;;; the curso / skip logic below is wong: each tread needs its own cursor(s)
214
(defvar *agp-thread-count* 1)
215
(defvar *bgp-threads* nil)
216
(defvar *bgp-results* nil)
217
(defvar *enable-term-identifiers* t)
220
;;; w/ terms only (time, gcs during bgp porcessing)
221
;;; sp2b-q7@50k 4.86s 2gc
222
;;; sp2b-q7@250k 21.04s 12gc
223
;;; sp2b-q7@250k 127.70s 35gc
224
;;; sp2b-q4@1m 315.39s 423gc
226
;;; w/ terms+identifiers
227
;;; sp2b-q7@50k 4.51s 0gc
228
;;; sp2b-q7@250k 19.94s 4gc
229
;;; sp2b-q7@1m 83.63s 12gc
230
;;; sp2b-q4@1m 283.59s 305gc
232
#+(or) ; to suppress processing
233
(defmethod compute-bgp-lambda :around ((repo rdfcache-repository) body &key &allow-other-keys)
234
(declare (ignore body))
235
`(lambda (destination ,@(when base-dimensions '(source)))
236
(declare (ignore destination ,@(when base-dimensions '(source))))
237
(put-field-page destination nil)
241
(defparameter *bgp-results* nil)
242
(defparameter *bgp-threads* nil)
244
(defgeneric expand-query (form &key agent dataset repository-id
245
default-graphs named-graphs graphs dataset-graphs
246
query expand-bgps version dynamic-bindings
249
(:method ((form cons) &key
250
(default-graphs ()) (named-graphs ())
252
(dataset-graphs (or graphs (list default-graphs named-graphs)))
253
(operation (first form))
254
(dataset "jhacker/basic-term-1")
255
(repository-id dataset)
257
(dynamic-bindings nil)
258
(query (make-query :id (make-null-task-id)
262
:repository-id repository-id
263
:dataset-graphs dataset-graphs
264
:request-exchange "test" :request-routing-key "key"
265
:store-routing-key "host.app.pid"
266
:trace-routing-key nil
267
:accept "application/sparql-query+sse"
268
:dynamic-bindings dynamic-bindings))
270
(retain-declarations nil)
272
(labels ((expand-body (body env)
273
(loop for sub-form in body collect (expand-query-form sub-form env)))
274
(expand-query-form (form &optional env)
277
(destructuring-bind (op . body) form
280
(spocq.i::agp-generator
281
;(expand-query-form (first body) env)
282
`(spocq.e:bgp ,@(second (first body))))
283
(spocq.e::with-join-scope
284
`(,op ,(first body) ,@(expand-body (rest body) env)))
285
(compress-solution-field
286
`(,op ,@(expand-body body env)))
287
((let let* flet labels)
288
`(,op ,(first body) ,@(expand-body (rest body) env)))
290
(loop for body on body
291
for (op . args) = (first body)
295
(setf env (augment-environment env :declare args))
296
(push `(,op . ,args) declares))
298
(if retain-declarations
299
(return `(locally ,@declares
300
,@(expand-body body env)))
301
(return (if (rest body)
302
`(locally ,@(expand-body body env))
303
(first (expand-body body env))))
307
`(,(compute-agp-pattern-lambda (apply #'make-agp body)
308
(agp-repository form))
309
,(agp-repository form))
310
`(spocq.e:bgp ,@body)))
312
(cond (;;(eq (symbol-package op) *algebra-package*)
314
(flet ((macroexpand-subexpressions (expander form env)
315
(expand-query-form (funcall expander form env) env)))
316
(let ((*macroexpand-hook* #'macroexpand-subexpressions))
317
(multiple-value-bind (expansion macro-p)
318
(macroexpand form env)
320
(expand-query-form expansion env)
322
((eq (symbol-package op) (find-package :spocq.e))
323
`(,op ,@(expand-body body env)))
331
`(,(compute-agp-pattern-lambda form (agp-repository form))
332
,(agp-repository form))
335
(with-task-environment (:task query :normal-disposition :abort)
336
(expand-query-form form nil))))
338
(:method ((query-string string) &rest args)
339
(with-configuration ()
340
(multiple-value-bind (sse-expression options)
341
(parse-sparql query-string)
342
(apply #'expand-query sse-expression
343
(append args (list* :dataset-graphs *dataset-graphs* options))))))
345
(:method ((query-source pathname) &rest args
346
&key (repository-id (format nil "~a/~a" (first (last (pathname-directory query-source))) (pathname-name query-source)))
348
(with-configuration ()
349
(multiple-value-bind (sse-expression options)
350
(parse-sparql query-source)
351
(apply #'expand-query sse-expression
352
:repository-id repository-id
353
(append args (list* :dataset-graphs *dataset-graphs* options)))))))
355
(defun term-equal (term1 term2)
356
(cond ((rdfcache::pointerp term1)
357
(term-equal (intern-term term1) term2))
358
((rdfcache::pointerp term2)
359
(term-equal term1 (intern-term term2)))
361
(spocq.e:equal term1 term2))))
364
(undefun rdfcache-query-verbose (transaction cursor context subject predicate object)
367
(integer (with-numbered-term (temp-term term)
368
(cons term (intern-term temp-term))))
369
(cffi-sys:foreign-pointer (intern-term term))
372
(rdfcache::%clear-cursor cursor)
373
(format *trace-output* "~&query : ~a ~a"
375
(rdfcache::print-cursor cursor *trace-output*)
376
(format *trace-output* "~% c: ~a" (term context))
377
(format *trace-output* "~% s: ~a" (term subject))
378
(format *trace-output* "~% p: ~a" (term predicate))
379
(format *trace-output* "~% o: ~a" (term object))
380
(terpri *trace-output*) (finish-output *trace-output*)
381
(rdfcache:match transaction cursor context subject predicate object)))
383
(undefun rdfcache-query-verbose-id (transaction cursor context subject predicate object)
384
(format *trace-output* "~&query : ~s ~s ~s ~s~%"
385
context subject predicate object)
386
(finish-output *trace-output*)
387
(rdfcache:match transaction cursor context subject predicate object))
390
(undefun rdfcache-count-verbose (transaction context subject predicate object)
393
(integer (with-numbered-term (temp-term term)
394
(cons term (intern-term temp-term))))
395
(cffi-sys:foreign-pointer (intern-term term))
398
(format *trace-output* "~&count : ~a " transaction)
399
(format *trace-output* "~% c: ~a" (term context))
400
(format *trace-output* "~% s: ~a" (term subject))
401
(format *trace-output* "~% p: ~a" (term predicate))
402
(format *trace-output* "~% o: ~a" (term object))
403
(terpri *trace-output*) (finish-output *trace-output*)
404
(rdfcache:count transaction context subject predicate object)))
406
(undefun rdfcache-count-verbose-id (transaction context subject predicate object)
407
(format *trace-output* "~&count : ~s ~s ~s ~s~%"
408
context subject predicate object)
409
(finish-output *trace-output*)
410
(rdfcache:count transaction context subject predicate object))
413
(undefun rdfcache-next-verbose (cursor)
414
(let ((result (rdfcache::next cursor)))
415
(format *trace-output* "~&next :")
417
(format *trace-output* "~% c: ~s" (intern-term (rdfcache:cursor-context-pointer cursor)))
418
(format *trace-output* "~% s: ~s" (intern-term (rdfcache:cursor-subject-pointer cursor)))
419
(format *trace-output* "~% p: ~s" (intern-term (rdfcache:cursor-predicate-pointer cursor)))
420
(format *trace-output* "~% o: ~s" (intern-term (rdfcache:cursor-object-pointer cursor))))
421
(terpri *trace-output*) (finish-output *trace-output*)
424
(undefun rdfcache-next-verbose-id (cursor)
425
(let ((result (rdfcache::next cursor)))
426
(format *trace-output* "~&next : ")
428
(format *trace-output* "~s ~s ~s ~s~%"
429
(rdfcache:cursor-context-number cursor)
430
(rdfcache:cursor-subject-number cursor)
431
(rdfcache:cursor-predicate-number cursor)
432
(rdfcache:cursor-object-number cursor))
433
(format *trace-output* "done~%"))
434
(finish-output *trace-output*)