Coverage report: /development/source/library/org/datagraph/spocq-shard/src/store/rdfcache/processing.lisp

KindCoveredAll%
expression99430 23.0
branch742 16.7
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
2
 
3
 (in-package :org.datagraph.spocq.implementation)
4
 
5
 ;;; triple rates       count                                        (user+system time)
6
 ;;; 20110224 spocq     2700 (spo materialized 991 solutions)        0.296018
7
 ;;; 20110224 spocq    33100 (s x4 po materialized 9619 solutions)   0.268017 
8
 ;;;          agraph   40000 (s x10 po per sparql)                           [http://www.franz.com/products/allegrograph/allegrograph.datasheet.pdf]
9
 
10
 (:documentation "top-level store processing"
11
   "")
12
 
13
 (defvar *repository* nil)
14
 (defvar *repository-name* nil)
15
 
16
 (defgeneric test-store-query-operator (store)
17
   (:method ((store null)) 'trivial-quad-match))
18
 
19
 #+(or)
20
 (defmacro with-term-literal ((cursor term type-code string) &body body)
21
   `(let* ((,type-code (quad-term-type ,cursor ',term))
22
           (.term-length. (quad-term-decoded-length ,cursor ',term))
23
           (,string (make-array .term-length. :element-type 'character)))
24
      (declare (fixnum .term-length.) (dynamic-extent ,string))
25
      (==foreign-string-to-lisp== (quad-term-value ,cursor ',term) ,string)
26
      (locally ,@body)))
27
 
28
 (defmethod process-store-request ((request bgp-match) (*engine-io* stream))
29
   (send-account-note request :start *accounting-io*)
30
   (compile-bgp-match request)
31
   (send-account-note request :compiled *accounting-io*)
32
   (apply-bgp-match-function request *engine-io*)
33
   (send-account-note request :complete *accounting-io*))
34
 
35
 
36
 (defgeneric compile-bgp-match (task)
37
   (:documentation "Compile a BGP-match expression into a filtered series of queries.
38
     see compute-bgp-lambda.")
39
 
40
   (:method ((*query* bgp-match))
41
     (let* ((expression (bgp-match-expression *query*))
42
            (variables (expression-variables expression))
43
            (lambda-expression `(lambda (*task* continuation)
44
                                  (log-debug "bgp-match ~s, to reduce: ~s" (task-id *task*) ',expression)
45
                                  (spocq.a:|select| ,variables
46
                                                    ,expression
47
                                                    continuation))))
48
       (log-debug "bgp-match ~a, lambda expression: ~s" (task-id *query*) lambda-expression)
49
       (setf (bgp-match-lambda *query*) lambda-expression)
50
       (setf (bgp-match-function *query*) (spocq-compile lambda-expression))
51
       (values (bgp-match-function *query*)
52
               (bgp-match-lambda *query*)))))
53
 
54
 
