Coverage report: /development/source/library/org/datagraph/spocq-shard/src/core/processing.lisp

KindCoveredAll%
expression4261101 38.7
branch3076 39.5
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
2
 
3
 (in-package :org.datagraph.spocq.implementation)
4
 
5
 (:documentation "top-level query processing"
6
   "The 'processor' comprises a set of message handling threads.
7
  It manages its resource usage in terms of number of threads and the role(s) for which they are configured.
8
  There are two kinds of thread, request threads and response threads.
9
  Each configured is upon creation to serve a specific purpose:
10
 
11
  * A request thread is intended to accept query requests. It creates as its input, a stream/channel which
12
    to consume query messages from a given brokered domain. It prepares each parsed request, in particular,
13
    by compiling it, and then sends it through the *service-channel* to a response thread for execution.
14
    with a single queue to delegate BGP operations to the store.
15
  * A response thread administers query execution by dispatching the constituent algebra operations to
16
    individual threads and conecting the result solution stream to a response stream/channel to the
17
    originating broker.
18
 
19
 
20
  Each request and response comprises at least three required message properties,
21
 
22
  * account_id 
23
  * repository_id 
24
  * query_id
25
 
26
  and a body. Additional, optional properties are taken from the request header:
27
  
28
  * priority 
29
  * quota 
30
 
31
  A query request's body is the query expression encoded either as SPARQL or as SSE/BERT, as per the
32
  request MIME type.  A query response message's body depends on the query operator
33
 
34
  * SELECT : a solution field
35
  * CONSTRUCT : a triple (or optionally quad?) sequence (graph)
36
  * DESCRIBE : a triple sequence (graph)
37
  * ASK : a boolean
38
 
39
  Should a store request or a query request fail, the response is an error message.
40
 
41
  The managed execution process involves constructung graph of connected threads, each of which
42
  processes one node in the algebra graph. Internal nodes accept arguments are streams and produce
43
  a result stream. The leaves perform BGP matching and iterate over match results from the store to
44
  emit a result stream. The response thread connects the root node output stream to an encoding
45
  filter as per the response MIME type, which emits the result to the response stream/channel.
46
 
47
  Each algebra node involves three processing phases:
48
  - compilation : the sse expression is expanded/translated into one in terms of spocq.e runtime operations
49
    and compiled from lisp into an native function.
50
  - propagation network construction : each first-level spocq.e operator constructs an algebra graph
51
    node which encapsulates the operations parameters - eg., the specific reduction function, projection
52
    dimensions, slice specification, and the links to the argument source nodes. This is a 'static' graph,
53
    which is intended to be reused, and requires store access for selectivity data only, in order to sort
54
    BGP constituent patterns.
55
  - reduction execution : a top-down control process delegates the respective operation for each node to
56
    a thread, which applies the reduction function to source channels - or in the case of BGP leaves, to the
57
    store, to emt the result field stream to the destination.
58
 
59
  The second and third phases are split, rather than initiating the execution simultaneous with network
60
  construction in order that
61
  - the respective reduction thread is active prior to the argument thread(s), and
62
  - the network can be reused by just flushing its channels of ay unused results and restarting the
63
    topmost node.
64
 
65
  Some notes about optimization.
66
  - arq[1] uses 'selectivity' criteria to order related bgps.
67
    - arq/vc : static values by variable position
68
    - arq/vcp : augmented with values for predicate values, for predicate classes
69
    - arq/gsh : uses exact statistics for any literal term. could be extended to term combinations
70
    - the others (pfj, pfn) constitute unclear variations of the above
71
 
72
  [2] presents three aspects of optimization
73
  - abstract symbolic which follows from operator simplification patterns (from perez)
74
  - symbolic transformation based on certain/probable variables
75
  - semantic optimization, which follows from tgd/egd-based inference
76
 
77
 
78
  NB. There is code included to delegate processing for query components to remote processes, but that code
79
  is not up-to-date:
80
  If the query request entailes BGP queries, the execution publishes any BGP queries and yields a function as
81
  the immediate local result. This function is bound globally to its BGP identifiers to be avaialble for
82
  a response thread to combine with response from the store. If, on the other hand, the query included no BGP,
83
  the execution yields an immediate result - a solution field, a triple set, or a scalar (boolean or numeric),
84
  which is published back to the client immediately.
85
 
86
  Any 'incomplete' query functions remain bound to their pending BGP queries. As each response appears, its
87
  query function is retrieved according to BGP identifier and applied to the store result. Each successive
88
  application yields a result analogous to the initial one. As long as a the result is incomplete, the
89
  function remains registered. Any results are returned immediately. Once the function is complete, any
90
  final results are sent as a completion response.
91
 
92
  A store request's body is a single
93
  BGP encoded as SSE/BERT. A store response's body is a solution field - a set of variable binding solutions.
94
 
95
 working with streaming revisions
96
 - the revision has a repeat interval - with or without an end.
97
 this means after it completes, the min/max ordinals are adjusted and it runs again.
98
 - completely? : no without initialization, just the query-run-in-thread suffices? (it is used for internal service forms as well)
99
 - results are emitted to its destination queue as normal
100
 - the query must indicate how to mark them temporally for combination with other streams
101
 
102
 - algebra-thread-apply wiuld be premature - it does initialte execution, but not completion,
103
  which means the transaction parameters cannot be changed.
104
 
105
   this should be captured as a function
106
               (query-run-in-thread task result-or-generator)
107
               (send-response-message (task-operation task) task stream encoding)))
108
   for repreted intervals, update the bounds around it. actually need to mediate the response stream
109
   to take thee-o-s marker as the cueue to reiterate.
110
 
111
 options
112
  begin,end,repeat,window
113
  begin,window
114
  begin,end
115
  begin,repeat,window
116
 
117
 
118
  ---
119
  [1] : http://www2008.org/papers/pdf/p595-stocker1.pdf
120
  [2] : tcdt2010_cd.pdf")
121
 
122
 
123
 ;;; consolidated interface to execute a single query
124
 
125
 (defun test-sparql (query-string &rest args &key (repository-id "system/system") (agent (system-agent))
126
                                  &allow-other-keys)
127
   (apply #'run-sparql query-string
128
          :repository-id repository-id :agent agent
129
          args))
130
 
