Coverage report: /development/source/library/org/datagraph/spocq-shard/src/algebra/operators/service.lisp

KindCoveredAll%
expression1502418 6.2
branch4160 2.5
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
2
 
3
 (in-package :org.datagraph.spocq.implementation)
4
 
5
 (:documentation "This file defines the SERVICE operator for the 'org.datagraph.spocq'
6
  RDF engine."
7
 
8
  (copyright
9
   "Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved.")
10
 
11
  (long-description
12
   "The service operator involves sending a query request to a sparql endpoint and incorporating
13
  the result solution field into the current query as if it were the product of an internal
14
  BGP. This implementation limits the service designator to a constant, as the seantics for the
15
  variable form is undefined."))
16
 
17
 
18
 (defun roqet-pathname ()
19
   (or *roqet-pathname*
20
       (when (probe-file "/usr/local/bin/roqet")
21
         (setq *roqet-pathname* "/usr/local/bin/roqet"))
22
       (when (probe-file "/usr/bin/roqet")
23
         (setq *roqet-pathname* "/usr/bin/roqet"))
24
       (error "roqet-pathname: no executable found")))
25
 
26
 (defmacro spocq.a:|service| (iri group-graph-pattern &rest args &environment env)
27
   "Perform the remote query and return the result field."
28
   (apply #'macroexpand-service iri group-graph-pattern env args))
29
 
30
 (defmacro spocq.a::|servicejoin| (iri group-graph-pattern sip-source &rest args &environment env)
31
   "Perform the remote query and return the result field."
32
   (apply #'macroexpand-service-join iri group-graph-pattern sip-source env args))
33
 
34
 (defmacro spocq.a::|serviceleftjoin| (iri group-graph-pattern sip-source &rest args &environment env)
35
   "Perform the remote query and return the result field.
36
    nb. the argument order is opposite of a simple leftjoin: the sip-source, the second argument, is the base field"
37
   (apply #'macroexpand-service-leftjoin iri group-graph-pattern sip-source env args))
38
 
39
 (defun expression-projection-variables (expression)
40
   (multiple-value-bind (all bound free projection)
41
                        (parse-expression-variables expression)
42
     (declare (ignore all bound free))
43
     projection))
44
 
45
 (defgeneric field-propagation-dimensions (field base-dimension-declarations)
46
   (:documentation "given a field expression and a list of declared base dimensions,
47
    compute the possible sip input dimensions. this largest intersection between
48
    the field input dimensions an the respective base dimensions.
49
    The input dimensions are the variables free in the expression - this is,
50
    the difference between the complete dimension set and those in the projection set.")
51
   (:method ((field list) (base-dimension-declarations cons))
52
     (multiple-value-bind (all bound free projection)
53
                          (parse-expression-variables field)
54
       (declare (ignore all bound free))
55
       (loop with maximal = ()
56
         for dimensions in base-dimension-declarations
57
         ;; only the projected dimensions count
58
         for intersection = (intersection dimensions projection)
59
         when (> (length intersection) (length maximal))
60
         do (setf maximal intersection)
61
         finally (return maximal))))
62
   (:method ((field list) (base-dimension-declarations null))
63
     ()))
64
 
65
 (defgeneric compute-endpoint-sparql-expression (endpoint sparql bindings)
66
   (:documentation "serialize the sse as a sparql expression, with possible bindings.
67
    iff a base was provided in the metadata, prepend that. (nyi: see spocq#306)")
68
   (:method ((endpoint substitution-service-endpoint) (sparql-expression cons) (bindings cons))
69
     (with-output-to-string (s)
70
       (format-sparql-sse s (bind-sparql-expression sparql-expression bindings))))
71
   (:method ((endpoint t) (sparql-expression cons) (bindings cons))
72
     (with-output-to-string (s)
73
       (format-sparql-sse s (add-sparql-bindings bindings sparql-expression))))
74
   (:method ((endpoint t) (sparql-expression cons) (bindings null))
75
     sparql-expression))
76
 
77
 (setf (service-location-class "www.ws-qa.tibco.nxp.com/DummyTestingURL/RootsSparql")
78
       'substitution-service-endpoint)
79
 
80
 (defun macroexpand-service (iri group-graph-pattern env &rest args)
81
   "Expand a service form into a contour which rebinds the current repository to that of the
82
  service and then compiles the graph pattern in that context, computes its generator and
83
  invokes the generic service operator. This will distinguish between local and remote
84
  repositories to either evaluate the bgp as in internal query or use the original sparql
85
  string to perform a remote query request.
86
   Take also note of the binding context in order to distinguish between dependent and
87
  independent federation operations.
88
 
89
  The failed SERVICE clause is treated as if it had a result of a single solution with no bindings
90
   "
91
   (declare (ignore env))
92
   (case (federation-mode)
93
     (|urn:dydra|:|none|
94
      `(spocq.a:|null| ,(expression-dimensions group-graph-pattern)))
95
     (t
96
      (cond ((or (null args) (keywordp (first args)))
97
             (destructuring-bind (&key (silent nil s-p) query-text) args
98
               `(spocq.e::service ',iri
99
                                  (quote ,(if (or (select-form-p group-graph-pattern)
100
                                                  (distinct-form-p group-graph-pattern)
101
                                                  (slice-form-p group-graph-pattern))
102
                                              group-graph-pattern
103
                                              `(spocq.a:|select| ,group-graph-pattern ,(expression-dimensions group-graph-pattern))))
104
                                  ,@(when query-text `(:query-text ,query-text))
105
                                  ,@(when s-p `(:verbose ,(not silent))))))
106
            ;; otherwise the next argument is a field expression,
107
            ;; be prepared to sip-join it
108
            (t
109
             (let* ((group-dimensions (expression-dimensions group-graph-pattern))
110
                    (location-dimensions (when (variable-p iri) (list iri)))
111
                    (reference-dimensions (union-dimensions group-dimensions location-dimensions)))
112
               (destructuring-bind (field &key (silent nil s-p) query-text) args
113
                 `(spocq.e::service-join ',iri
114
                                         (quote ,(if (select-form-p group-graph-pattern)
115
                                                     group-graph-pattern
116
                                                     `(spocq.a:|select| ,group-graph-pattern ,group-dimensions)))
117
                                         ,(if reference-dimensions
118
                                              `(spocq.e::with-reference-dimensions ,reference-dimensions ,field)
119
                                              field)
120
                                       ,@(when query-text `(:query-text ,query-text))
121
                                       ,@(when s-p `(:verbose ,(not silent)))))))))))
122
 
123
 (defun macroexpand-service-join (iri group-graph-pattern sip-source env &rest args)
124
   (declare (ignore env))
125
   (case (federation-mode)
126
     (|urn:dydra|:|none|
127
      `(spocq.a:|null| ,(union-dimensions (when (variable-p iri) (list iri))
128
                                          (union-dimensions (expression-dimensions group-graph-pattern)
129
                                                            (expression-dimensions sip-source)))))
130
     (t
131
      (let* ((group-dimensions (expression-dimensions group-graph-pattern))
132
             (location-dimensions (when (variable-p iri) (list iri)))
133
             (reference-dimensions (union-dimensions group-dimensions location-dimensions)))
134
        `(spocq.e::service-join ',iri
135
                                (quote ,(if (select-form-p group-graph-pattern)
136
                                            group-graph-pattern
137
                                            `(spocq.a:|select| ,group-graph-pattern ,group-dimensions)))
138
                                ,(if reference-dimensions
139
                                         `(spocq.e::with-reference-dimensions ,reference-dimensions ,sip-source)
140
                                         sip-source)
141
                              ,@args)))))
142
 
143
 (defun macroexpand-service-leftjoin (iri group-graph-pattern sip-source env &rest args)
144
   (declare (ignore env))
145
   (case (federation-mode)
146
     (|urn:dydra|:|none|
147
      `(spocq.a:|null| ,(union-dimensions (when (variable-p iri) (list iri))
148
                                          (union-dimensions (expression-dimensions group-graph-pattern)
149
                                                            (expression-dimensions sip-source)))))
150
     (t 
151
      (let* ((group-dimensions (expression-dimensions group-graph-pattern))
152
             (location-dimensions (when (variable-p iri) (list iri)))
153
             (reference-dimensions (union-dimensions group-dimensions location-dimensions)))
154
        `(spocq.e::service-leftjoin ',iri
155
                                    (quote ,(if (select-form-p group-graph-pattern)
156
                                                group-graph-pattern
157
                                                `(spocq.a:|select| ,group-graph-pattern ,group-dimensions)))
158
                                    ,(if reference-dimensions
159
                                         `(spocq.e::with-reference-dimensions ,reference-dimensions ,sip-source)
160
                                         sip-source)
161
                                    ,@args)))))
162
 
163
 
164
 (defparameter *use-service-description-repository-p* nil)
165
 
166
 (defgeneric spocq.e:service (location query-expression &key verbose user-id query-text)
167
   (:documentation "Given a location and a query expression - as an sse and as a sparql string,
168
     distinguish local from remote requests and either evaluate the sse pattern or delegate the request
169
     to a remote processor.")
170
 
171
   (:method :around (location query &rest args &key (verbose t) user-id query-text)
172
     (declare (ignore user-id query-text))
173
     (apply #'call-next-method location query
174
            :verbose verbose
175
            args))
176
 
177
   (:method ((service (eql |urn:dydra|:|service-description|)) (query-expression cons) &rest args)
178
     ;; should make an ephemeral repository and run the query on that.
179
     (cond (*use-service-description-repository-p*
180
            (let ((ephemeral-repository (ensure-ephemeral-repository (task-repository *task*) |urn:dydra|:|service-description|)))
181
              (apply #'spocq.e:service ephemeral-repository query-expression args)))
182
           (t
183
            (when (select-form-p query-expression) (setf query-expression (second query-expression)))
184
            (let* ((statement-pattern (assoc 'spocq.a:|triple| (rest query-expression)))
185
                   (service-description (repository-service-description (repository-id (task-repository *task*)))))
186
              (assert (every #'variable-p (rest statement-pattern)) ()
187
                      "service: Invalid service description query : ~s" query-expression)
188
              (spocq.e:bindings (service-description-solutions service-description)
189
                                (rest statement-pattern))))))
190
 
191
 
192
   (:method ((service (eql |urn:dydra|:|timemap|)) query-expression &rest args)
193
     (declare (ignore args))
194
     (when (select-form-p query-expression) (setf query-expression (second query-expression)))
195
     (let* ((statement-pattern (assoc 'spocq.a:|triple| (rest query-expression)))
196
            (timemap (compute-timemap (repository-id (task-repository *task*)))))
197
       (assert (every #'variable-p (rest statement-pattern)) ()
198
               "service: Invalid timemap query : ~s" query-expression)
199
       (spocq.e:bindings timemap (rest statement-pattern))))
200
 
201
   (:method ((service (eql |urn:dydra|:|revisionId|)) query-expression &rest args)
202
     (declare (ignore args))
203
     (assert (select-form-p query-expression)
204
             ()
205
             "service: Invalid revision id query : ~s" query-expression)
206
     (let* ((dimensions (third query-expression))
207
            (revisions (repository-list-revision-ids (task-repository *task*))))
208
       (assert (and (every #'variable-p dimensions)
209
                    (= (length dimensions) 1))
210
               ()
211
               "service: Invalid revision id query : ~s" query-expression)
212
       (spocq.e:bindings (mapcar #'list revisions) dimensions)))
213
   
214
   (:method ((service (eql |urn:dydra|:|service-functions|)) (query-expression cons) &rest args)
215
     ;;; (loop for p in spocq.i:*iri-package-names* when (loop for s being the external-symbols in p when (fboundp s) return t) collect p)
216
     (declare (ignore args))
217
     (assert (select-form-p query-expression)
218
             ()
219
             "service: Invalid service functions query : ~s" query-expression)
220
     (let ((dimensions (third query-expression))
221
           (operators (sort (loop for p in spocq.i:*iri-package-names*
222
                              append (loop for s being the external-symbols in p when (fboundp s) collect s))
223
                            #'string-lessp)))
224
       (assert (and (every #'variable-p dimensions)
225
                    (= (length dimensions) 2))
226
               ()
227
               "service: Invalid service functions query : ~s" query-expression)
228
       (spocq.e:bindings (loop for symbol in operators
229
                           collect (list symbol (or (documentation symbol 'function) "")))
230
                         dimensions)))
231
 
232
   ;; parser iri locations from a query expression must allow decoding
233
   (:method ((service-location spocq:iri) query-expression &rest args)
234
     (declare (dynamic-extent args))
235
     (apply #'spocq.e:service (url-decode (spocq:iri-lexical-form service-location)) query-expression args))
236
   ;; while internal ones do not
237
   (:method ((service-location puri:uri) query-expression &rest args)
238
     (declare (dynamic-extent args))
239
     (apply #'spocq.e:service (iri-lexical-form service-location) query-expression args))
240
 
241
   (:method ((service-location string) query-expression &rest args &key (verbose t) user-id query-text)
242
     (declare (dynamic-extent args))
243
     (declare (ignore user-id query-text))
244
     (flet ((call-with-repository ()
245
              (let ((repository (handler-case (service-repository service-location)
246
                                  (error (c) (spocq.e:request-error "service: invalid service location: ~s: ~a"
247
                                                                    service-location c))))
248
                    (revision (service-repository-revision service-location)))
249
              (apply #'spocq.e::service repository query-expression
250
                     :location service-location
251
                     :revision-id (or revision "HEAD")
252
                     args))))
253
       (if verbose
254
         (call-with-repository)
255
         (handler-bind ((error (lambda (c)
256
                             (log-warn "service failure: ~a: ~a" service-location c)
257
                             (unless verbose
258
                               (return-from spocq.e::service
259
                                 ;; https://www.w3.org/TR/2013/REC-sparql11-federated-query-20130321/#serviceFailure
260
                                 (singleton-generator ())
261
                                 )))))
262
           (call-with-repository)))))
263
 
264
   (:method ((service-repository service-repository) query-expression &key (verbose t) user-id query-text location)
265
     "For external service processing, compute a generator which will start an external process to run
266
      the remote query and intern the result soultion stream.
267
 
268
      Given a unit table expression, attempt to locate a view."
269
     (declare (ignore user-id))
270
     (let* ((qdb (query-dynamic-bindings *task*))
271
            (bindings (mapcar #'cons (first qdb) (rest qdb))))
272
       (authorize-service-request *task* service-repository)
273
       ;; replace a unit table with the view definition - for remote locations as well.
274
       (when (or (equal query-expression *select-table-unit-sse*)
275
                 (equal query-expression *table-unit-sse*))
276
         (let* ((view-name (repository-resource-view-name service-repository location))
277
                (view-text (when view-name (authorized-repository-view service-repository view-name (task-agent *task*)))))
278
           (when view-text
279
             (setf query-expression (parse-sparql view-text))
280
             (setf query-text view-text))))
281
       (service-generator service-repository (expression-dimensions query-expression)
282
                          ;;!!! nb. if bindings are present, this will drop any graphs specified in the view.
283
                          ;;!!! they will need to be captured as for internal federation and incorporated into the query text
284
                          (cond (bindings
285
                                 (compute-endpoint-sparql-expression service-repository query-expression bindings))
286
                                (query-text )
287
                                (t
288
                                 (with-output-to-string (s) (format-sparql-sse s query-expression))))
289
                          :verbose verbose)))
290
 
291
   (:method  ((service-repository repository) (query-expression cons) &key (verbose t) query-text location
292
              (user-id (task-user-id *task*))
293
              (revision-id "HEAD")
294
              (dataset-graphs ()))
295
     "For internal service processing compile the query, if necessary, and return its generator.
296
      In the event of error, unless verbose, suppress it and proceed with a table solution field.
297
      When verbose, decline to handle the error and delegate it to the context.
298
 
299
      Given a unit table expression, attempt to locate a view.
300
      !!!HDT : based on the service-repository class interpose term mapping."
301
     (let* ((qdb (query-dynamic-bindings *task*)))
302
       (authorize-service-request *task* service-repository)
303
       ;; replace a unit table with the view definition
304
       (when (or (equal query-expression *select-table-unit-sse*)
305
                 (equal query-expression *table-unit-sse*))
306
         (let* ((view-name (repository-resource-view-name service-repository location))
307
                (view-text (when view-name (authorized-repository-view service-repository view-name (task-agent *task*)))))
308
           (when view-text
309
             (multiple-value-bind (view-query-expression view-options)
310
                                  (parse-sparql view-text)
311
               (setf dataset-graphs (getf view-options :dataset-graphs))
312
               (setf query-expression view-query-expression)
313
               (setf query-text view-text)))))
314
       (flet ((service-task-generator ()
315
                (let* ((metadata (instance-metadata *task*))
316
                       (service-task (make-query :sse-expression query-expression
317
                                                 :id (make-service-task-id)
318
                                                 :dynamic-bindings qdb
319
                                                 :dataset-graphs dataset-graphs
320
                                                 :repository service-repository
321
                                                 :revision-id revision-id
322
                                                 :user-id user-id
323
                                                 :sparql-expression query-text
324
                                                 :agent (task-agent *task*)
325
                                                 :parent-task *task*
326
                                                 :metadata (when metadata (clone-instance metadata))
327
                                                 :response-content-type (task-response-content-type *task*))))
328
                  (with-task-environment (:task service-task)
329
                    ;; initialze the task in order that it has its generator
330
                    (initialize-task service-task)
331
                    ;; a generator is returned as for a standard algebra operation, but
332
                    ;; the expressed operation is wrapped in order to establish the service query
333
                    ;; as the wrapping task
334
                    (setf (solution-generator-expression (task-result-generator service-task))
335
                          (list #'run-service-task service-task
336
                                (solution-generator-expression (task-result-generator service-task))))
337
                    (log-debug "query: wrapped for retask ~a -> ~a" *task* service-task)
338
                    ;;; reduction yields the solution generator to be initiated my the respective operation
339
                    ;;; !!!HDT based on the service-repository class interpose term mapping.
340
                    (task-result-generator service-task)))))
341
         (handler-bind ((error (lambda (c)
342
                                 (log-warn "service: internal service failure: ~a: ~a" service-repository c)
343
                                 (unless verbose
344
                                   (return-from spocq.e::service
345
                                     ;; https://www.w3.org/TR/2013/REC-sparql11-federated-query-20130321/#serviceFailure
346
                                     (singleton-generator ())
347
                                     )))))
348
           (service-task-generator)))))
349
 
350
   (:method  ((service-repository repository-revision) (query-expression cons) &rest args &key &allow-other-keys)
351
     ;; indirectly through the revision to the repository, with the revision ad as an argument
352
     ;; will cause the revision to be reconstituted when the query is initialized, but leaves
353
     ;; the methods otherwise as-they-are
354
     (declare (dynamic-extent args))
355
     (apply #'spocq.e::service (repository-revision-reference service-repository) query-expression
356
            :revision-id (repository-revision-id service-repository)
357
            args))
358
 
359
   ;; this should not arise in the standard case as the macro expands to the join form given a variable...
360
   (:method ((location symbol) query-expression &rest args)
361
     (if (variable-p location)
362
         (let ((location-value (query-binding-value location)))
363
           (if (spocq:unbound-variable-p location-value )
364
               (case (undefined-variable-behavior)
365
                 (spocq.e:compilation-error :expression location
366
                                            :condition (make-condition 'spocq.e:undefined-variable-error
367
                                                                       :variables (list location)))
368
                 (|urn:dydra|:|warning| (log-warn "~@[~a: ~]~a"
369
                                                  *query*
370
                                                  (make-condition 'spocq.e:undefined-variable-error
371
                                                                  :expression location
372
                                                                  :variables (list location)))
373
                  (null-generator (union-dimensions (list location) (expression-dimensions query-expression))))
374
                 (|urn:dydra|:|dynamicBinding|
375
                  (null-generator (union-dimensions (list location) (expression-dimensions query-expression)))))
376
               ;; iff it was a bound variable
377
               (apply #'spocq.e::service location-value query-expression args)))
378
         ;; iff it was not a variable
379
         (spocq.e:compilation-error :expression location
380
                                    :condition (make-condition 'spocq.e:resource-not-found-error
381
                                                               :identifier location)))))
382
 
383
 (defgeneric run-service-task (task initial-operation)
384
   (:documentation "run the query as a service sub-select by starting its algebra thread,
385
     waiting for that to complete, and then finalizing it.
386
     this last is necessary in order to clean up after it - particularly transactions.")
387
   (:method ((task query) initial-operation)
388
     (channel-put (task-operations task) initial-operation)
389
     (unwind-protect (progn
390
                       (task-run-algebra-thread task)
391
                       'spocq.a:|service|)
392
       (setf (task-state task) :complete)
393
       ;;; !!! finalization should be in the task thread ???
394
       (finalize-task task))))
395
 
396
 (defun spocq.e::service-join (location query-expression field &rest args)
397
   (declare (dynamic-extent args))
398
   (apply #'spocq.e::service-generic-join #'spocq.e:join location query-expression field args))
399
 
400
 (defun spocq.e::service-leftjoin (location query-expression field &rest args)
401
   (declare (dynamic-extent args))
402
   (apply #'spocq.e::service-generic-join #'spocq.e:leftjoin location query-expression field args))
403
 
404
   
405
 (defgeneric spocq.e::service-generic-join (join-operator location query-expression field &key verbose user-id query-text)
406
   (:documentation "Given a service form which includes a source field, use that field content for some combination of
407
     - a variable for a location
408
     - values to propagate to the service field expression.
409
     - as an sse and as a sparql string,
410
     distinguish local from remote requests and either evaluate the sse pattern or delegate the request
411
     to a remote processor.
412
     If the variable is among the source field dimensions, for each solution in the source field,
413
     issue the remote the service request and join the result field with the solution.
414
     If the variable is not bound, attempt a dynamic binding and then delegate to a standard service operator.
415
     If other variables correspond, then instantiate the pattern for each solution for those bindings and
416
     reiterate the requests.")
417
 
418
   (:method (join-operator (location symbol) group-graph-pattern source-field &rest args)
419
     (declare (dynamic-extent args))
420
     (if (variable-p location)
421
         (let* ((service-variables (expression-projection-variables group-graph-pattern))
422
                (extended-variables (cond ((member location service-variables) service-variables)
423
                                          ((variable-p location) (cons location service-variables))
424
                                          (t service-variables)))
425
                (source-variables (expression-projection-variables source-field))
426
                (sip-variables (intersection extended-variables
427
                                             ;;source-variables
428
                                             (union-dimensions source-variables
429
                                                               (first (task-dynamic-bindings *task*))))))
430
           ; (trace-algebra spocq.e::service-join
431
           (log-debug "service-generic-join: (location symbol) ~s" location)
432
           (log-debug "service-generic-join: pattern variables ~s" service-variables)
433
           (log-debug "service-generic-join: extended variables ~s" extended-variables)
434
           (log-debug "service-generic-join: source variables: ~s" source-variables)
435
           (log-debug "service-generic-join: sip variabes: ~s" sip-variables)
436
           (if (find location sip-variables)
437
               ;; if the location can be taken from the source field, extract solutions
438
               ;; and join each with the results of the respective service
439
               (if (rest sip-variables)
440
                   ;; if other dimensions are propagated, arrange to integrate them into the service request
441
                   (apply #'service-sip-generic-join-generator join-operator location
442
                          group-graph-pattern source-field
443
                          :sip-variables sip-variables
444
                          args)
445
                   ;; otherwise, expect just the location
446
                   (apply #'service-location-join-generator join-operator location
447
                          group-graph-pattern source-field args))
448
               ;; otherwise attempt to get the value elsewhere.
449
               ;; if it is a request parameter, use that to performa static service operation
450
               ;; if no value is found follow the defined behaviour for undefined variable,
451
               ;; whereby if continued, the result is a null solution field
452
               (let ((location-value (query-binding-value location)))
453
                 (if (spocq:unbound-variable-p location-value )
454
                     (case (undefined-variable-behavior)
455
                       (spocq.e:compilation-error :expression location
456
                                                  :condition (make-condition 'spocq.e:undefined-variable-error
457
                                                                             :variables (list location)))
458
                       (|urn:dydra|:|warning| (log-warn "~@[~a: ~]~a"
459
                                                        *query*
460
                                                        (make-condition 'spocq.e:undefined-variable-error
461
                                                                        :expression location
462
                                                                        :variables (list location)))
463
                        (null-generator (union-dimensions (list location) (solution-generator-dimensions source-field))))
464
                       (|urn:dydra|:|dynamicBinding|
465
                        (null-generator (union-dimensions (list location) (solution-generator-dimensions source-field)))))
466
                     (funcall join-operator source-field
467
                              (apply #'spocq.e:service location-value group-graph-pattern args))))))
468
         ;; if not a variable location, delegate to the base method as a constant
469
         (call-next-method)))
470
 
471
   (:method (join-operator (location t) group-graph-pattern source-field &rest args)
472
     "The default method allows both symbol and iri constant constant locations.
473
      just propagate solutions"
474
     (let* ((service-variables (expression-projection-variables group-graph-pattern))
475
            (source-variables (expression-projection-variables source-field))
476
            (sip-variables (intersection service-variables source-variables)))
477
       (log-debug "service-generic-join: (location t) ~s" location)
478
       (log-debug "service-generic-join: pattern variables ~s" service-variables)
479
       (log-debug "service-generic-join: source variables: ~s" source-variables)
480
       (log-debug "service-generic-join: sip variabes: ~s" sip-variables)
481
       ;; constant location
482
       (if sip-variables
483
           (apply #'service-sip-generic-join-generator join-operator location
484
                  group-graph-pattern source-field
485
                  :sip-variables sip-variables
486
                  args)
487
           ;; no sip: fall-back on standard service
488
           (apply #'spocq.e:service location group-graph-pattern args)))))
489
 
490
 
491
 
492
 (defun run-service (location sparql-expression &rest args &key repository-id agent)
493
   (declare (ignore repository-id agent))
494
   (apply #'run-sparql `(spocq.a:|service| ,location ,sparql-expression)
495
          args))
496
 #+(or)
497
 (run-service <http://localhost/james/test2>
498
              '(spocq.a:|select| (spocq.a:|bgp| (spocq.a:|triple| ?::s ?::p ?::o)) (?::s ?::p ?::o))
499
              :repository-id "james/test"
500
              :agent (system-agent))
501
 
502
 
503
 ;; (setq *authorize-service-access* nil)
504
 
505
 (defparameter *authorizations* (make-registry :test #'equalp))
506
 
507
 (defgeneric authorize-service-request (client-repository service-location &key if-unauthorized)
508
   (:documentation "Authorize cross-repository access for a service request.
509
     Require some one of (for mysql metadata)
510
     - the agent is an administrator
511
     - the agent account is the same as the target account
512
     - the base repository account is the same as the target account
513
     - the agent is authorized to read the target repository
514
     - the agent is an authorized read collaborator
515
     - the base repository account is authorized to read the target repository
516
     - the base repository account is an authorized read collaborator
517
     (for acl metadata)
518
     - the agent account has read access
519
     - the base repository account has read access
520
     If the tests succeed, return T, otherwise signal a request error.")
521
 
522
   (:method ((task task) (service-location service-repository) &key if-unauthorized)
523
     (declare (ignore if-unauthorized))
524
     #+(or)
525
     (if *mysql-host*
526
         t ; based on mysql authorizations, remote requests are allowed based on configuration
527
         (call-next-method))
528
     (call-next-method))
529
 
530
   (:method ((task task) (service-location repository) &key (if-unauthorized :error))
531
     (let* ((agent (task-agent task))
532
            (agent-name (agent-name agent))
533
            (agent-location (agent-location agent))
534
            (task-repository (task-repository task))
535
            (task-account (repository-account task-repository)) 
536
            (task-account-name (account-name task-account))
537
            (location-id (repository-id service-location))
538
            (location-name (repository-name service-location))
539
            (location-account-name (account-name (repository-account service-location)))
540
            (authorization-key (cons (or agent-name (when agent-location (list agent-location))) t))
541
            (view (task-request-location task)))
542
       (cond ((equal agent-name location-account-name)
543
              (log-info "service authorization as owner ~s -> ~s" agent-name location-name)
544
              t)
545
             ((administrator-p agent)
546
              (log-info "service authorization as admin ~s -> ~s" agent-name location-name)
547
              t)
548
             ((equal task-account-name location-account-name)
549
              (log-info "service authorization as intra-account ~s -> ~s" task-account-name location-name)
550
              t)
551
             ((authorize-query-request agent service-location :if-unauthorized nil))
552
             #+(or)  ;; mysql -based version superseded by acl graph
553
             ((multiple-value-bind (authorization known-p) (gethash (cons location-id authorization-key) *authorizations*)
554
                (unless known-p
555
                  (setf authorization
556
                        (if *mysql-host*
557
                            (or
558
                             ;; if the target repository permits read
559
                             (let* ((query-command
560
                                     (if (authenticated-agent-p agent)
561
                                         (format nil  "mysql -h ~a -u root ~a -BNe \"select repositories.id from repositories, accounts where repositories.name = '~a' and accounts.name = '~a' and repositories.account_id = accounts.id and (repositories.privacy_setting = 5 or repositories.privacy_setting = 3);\""
562
                                                 *mysql-host* *mysql-database*
563
                                                 location-name location-account-name)
564
                                         (format nil  "mysql -h ~a -u root ~a -BNe \"select repositories.id from repositories, accounts where repositories.name = '~a' and accounts.name = '~a' and repositories.account_id = accounts.id and (repositories.privacy_setting = 5);\""
565
                                                 *mysql-host* *mysql-database*
566
                                                 location-name location-account-name)))
567
                                    (process (run-program "/bin/sh" (list "-c" query-command)
568
                                                          :input nil :output :stream))
569
                                    (id (unwind-protect (read-line (sb-ext:process-output process) nil)
570
                                          (sb-ext:process-close process))))
571
                               (when (and (stringp id) (ignore-errors (parse-integer  id)))
572
                                 t))
573
                             ;; if collaboration permits the base repository account
574
                             (let* ((query-command
575
                                     (format nil  "mysql -h ~a -u root ~a -BNe \"select collaborations.id from collaborations where  collaborations.account_id = (select accounts.id from accounts  where accounts.name = '~a')  and  collaborations.repository_id = (select repositories.id from repositories, accounts where repositories.name = '~a' and accounts.name = '~a' and repositories.account_id = accounts.id) and collaborations.read = 1;\""
576
                                             *mysql-host* *mysql-database*
577
                                             task-account-name location-name location-account-name))
578
                                    (process (run-program "/bin/sh" (list "-c" query-command)
579
                                                          :input nil :output :stream))
580
                                    (id (unwind-protect (read-line (run-program-output process) nil)
581
                                          (run-program-close process))))
582
                               (when (and (stringp id) (ignore-errors (parse-integer  id)))
583
                                 t)))
584
                            ;; test authorization via the agent - which can fallback to the
585
                            ;; account or origin repository
586
                            (access-authorized-p service-location agent |acl|:|Read|)))
587
                  (setf (gethash (cons location-id authorization-key) *authorizations*) authorization))
588
                authorization)
589
              (log-info "service authorization ~s -> t"  (cons (cons location-id authorization-key) t))
590
              t)
591
             ((access-authorized-p service-location agent |acl|:|Read|))
592
             ((access-authorized-p service-location task-account |acl|:|Read|))
593
             ((and view
594
                   (access-authorized-p service-location
595
                                        (typecase view
596
                                          (string (make-view :repository task-repository :name view))
597
                                          (view view)
598
                                          (iri view))
599
                                        |acl|:|Read|)))
600
             (t
601
              (log-info "service authorization ~s -> nil"  (cons (cons location-id authorization-key) t))
602
              (ecase if-unauthorized
603
                (:error
604
                 (spocq.e:task-authorization-error :task task :operation 'spocq.a:|service| :location service-location))
605
                ((nil)
606
                 nil)))))))
607
 
608
 (defgeneric authorize-query-request (context repository &key read-only-p if-unauthorized)
609
   (:method ((task task) (repository repository) &rest args &key (read-only-p (operation-read-only-p task)) if-unauthorized)
610
     (declare (dynamic-extent args) (ignore if-unauthorized))
611
     (apply #'authorize-query-request (task-agent task) repository
612
            :read-only-p read-only-p
613
            args))
614
   (:method ((agent agent) (repository repository) &key (read-only-p nil) (if-unauthorized :error))
615
     (let* ((agent-name (agent-name agent))
616
            (repository-id (repository-id repository))
617
            (repository-name (repository-name repository))
618
            (repository-account-name (account-name (repository-account repository)))
619
            (location (agent-location agent))
620
            (authorization-key (cons (or agent-name (when location (list location))) read-only-p)))
621
       (cond ((equal agent-name repository-account-name)
622
              (log-info "authorization as owner ~s" agent-name)
623
              t)
624
             ((administrator-p agent)
625
              (log-info "authorization as admin ~s" agent-name)
626
              t)
627
             ((multiple-value-bind (authorization known-p) (gethash (list repository-id authorization-key read-only-p) *authorizations*)
628
                (unless known-p
629
                  (log-notice "authorize-query-request: ~s(~s)@~s ~s ~s"
630
                              agent-name (authenticated-agent-p agent) location
631
                              repository-id
632
                              read-only-p)
633
                  (setf authorization
634
                        (or (and read-only-p
635
                                 ;; see if the repository permits the agent to read
636
                                 (let* ((query-command
637
                                         (if (authenticated-agent-p agent)
638
                                             (format nil  "mysql -h ~a -u root ~a -BNe \"select repositories.id from repositories, accounts where repositories.name = '~a' and accounts.name = '~a' and repositories.account_id = accounts.id and (repositories.privacy_setting = 5 or repositories.privacy_setting = 3);\""
639
                                                     *mysql-host* *mysql-database*
640
                                                     repository-name repository-account-name)
641
                                             (format nil  "mysql -h ~a -u root ~a -BNe \"select repositories.id from repositories, accounts where repositories.name = '~a' and accounts.name = '~a' and repositories.account_id = accounts.id and (repositories.privacy_setting = 5);\""
642
                                                     *mysql-host* *mysql-database*
643
                                                     repository-name repository-account-name)))
644
                                        (process (sb-ext:run-program "/bin/sh" (list "-c" query-command)
645
                                                                     :input nil :output :stream))
646
                                        (id (unwind-protect (read-line (sb-ext:process-output process) nil)
647
                                              (sb-ext:process-close process))))
648
                                   (when (and (stringp id(ignore-errors (parse-integer  id)))
649
                                     t)))
650
                            ;; see if collaboration permits
651
                            (and (authenticated-agent-p agent)
652
                                 (let* ((query-command
653
                                         (if read-only-p
654
                                             (format nil  "mysql -h ~a -u root ~a -BNe \"select collaborations.id from collaborations where  collaborations.account_id = (select accounts.id from accounts  where accounts.name = '~a')  and  collaborations.repository_id = (select repositories.id from repositories, accounts where repositories.name = '~a' and accounts.name = '~a' and repositories.account_id = accounts.id) and collaborations.read = 1;\""
655
                                                     *mysql-host* *mysql-database*
656
                                                     agent-name repository-name repository-account-name)
657
                                             (format nil  "mysql -h ~a -u root ~a -BNe \"select collaborations.id from, collaborations where  collaborations.account_id = (select accounts.id from accounts  where accounts.name = '~a')  and  collaborations.repository_id = (select repositories.id from repositories, accounts where repositories.name = '~a' and accounts.name = '~a' and repositories.account_id = accounts.id) and collaborations.write = 1;\""
658
                                                     *mysql-host* *mysql-database*
659
                                                     agent-name repository-name repository-account-name)))
660
                                        (process (sb-ext:run-program "/bin/sh" (list "-c" query-command)
661
                                                                     :input nil :output :stream))
662
                                        (id (unwind-protect (read-line (sb-ext:process-output process) nil)
663
                                              (sb-ext:process-close process))))
664
                                   (when (and (stringp id(ignore-errors (parse-integer  id)))
665
                                     t)))
666
                            ;; see if location permits
667
                            (and read-only-p
668
                                 (located-user-p agent)
669
                                 (let* ((query-command
670
                                         (format nil  "mysql -h ~a -u root ~a -BNe \"select repositories.id from repositories, accounts where repositories.name = '~a' and accounts.name = '~a' and repositories.account_id = accounts.id and repositories.permissable_ip_addresses regexp('.*~a.*');\""
671
                                                 *mysql-host* *mysql-database*
672
                                                 repository-name repository-account-name
673
                                                 location))
674
                                        (process (sb-ext:run-program "/bin/sh" (list "-c" query-command)
675
                                                                     :input nil :output :stream))
676
                                        (id (unwind-protect (read-line (sb-ext:process-output process) nil)
677
                                              (sb-ext:process-close process))))
678
                                   (when (and (stringp id(ignore-errors (parse-integer  id)))
679
                                     t)))))
680
                  (setf (gethash (cons repository-id authorization-key) *authorizations*) authorization))
681
                authorization)
682
              ;; read-only-p is redundant here
683
              (log-info "authorization ~s -> t"  (cons (cons repository-id authorization-key) read-only-p))
684
              t)
685
             (t
686
              (log-info "authorization ~s -> nil"  (cons (cons repository-id authorization-key) read-only-p))
687
              (ecase if-unauthorized
688
                (:error
689
                 (error "Access to repository is not authorized: ~s -> ~s." agent repository))
690
                ((nil)
691
                 nil)))))))
692
 
693
 
694
 
695
 (defun service-location-join-generator (join-operator location query-expression base-field-generator &rest args)
696
   "Delegate a service request with a propagated location to a join operation supplying it two generators:
697
    - one yields the solutions from the base field - after each has provided a service location;
698
    - the other yields the union of the solutions from the iterated service operations - if necessary extended with the location."
699
   (assert (variable-p location) ()
700
           "Invalid service join location variable: ~a" location)
701
   (let* ((base-dimensions (solution-generator-dimensions base-field-generator))
702
          (query-dimensions (expression-dimensions query-expression))
703
          (extended-service-dimensions (union-dimensions (list location) query-dimensions))
704
          (base-channel (make-channel :name (list 'spocq.a:|service| (task-id *query*))
705
                                      :dimensions base-dimensions))
706
          (location-sip-channel (make-solution-channel :name (list 'spocq.a:|service| (task-id *query*))
707
                                                       :dimensions (list location)))
708
          (result-channel (make-channel :name (list 'spocq.a:|service| (task-id *query*))
709
                                        :dimensions extended-service-dimensions))
710
          ;; the base field wrapper involves an operator which accepts the base channel and the sip channel to the service operator,
711
          ;; reads base solutions and for each one, writes the location to the service channel an the original solution
712
          ;; to the wrapped base channel
713
          (base-field-generator-wrapped (make-solution-generator :operator 'spocq.a:|copy|
714
                                                                 :dimensions base-dimensions
715
                                                                 :expression (list #'run-service-location-join-base
716
                                                                                   base-channel
717
                                                                                   location
718
                                                                                   base-field-generator
719
                                                                                   location-sip-channel)
720
                                                                 :channel base-channel
721
                                                                 :constituents (list base-field-generator)))
722
          ;; the service field jenerator involves an operator which accepts a channel from which to read locations
723
          ;; and for each one, initiates the service operation and transcribes solutions
724
          (service-field-generator (make-solution-generator :operator 'spocq.a::|service|
725
                                                            :dimensions extended-service-dimensions
726
                                                            :channel result-channel
727
                                                            :expression (list #'run-service-location-join-service
728
                                                                              result-channel
729
                                                                              location
730
                                                                              location-sip-channel
731
                                                                              query-expression
732
                                                                              args))))
733
     ;; delegate to join the two fields
734
     (funcall join-operator base-field-generator-wrapped service-field-generator)))
735
 
736
 
737
 (defun run-service-location-join-base (destination location-variable base-generator location-sip-channel)
738
   "Initiate the source processing, read each solution and propagate each unique value to the
739
  service location processor before passing the complete page to the destination."
740
 
741
   (let ((base-channel (solution-generator-channel base-generator))
742
         (location-cache (make-hash-table :test'eql))
743
         (location-index (position location-variable (solution-generator-dimensions base-generator))))
744
     (query-run-in-thread *query* (solution-generator-expression base-generator))
745
     (do-pages (solutions base-channel)
746
               ;; (print (cons :rsljb (field-term-objects solutions)))
747
               (loop for index from 0 below (array-dimension solutions 0)
748
                 for location-term = (aref solutions index location-index)
749
                 unless (gethash location-term location-cache)
750
                 do (progn (setf (gethash location-term location-cache) location-term)
751
                      ;; (print (cons :sip-put (rlmdb:term-number-value location-term)))
752
                      (channel-put location-sip-channel (list location-term))))
753
               (let ((result-page (make-page (array-dimension solutions 0) (array-dimension solutions 1))))
754
                 (copy-page solutions result-page)
755
                 (put-field-page destination result-page)))
756
     (complete-field location-sip-channel)
757
     (complete-field destination)))
758
 
759
 
760
 (defun run-service-location-join-service (destination location-variable location-sip-channel query-expression args)
761
   "Implement the service request thread for a service clause in which the locations are passed
762
   from a joined field. If the location is in the service bgp, then pass that reault as-is.
763
   Otherwise extend it to include the location."
764
 
765
   (loop for (location-term) = (channel-get location-sip-channel)
766
     until (null location-term)
767
     for location-object = (term-number-object location-term)
768
     do (let* ((service-generator (apply #'spocq.e::service location-object query-expression args))
769
               (service-dimensions (solution-generator-dimensions service-generator))
770
               (service-source (solution-generator-channel service-generator))
771
               (location-index (position location-variable service-dimensions)))
772
          (query-run-in-thread *query* (solution-generator-expression service-generator))
773
          (if location-index
774
              ;; just copy the pages
775
              (do-pages (solutions service-source)
776
                        (check-query-status *query*)
777
                        (let* ((result-page (make-page (array-dimension solutions 0) (array-dimension solutions 1))))
778
                          (copy-page solutions result-page)
779
                          (put-field-page destination result-page)))
780
              ;; otherwise, extend the page with the respective location
781
              (let* ((destination-dimensions (channel-dimensions destination))
782
                    (destination-width (channel-page-width destination))
783
                    (dimension-map (make-array (length service-dimensions)
784
                                               :initial-contents (loop for dimension in service-dimensions
785
                                                                   collect (or (position dimension destination-dimensions)
786
                                                                               (error "service != destination: ~a ~a"
787
                                                                                      dimension destination-dimensions)))))
788
                    (destination-location-index (or (position location-variable destination-dimensions)
789
                                                    (error  "service location != destination: ~a ~a"
790
                                                            location-variable destination-dimensions))))
791
                (do-pages (solutions service-source)
792
                          (check-query-status *query*)
793
                          (let* ((length (array-dimension solutions 0))
794
                                 (width (array-dimension solutions 1))
795
                                 (result-page (make-page length destination-width)))
796
                            (assert (= width (1- destination-width)) ()
797
                                    "run-service-location-join-service: service page width mismatch: ~a, ~a:  ~a: ~a"
798
                                    width destination-width
799
                                    location-variable service-generator)
800
                            (loop for solution-index below length
801
                              do (progn
802
                                   (loop for service-index below width
803
                                     do (setf (aref result-page solution-index (aref dimension-map service-index))
804
                                              (aref solutions solution-index service-index)))
805
                                   (setf (aref result-page solution-index destination-location-index)
806
                                         location-term)))
807
                            (put-field-page destination result-page)))))))
808
   (complete-field destination))
809
 
810
 
811
 (defun service-generator (repository base-dimensions query-text &key (verbose t))
812
   "Construct a generator which will delegate request/response for a remote query
813
    to an external thread.
814
    In the event of error, unless verbose, suppress it and proceed with a table solution field.
815
    When verbose, decline to handle the error and delegate it to the context."
816
   
817
   (handler-bind ((error (lambda (c)
818
                           (log-warn "external service failure: ~a: ~a" repository c)
819
                           (unless verbose
820
                             (return-from service-generator
821
                               (singleton-generator base-dimensions))))))
822
     (multiple-value-bind (result-solutions result-dimensions)
823
                          (request-sparql repository query-text)
824
       (if (and (null (set-difference result-dimensions base-dimensions))
825
                (null (set-difference base-dimensions result-dimensions)))
826
         (typecase *transaction*
827
           #+matrix-fields
828
           (matrix-transaction
829
            (let ((result-field (make-matrix-field :dimensions result-dimensions)))
830
              (set-solution-field-solutions result-field result-solutions)
831
              (complete-field-data result-field)
832
              result-field))
833
           (t
834
            (let ((result-channel (make-channel :name (list 'spocq.a:|service| (task-id *query*))
835
                                                :dimensions base-dimensions)))
836
              (make-solution-generator :operator 'spocq.a:|service|
837
                                       :dimensions base-dimensions
838
                                       :expression (list #'run-service-thread result-channel result-solutions result-dimensions)
839
                                       :channel result-channel
840
                                       :constituents ()))))
841
         (null-generator base-dimensions)))))
842
 
843
 
844
 (defgeneric run-service-thread (destination result-solutions &optional result-dimensions)
845
   (:method (destination (result-solutions list) &optional (result-dimensions (channel-dimensions destination)))
846
     (run-service-thread destination
847
                         (if (equal result-dimensions (channel-dimensions destination))
848
                           (term-number-field result-solutions)
849
                           (transaction-intern-shuffeled-field *transaction*
850
                                                               result-solutions result-dimensions
851
                                                               (channel-dimensions destination)))))
852
 
853
   (:method (destination (solutions array) &optional (result-dimensions (channel-dimensions destination)))
854
     (declare (type array solutions))
855
     (incf-stat *algebra-operations*) 
856
     (assert-argument-types run-service-thread
857
       (destination (or channel function)))
858
     (trace-algebra run-service-thread solutions)
859
 
860
     (when (> (array-dimension solutions 0) 0)
861
       (assert (= (array-dimension solutions 1) (length (channel-dimensions destination))) ()
862
               "Inconsistent solutions / dimensions: ~s ~s" (array-dimensions solutions) result-dimensions))
863
     (new-field-page destination (array-dimension solutions 0) (array-dimension solutions 1))
864
     (put-field-page destination solutions)
865
     (incf-stat *solutions-processed* (array-dimension solutions 0))
866
     (incf-stat *solutions-constructed* (array-dimension solutions 0))
867
     (complete-field destination)))
868
 
869
 
870
 
871
 (defparameter *curl-pathname* "/usr/bin/curl")
872
 ;;; (defparameter *curl-pathname*  "/development/source/library/cat-sparql-results.sh")
873
 
874
 (defgeneric agent-curl-authentication (authority &key auth-token agent-token agent-password
875
                                                  agent timeout arguments)
876
   (:documentation "Given an authority agent, return the list of curl arguments for its auth data.")
877
   (:method ((authority t) &key auth-token (agent-token auth-token) agent-password agent timeout arguments)
878
     (declare (ignore agent timeout arguments))
879
     (flet ((as-password (key)
880
              (when key `("--user" ,(concatenate 'string ":" key))))
881
            (as-token (key)
882
              (when key `("--user" ,(concatenate 'string key ":")))))
883
       (or (as-token agent-token)
884
           (as-password agent-password))))
885
   (:method ((authority string) &rest args)
886
     (declare (dynamic-extent args))
887
     (apply #'agent-curl-authentication (authority authority) args))
888
   (:method ((authority authority) &key auth-token (agent-token auth-token) agent-password
889
             agent timeout arguments)
890
     (declare (ignore agent timeout arguments)) ; just to allow pass-through from service operators
891
     (flet ((as-password (key)
892
              (when key `("--user" ,(concatenate 'string (or (agent-name authority) "") ":" key))))
893
            (as-token (key)
894
              ;; as of release 03.2018 the key is in the second position
895
              (when key `("--user" ,(concatenate 'string ":" key)))))
896
       (or (as-token agent-token)
897
           (as-password agent-password)
898
           ;; if no auth information is provided, attempt to locate it respective the authority identifer
899
           (as-token (agent-token authority))
900
           (as-password (agent-password authority))))))
901
 ;;; (agent-curl-authentication *create-authority-id*)
902
 
903
 (defgeneric agent-request-authentication (authority &key auth-token agent-token agent-password)
904
   (:documentation "Given an authority agent, return the tbnl property list with arguments for its auth data.")
905
   (:method ((authority string) &rest args)
906
     (declare (dynamic-extent args))
907
     (apply #'agent-request-authentication (authority authority) args))
908
   (:method ((authority authority) &key (auth-token (agent-token authority)) (agent-token auth-token)
909
             (agent-password (agent-password authority)))
910
     (flet ((as-password (key)
911
              (when key `(:basic-authentication ,(concatenate 'string (or (agent-name authority) "") ":" key))))
912
            (as-token (key)
913
              ;; as of release 03.2018 the key is in the second position
914
              (when key `(:basic-authentication ,(concatenate 'string ":" key)))))
915
       (or (as-token agent-token)
916
           (as-password agent-password)))))
917
 
918
 
919
 ;;; remote service request
920
 
921
 ;;; "-w" "%{http_code}\\n"
922
 ;;; is not useful as the code follows the response
923
 
924
 (defun encode-query-url (base-url arguments &key (urlencode t))
925
   (format nil "~a~@[~a~]~{~a=~a~^&~}"
926
           ;; do not add the extension as some urls do without one
927
           base-url
928
           (when arguments (if (position #\? base-url) "&" "?"))
929
           (loop for (keyword . value) in arguments
930
             for string-value = (typecase value (string value) (t (prin1-to-string value)))
931
             collect (if (variable-p keyword) (concatenate 'string "\$" (string keyword)) keyword)
932
             collect (if urlencode (url-encode string-value) string-value))))
933
 ;;; (encode-query-url "http://example.org" '(a 1 ?::|b| "http://dydra.com"))
934
 
935
 
936
 (defgeneric run-external-view-request (method iri &key
937
                                         auth-token agent-token agent-password agent
938
                                         arguments timeout retry)
939
   (:documentation "Execute a remote query based on a view given its IRI and optional
940
     authentication and query arguments.
941
     parse the result and return it as a symbolic field")
942
 
943
   (:method  :before ((method t) (iri-string string) &rest args)
944
     (declare (dynamic-extent args))
945
     (log-notice "run-external-view-request:  ~s  ~s . ~s" method iri-string args))
946
 
947
   (:method ((method (eql :curl-csv)) (iri-string string) &rest args
948
             &key auth-token agent-token agent-password
949
             (agent (authority (parse-url-authority iri-string)))
950
             (arguments nil)
951
             (retry *external-request-retry*)
952
             (timeout *external-request-timeout*))
953
     (declare (dynamic-extent args)
954
              (ignore auth-token agent-token agent-password))
955
     (let* ((accept  (load-time-value (concatenate 'string "Accept: " (mime:mime-type-namestring mime:text/csv)))))
956
       (labels ((parse-dimensions (line)
957
                  (mapcar #'(lambda (name) (intern name *variable-package*))
958
                          (split-string line ",")))
959
                (parse-sparql-query-solution (line)
960
                  (mapcar #'parse-csv-term
961
                          (spocq.i::parse-csv line :start-name 'odcsv::|recordfields| :separator #\,)))
962
                (read-csv-line (stream)
963
                  (let ((line (read-line stream nil nil)))
964
                    (when line
965
                      (string-trim #(#\return #\newline #\space) line))))
966
                (run-curl (arguments)
967
                  (dotimes (x retry)
968
                    (let ((process  (run-program *curl-pathname* arguments :input nil :output :stream :wait nil
969
                                                 ;; needs the proxy environemnt variables
970
                                                 ;; :environment ()
971
                                                 )))
972
                      (unless process (error "run-external-view-request: run-curl failed."))
973
                      (case (run-program-exit-code process)
974
                        ((0 nil) (return-from run-curl process))
975
                        ((35 28) ) ;;; retry
976
                        (t (error "run-external-view-request: run-curl failed: ~s" (run-program-exit-code process))))))
977
                  (error "run-external-view-request: run-curl retries exhausted: ~s" arguments)))
978
         (let* ((timeout (etypecase timeout
979
                         (null nil)
980
                         (number (write-to-string (ceiling timeout)))
981
                         (string timeout)))
982
                (concrete-url (encode-query-url iri-string arguments :urlencode t))
983
                (process (run-curl (append (apply #'agent-curl-authentication agent args)
984
                                           `("-k" "-L" "--silent" "-H" ,accept)
985
                                           (when timeout `("--max-time" ,timeout))
986
                                           (list concrete-url))))
987
                (result-stream (run-program-output process)))
988
           (unwind-protect
989
               (let* ((header (read-csv-line result-stream))
990
                      (dimensions (parse-dimensions header))
991
                      (count 0))
992
                 ;; (print dimensions)
993
                 ;; (print (cons :dimensisons dimensions))
994
                 (values (loop for line = (read-csv-line result-stream)
995
                           until (null line)
996
                           collect (parse-sparql-query-solution line)
997
                           do (incf count))
998
                         dimensions
999
                         count))
1000
             (run-program-close process))))))
1001
 
1002
   (:method :around ((method t) (iri-string string) &key
1003
                     auth-token agent-token agent-password agent
1004
                     arguments timeout retry)
1005
     (declare (ignore agent auth-token agent-token agent-password timeout retry))
1006
     (if *cache-path*
1007
         (let* ((authority (parse-url-authority iri-string))
1008
                (key (list iri-string arguments))
1009
                (hash (make-sha1-digest (write-to-string key)))
1010
                (cache-directory (make-pathname :directory `(:relative ,@*cache-path* ,authority)))
1011
                (cache-pathname (make-pathname :directory (pathname-directory cache-directory)
1012
                                               :name (format nil "~32,'0x" hash))))
1013
           (log-trace "cache entry for ~s: ~s" key cache-pathname)
1014
           ; (format *trace-output* "~&cache entry for ~s: ~s~%" key cache-pathname)
1015
           (if (probe-file cache-pathname)
1016
               (with-open-file (stream cache-pathname :direction :input)
1017
                 (let ((*package* (find-package :spocq.i)))
1018
                   (apply #'values (read stream))))
1019
               (let ((results (multiple-value-list (call-next-method))))
1020
                 (when (probe-file cache-directory)
1021
                   (with-open-file (stream cache-pathname :direction :output :if-does-not-exist :create)
1022
                     (let ((*package* (find-package :spocq.i)))
1023
                       (write results :stream stream))))
1024
                 (apply #'values results))))
1025
         (call-next-method))))
1026
 
1027
 ;;; with a stub script which returns a static file
1028
 ;;; (run-external-view-request :curl-csv "http://example.org" :arguments '(a 1 ?::|b| "http://dydra.com"))
1029
 
1030
 (defun request-sparql (iri query-text)
1031
   "given an iri and a query string, send a request to the remote service and parse the response into a list of triples."
1032
   (run-external-service-request *external-service-request-method* iri query-text))
1033
 
1034
 
1035
 (defgeneric run-external-service-request (method iri query-text &key 
1036
                                            auth-token agent-token agent-password agent
1037
                                            arguments)
1038
   (:documentation "Execute a remote query given the IRI and the QUERY-TEXT,
1039
     parse the result and return it as a symbolic field.
1040
     Given the external program limitations, this supports neither views nor authentication.")
1041
 
1042
   (:method  :before ((method t) (iri-string string) (query-text string) &rest args)
1043
     (declare (dynamic-extent args))
1044
     (log-notice "run-external-service-request:  ~s  ~s:  ~s . ~s" method iri-string query-text args))
1045
 
1046
   (:method ((method t) (iri spocq:iri) query-text &rest args)
1047
     (declare (dynamic-extent args))
1048
     (apply #'run-external-service-request method (spocq:iri-lexical-form iri) query-text args))
1049
 
1050
   (:method ((method t) (repository service-repository) query-text &rest args)
1051
     (declare (dynamic-extent args))
1052
     ;; (apply #'run-external-service-request method (repository-id repository) query-text args))
1053
     (apply #'run-external-service-request method (repository-external-name repository) query-text args))
1054
 
1055
   (:method ((method (eql :curl)) (repository t) query-text &rest args)
1056
     (declare (dynamic-extent args))
1057
     (apply #'run-external-service-request :curl-csv repository query-text args))
1058
 
1059
   ;; nb. no password authentcation
1060
   (:method ((method (eql :sparql-query)) (iri-string string) (query-text string) &key 
1061
             agent-token (auth-token agent-token) agent-password agent
1062
             arguments
1063
             (timeout *external-request-timeout*))
1064
     (declare (ignore agent-password agent))
1065
     (flet ((parse-dimensions (line)
1066
              (mapcar #'(lambda (name) (intern name *variable-package*))
1067
                      (split-string line #(#\space #.(code-char 9474)))))
1068
            (parse-sparql-query-solution (line)
1069
              (flet ((parse-sparql-term (string)
1070
                       (or (ignore-errors (parse-term string))
1071
                           (parse-term (concatenate 'string "\"" (string-trim #(#\space) string) "\"")))))
1072
                (mapcar #'parse-sparql-term (split-string line #.(code-char 9474))))))
1073
       (let* ((timeout (etypecase timeout (null nil) (number timeout) (string (read-from-string timeout))))
1074
              (concrete-iri
1075
               (encode-query-url iri-string (if auth-token (acons "auth_token" auth-token arguments) arguments)
1076
                                 :urlencode nil))
1077
              (process (if timeout
1078
                           (bt:with-timeout (timeout)
1079
                             (run-program *sparql-query-pathname* (list concrete-iri query-text)
1080
                                          :input nil :output :stream))
1081
                           (run-program *sparql-query-pathname* (list concrete-iri query-text)
1082
                                          :input nil :output :stream)))
1083
              (result-stream (run-program-output process))
1084
              (header (progn (read-line result-stream)           ; skip decoration
1085
                             (read-line result-stream)))
1086
              (dimensions (parse-dimensions header)))
1087
         (read-line result-stream)           ; skip decoration
1088
         (values (loop for line = (read-line result-stream nil nil)
1089
                   until (or (null line) (not (eql (char line 0) #.(code-char 9474))))
1090
                   collect (parse-sparql-query-solution line)
1091
                   finally (run-program-close process))
1092
                 dimensions))))
1093
 
1094
   ;; nb. no password authentication
1095
   (:method ((method (eql :roqet)) (iri-string string) (query-text string) &key
1096
             agent-token (auth-token agent-token) agent-password agent
1097
             arguments
1098
             (timeout *external-request-timeout*) (retry *external-request-retry*))
1099
     (declare (ignore agent-password agent))
1100
     (labels ((parse-dimensions (line)
1101
                (mapcar #'(lambda (name) (intern name *variable-package*))
1102
                        (split-string line #(#\space #\|))))
1103
              (parse-positions (line)
1104
                (loop for i from 0
1105
                  for char across line
1106
                  when (eql char #\|)
1107
                  collect i))
1108
              (parse-sparql-query-solution (line positions)
1109
                (flet ((parse-column (start end)
1110
                         (cond ((= end (+ start 3))
1111
                                nil)
1112
                               ((string-equal "uri" line :start2 (+ start 2) :end2 (min (length line) (+ start 5)))
1113
                                (let ((start (+ start 6))
1114
                                      (end (position #\> line :end (- end 1) :from-end t)))
1115
                                  (intern-iri (subseq line start end))))
1116
                               ((string-equal "blank" line :start2 (+ start 2) :end2 (min (length line) (+ start 7)))
1117
                                (let ((start (+ start 8))
1118
                                      (end (- end 1)))
1119
                                  (intern-blank-node (string-trim #(#\space) (subseq line start end)))))
1120
                               ((string-equal "string" line :start2 (+ start 2) :end2 (min (length line) (+ start 8)))
1121
                                (parse-roqet-term (subseq line (+ start 9) (- end 2))))
1122
                               (t
1123
                                (error "invalid term: ~s: (~s,~s)" line start end)))))
1124
                  (loop for start in positions
1125
                    for end in (rest positions)
1126
                    collect (parse-column start end))))
1127
              (parse-roqet-term (term)
1128
                (parse-term term))
1129
              (read-roqet-line (result-stream &rest args)
1130
                (let ((line (apply #'read-line result-stream args)))
1131
                  line))
1132
              (run-roqet (arguments)
1133
                (dotimes (x retry)
1134
                    (let ((process (run-program (roqet-pathname) arguments :input :stream :output :stream :wait nil)))
1135
                      (unless process (error "run-external-service-request: run-roqet failed."))
1136
                      (case (run-program-exit-code process)
1137
                        ((0 nil)
1138
                         (case (peek-char nil (run-program-output process) nil nil)
1139
                           (#\- (return-from run-roqet process))
1140
                           (nil (error "run-external-service-request: run-roqet failed: ~s" (run-program-exit-code process)))
1141
                           (t (run-program-close process)))) ;; retry
1142
                        ((35 (run-program-close process)) ) ;; retry
1143
                        (t (error "run-external-service-request: run-roqet failed: ~s" (run-program-exit-code process))))))
1144
                  (error "run-external-service-request: run-roqet retries exhausted: ~s" arguments)))
1145
       (let* ((timeout (etypecase timeout (null nil) (number timeout) (string (read-from-string timeout))))
1146
              (concrete-iri
1147
               (encode-query-url iri-string (if auth-token (acons "auth_token" auth-token arguments) arguments)
1148
                                 :urlencode nil))
1149
              (process (or (if timeout
1150
                               (bt:with-timeout (timeout)
1151
                                 (run-roqet (list "-p" concrete-iri "-e" query-text "-r" "table" "-q")))
1152
                               (run-roqet (list "-p" concrete-iri "-e" query-text "-r" "table" "-q")))
1153
                           (error "run-external-service-request: run-roqet timeout."))))
1154
         (unwind-protect
1155
             (if (typep (run-program-exit-code process) '(integer 1))
1156
                 (error "run-external-service-request: run-program failed: ~s" (run-program-exit-code process))
1157
                 (let* ((result-stream (run-program-output process))
1158
                        (header (progn (read-roqet-line result-stream)           ; skip decoration
1159
                                  (read-roqet-line result-stream)))
1160
                        (positions (parse-positions header))
1161
                        (dimensions (parse-dimensions header)))
1162
                   (when dimensions                ; no results yield a degenerate table
1163
                     (read-line result-stream)           ; skip decoration
1164
                     (values (loop for line = (read-roqet-line result-stream nil nil)
1165
                               until (or (null line)
1166
                                         (not (and (>= (length line) 3)
1167
                                                   (eql (char line 0) #\|)
1168
                                                   (eql (char line 1) #\space)
1169
                                                   (eql (char line (1- (length line))) #\|))))
1170
                               collect (parse-sparql-query-solution line positions))
1171
                             dimensions))))
1172
           (run-program-close process)))))
1173
 
1174
   (:method ((method (eql :curl-csv)) (iri-string string) (query-text string) &rest args &key
1175
             auth-token agent-token agent-password
1176
             (agent (authority (parse-url-authority iri-string)))
1177
             arguments
1178
             (timeout *external-request-timeout*) (retry *external-request-retry*)
1179
             (http-method :get))
1180
     "invoke curl with the query text as its input stream, specify csv,json response type, parse it and return
1181
      the interned field as a list"
1182
     (declare (ignore auth-token agent-token agent-password))
1183
     (let* ((accept (load-time-value (concatenate 'string "Accept: "
1184
                                                  (string-downcase (type-of mime:text/csv))  ;; d2rq permits lower case only
1185
                                                  ","
1186
                                                  (string-downcase (type-of mime:application/sparql-results+json)))))
1187
            (content (load-time-value (concatenate 'string "Content-Type: "
1188
                                                   (string-downcase (type-of mime:application/sparql-query))))))
1189
       (labels ((run-curl (arguments)
1190
                  (dotimes (x retry)
1191
                    (let ((process (run-program *curl-pathname*
1192
                                                arguments
1193
                                                :input (when (eq http-method :post) :stream)
1194
                                                :output :stream :wait nil)))
1195
                      (unless process (error "run-external-service-request: run-curl failed."))
1196
                      (case (run-program-exit-code process)
1197
                        ((0 nil) (if (peek-char t (run-program-output process) nil nil)
1198
                                     (return-from run-curl process)
1199
                                     (run-program-close process)))
1200
                        ((35 28) (run-program-close process)) ;;; retry
1201
                        (t (error "run-external-service-request: run-curl failed: ~s" (run-program-exit-code process))))))
1202
                  (error "run-external-service-request: run-curl retries exhausted: ~s" arguments))
1203
                (read-content-type (stream)
1204
                  (let ((type (rest (assoc :content-type (read-http-headers stream :content-type)))))
1205
                    (when type (mime:mime-type type)))))
1206
         (let* ((concrete-url (encode-query-url iri-string
1207
                                                (acons "query" (substitute-if #\space #'(lambda (c) (or (eql c #\return) (eql c #\linefeed)))
1208
                                                                              query-text)
1209
                                                       arguments)
1210
                                                :urlencode t))
1211
                (timeout (etypecase timeout
1212
                           (null nil)
1213
                           (real (write-to-string (ceiling timeout)))
1214
                           (string timeout))))
1215
           (dotimes (y retry  ;; retry http return code
1216
                      (error "run-external-service-request: http status of retries exhausted: ~s" concrete-url))
1217
             (let* ((process (run-curl (append (apply #'agent-curl-authentication agent args)
1218
                                               `("-L" "-X" "GET" ;; "POST"
1219
                                                "-D" "-"
1220
                                                "-k" "--silent" "-H" ,accept)
1221
                                              (when (eq http-method :post) `( "-H" ,content "--data-binary" "@-"))
1222
                                              (when timeout `("--max-time" ,timeout))
1223
                                              (list concrete-url))))
1224
                   (query-stream  (when (and process (eq http-method :post)) (run-program-input process)))
1225
                   (result-stream (when process (run-program-output process))))
1226
               (when query-stream
1227
                 (write-string query-text query-stream)
1228
                 (close query-stream))
1229
               (unwind-protect 
1230
                   (let ((start-character (peek-char nil result-stream)))
1231
                     (case start-character
1232
                       ((#\h #\H)
1233
                        (let ((status (second (split-string (read-line result-stream) #(#\space)))))
1234
                          (when (eql (elt status 0) #\2)
1235
                            (return (receive-solutions result-stream (or (read-content-type result-stream)
1236
                                                                       mime:text/csv))))))
1237
                       (#\{ (return (receive-solutions result-stream mime:application/sparql-results+json)))
1238
                       (t (return (receive-solutions result-stream mime:text/csv)))))
1239
                 (progn
1240
                   (close result-stream)
1241
                   (run-program-close process))))))))))
1242
 
1243
 (defgeneric receive-solutions (source media-type)
1244
   (:documentation "wrap receive-message with transformations necesary to yield two values
1245
    - a list of solutions
1246
    - a dimension list")
1247
   (:method ((source t) (content-type mime:application/sparql-results+json))
1248
     (parse-json-sparql-results source))
1249
   (:method ((source t) (content-type t))
1250
     (receive-message source content-type)))
1251
 
1252
 
1253
 (defgeneric map-external-service-response (operator service-location query-text &rest args)
1254
   (:documentation
1255
    "perform the external service request, collect the results, invoke the continuation for
1256
  each _external_ solution in turn")
1257
 
1258
   (:method (operator (iri-string string) (query-text string) &rest args)
1259
     (multiple-value-bind (solutions dimensions)
1260
                          (apply #'run-external-service-request :curl-csv iri-string query-text args)
1261
       (loop for solution in solutions
1262
         do (apply operator solution))
1263
       (values solutions dimensions)))
1264
 
1265
   (:method (operator (repository service-repository) (query-text string) &rest args)
1266
     (apply #'map-external-service-response operator (repository-external-name repository) query-text args))
1267
 
1268
   (:method (operator (iri spocq:iri) (query-text string) &rest args)
1269
     (apply #'map-external-service-response operator (spocq:iri-lexical-form iri) query-text args)))
1270
 
1271
 ;;; (map-external-service-response #'(lambda (&rest solution) (print solution)) (service-repository "https://www.dydra.com/test/test/sparql") "select ?s ?p ?o where {?s ?p ?o}")
1272
 
1273
 
1274
 
1275
 ;;; sip service support
1276
 ;;; join v/s leftjoin sip abstracted over the concrete join operator
1277
 
1278
 (defun service-sip-join-generator (join-operator location query-expression base-field-generator &rest args)
1279
  "Generate the join results with solutions from the first constituent passed
1280
   both to the second argument and to the concrete join operator"
1281
   (declare (dynamic-extent args))
1282
   (apply #'service-sip-generic-join-generator #'spocq.e:join
1283
          join-operator location query-expression base-field-generator
1284
          args))
1285
 
1286
 (defun service-sip-leftjoin-generator (join-operator location query-expression base-field-generator &rest args)
1287
  "Generate the optional join results with solutions from the first constituent
1288
   passed both to the second argument and to the concrete leftjoin operator"
1289
   (declare (dynamic-extent args))
1290
   (apply #'service-sip-generic-join-generator #'spocq.e:leftjoin
1291
          join-operator location query-expression base-field-generator
1292
          args))
1293
 
1294
 (defun service-sip-generic-join-generator (join-operator location query-expression base-field-generator &rest args
1295
                                                          &key sip-variables &allow-other-keys)
1296
   "Delegate a service request with a propagated location to a join operation supplying it two generators:
1297
    - one yields the solutions from the base field - after each has provided a service location;
1298
    - the other yields the union of the solutions from the iterated service operations - if necessary extended with the location."
1299
   ;; allow both a variable location and a constant
1300
   (let* ((base-dimensions (solution-generator-dimensions base-field-generator))
1301
          (query-dimensions (expression-dimensions query-expression))
1302
          (extended-service-dimensions (if (variable-p location)
1303
                                           (union-dimensions (list location) query-dimensions)
1304
                                           query-dimensions))
1305
          (base-channel (make-channel :name (list 'spocq.a:|service| (task-id *query*))
1306
                                      :dimensions base-dimensions))
1307
          (sip-dimensions (sort (copy-list sip-variables) #'string-lessp))
1308
          (sip-channel (make-solution-channel :name (list 'spocq.a:|service| (task-id *query*))
1309
                                          :dimensions sip-dimensions))
1310
          (service-channel (make-channel :name (list 'spocq.a:|service| (task-id *query*))
1311
                                                  :dimensions extended-service-dimensions))
1312
          ;; the base field wrapper involves an operator which accepts the base channel and the sip channel to the service operator,
1313
          ;; reads base solutions and for each one, writes the location to the service channel and the original solution
1314
          ;; to the wrapped base channel
1315
          (base-field-generator-wrapped (make-solution-generator :operator 'spocq.a:|copy|
1316
                                                                 :dimensions base-dimensions
1317
                                                                 :expression (list #'run-service-sip-join-base
1318
                                                                                   base-channel
1319
                                                                                   base-field-generator
1320
                                                                                   sip-channel)
1321
                                                                 :channel base-channel
1322
                                                                 :constituents (list base-field-generator)))
1323
          ;; the service field jenerator involves an operator which accepts a channel from which to read locations
1324
          ;; and for each one, initiates the service operation and transcribes solutions
1325
          (service-field-generator (make-solution-generator :operator 'spocq.a::|service|
1326
                                                            :dimensions extended-service-dimensions
1327
                                                            :channel service-channel
1328
                                                            :expression (list #'run-service-sip-join-service
1329
                                                                              service-channel
1330
                                                                              location
1331
                                                                              sip-channel
1332
                                                                              query-expression
1333
                                                                              args))))
1334
     ;; delegate to join the two fields and return that generator
1335
     (trace-algebra service-sip-join-generator :base base-field-generator-wrapped :service service-field-generator)
1336
     ;; the base field generator is primary as the operator may be a leftjoin
1337
     (funcall join-operator base-field-generator-wrapped service-field-generator)))
1338
 
1339
 (defun run-service-sip-join-base (destination base-generator sip-channel)
1340
   "Initiate the source processing, use a projection operator to extract source solutions,
1341
  map them to the sip dimensions and emit distinct solutions to the sip channel before
1342
  passing the complete solution page to the destination."
1343
 
1344
   (let* ((base-channel (solution-generator-channel base-generator))
1345
          (sip-cache (make-hash-table :test 'equal))
1346
          (projection (compute-unary-extractor (channel-dimensions sip-channel)
1347
                                               (channel-dimensions base-channel))))
1348
     (query-run-in-thread *query* (solution-generator-expression base-generator))
1349
     ;; (print (cons :rssjb.sip-channel sip-channel))
1350
     (do-pages (solutions base-channel)
1351
               ;; (print (cons :rssjb (field-term-objects solutions)))
1352
               (loop for index from 0 below (array-dimension solutions 0)
1353
                 for sip-solution = (funcall projection solutions index)
1354
                 unless (gethash sip-solution sip-cache)
1355
                 do (progn (setf (gethash sip-solution sip-cache) t)
1356
                      ;; emit the term-number solution sequence
1357
                      (channel-put sip-channel sip-solution)
1358
                      ;; (print (cons :rssjb (loop for term in sip-solution collect (rlmdb:term-number-value term))))
1359
                      ))
1360
               (let ((result-page (make-page (array-dimension solutions 0) (array-dimension solutions 1))))
1361
                 (copy-page solutions result-page)
1362
                 (put-field-page destination result-page)
1363
                 ;; (print (cons :rssjb result-page))
1364
                 ))
1365
     (complete-field sip-channel)
1366
     (complete-field destination)))
1367
 
1368
 
1369
 (defun run-service-sip-join-service (destination location sip-channel query-expression args)
1370
   "Implement the service request thread for a service clause in which the location and other
1371
   values are passed from a joined field.
1372
   For each solution received from the base source, combine it with any dynamic bindings,
1373
   arrange to execute the service operation with those bindings and emit the results to the destination.
1374
   The intended precedence is
1375
   1. constants in the query expression
1376
   2. dynamic request arguments, as they appear in dynamic bindings
1377
   3. sip bindings.
1378
   "
1379
 
1380
   (let* ((sip-dimensions (channel-dimensions sip-channel))
1381
          ;; iff the location is a variable
1382
          (location-index (position location sip-dimensions)))
1383
     (loop for solution = (channel-get sip-channel)
1384
       until (null solution)
1385
       ;; for each solution, construct the equivalent dynamic bindings from the passed solution
1386
       ;; and any dynamic bindings. provide those to the concrete service call,
1387
       ;; for it to (somehow) incorporate in the query.
1388
       for location-object = (if location-index
1389
                                 (term-number-object (elt solution location-index))
1390
                                 location)
1391
       ;; pass bindings which combine the dynamic bindings with step-specific bindings
1392
       ;; such that the query dynamic bindings take precedence
1393
       for bindings = (loop for dimension in sip-dimensions for term-number in solution
1394
                        for dynamic-term-number = (query-binding-term-number dimension)
1395
                        if (not (= +NULL-TERM-ID+ dynamic-term-number))
1396
                        collect (cons dimension (term-number-object dynamic-term-number))
1397
                        else if (not (eql +NULL-TERM-ID+ term-number))
1398
                        collect (cons dimension (term-number-object term-number)))
1399
       do (apply #'run-sip-service-step location-object destination query-expression bindings args)))
1400
   (complete-field destination))
1401
 
1402
 #+(or)
1403
 (defmethod run-sip-service-step :around (location destination query-expression bindings &rest args)
1404
   (print (list location destination query-expression bindings args))
1405
   (print (compute-applicable-methods #'run-sip-service-step
1406
                                      (list location destination query-expression bindings))))
1407
 
1408
 (defgeneric run-sip-service-step (location destination query-expression bindings &key verbose user-id query-text sip-variables)
1409
   (:documentation "Given a location, a query expression - as an sse and as a sparql string,
1410
     -and- bindings per iteration,
1411
     distinguish local from remote requests and either evaluate the sse pattern or delegate the request
1412
     to a remote processor.")
1413
 
1414
   (:method ((service-location spocq:iri) destination query-expression bindings &rest args &key (verbose t) user-id query-text sip-variables)
1415
     (declare (dynamic-extent args))
1416
     (declare (ignore user-id query-text sip-variables))
1417
     (flet ((call-with-repository ()
1418
              (let ((repository (service-repository service-location))
1419
                    (revision (service-repository-revision service-location)))
1420
                (apply #'run-sip-service-step repository destination query-expression bindings
1421
                       :revision-id (or revision "HEAD")
1422
                       args))))
1423
       (if verbose
1424
         (call-with-repository)
1425
         (handler-bind ((error (lambda (c)
1426
                                 (log-warn "service failure: ~a: ~a" service-location c)
1427
                                 (unless verbose
1428
                                   (return-from run-sip-service-step
1429
                                     (null-generator (expression-dimensions query-expression))
1430
                                     ;; (singleton-generator ())
1431
                                     ;; (singleton-generator (expression-dimensions query-expression))
1432
                                     )))))
1433
           (call-with-repository)))))
1434
 
1435
   (:method ((service-location string) destination query-expression bindings &rest args &key (verbose t) user-id query-text sip-variables)
1436
     (declare (dynamic-extent args))
1437
     (declare (ignore user-id query-text sip-variables))
1438
     (flet ((call-with-repository ()
1439
              (let ((repository (service-repository service-location))
1440
                    (revision (service-repository-revision service-location)))
1441
                (apply #'run-sip-service-step repository destination query-expression bindings
1442
                       :revision-id (or revision "HEAD")
1443
                       args))))
1444
       (if verbose
1445
         (call-with-repository)
1446
         (handler-bind ((error (lambda (c)
1447
                             (log-warn "service failure: ~a: ~a" service-location c)
1448
                             (unless verbose
1449
                               (return-from run-sip-service-step
1450
                                 (singleton-generator ())
1451
                                 ;; (singleton-generator (expression-dimensions query-expression))
1452
                                 )))))
1453
           (call-with-repository)))))
1454
 
1455
   (:method ((service-repository service-repository) destination (query-expression cons) bindings &rest args)
1456
     "For external service processing, compute a generator which will start an external process to run
1457
      the remote query and intern the result soultion stream.
1458
      use a query text rewritten to reflect the bindings."
1459
     (declare (ignore args))
1460
     (authorize-service-request *task* service-repository)
1461
     (multiple-value-bind (solutions result-dimensions result-count)
1462
                          (request-sparql service-repository
1463
                                          (compute-endpoint-sparql-expression service-repository
1464
                                                                              query-expression
1465
                                                                              bindings))
1466
       (when solutions
1467
         (new-field-page destination result-count (length (first solutions)))
1468
         (put-field-page destination
1469
                         (if (equal result-dimensions (channel-dimensions destination))
1470
                             (term-number-field solutions)
1471
                             (transaction-intern-shuffeled-field *transaction*
1472
                                                                 solutions result-dimensions
1473
                                                                 (channel-dimensions destination)))))))
1474
 
1475
   (:method  ((service-repository repository-revision) destination (query-expression cons) bindings &rest args)
1476
     ;; indirectly through the revision to the repository, with the revision ad as an argument
1477
     ;; will cause the revision to be reconstituted when the query is initialized, but leaves
1478
     ;; the methods otherwise as-they-are
1479
     (declare (dynamic-extent args))
1480
     (apply #'run-sip-service-step (repository-revision-reference service-repository) destination query-expression bindings
1481
            :revision-id (repository-revision-id service-repository)
1482
            args))
1483
 
1484
   (:method  ((service-repository repository) destination (query-expression cons) bindings &key (verbose t) query-text sip-variables
1485
              (user-id (task-user-id *task*))
1486
              (revision-id "HEAD")
1487
              (timeout *external-request-timeout*))
1488
     "For internal service processing compile the query, if necessary, and return its generator.
1489
      In the event of error, unless verbose, suppress it and proceed with a table solution field.
1490
      When verbose, decline to handle the error and delegate it to the context."
1491
     (declare (ignore sip-variables))
1492
     (authorize-service-request *task* service-repository)
1493
     ;;; this wrecks havoc with variable-directed analyses at the remote end
1494
     ;;; (setf query-expression (sublis bindings query-expression))
1495
     ;; for internal use, always perform substitution
1496
     (setf query-expression (bind-sparql-expression query-expression bindings))
1497
     (flet ((execute-task ()
1498
              (let* ((metadata (instance-metadata *task*))
1499
                    (service-task (make-query :sse-expression query-expression
1500
                                              :id (make-service-task-id)
1501
                                              :dynamic-bindings (cons (mapcar #'first bindings) (mapcar #'rest bindings))
1502
                                              :repository service-repository
1503
                                              :revision-id revision-id
1504
                                              :user-id user-id
1505
                                              :sparql-expression query-text
1506
                                              :agent (task-agent *task*)
1507
                                              :parent-task *task*
1508
                                              :metadata (when metadata (clone-instance metadata))
1509
                                              :response-content-type (task-response-content-type *task*))))
1510
                (with-task-environment (:task service-task)
1511
                  ;;(update-repository-revision-id (task-revision service-task))
1512
                  (initialize-task service-task)
1513
                  ;;  return the generator wrapped to initiate a new query context
1514
                  (flet ((retask (&rest args)
1515
                           ; interposes an extra thread
1516
                           (query-run-in-thread service-task args)
1517
                           'spocq.a:|service|))
1518
                    (push #'retask (solution-generator-expression (task-result-generator service-task)))
1519
                    (log-debug "query: wrapped for retask ~a -> ~a" *task* service-task)
1520
                    ;;;!!! reduction requires immediate execution and materialization...
1521
                    (let* ((service-generator (task-result-generator service-task))
1522
                           (service-source (solution-generator-channel service-generator)))
1523
                      ;; just copy the pages
1524
                      ; (print (list* :rsss.startdopages (repository-id service-repository) (solution-generator-dimensions service-generator)))
1525
                      (query-run-in-thread *query* (task-result-generator service-task))
1526
                      (do-pages (solutions service-source)
1527
                                ;; (print (cons :rsss (field-term-objects solutions)))
1528
                                (check-query-status *query*)
1529
                                (let* ((solution-count (array-dimension solutions 0))
1530
                                       (result-page (make-page solution-count (array-dimension solutions 1))))
1531
                                  (copy-page solutions result-page)
1532
                                  (put-field-page destination result-page)
1533
                                  (incf-stat *solutions-processed* solution-count)
1534
                                  (incf-stat *solutions-constructed* solution-count)))
1535
                      ;; (print :rsss.enddopages)
1536
                      ;; no, do not complete it. that is for the caller
1537
                      ;; (complete-field destination)
1538
                      ))))))
1539
       (handler-bind ((error (lambda (c)
1540
                               (log-warn "service: request failure: ~a: ~a" service-repository c)
1541
                               (unless verbose
1542
                                 (return-from run-sip-service-step
1543
                                   ;;(singleton-generator ())
1544
                                   (singleton-generator (expression-dimensions query-expression))
1545
                                   )))))
1546
         (bt:with-timeout (timeout)
1547
           (execute-task)))))
1548
 
1549
   ;; this should not arise in the standard case as the macro expands to the join form given a variable...
1550
   (:method ((location symbol) destination query-expression (bindings t) &rest args)
1551
     (declare (ignore args))
1552
     (null-generator (union-dimensions (list location) (expression-dimensions query-expression)))))
1553
 
1554
 #| degenerate roqet results
1555
 (let ((line "|"))
1556
   (mapcar #'(lambda (name) (intern name *variable-package*))
1557
           (split-string line #(#\space #\|))))
1558
 (let ((line "|"))
1559
   (loop for i from 0
1560
         for char across line
1561
         when (eql char #\|)
1562
         collect i))
1563
 
1564
 (defparameter *p*
1565
   (sb-ext:RUN-PROGRAM "/usr/local/bin/roqet"
1566
                       '("-p" "http://dbpedia.org/sparql" "-e"
1567
                         "select * where { ?film <http://purl.org/dc/elements/1.1/subject> <http://dbpedia.org/resource/Category:French_films> } "
1568
                         "-r" "table" "-q")
1569
                       :INPUT NIL :OUTPUT :STREAM :WAIT NIL))
1570
 
1571
 |#
1572
 
1573
 
1574
 ;; from dev.dydra.com
1575
 ;; (sb-ext:run-program "/development/bin/sparql-query" '("http://1jStOy40J9151En2MsYQ:@wulff.dev/jhacker/functions-strlang03/sparql" "SELECT * WHERE { ?s ?p ?o } LIMIT 10") :input nil :output :stream)
1576
 ;; (request-sparql "http://dydra.com/jhacker/foaf/sparql" "SELECT count(*) WHERE { ?s ?p ?o }")
1577
 ;; (request-sparql "http://dydra.com/jhacker/foaf/sparql" "SELECT ?s ?p ?o WHERE { ?s ?p ?o }")
1578
 ;; (request-sparql "http://dydra.com/jhacker/bnode-coreference-dawg-bnode-coref-001/sparql" "SELECT ?s ?p ?o WHERE { ?s ?p ?o }")
1579
 
1580
 ;; roqet -p "http://dydra.com/jhacker/foaf/sparql" -e 'SELECT count(*) WHERE { ?s ?p ?o }' -r table
1581