55
 (defgeneric apply-bgp-match-function (match-task response-stream)
56
   (:method ((request bgp-match) (response-stream stream))
57
     (flet ((response-continuation (solution-field)
58
              (send-account-note request :send *accounting-io*)
59
              (send-store-response request response-stream solution-field)
60
              (log-info "match ~s. complete." (task-id request))
61
              (when (log-level-qualifies? :debug)
62
                (if (consp solution-field)
63
                  (let ((variables (first solution-field))
64
                        (first (first (rest solution-field)))
65
                        (last (first (last solution-field)))
66
                        (count (length (rest solution-field))))
67
                    (log-debug  "match ~s result: ~s~@[ . (~s~@[ [... x~d ...]~]~@[ ~s~])~]."
68
                                (task-id request)
69
                                variables
70
                                first
71
                                (when (> count 2) (- count 2))
72
                                (when (> count 1) last)))
73
                  (log-debug "match ~s result: ~s."
74
                             (task-id request)
75
                             solution-field)))))
76
       (declare (dynamic-extent #'response-continuation))
77
       (funcall (bgp-match-function request) (task-revision request) #'response-continuation))))
78
 
79
 
80
 ;;;!!! does not correctly implement the standard dataset graph logic
81
 (defmethod compute-bgp-lambda ((store repository-cache) statement-patterns &key
82
                                default-graph-term
83
                                graph
84
                                graphs
85
                                dataset-graphs
86
                                (base-dimensions ())
87
                                (projection-dimensions (expression-variables (cons graph statement-patterns)))
88
                                (query-operator 'spocq.e::query)
89
                                (wildcard-term nil)
90
                                environment)
91
   (declare (ignore graphs dataset-graphs environment))
92
   
93
   (let* ((graph-variable (if (variable-p graph) graph nil))
94
          ;; bind it if it's a variable
95
          (bind-graph-variable (when graph-variable `((,graph-variable (graph stmt))))))
96
     
97
     (labels ((compute-filter-continuation (form rest-triples accumulated-variables)
98
                (let ((cc (compute-next-continuation rest-triples accumulated-variables))
99
                      (test-expression (second form)))
100
                  `(when (ignore-errors (spocq.a:|&&| ,test-expression t))
101
                     ,cc)))
102
              (compute-statement-continuation (form rest-triples accumulated-variables)
103
                (flet ((local-variable-p (v) (and (variable-p v) (not (member v accumulated-variables)))))
104
                  (let* ((all-variables (substitute-if nil (complement #'local-variable-p) (rest form)))
105
                         (uniqued-variables (loop for vars on all-variables
106
                                                  for v = (first vars)
107
                                                  if (and v (member v (rest vars)))
108
                                                  collect (gensym)
109
                                                  else collect v))
110
                         (pattern (substitute-if nil #'local-variable-p (rest form)))
111
                         (query `(,query-operator repository ,@(loop for term in pattern
112
                                                                     for key in '(:subject :predicate :object)
113
                                                                     when term
114
                                                                     nconc (list key (cond ((variable-p term) term)
115
                                                                                           ((symbolp term) `(quote ,term))
116
                                                                                           (t term))))
117
                                                  :context ,(cond (bind-graph-variable wildcard-term)
118
                                                                  (graph-variable graph-variable)
119
                                                                  (graph)
120
                                                                  (t default-graph-term))))
121
                         (extended-variables (append accumulated-variables (remove nil (remove-duplicates all-variables))))
122
                         (body `(progn (incf match-requests)
123
                                       (dolist (stmt ,query)
124
                                         (when (zerop (logand (incf match-requests) #x1fff))
125
                                           (check-query-status))
126
                                         ,@(unless (some #'identity uniqued-variables) '((declare (ignorable stmt))))
127
                                         (let (,@(loop for v in uniqued-variables
128
                                                       for accessor in '(triple-subject triple-predicate triple-object)
129
                                                       when v
130
                                                       collect `(,v (,accessor stmt)))
131
                                               ,@(shiftf bind-graph-variable nil))
132
                                           ;; outer iteration only, iff the context is a variable, set it from the first triple
133
                                           ,(let ((continuation (compute-next-continuation rest-triples extended-variables)))
134
                                              (if (equal uniqued-variables all-variables)
135
                                                continuation
136
                                                `(when (and ,@(loop for v in all-variables
137
                                                                    for u in uniqued-variables
138
                                                                    when (and v (not (eq u v)))
139
                                                                    collect `(spocq.e:equal ,u ,v)))
140
                                                   ,continuation))))))))
141
                    body)))
142
              (compute-base-continuation ()
143
                `(collect (list ,@(loop for variable in projection-dimensions
144
                                        nconc `((quote ,variable) ,variable))
145
                                ,@(when (variable-p graph-variable)
146
                                    ;; if the found statements had nothing substitute something
147
                                    ;; otherwise the query will fail
148
                                    `((quote ,graph-variable)
149
                                      (or ,graph-variable (de.setf.rdf:mediator-default-context repository)))))))
150
              (compute-next-continuation (triples accumulated-variables)
151
                (if triples
152
                  (let* ((form (first triples)))         ; (triple ?s ?p ?o)
153
                    (ecase (first form)
154
                      ((spocq.a:|triple| spocq.a:|quad|)
155
                       (compute-statement-continuation form (rest triples) accumulated-variables))
156
                      (spocq.a:|filter|
157
                       (compute-filter-continuation form (rest triples) accumulated-variables))))
158
                  (compute-base-continuation))))
159
       
160
       `(lambda (repository &key (solutions '(nil)))
161
          (declare (ignorable solutions))      ; if no solution variables were supplied
162
          (let ((match-requests 0)
163
                (match-results 0))
164
            (values ,(if base-dimensions
165
                       `(collect-solutions (collect)
166
                          (dolist (solution solutions)
167
                            (destructuring-bind (&key ,@(mapcar #'(lambda (var) `((,var ,var)
168
                                                                                  (load-time-value (spocq:make-unbound-variable ',var))))
169
                                                                base-dimensions)
170
                                                      &allow-other-keys)
171
                                                solution
172
                              ,(compute-next-continuation statement-patterns base-dimensions))))
173
                       `(collect-solutions (collect )
174
                          ,(compute-next-continuation statement-patterns nil)))
175
                    match-requests
176
                    match-results
177
                    :complete))))))
178
 
179
 #+(or)                                  ; unused
180
 (defmethod compute-bgp-lambda ((repo amqp-repository) statement-patterns &key
181
                                (base-dimensions ())          ; initial solution field variables
182
                                (projection-dimensions (expression-variables statement-patterns))
183
                                (wildcard-term nil)
184
                                graph    ; if in a graph clause, then either a variable or a literal
185
                                (default-graphs nil)
186
                                (named-graphs nil)
187
                                (graphs (cond ((null graph)       default-graphs)
188
                                              ((variable-p graph) named-graphs)
189
                                              ((typep graph 'iri) (list graph))
190
                                              (t
191
                                               (error "Invalid dataset specification: graph ~s, default ~s, named ~s."
192
                                                      graph default-graphs named-graphs)))))
193
   (declare (ignore wildcard-term))
194
   `(lambda (repository &key (solutions nil solutions-s))
195
      (declare (ignorable solutions))      ; if no solution variables were supplied
196
      (let* ((agp-form (agp-form agp))
197
             (projection-dimensions ',projection-dimensions)
198
             (base-dimensions ',base-dimensions)
199
             (graph ',graph)
200
             (graphs ',graphs)
201
             (effective-agp-form `(,(first agp-form)
202
                                   (spocq.a:|project| ,projection-dimensions)
203
                                   ,@(when solutions-s `((spocq.a::|solutions| ,base-dimensions ,@solutions)))
204
                                   ,@(when graph `((spocq.a::|graph| ,graph)))
205
                                   ,@(when graphs `((spocq.a::|graphs| ,@graphs)))
206
                                   ,@(rest agp-form))))
207
        (send-store-request (agp-query agp) (repository-channel repository) (agp-id agp) effective-agp-form
208
                            :repository (repository-id repository))
209
        (values nil 0 0 (setf (agp-state agp) :delegate)))))
210
 
211
 
212
 ;;; disabled for now
213
 ;;; the curso / skip logic below is wong: each tread needs its own cursor(s)
214
 (defvar *agp-thread-count* 1)
215
 (defvar *bgp-threads* nil)
216
 (defvar *bgp-results* nil)
217
 (defvar *enable-term-identifiers* t)
218
 
219
 ;;; 20110503 hetzner
220
 ;;; w/ terms only (time, gcs during bgp porcessing)
221
 ;;;  sp2b-q7@50k    4.86s   2gc
222
 ;;;  sp2b-q7@250k  21.04s  12gc
223
 ;;;  sp2b-q7@250k 127.70s  35gc
224
 ;;;  sp2b-q4@1m   315.39s 423gc
225
 
226
 ;;; w/ terms+identifiers
227
 ;;;  sp2b-q7@50k    4.51s   0gc
228
 ;;;  sp2b-q7@250k  19.94s   4gc
229
 ;;;  sp2b-q7@1m    83.63s  12gc
230
 ;;;  sp2b-q4@1m   283.59s 305gc
231
 
232
 #+(or)                                  ; to suppress processing
233
 (defmethod compute-bgp-lambda :around ((repo rdfcache-repository) body &key &allow-other-keys)
234
   (declare (ignore body))
235
   `(lambda (destination ,@(when base-dimensions '(source)))
236
      (declare (ignore destination ,@(when base-dimensions '(source))))
237
      (put-field-page destination nil)
238
      0))
239
 
240
 
241
 (defparameter *bgp-results* nil)
242
 (defparameter *bgp-threads* nil)
243
 
244
 (defgeneric expand-query (form &key agent dataset repository-id
245
                                default-graphs named-graphs graphs dataset-graphs
246
                                query expand-bgps version dynamic-bindings
247
                                retain-declarations)
248
 
249
   (:method ((form cons) &key
250
             (default-graphs ()) (named-graphs ())
251
             graphs
252
             (dataset-graphs (or graphs (list default-graphs named-graphs)))
253
             (operation (first form))
254
             (dataset "jhacker/basic-term-1")
255
             (repository-id dataset)
256
             (agent *agent*)
257
             (dynamic-bindings nil)
258
             (query (make-query :id (make-null-task-id)
259
                                :agent agent
260
                                :operation operation
261
                                :sse-expression form
262
                                :repository-id repository-id
263
                                :dataset-graphs dataset-graphs
264
                                :request-exchange "test" :request-routing-key "key"
265
                                :store-routing-key "host.app.pid"
266
                                :trace-routing-key nil
267
                                :accept "application/sparql-query+sse"
268
                                :dynamic-bindings dynamic-bindings))
269
             (expand-bgps nil)
270
             (retain-declarations nil)
271
             &allow-other-keys)
272
     (labels ((expand-body (body env)
273
                (loop for sub-form in body collect (expand-query-form sub-form env)))
274
              (expand-query-form (form &optional env)
275
                (typecase form
276
                  (cons
277
                   (destructuring-bind (op . body) form
278
                     (if (symbolp op)
279
                       (case op
280
                         (spocq.i::agp-generator
281
                          ;(expand-query-form (first body) env)
282
                          `(spocq.e:bgp ,@(second (first body))))
283
                         (spocq.e::with-join-scope
284
                          `(,op ,(first body) ,@(expand-body (rest body) env)))
285
                         (compress-solution-field
286
                          `(,op ,@(expand-body body env)))
287
                         ((let let* flet labels)
288
                          `(,op ,(first body) ,@(expand-body (rest body) env)))
289
                         (locally
290
                          (loop for body on body
291
                            for (op . args) = (first body)
292
                            with declares = ()
293
                            do (case op
294
                                 (declare
295
                                  (setf env (augment-environment env :declare args))
296
                                  (push `(,op . ,args) declares))
297
                                 (t
298
                                  (if retain-declarations
299
                                      (return `(locally ,@declares
300
                                                 ,@(expand-body body env)))
301
                                      (return (if (rest body)
302
                                                  `(locally ,@(expand-body body env))
303
                                                  (first (expand-body body env))))
304
                                      )))))
305
                         (spocq.a:|agp|
306
                          (if expand-bgps
307
                              `(,(compute-agp-pattern-lambda (apply #'make-agp body)
308
                                                             (agp-repository form))
309
                                ,(agp-repository form))
310
                              `(spocq.e:bgp ,@body)))
311
                         (t
312
                          (cond (;;(eq (symbol-package op) *algebra-package*)
313
                                 (macro-function op)
314
                                 (flet ((macroexpand-subexpressions (expander form env)
315
                                          (expand-query-form (funcall expander form env) env)))
316
                                   (let ((*macroexpand-hook* #'macroexpand-subexpressions))
317
                                     (multiple-value-bind (expansion macro-p)
318
                                                          (macroexpand form env)
319
                                       (if macro-p
320
                                         (expand-query-form expansion env)
321
                                         expansion)))))
322
                                ((eq (symbol-package op) (find-package :spocq.e))
323
                                 `(,op ,@(expand-body body env)))
324
                                (t
325
                                 form))))
326
                       form)))
327
                  (function
328
                   form)
329
                  (agp
330
                   (if expand-bgps
331
                     `(,(compute-agp-pattern-lambda form (agp-repository form))
332
                       ,(agp-repository form))
333
                     (agp-form form)))
334
                  (t form))))
335
       (with-task-environment (:task query :normal-disposition :abort)
336
         (expand-query-form form nil))))
337
 
338
   (:method ((query-string string) &rest args)
339
     (with-configuration ()
340
       (multiple-value-bind (sse-expression options)
341
                            (parse-sparql query-string)
342
         (apply #'expand-query sse-expression
343
                (append args (list* :dataset-graphs *dataset-graphs* options))))))
344
 
345
   (:method ((query-source pathname) &rest args
346
             &key (repository-id (format nil "~a/~a" (first (last (pathname-directory query-source))) (pathname-name query-source)))
347
             &allow-other-keys)
348
     (with-configuration ()
349
       (multiple-value-bind (sse-expression options)
350
                            (parse-sparql query-source)
351
         (apply #'expand-query sse-expression
352
                :repository-id repository-id
353
                (append args (list* :dataset-graphs *dataset-graphs* options)))))))