131
 (defun count-system-quads (&key (account "system"))
132
   (test-sparql "select count(*) from <urn:dydra:all> where {?s ?p ?o}"
133
                :repository-id (concatenate 'string account "/system")))
134
 
135
 (defun rqpl (repository)
136
   (loop
137
     (write-string "? " *terminal-io*)
138
     (finish-output *terminal-io*)
139
     (let ((query (loop with input = ()
140
                    for line = (read-line *terminal-io* nil nil)
141
                    if (zerop (length line))
142
                    return (format nil "~{~a~^~%~}" (reverse input))
143
                    else do (push line input))))
144
       (unless (plusp (length query)) (return))
145
       (handler-case (let ((*print-pretty* nil))
146
                       (multiple-value-bind (solutions variables) (run-sparql query :repository-id repository)
147
                         (format *terminal-io* "~{~a~^ ~}~%" variables)
148
                         (format *terminal-io* "~{~{~s~^ ~}~%~}" solutions)))
149
         (error (c) (warn "error: ~%~a~%" c))))))
150
 
151
 (defun run-sparql-internal (query &rest args)
152
   "run sparql with auth controls, but suppress all extensions - inference, views, etc
153
    which would be recognized during a request.
154
    This, in particular to permit sparql request for extension metadata without
155
    infinite recursion to retrieve it."
156
   (let ((*library-path* ())
157
         (*macroexpand-bgp-phases* *macroexpand-bgp-base-phases*))
158
     (declare (dynamic-extent args))
159
     (apply #'run-sparql query :library-path nil args)))
160
 
161
 (defgeneric run-sparql (query-string &key repository-id graphs library-path dynamic-bindings id operation
162
                                      revision-id revision-windows
163
                                      accounting-handler error-handler solution-handler user-id
164
                                      continuation agent response-content-type
165
                                      &allow-other-keys)
166
 
167
   (:method ((location pathname) &rest args )
168
     (apply #'run-sparql (read-file location) args))
169
 
170
   (:method ((query-string string) &rest args)
171
     (with-configuration ()              ; intended for internal use
172
       (multiple-value-bind (sse-expression options)
173
                            (parse-sparql query-string)
174
         (apply #'run-sparql sse-expression
175
                :request-content query-string
176
                (append args
177
                        ;; take the dataset freom the environment or from the query text
178
                        (case *dataset-source*
179
                          (:query options)
180
                          (t
181
                           (list* :dataset-graphs *dataset-graphs* options))))))))
182
 
183
   (:method ((sse-expression cons) &rest args &key (id (make-task-id))
184
             (operation (first sse-expression))
185
             (response-content-type "application/sparql-query+sse")
186
             &allow-other-keys)
187
     (apply #'run-sparql (apply #'make-query
188
                                :id  id
189
                                :operation operation
190
                                :sse-expression sse-expression
191
                                :request-exchange "test" :request-routing-key "key"
192
                                :store-routing-key "host.app.pid"
193
                                :trace-routing-key nil
194
                                :response-content-type response-content-type
195
                                (plist-difference args '(:accounting-handler :error-handler :solution-handler :continuation
196
                                                                             ;; should be adopted for the initargs
197
                                                                             :revision-windows)))
198
            args))
199
   #+(or) ;; revision windows here
200
   (:method ((query query) &key (accounting-handler nil ah-s) (error-handler nil eh-s) (continuation nil c-s)
201
             (dynamic-bindings nil db-s) (solution-handler #'term-value-field) response-content-type
202
             revision-windows
203
             &allow-other-keys)
204
     (let* ((post-result ())
205
            (dimensions ())
206
            (repository (task-repository query))
207
            (windowed-revision (when revision-windows
208
                                 (compute-repository-revision repository revision-windows))))
209
       (restart-case
210
         (progn (block :handle-termination
211
                  (add-task-thread query (bt:current-thread))
212
                  (restart-bind ((terminate-task (lambda (task-to-terminate &optional condition)
213
                                                   (when (eq query task-to-terminate)
214
                                                     (warn "query task terminated: ~a" task-to-terminate)
215
                                                     (return-from run-sparql (values nil nil query condition))))))
216
                    (with-task-environment (:task query)
217
                      (when db-s  ;; modify a clone's bindings
218
                        (setf (task-dynamic-bindings query) dynamic-bindings))
219
                      (block :collect-result
220
                        (labels ((collect-result (element)
221
                                   (if element
222
                                       (let ((element-result (funcall solution-handler element)))
223
                                         (when (consp element-result)
224
                                           (setf post-result (if c-s
225
                                                                 (when continuation
226
                                                                   (funcall continuation element))
227
                                                                 (append post-result element-result)))))
228
                                     (return-from :collect-result)))
229
                                 (report-errors (task.condition)
230
                                   (destructuring-bind (task . condition) task.condition
231
                                     (declare (ignore task))
232
                                     (warn "query error: ~a: ~a" query condition)))
233
                                 (report-accounting (task.properties)
234
                                   (destructuring-bind (task . properties) task.properties
235
                                     (declare (ignore task))
236
                                     (when *accounting-io*
237
                                       (let ((*print-pretty* nil)) (format *accounting-io* "~&~a~%" properties)))
238
                                     (labels ((clear-list (list)
239
                                                (typecase list
240
                                                  (null nil)
241
                                                  (cons (clear-list (rest list))
242
                                                        (setf (first list) nil (rest list) nil))
243
                                                  (t ))))
244
                                       ;; test retention
245
                                       (clear-list properties))))
246
                                 (run-task-for-revision (effective-revision)
247
                                   (let ((effective-revision-id (spocq.i::repository-revision-id effective-revision)))
248
                                     (setf (spocq.i::task-revision query) effective-revision)
249
                                     (setf (spocq.i::task-revision-id query) effective-revision-id)
250
                                     (log-debug "initiate-task.mqtt: run-task-for-revision: ~a" effective-revision-id))
251
                                   (cond (response-content-type
252
                                          (initiate-task query response-content-type))
253
                                         (t
254
                                          (initiate-task query #'collect-result
255
                                                         :dimension-handler #'(lambda (d) (setf dimensions d))
256
                                                         :accounting-handler (if ah-s accounting-handler #'report-accounting)
257
                                                         :error-handler (if eh-s error-handler #'report-errors)))))
258
                                 (for-each-revision (min-record max-record)
259
                                   (log-debug "initiate-task.run-sparql: for-each-revision: ~a -- ~a" min-record max-record)
260
                                   (unless max-record (setf max-record min-record))
261
                                   (let* ((min-id (ORG.DATAGRAPH.RDF.LMDB.IMPLEMENTATION::REVISION-RECORD-UUID min-record))
262
                                          (max-id (ORG.DATAGRAPH.RDF.LMDB.IMPLEMENTATION::REVISION-RECORD-UUID max-record))
263
                                          (id (if (equalp min-id max-id) min-id (concatenate 'string min-id "-" max-id)))
264
                                          (effective-revision (spocq.i::compute-repository-revision repository id)))
265
                                     (run-task-for-revision effective-revision)
266
                                     (spocq.i::reinitialize-task query))))
267
                          (if windowed-revision
268
                              (map-repository-revision-intervals #'for-each-revision windowed-revision)
269
                              (run-task-for-revision (spocq.i::task-revision query))))))
270
                    (when (fboundp 'process-provenance-information)
271
                      (funcall 'process-provenance-information query))))
272
                (values post-result dimensions query t))
273
         (return-query () (values nil dimensions query nil)))))
274
 
275
   ;; revision windows in initialize task
276
   (:method ((query query) &key (accounting-handler nil ah-s) (error-handler nil eh-s) (continuation nil c-s)
277
             (dynamic-bindings nil db-s) (solution-handler #'term-value-field) response-content-type
278
             revision-windows
279
             &allow-other-keys)
280
     (let ((post-result ())
281
           (dimensions ())
282
           (windowed-revision (when revision-windows
283
                                (spocq.i::compute-repository-revision (task-repository query) revision-windows))))
284
       (restart-case
285
         (progn (block :handle-termination
286
                  (add-task-thread query (bt:current-thread))
287
                  (restart-bind ((terminate-task (lambda (task-to-terminate &optional condition)
288
                                                   (when (eq query task-to-terminate)
289
                                                     (warn "query task terminated: ~a" task-to-terminate)
290
                                                     (return-from run-sparql (values nil nil query condition))))))
291
                    (with-task-environment (:task query)
292
                      (when db-s  ;; modify a clone's bindings
293
                        (setf (task-dynamic-bindings query) dynamic-bindings))
294
                      (block :collect-result
295
                        (labels ((collect-result (element)
296
                                   (if element
297
                                       (let ((element-result (funcall solution-handler element)))
298
                                         (when (consp element-result)
299
                                           (setf post-result (if c-s
300
                                                                 (when continuation
301
                                                                   (funcall continuation element))
302
                                                                 (append post-result element-result)))))
303
                                     (return-from :collect-result)))
304
                                 (report-errors (task.condition)
305
                                   (destructuring-bind (task . condition) task.condition
306
                                     (declare (ignore task))
307
                                     (warn "query error: ~a: ~a" query condition)))
308
                                 (report-accounting (task.properties)
309
                                   (destructuring-bind (task . properties) task.properties
310
                                     (declare (ignore task))
311
                                     (when *accounting-io*
312
                                       (let ((*print-pretty* nil)) (format *accounting-io* "~&~a~%" properties)))
313
                                     (labels ((clear-list (list)
314
                                                (typecase list
315
                                                  (null nil)
316
                                                  (cons (clear-list (rest list))
317
                                                        (setf (first list) nil (rest list) nil))
318
                                                  (t ))))
319
                                       ;; test retention
320
                                       (clear-list properties)))))
321
                          (when windowed-revision
322
                            (let ((effective-revision-id (spocq.i::repository-revision-id windowed-revision)))
323
                              (setf (spocq.i::task-revision query) windowed-revision)
324
                              (setf (spocq.i::task-revision-id query) effective-revision-id)
325
                              (log-debug "run-sparql: revision windows: ~a" windowed-revision)))
326
                          (cond (response-content-type
327
                                 (initiate-task query response-content-type))
328
                                (t
329
                                 (initiate-task query #'collect-result
330
                                                         :dimension-handler #'(lambda (d) (setf dimensions d))
331
                                                         :accounting-handler (if ah-s accounting-handler #'report-accounting)
332
                                                         :error-handler (if eh-s error-handler #'report-errors)))))))
333
                    (when (fboundp 'process-provenance-information)
334
                      (funcall 'process-provenance-information query))))
335
                (values post-result dimensions query t))
336
         (return-query () (values nil dimensions query nil))))))
337
 
338
 ;;; (dydra:run-sparql "select count(*) where {?s ?p ?o}" :repository-id "james/test" :agent (dydra:system-agent))
339
 
340
 ;;; operational components
341
 
342
 (defgeneric initiate-service-task (operation task)
343
   (:documentation "Run a task as an asynchronous thread with the results to be emitted as a
344
  response. Performs just registration and setup immediately and delegates other aspects
345
  through a channel to an asynchronous thread which performs actual processing, response generation,
346
  and clean-up. ")
347
 
348
   (:method ((operation symbol) (query t))
349
     ;; by default, delegate to the :query response method
350
     (initiate-service-task :query query))
351
 
352
   (:method ((operation (eql :query)) (query query))
353
     "Compile, register and initiate processing for a query. If it included a trace key,
354
      send the parsed query expression. Emit accounting notes as per progress."
355
 
356
     (register-query query)
357
     (with-task-environment (:task query)
358
       ;; any commit must happen in the response thread - prior to responding
359
       (let ((*package* *algebra-package*)
360
             (*readtable* *sse-readtable*))
361
         (log-notice "task ~s~@[/~s~], repository ~s registered."
362
                     (task-id query)
363
                     (task-user-id query)
364
                     (repository-id (task-repository query)))
365
         (log-debug "task ~s, form ~s"
366
                     (task-id query)
367
                     (query-sse-expression query)))
368
       (unless (task-initialization-function query)
369
         (compile-query query)
370
         (generate-accounting-note :abstract))
371
       (log-debug "query compiled: ~s, ~s" (task-id query) (query-sse-expression query))
372
       (channel-put *service-channel* query)))
373
 
374
   (:method ((operation (eql :plan)) (query query))
375
     "Expand a query into its plan and return that as the result."
376
     
377
     (register-query query)
378
     (with-task-environment (:task query)
379
       (let ((*package* *algebra-package*)
380
             (*readtable* *sse-readtable*))
381
         (log-notice "plan ~s, repository ~s, form ~s"
382
                     (task-id query)
383
                     (repository-id (task-repository query))
384
                     (query-sse-expression query)))
385
       (unless (task-initialization-function query)
386
         (compile-query query)
387
         (generate-accounting-note :abstract))
388
       (channel-put *service-channel* query))))
389
 
390
 
391
 (defun initiate-internal-task (task modifier &rest args)
392
   "stub to continue to the generic operator"
393
   (declare (dynamic-extent args))
394
   (apply #'initiate-task task modifier args))
395
 
396
 (defgeneric run-precommit-test (query revision)
397
   (:method ((task task) (revision repository-revision))
398
     (flet ((accept-result (result)
399
              (declare (ignore result))
400
              ;; if it yielded a result, then the test succeeded
401
              (return-from run-precommit-test t)))
402
       (initiate-task task #'accept-result)
403
       ;; if it returns without a result, then the test failed
404
       nil))
405
 
406
   (:method ((view view) revision)
407
     (run-precommit-test (view-query view) revision))
408
 
409
   (:method ((test-string string) revision)
410
     (let ((http-method-scanner (load-time-value (cl-ppcre:create-scanner `(:sequence :start-anchor
411
                                                                                      (:alternation "get" "post" "put"))
412
                                                                          :case-insensitive-mode t))))
413
       (cond ((cl-ppcre:scan-to-strings http-method-scanner test-string)
414
              (process-asynchronous-task test-string))
415
             (t
416
              ;; need the dataset
417
              (with-configuration ()              ; intended for internal use
418
                (multiple-value-bind (sse-expression options)
419
                                     (parse-sparql test-string)
420
                  (run-precommit-test (apply #'make-query
421
                                             :id  (make-service-task-id)
422
                                             :dynamic-bindings (query-dynamic-bindings *task*)
423
                                             :repository (repository-revision-reference revision)
424
                                             :revision revision
425
                                             :revision-id (repository-revision-id revision)
426
                                             :operation (first sse-expression)
427
                                             :sse-expression sse-expression
428
                                             :response-content-type nil
429
                                             :user-id (task-user-id *task*)
430
                                             :sparql-expression test-string
431
                                             :agent (task-agent *task*)
432
                                             options)
433
                                      revision))))))))
434
 
435
 (defgeneric process-asynchronous-task (request-string)
436
   (:documentation "Define a stub for the eventual spocq.si implementation")
437
   (:method ((request t))
438
     "This place-holder always returns nil"
439
     nil))
440
 
441
 (defgeneric call-for-task-revisions (operator task revision)
442
   (:method (operator (task task) (revision repository-revision))
443
     "The base method just call the function"
444
     (funcall operator task revision)))
445
 
446
 (defgeneric initiate-task (task continuation &key accounting-handler error-handler dimension-handler)
447
   (:documentation "
448
     This serves as the top-level internal operator to process requests.
449
     Given a task and either a continuation which accepts a result stream or a media type to specify encoding,
450
     compile the query and either initiate the query to yield its generator function and initiate its processing
451
     or, depending on media type, initiate and delegate directly to the encoding operation, or
452
     just delegate for the media type to perform necessary intermediate steps and then emit the result.")
453
 
454
   (:method :around (task continuation &rest args)
455
     (declare (ignore args))
456
     (with-task-environment (:task task)
457
       (register-query task)
458
       ;; note the principal task in order to receive signals
459
       (setf (task-thread task) (bt:current-thread))
460
       (unwind-protect (multiple-value-prog1 (call-next-method)
461
                         (setf (task-thread task) nil))
462
         (when (task-thread task)
463
           (log-notice "initiate-task: ~s. incomplete" (task-id task)))
464
         (finalize-task task))))
465
   
466
   (:method ((task task) (continuation function) &key error-handler accounting-handler dimension-handler)
467
     "Given a continuation as the disposition, invoke it for each solution"
468
     (labels ((result-mediator (result)
469
                (when (null result)
470
                  (generate-accounting-note :complete)   ; set the final task state
471
                  ;; clear any in-process threads
472
                  (log-notice "initiate-task: ~s. complete [~/format-iso-time/ - ~/format-iso-time/) = ~d/~d seconds."
473
                              (task-id task) (task-start-time task) (task-time task)
474
                              (float (/ (- (get-internal-run-time) (task-start-run-time task))
475
                                        internal-time-units-per-second))
476
                              (float (/ (- (get-internal-real-time) (task-start-real-time task))
477
                                        internal-time-units-per-second)))
478
                  (unschedule-task task)
479
                  (when (and error-handler *error-condition-channel*)
480
                    (filter-notes error-handler (channel-get-all *error-condition-channel*)))
481
                  (when accounting-handler
482
                    (filter-notes accounting-handler (get-accounting-notes))))
483
                (funcall continuation result))
484
              (filter-notes (handler notes)
485
                (loop for (note-task . properties) in notes
486
                  until (null note-task)
487
                  if (eq note-task task)
488
                  do (when handler
489
                       (locally (declare (type function handler))
490
                         (assert-argument-types initiate-task
491
                                                (handler function))
492
                         (funcall handler (cons note-task properties)))))))
493
       (initialize-task task)
494
       (let* ((result-or-generator (task-result-generator task)))
495
         (etypecase result-or-generator
496
           (matrix-field
497
            (result-mediator result-or-generator))
498
           (abstract-field-generator
499
            (let ((channel (abstract-field-generator-channel result-or-generator)))
500
              (when dimension-handler
501
                (funcall dimension-handler (abstract-field-generator-dimensions result-or-generator)))
502
              (flet ((initiate-task-for-revision (task revision)
503
                       (declare (ignore revision))
504
                       (query-run-in-thread task result-or-generator)
505
                       (do-pages (page channel)
506
                                 (result-mediator page))))
507
                (declare (dynamic-extent #'initiate-task-for-revision))
508
                (call-for-task-revisions #'initiate-task-for-revision task (task-revision task)))))))
509
       (result-mediator nil)))
510
 
511
    (:method ((task task) (encoding mime:mime-type) &key (stream *standard-output*)  &allow-other-keys)
512
      "In general, for mime types, run the query and send the generated result as the response"
513
      (initialize-task task)
514
      (let ((result-or-generator (task-result-generator task))
515
            (*thread-operations* (cons (list 'initiate-task (task-id task))
516
                                       *thread-operations*)))
517
        (flet ((initiate-task-for-revision (task revision)
518
                 (declare (ignore revision))
519
                 (query-run-in-thread task result-or-generator)
520
                 (send-response-message (task-operation task) task stream encoding)))
521
          (declare (dynamic-extent #'initiate-task-for-revision))
522
          (call-for-task-revisions #'initiate-task-for-revision task (task-revision task))))
523
      t)
524
 
525
   (:method ((task query) (encoding mime::query) &key (stream *standard-output*)  &allow-other-keys)
526
     "if the response is strictly the query, then send the response directly, without processing"
527
     (send-response-message (task-operation task) (query-sse-expression task) stream encoding)))
528
 
529
 
530
 #+sbcl
531
 (defun thread-task (thread)
532
   (when (bt:thread-alive-p thread)
533
     (sb-thread:symbol-value-in-thread '*task* thread nil)))
534
 
535
 #-sbcl
536
 (defun thread-task (thread)
537
   (log-error "no thread-task implementation")
538
   nil)
539
 
540
 
541
 (defun cancel-algebra-threads ()
542
   (loop for thread in (bt:all-threads)
543
     count (unless (or (eq thread *management-thread*) ; 
544
                       (eq thread (bt:current-thread)))
545
             (ignore-errors (cancel-thread thread :task nil)))))
546
 
547
 
548
 ;;;
549
 ;;; page size : 512
550
 ;;; 5 : 76.59 65.29 92.98
551
 ;;; 6 : 66.48 55.6 74.04
552
 ;;; 7 : 66.96 66.48 72.39
553
 ;;; w/10 all results were materialized in a single thread
554
 ;;; 10: 75.95 59.98 65.12
555
 ;;;
556
 ;;; page size : 32
557
 ;;; 10: 81.4 81.33 75.46 59.07
558
 ;;;
559
 ;;; page size 64
560
 ;;; 10: 77.72 67.31 96.93
561
 
562
 (defgeneric get-task-operation (task)
563
   (:method ((task task))
564
     "ensure that the task is still active - otherwise, delayed process creation can leave a hanging process"
565
     (when (task-active-p task)
566
       ;; must wait as the thread will be created before the operation
567
       (channel-get (task-operations task) :wait t))))
568
 
569
 (defgeneric put-task-operation (task operation)
570
   (:method ((task task) operation)
571
     "allow the task to be picked up by an idle thread, suspend to allow a thread
572
     to queue to fetch an operation and if non is waiting create a new one.
573
     queue operations per task in order to permit eventual prioritzation"
574
     (let ((operations-queue (task-operations task)))
575
       (bt:with-lock-held (*algebra-task-lock*)
576
         (channel-put *algebra-task-channel* task)
577
         (unless (plusp (channel-get-wait-count *algebra-task-channel*))
578
           ;; (let ((*print-pretty* nil)) (format *trace-output* "~%starting thread for ~s." source))
579
           (start-algebra-thread #'(lambda () (run-algebra-thread-loop)))
580
           (bt:thread-yield)))
581
       (channel-put operations-queue operation)
582
       operation)))
583
 
584
 
585
 (defparameter *cache-channel-threads* nil)
586
 
587
 (defgeneric query-run-in-thread (task task-activation)
588
   (:documentation "Arrange to add an operation to a query to make it available for another thread to execute.
589
    This may be the initial task operation or some indiviaul algepra operation.")
590
 
591
   (:method ((task task) (source null))
592
     nil)
593
   
594
   (:method ((task task) (source abstract-field-generator))
595
     (put-task-operation task (abstract-field-generator-expression source)))
596
 
597
   (:method ((task task) (expression cons))
598
     (when *cache-channel-threads*
599
       (loop for object in expression
600
             when (typep object 'channel)
601
             do (setf (channel-writer object) (bt:current-thread))
602
             when (typep object 'solution-generator)
603
             do (let ((channel (solution-generator-channel object)))
604
                  (when channel (setf (channel-reader channel) (bt:current-thread))))))
605
     (put-task-operation task expression))
606
   
607
   (:method ((task task) (task-activation function))
608
     (put-task-operation task task-activation)))
609
 
610
 
611
 (defun start-algebra-thread (function &optional (initial-bindings nil))
612
   (make-processing-thread function
613
                           :name (format nil "algebra-dynamic-~d" (rdfcache:time-in-thread))
614
                           :initial-bindings initial-bindings))
615
 
616
 
617
 (defun make-processing-thread (function &rest args &key initial-bindings name)
618
   "create a new thread which ensures thread-saftey for global bits
619
  - *random-state*, for use in blank node generation
620
  - *task*, to keep it local"
621
   (declare (ignore name))
622
   (let ((thread
623
          (apply #'bt:make-thread function
624
                 :initial-bindings (loop for (variable value)
625
                                     in `((*print-array* ,*print-array*)
626
                                          (*random-state* ,(make-random-state t))
627
                                          (*thread-string-buffer* nil)
628
                                          (*thread-byte-buffer* nil)
629
                                          (*task* nil))
630
                                     unless (assoc variable initial-bindings)
631
                                     do (setf initial-bindings
632
                                              (acons variable value initial-bindings))
633
                                     finally (return initial-bindings))
634
                 args)))
635
     (unless (bt:threadp thread)
636
       (error "thread creation failed: ~s" function))
637
     thread))
638
     
639
 ;;; (dotimes (x 3) (make-processing-thread #'(lambda () (print (random 10)))))
640
 ;;; (time (dotimes (x 100) (make-processing-thread #'(lambda () nil)))) == ca 45ms/call
641
 
642
 #+(and use-affinity sbcl)
643
 (progn
644
   (defparameter *affinity-masks* (make-array (sb-cpu-affinity:cpu-count)))
645
   (dotimes (i (length *affinity-masks*))
646
     (setf (aref *affinity-masks* i) ;; (sb-cpu-affinity::make-cpu-mask)))
647
           (sb-alien:make-alien (sb-alien:unsigned 8) 1024))
648
     (sb-cpu-affinity::%get-cpu-affinity-mask (aref *affinity-masks* i))
649
     (sb-cpu-affinity::set-cpu-affinity i (aref *affinity-masks* i))))
650
 
651
 
652
 ;; (defparameter *run-algebra-thread.lock* (bt:make-lock "run-algebra-thread"))
653
 
654
 (defun algebra-thread-apply (task expression-or-function &key (priority *algebra-running-priority*))
655
   "Apply the task operation in an existing thread.
656
  Modify the thread priority and bind the task environment around the call.
657
  Emit an accounting note and trace messages for the operation completion and return the
658
  operatiosn completion state"
659
 
660
   (log-trace "reduce aspect: ~a: ~a"
661
              (task-id task) expression-or-function)
662
   (let ((active-priority *algebra-idle-priority*))
663
     (with-task-environment (:task task)
664
       (unwind-protect (progn
665
                         (setf active-priority (setf (thread-priority) priority))
666
                         (let ((return-state (multiple-value-list (etypecase expression-or-function
667
                                                                    (function (funcall expression-or-function))
668
                                                                    (null nil)
669
                                                                    (cons (apply (first expression-or-function)
670
                                                                                 (rest expression-or-function)))))))
671
                           (generate-accounting-note (first return-state) :task task)
672
                           (log-trace "reduced aspect: ~a: ~a -> ~a"
673
                                      (task-id task) expression-or-function return-state)
674
                           (apply #'values return-state)))
675
         (unless (eql active-priority *algebra-idle-priority*)
676
           (handler-case (setf (thread-priority) *algebra-idle-priority*)
677
             (error (c) (log-warn "error resetting thread priority: ~s: ~a."
678
                                  task c))))))))
679
 
680
 (defparameter *run-algebra-thread.init-thread* t)
681
 (defparameter *run-algebra-thread.exit-thread* nil)
682
 
683
 ;;; 2019-05-04: observing htat each thrad creation required 30-50ms, a single-operation thread model
684
 ;;; was not realistic for short-lived requests.
685
 ;;; as a consequence, refactored run-algebra-thread(task)  into run-algebra-thread-loop + get-task -> task-run-algebra-thread
686
 
687
 (defun run-state-is-processing ()
688
   (eq *run-state* :processing))
689
 (defun run-state-is-terminate ()
690
   (eq *run-state* :terminate))
691
 
692
 (defun run-algebra-thread-loop ()
693
   "run as the toplevel thread loop to request a task which needs processing,
694
    attach to it and process a task for it."
695
   (loop for task = (channel-get *algebra-task-channel*)
696
     do (task-run-algebra-thread task)
697
     until (run-state-is-terminate)))
698
 
699
 
700
 (defun task-run-algebra-thread (task)
701
   (declare (optimize (debug 2) (safety 3)))
702
   (log-trace "algebra thread starting: ~s." task)
703
   (assert-argument-type task-run-algebra-thread task task)
704
   (trace-threads
705
    #'(lambda (stream)
706
        (format stream "~&rat.start: ~s: ~s~%" task (bt:thread-name (bt:current-thread)))
707
        (write-char #\+ *trace-output*) (finish-output *trace-output*)))
708
   (let ((*thread-operations* nil)
709
         (unwind-count 0))
710
     (when *run-algebra-thread.init-thread*
711
       (rdfcache:init-thread) )          ; required for rdfcache channel setup
712
     ;; note that the thread accepts operations from the tasks's queue w/o setting up the respective
713
     ;; environment. given which, terminate task
714
     (restart-bind ((cancel-thread (lambda (&key ((:task cancel-task) nil))
715
                                     (trace-threads task-run-algebra-thread.cancel-thread :current-thread (bt:current-thread) :canceled)
716
                                     (cond ((or (null cancel-task) (eq cancel-task task))
717
                                            (log-info "task thread canceled: returning: ~a ~a" *task* (bt:current-thread ))
718
                                            (return-from task-run-algebra-thread))
719
                                           (t
720
                                            ;; this happens on rare occasions.
721
                                            ;; it indicates that this thread has been associate with a task which is cancelling its threads,
722
                                            ;; but the association is no longer true.
723
                                            ;; this could happen where the cancellation is a spawned thread while the thread itself
724
                                            ;; has returned, disassociated itself from the task and commenced operation for another.
725
                                            ;; this agrees with the pronted state, where the thread's task is active and the cancel-task is complete.
726
                                            (log-warn "task thread canceled ... not: ~a/~a/~a ~a"
727
                                                      *task* task cancel-task (bt:current-thread )))))))
728
       (unwind-protect
729
         (flet ((handle-error (condition)
730
                  (typecase condition
731
                    (spocq.e:compilation-error
732
                     (log-warn "task-run-algebra-thread: compilation error in query thread: ~a -> ~a." task condition)
733
                     (if *log-compilation-errors*
734
                       (write-tracelog "task-run-algebra-thread: compilation error in query thread: ~a -> ~a." task condition)))
735
                    (t
736
                     (log-stacktrace "Error in query thread: ~a -> ~a." task condition)))
737
                  ;; send an error response
738
                  (generate-error-note condition :task task)
739
                  (trace-threads task-run-algebra-thread.handle-error :task-terminated :current-thread (bt:current-thread) :task task)
740
                  (log-warn "task-run-algebra-thread.handle-error: terminating task: ~a: ~a: ~a" (bt:current-thread) task *thread-operations*)
741
                  (generate-accounting-note :terminate :task task)
742
                  (terminate-task task condition)
743
                  (return-from task-run-algebra-thread condition)))
744
                    
745
           (add-task-thread task (bt:current-thread))
746
           (flet ((guarded-handler (c)
747
                    ;;(print (list :task-run-algebra-thread :guarded-handler c) *error-output*)
748
                    (handler-case (handle-error c)
749
                      (serious-condition (c2)
750
                                         (log-error "reentrant error processing query:~% ~a~% ~a."
751
                                                    c c2)))
752
                    (return-from task-run-algebra-thread)))
753
             (declare (dynamic-extent #'guarded-handler))
754
             (handler-case
755
               (handler-bind ((storage-condition
756
                               (lambda (c)
757
                                 (SB-DEBUG:PRINT-BACKTRACE :count 8)
758
                                 (signal c)) )    ; unwind it to the handler-case
759
                              (serious-condition
760
                               ;; handle the error in the dynamic context in order to terminate other threads
761
                               ;; then, as the last this one, in order to not close the transaction until
762
                               ;; no other thread needs it.
763
                               #'guarded-handler))
764
                  (let ((operation (get-task-operation task)))
765
                   ;;(trace-when *trace-output* "~&rat.next ~s.~s: ~s" task (task-operation task) (bt:current-thread))
766
                    (when operation ;; allow that the task has been terminated
767
                      (setq *thread-operations* (list operation))
768
                      ;; run exactly one operation per thread
769
                       (algebra-thread-apply task operation))))
770
               (storage-condition (c) (guarded-handler c)))))
771
         (log-info "algebra thread returning: ~s." task)
772
         (remove-task-thread task (bt:current-thread))
773
         (trace-threads
774
          #'(lambda (stream)
775
              (format stream "~&rat.complete: ~s: ~s @~d~%" task (bt:thread-name (bt:current-thread)) (incf unwind-count))
776
              (print *thread-trace-output* *terminal-io*)
777
              (finish-output *terminal-io*)))
778
         (when *run-algebra-thread.exit-thread*
779
           (rdfcache:exit-thread))       ; channel housekeeping
780
         task))))
781
 
782
 (defmacro process-query-request (&rest args)
783
   (error "NYI: process-query-request . ~s" args))
784
 
785
 #+(or); obsolete
786
 (defun process-query-request (query *request-io* *store-io*)
787
   "Mediate query requests against a store.
788
 
789
  The request channel is active based on subscriptions
790
  * to service a request domain - with respective access and priority
791
  * to field store responses
792
  The store queue is bound to a brokered queue for a triple store, which performs the BGP processing.
793
  Each incoming message is dispatched as either a new request or a store response for an existing
794
  request.
795
 
796
  To process a request, compile and execute it. The execution yields two values:
797
  * a solution set, which may be empty, and
798
  * a boolean to indicate completion.
799
  If either the solution is not empty, or the request is complete, publish the result."
800
 
801
   (funcall (task-operation query) query *request-io* *store-io*))
802
 
803
 
804
 (defgeneric process-store-response (response-type message *request-io* *store-io*)
805
   (:documentation "Process a store message This allows for BGP solutions and for errors.")
806
 
807
   (:method ((type (eql 'spocq:|bgpfield|)) message (*request-io* t) (*store-io* t))
808
     "A reply message contains a solution field. Integrate it into an ongoing query, capture any
809
  (possibly intermediate) results and publish them to any subscribers. If the query remains complete,
810
  terminate the client streams. Otherwise leave the augmented query standing."
811
 
812
     (handler-bind ((spocq.e:runtime-error
813
                     (lambda (condition)
814
                       ;; if it is a spocq error, send an error message to the client and terminate
815
                       ;; the query.
816
                       (let ((query (error-query condition)))
817
                         (generate-error-note condition :task query)
818
                         (generate-accounting-note :terminate :task query)
819
                         (finalize-task query)
820
                         ;; return t to indicate that the message was handled
821
                         (return-from process-store-response t)))))
822
       (error "Not supported: ~s."
823
              `(spocq:|bgpfield| ,message ,*request-io* ,*store-io*))))
824
 
825
   (:method ((type (eql 'spocq:|error|)) message *store-io* *request-io*)
826
     (spocq:|error| message *request-io* *store-io*)))
827
 
828
 #+(or)
829
 (defgeneric spocq:|query| (task request-io store-io)
830
   (:method ((*query* query) request-io store-io)
831
     "Compile, register and initiate processing for a query. If it included a trace information,
832
  send the parsed query expression. Emit accounting notes as per progress."
833
 
834
     (send-account-note *query* :abstract *accounting-io*)
835
     (compile-query *query*)
836
     (when (task-trace-routing-key *query*)
837
       (send-trace-response *query* request-io :|query-form| (query-sse-expression *query*)))
838
     (let ((*package* *ALGEBRA-PACKAGE*)
839
           (*readtable* *sse-readtable*))
840
       (log-notice "task ~s, repository ~s, form ~s"
841
                   (task-id *query*)
842
                   (repository-id (task-repository *query*))
843
                   (query-sse-expression *query*)))
844
     (register-query *query*)
845
     ;; initiate any bgp matching
846
     (match-query *query*)
847
     ;; attempt immediate reduction - in case bgps were literal or repository in direct
848
     (reduce-query *query*)
849
     (task-state *query*)))
850
 
851
 #+(or)
852
 (defgeneric spocq:|bgpfield| (message request-io store-io)
853
   (:method ((task store-reply) request-io store-io)
854
     (let* ((bgp-id (task-bgp-id task))
855
            (solution-field (task-solution-field task))
856
            (*query* (find-query bgp-id)))
857
       (cond (*query*
858
              (when (task-trace-routing-key *query*)
859
                (send-trace-response *query* request-io :|bgp-complete| (list bgp-id (length solution-field))))
860
              (log-info "task ~s, bgp ~s, count ~s."
861
                        (task-id *query*) bgp-id (solution-field-row-count solution-field))
862
              (setf (query-pattern-solutions *query* bgp-id) solution-field)
863
              (if (every #'agp-completed-p (query-patterns *query*))
864
                (reduce-query *query*)
865
                (match-query *query*))
866
              (task-state *query*))
867
             (t
868
              (log-warn "No query found for store response: ~s" task)
869
              bgp-id)))))
870
 
871
 
872
 (defgeneric spocq:|error| (message request-io store-io)
873
   (:method ((task store-error-task) request-io store-io)
874
     (let* ((bgp-id (task-bgp-id task))
875
            (*query* (find-query bgp-id)))
876
       (cond (*query*
877
              (log-notice "query: ~s. terminated. (~a ~a)"
878
                          (task-id *query*) (task-condition task) (task-detail task))
879
              (generate-error-note (task-condition task) :task *query*)
880
              (generate-accounting-note :terminate :task *query*)
881
              (finalize-task *query*))
882
             (t
883
              (log-warn "No query found for store error: ~s" task))))))
884
 
885
 
886
 (defgeneric spocq:|terminate| (message request-io store-io)
887
   (:documentation "The interface operator for terminate defines a query-request method which
888
    locates a designated query and unregisters it. This allows in-progress algebra computations
889
    to complete, but prevents future operations. The operation is logs and makes an account note.")
890
 
891
   (:method ((request terminate-request) request-io (store-io t))
892
     (let* ((id (task-id request))
893
            (query (find-query id)))
894
       (cond (query
895
              (terminate-task query))
896
             (t
897
              (log-warn "No query found for terminate: ~s" request)))))
898
 
899
   (:method ((query query) (request-io t) (store-io t))
900
     (terminate-task query)))
901
 
902
 (defparameter *repository-purge-threshold* 3)
903
 
904
 (defun constrain-resources ()
905
   (let* ((now (get-universal-time ))
906
          (terminated-tasks ())
907
          (deleted-repositories ())
908
          (deleted-prototypes ()))
909
     (flet ((constrain-query (query)
910
              (when (task-quota-exceeded-p query)
911
                (log-notice "Task over quota: ~a (~a ~a ~a)."
912
                            query (task-start-time query) (task-time query) (task-time-limit query))
913
                (terminate-task query
914
                                (make-condition 'spocq.e::quota-error
915
                                                :task query
916
                                                :detail (list :solutions (task-solution-limit query)
917
                                                              :time (task-time-limit query))))
918
                (push query terminated-tasks))))
919
       (declare (dynamic-extent #'constrain-query))
920
       ;; first constrain active queries
921
       (map-queries #'constrain-query)
922
       ;; then limit prototype collected
923
       (when (> (hash-table-count *query-prototypes*) *query-prototypes-maximum-count*)
924
         (setf deleted-prototypes
925
               (loop for (prototype repository signature bindings)
926
                     in (subseq (sort (loop for key being each hash-key of *query-prototypes*
927
                                            using (hash-value prototype)
928
                                            collect (cons prototype key))
929
                                      #'< :key #'(lambda (cell) (task-start-run-time (first cell))))
930
                                0 (floor (/ *query-prototypes-maximum-count* 2)))
931
                     do (progn (setf (find-query-prototype repository signature bindings) nil)
932
                               )
933
                     collect prototype)))
934
       ;; then limit the repositores saved
935
       (when (zerop (logand now 15))
936
         (cond ((> (hash-table-count *repositories*) *repository-purge-threshold*)
937
                (setf deleted-repositories (release-repository-resources *repositories*)))
938
               #+sbcl
939
               ((> (sb-kernel::dynamic-usage) 2500000000)
940
                (setf deleted-repositories (release-repository-resources *repositories*)))))
941
       (when (or terminated-tasks deleted-repositories deleted-prototypes)
942
         (log-info "terminated tasks: ~s, deleted repositories: ~s, deleted prototypes: ~s"
943
                    (length terminated-tasks)
944
                    (length deleted-repositories)
945
                    (length deleted-prototypes)))
946
       (values terminated-tasks deleted-repositories deleted-prototypes))))
947
   
948
 (defmethod task-quota-exceeded-p (task)
949
   (let ((time-limit (task-time-limit task))
950
         (start-time (task-start-time task))
951
         (task-time (task-time task)))
952
     (cond ((eq (task-agent task) (system-agent))
953
            nil)
954
           ((null time-limit) nil)
955
           (start-time
956
            (> (- task-time start-time) time-limit))
957
           (t
958
            (setf (task-start-time task) task-time)
959
            nil))))
960
 
961
 
962
 (defparameter *saved-expression* nil)
963
 (defparameter *saved-function* nil)
964
 
965
 ;;; (setq *query-evaluation-mode* :interpreted)
966
 
967
 (defun compile-query (query)
968
   (compile-query-for-repository query (task-repository query)))
969
 
970
 (defgeneric compile-query-for-repository (query repository)
971
   (:documentation "Compile the top-level query algebra expression.
972
  The macro-expands each operator in the expression tree which transforms the algebra operations
973
  into the respective run-time function calls. In the process, where bgp expansion happens, it will
974
  re-order, slice, and/or combine graph patterns accoring to the transaction-specific repository
975
  statistics. this means
976
  Uses abstract-query-expression to abstract the sse over its bgp forms. Use the bgp bindings to construct a
977
  function (query-arguments-function) which issues asynchronous bgpmatch requests the repository's store, and
978
  a predicate (query-predicate-function) which tests whether the bgpmatch results are complete. Use the
979
  abstract sse to construct a function over the bgp results and a response continuation to compute the
980
  results and generate a query reply.
981
 
982
  Note that a describe form generates no immediate response. It constructs an additional query for the indicated
983
  subjects on-the-fly based on the initial response and evaluates it subsequent to the original query task.")
984
 
985
   (:method ((query query) (repository repository))
986
     (if (task-initialization-function query)
987
       (task-initialization-function query)
988
       (let* ((expression (query-sse-expression query))
989
              (lambda-expression
990
               ;; the initialization function generates the propagation tree. this happens
991
               ;; in the scope of the task transaciton, but must leave it to be continued
992
               ;, once other leaf operators run
993
               (ecase *query-evaluation-mode*
994
                 (:compiled
995
                  `(lambda (query)
996
                     ;;(print ',expression)
997
                     (with-task-environment (:task query :normal-disposition :continue)
998
                       ;; no longer necessary to prime bgp matches, as they now return standard
999
                       ;; generators with initiation functions which the combination operators
1000
                       ;; pass to run-in-thread...
1001
                       ,expression)))
1002
                 (:interpreted
1003
                  `(lambda (query)
1004
                     (with-task-environment (:task query :normal-disposition :continue)
1005
                       ;; no longer necessary to prime bgp matches, as they now return standard
1006
                       ;; generators with initiation functions which the combination operators
1007
                       ;; pass to run-in-thread...
1008
                       (eval ',expression)))))))
1009
         (with-task-environment (:task query :normal-disposition :continue)
1010
           (with-accounting
1011
               (log-debug "compile-query ~a: ~s" (task-id query) expression)
1012
             (setf (query-patterns query) nil)
1013
             (setf (task-operator-count query) (length (expression-algebra-operators expression :unique-p nil)))
1014
             ;; compile the query form and collect the agp instances as a side-effect
1015
             (let ((initialization-function 
1016
                    (if *compile-algebra-expression*
1017
                        (spocq-compile lambda-expression)
1018
                        #'(lambda (query) (eval (list lambda-expression query))))))
1019
               (setf (task-initialization-function query) initialization-function)
1020
               (generate-accounting-note :compile :task query)
1021
               (values initialization-function
1022
                       lambda-expression))))))))
1023
 
1024
 
1025
 
1026
 
1027
 (defgeneric match-query (agp-or-query)
1028
   (:documentation "Compute or retrieve the values for abstracted bgp/abp patterns. This is specialized for
1029
  query instances to iterate over all of its apgs, and for agps to obtain the respective solution field
1030
  either by local retrieval or by delegating the match to the remote store.")
1031
 
1032
   (:method ((query query))
1033
     (let ((match-start (task-time query))
1034
           (match-start-run-time (get-internal-run-time)))
1035
       (log-debug "match-query: ~s: start." (task-id query))
1036
       (send-account-note query :delegate *accounting-io*)
1037
       ;; reorder the agps
1038
       (setf (query-patterns query) (compute-agp-order (query-patterns query)))
1039
       (dolist (agp (query-patterns query))
1040
         ;; resolve each agp which is itself still un-resolved, but has completed predecessors
1041
         (when (and (agp-initialized-p agp)
1042
                    (every #'agp-completed-p (agp-predecessors agp)))
1043
           (agp-solutions agp)))
1044
       (send-account-note query :bind *accounting-io*)
1045
       (let ((match-end (task-time query)))
1046
         (log-debug "query: ~s: matched: [~/format-iso-time/ - ~/format-iso-time/) = ~d/~d seconds."
1047
                    (task-id query) match-start match-end
1048
                    (float (/ (- (get-internal-run-time) match-start-run-time) internal-time-units-per-second))
1049
                    (- match-end match-start))))))
1050
 
1051
 
1052
 ;;;
1053
 ;;; internal query execution operators
1054
 
1055
 (defgeneric call-with-query-solutions (function query &key repository repository-id graphs)
1056
   (:documentation "Apply FUNCTION to each of the solutions generated by the given QUERY
1057
 ")
1058
 
1059
   (:method (function (query string) &rest args &key graphs &allow-other-keys)
1060
     (declare (dynamic-extent args))
1061
     (declare (ignore graphs))
1062
     (multiple-value-bind (sse-expression options)
1063
                          (parse-sparql query)
1064
       (apply #'call-with-query-solutions function
1065
              sse-expression
1066
              (merge-property-lists args options))))
1067
 
1068
   (:method (function (query list) &rest args &key
1069
                      (id (format nil "internal-~d" (rdfcache:time-in-thread)))
1070
                      (operation :|query|)
1071
                      &allow-other-keys)
1072
     (declare (dynamic-extent args))
1073
     (call-with-query-solutions function
1074
            (apply #'make-instance *class.query*
1075
                   :id id
1076
                   :sse-expression query
1077
                   :operation operation
1078
                   args)))
1079
 
1080
   (:method (function (task query) &key &allow-other-keys)
1081
     (let ((dimensions nil)
1082
           (count 0))
1083
       (with-task-environment (:task task)
1084
         (initialize-task task)
1085
         (let* ((generator (task-result-generator task))
1086
                (channel (abstract-field-generator-channel generator)))
1087
           (setf dimensions (abstract-field-generator-dimensions generator))
1088
           (unwind-protect (block :handle-termination
1089
                             (add-task-thread task (bt:current-thread))
1090
                             (restart-bind ((terminate-task (lambda (task-to-terminate &optional condition)
1091
                                                              (when (or (null task-to-terminate) (eq task task-to-terminate))
1092
                                                                (warn "task terminated: ~a" task-to-terminate)
1093
                                                                (setf count -1)
1094
                                                                (return-from :handle-termination condition)))))
1095
                               (query-run-in-thread task generator)
1096
                               (loop for page = (get-field-page channel)
1097
                                     until (null page)
1098
                                     do (progn (incf count (array-dimension page 0))
1099
                                               (map nil function (term-value-field page)))
1100
                                     finally (funcall function nil))
1101
                               task))
1102
             (unless *management-thread*
1103
               (get-accounting-notes)
1104
               (channel-get-all *error-condition-channel*))
1105
             (finalize-task task))))
1106
       (values count dimensions))))
1107
 
1108
 ;;; (call-with-query-solutions #'print "select count(*) where { {graph ?g {?s ?p ?o}} union {?s ?p ?o} }" :repository-id "1/1571")
1109
 
1110
 
1111
 ;; (setq *compile-algebra-expression* nil)
1112
 
1113
 #+(or) ;; obsolete - applies to materialization
1114
 (defgeneric reduce-query (query)
1115
   (:documentation "Double-check, that the query preconditions are met - at least, that the arguments
1116
  are all set, and apply the algebra function to the arguments and to a response continuation.
1117
  That continuation accepts a result term - boolean, triple set, or solution field and responds
1118
  to the original query request.")
1119
   
1120
   (:method ((query query))
1121
     (let ((state :bind))
1122
       (when (query-runnable-p query)
1123
         (setf state :reduce)
1124
         (send-account-note query :reduce *accounting-io*)
1125
         (handler-bind (((or warning error)
1126
                         (lambda (c)
1127
                           (spocq.e:reduction-error :condition c :expression (query-sse-expression query) :query query)
1128
                           ;;; signals to the message processing loop. does not return
1129
                           )))
1130
           (let ((result (funcall (task-initialization-function query) query)))
1131
             (when (member result '(:complete :terminate))
1132
               (return-from reduce-query (task-state query)))
1133
             (let ((now (task-time query))
1134
                      (start (task-start-time query)))
1135
                  (case (task-state query)
1136
                    (:terminate            ; reduction aborted or completed after time-out
1137
                     (log-notice "task ~s. complete (terminate) [~/format-iso-time/ - ~/format-iso-time/) = ~d/~d seconds."
1138
                                 (task-id query) start now
1139
                                 (float (/ (- (get-internal-run-time) (task-start-run-time query)) internal-time-units-per-second))
1140
                                 (float (/ (- (get-internal-real-time) (task-start-real-time query)) internal-time-units-per-second))))
1141
                    (t
1142
                     (log-notice "task ~s. complete (to send) [~/format-iso-time/ - ~/format-iso-time/) = ~d/~d seconds."
1143
                                 (task-id query) start now
1144
                                 (float (/ (- (get-internal-run-time) (task-start-run-time query)) internal-time-units-per-second))
1145
                                 (float (/ (- (get-internal-real-time) (task-start-real-time query)) internal-time-units-per-second)))
1146
                     (when (log-level-qualifies? :debug)
1147
                       (if (consp result)
1148
                         (let ((variables (first result))
1149
                               (first (first (rest result)))
1150
                               (last (first (last result)))
1151
                               (count (length (rest result))))
1152
                           (log-debug  "task ~s result: ~s~@[ . (~s~@[ [... x~d ...]~]~@[ ~s~])~]."
1153
                                       (task-id query)
1154
                                       variables
1155
                                       first
1156
                                       (when (> count 2) (- count 2))
1157
                                       (when (> count 1) last)))
1158
                         (log-debug "task ~s result: ~s."
1159
                                    (task-id query)
1160
                                    result)))
1161
                     (send-query-response query *request-io* result)
1162
                     (setf now (task-time query))
1163
                     (log-notice "task ~s. complete (~a) [~/format-iso-time/ - ~/format-iso-time/) = ~d/~d seconds."
1164
                                 (task-id query) (task-state query) start now
1165
                                 (float (/ (- (get-internal-run-time) (task-start-run-time query)) internal-time-units-per-second))
1166
                                 (float (/ (- (get-internal-real-time) (task-start-real-time query)) internal-time-units-per-second)))
1167
                     (setf state (setf (task-state query) :complete))))))
1168
           (when (task-complete-p query)
1169
             (let ((time-interval (task-time-interval query)))
1170
               (cond ((and time-interval
1171
                           (< (setf (task-start-time query) (+ (task-time query) time-interval))
1172
                              (task-end-time query)))
1173
                      ;; if complete, but repeating, rerun at the respective start time
1174
                      (let ((delay (- (task-start-time query) (task-time query))))
1175
                        ;; reset the argument list
1176
                        (dolist (agp (query-patterns query))
1177
                          (agp-reset agp))
1178
                        (when (plusp delay)
1179
                          (sleep delay)))
1180
                      (match-query query)
1181
                      (reduce-query query))
1182
                   (t
1183
                    ;; if complete, but not repeating, purge it
1184
                    (send-account-note query :complete *accounting-io*)
1185
                    (finalize-task query)))))))
1186
       state)))
1187
 
1188
 ;;; ruby bin/spocq-query '(select (?s ?p ?o) (bgp (triple ?s ?p ?o)))'