354
 
355
 (defun term-equal (term1 term2)
356
   (cond ((rdfcache::pointerp term1)
357
          (term-equal (intern-term term1) term2))
358
         ((rdfcache::pointerp term2)
359
          (term-equal term1 (intern-term term2)))
360
         (t
361
          (spocq.e:equal term1 term2))))
362
 
363
 ;;; for testing
364
 (undefun rdfcache-query-verbose (transaction cursor context subject predicate object)
365
   (flet ((term (term)
366
            (typecase term
367
              (integer (with-numbered-term (temp-term term)
368
                         (cons term (intern-term temp-term))))
369
              (cffi-sys:foreign-pointer (intern-term term))
370
              (null nil)
371
              (t "<?>"))))
372
     (rdfcache::%clear-cursor cursor)
373
     (format *trace-output* "~&query : ~a ~a"
374
             transaction cursor)
375
     (rdfcache::print-cursor cursor *trace-output*)
376
     (format *trace-output* "~%     c: ~a" (term context))
377
     (format *trace-output* "~%     s: ~a" (term subject))
378
     (format *trace-output* "~%     p: ~a" (term predicate))
379
     (format *trace-output* "~%     o: ~a" (term object))
380
     (terpri *trace-output*) (finish-output *trace-output*)
381
     (rdfcache:match transaction cursor context subject predicate object)))
382
 
383
 (undefun rdfcache-query-verbose-id (transaction cursor context subject predicate object)
384
   (format *trace-output* "~&query : ~s ~s ~s ~s~%"
385
           context subject predicate object)
386
   (finish-output *trace-output*)
387
   (rdfcache:match transaction cursor context subject predicate object))
388
 
389
 
390
 (undefun rdfcache-count-verbose (transaction context subject predicate object)
391
   (flet ((term (term)
392
            (typecase term
393
              (integer (with-numbered-term (temp-term term)
394
                         (cons term (intern-term temp-term))))
395
              (cffi-sys:foreign-pointer (intern-term term))
396
              (null nil)
397
              (t "<?>"))))
398
     (format *trace-output* "~&count : ~a " transaction)
399
     (format *trace-output* "~%     c: ~a" (term context))
400
     (format *trace-output* "~%     s: ~a" (term subject))
401
     (format *trace-output* "~%     p: ~a" (term predicate))
402
     (format *trace-output* "~%     o: ~a" (term object))
403
     (terpri *trace-output*) (finish-output *trace-output*)
404
     (rdfcache:count transaction context subject predicate object)))
405
 
406
 (undefun rdfcache-count-verbose-id (transaction context subject predicate object)
407
   (format *trace-output* "~&count : ~s ~s ~s ~s~%"
408
           context subject predicate object)
409
   (finish-output *trace-output*)
410
   (rdfcache:count transaction context subject predicate object))
411
 
412
 
413
 (undefun rdfcache-next-verbose (cursor)
414
   (let ((result (rdfcache::next cursor)))
415
     (format *trace-output* "~&next :")
416
     (when result
417
       (format *trace-output* "~%    c: ~s" (intern-term (rdfcache:cursor-context-pointer cursor)))
418
       (format *trace-output* "~%    s: ~s" (intern-term (rdfcache:cursor-subject-pointer cursor)))
419
       (format *trace-output* "~%    p: ~s" (intern-term (rdfcache:cursor-predicate-pointer cursor)))
420
       (format *trace-output* "~%    o: ~s" (intern-term (rdfcache:cursor-object-pointer cursor))))
421
     (terpri *trace-output*) (finish-output *trace-output*)
422
     result))
423
 
424
 (undefun rdfcache-next-verbose-id (cursor)
425
   (let ((result (rdfcache::next cursor)))
426
     (format *trace-output* "~&next :  ")
427
     (if result
428
       (format *trace-output* "~s ~s ~s ~s~%"
429
             (rdfcache:cursor-context-number cursor)
430
             (rdfcache:cursor-subject-number cursor)
431
             (rdfcache:cursor-predicate-number cursor)
432
             (rdfcache:cursor-object-number cursor))
433
       (format *trace-output* "done~%"))
434
     (finish-output *trace-output*)
435
     result))
436
 
437
 
438
                                                                                
439
                                                                                
440