Coverage report: /development/source/library/org/datagraph/spocq-shard/src/algebra/operators/service.lisp
| Kind | Covered | All | % |
| expression | 150 | 2418 | 6.2 |
| branch | 4 | 160 | 2.5 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
3
(in-package :org.datagraph.spocq.implementation)
5
(:documentation "This file defines the SERVICE operator for the 'org.datagraph.spocq'
9
"Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved.")
12
"The service operator involves sending a query request to a sparql endpoint and incorporating
13
the result solution field into the current query as if it were the product of an internal
14
BGP. This implementation limits the service designator to a constant, as the seantics for the
15
variable form is undefined."))
18
(defun roqet-pathname ()
20
(when (probe-file "/usr/local/bin/roqet")
21
(setq *roqet-pathname* "/usr/local/bin/roqet"))
22
(when (probe-file "/usr/bin/roqet")
23
(setq *roqet-pathname* "/usr/bin/roqet"))
24
(error "roqet-pathname: no executable found")))
26
(defmacro spocq.a:|service| (iri group-graph-pattern &rest args &environment env)
27
"Perform the remote query and return the result field."
28
(apply #'macroexpand-service iri group-graph-pattern env args))
30
(defmacro spocq.a::|servicejoin| (iri group-graph-pattern sip-source &rest args &environment env)
31
"Perform the remote query and return the result field."
32
(apply #'macroexpand-service-join iri group-graph-pattern sip-source env args))
34
(defmacro spocq.a::|serviceleftjoin| (iri group-graph-pattern sip-source &rest args &environment env)
35
"Perform the remote query and return the result field.
36
nb. the argument order is opposite of a simple leftjoin: the sip-source, the second argument, is the base field"
37
(apply #'macroexpand-service-leftjoin iri group-graph-pattern sip-source env args))
39
(defun expression-projection-variables (expression)
40
(multiple-value-bind (all bound free projection)
41
(parse-expression-variables expression)
42
(declare (ignore all bound free))
45
(defgeneric field-propagation-dimensions (field base-dimension-declarations)
46
(:documentation "given a field expression and a list of declared base dimensions,
47
compute the possible sip input dimensions. this largest intersection between
48
the field input dimensions an the respective base dimensions.
49
The input dimensions are the variables free in the expression - this is,
50
the difference between the complete dimension set and those in the projection set.")
51
(:method ((field list) (base-dimension-declarations cons))
52
(multiple-value-bind (all bound free projection)
53
(parse-expression-variables field)
54
(declare (ignore all bound free))
55
(loop with maximal = ()
56
for dimensions in base-dimension-declarations
57
;; only the projected dimensions count
58
for intersection = (intersection dimensions projection)
59
when (> (length intersection) (length maximal))
60
do (setf maximal intersection)
61
finally (return maximal))))
62
(:method ((field list) (base-dimension-declarations null))
65
(defgeneric compute-endpoint-sparql-expression (endpoint sparql bindings)
66
(:documentation "serialize the sse as a sparql expression, with possible bindings.
67
iff a base was provided in the metadata, prepend that. (nyi: see spocq#306)")
68
(:method ((endpoint substitution-service-endpoint) (sparql-expression cons) (bindings cons))
69
(with-output-to-string (s)
70
(format-sparql-sse s (bind-sparql-expression sparql-expression bindings))))
71
(:method ((endpoint t) (sparql-expression cons) (bindings cons))
72
(with-output-to-string (s)
73
(format-sparql-sse s (add-sparql-bindings bindings sparql-expression))))
74
(:method ((endpoint t) (sparql-expression cons) (bindings null))
77
(setf (service-location-class "www.ws-qa.tibco.nxp.com/DummyTestingURL/RootsSparql")
78
'substitution-service-endpoint)
80
(defun macroexpand-service (iri group-graph-pattern env &rest args)
81
"Expand a service form into a contour which rebinds the current repository to that of the
82
service and then compiles the graph pattern in that context, computes its generator and
83
invokes the generic service operator. This will distinguish between local and remote
84
repositories to either evaluate the bgp as in internal query or use the original sparql
85
string to perform a remote query request.
86
Take also note of the binding context in order to distinguish between dependent and
87
independent federation operations.
89
The failed SERVICE clause is treated as if it had a result of a single solution with no bindings
91
(declare (ignore env))
92
(case (federation-mode)
94
`(spocq.a:|null| ,(expression-dimensions group-graph-pattern)))
96
(cond ((or (null args) (keywordp (first args)))
97
(destructuring-bind (&key (silent nil s-p) query-text) args
98
`(spocq.e::service ',iri
99
(quote ,(if (or (select-form-p group-graph-pattern)
100
(distinct-form-p group-graph-pattern)
101
(slice-form-p group-graph-pattern))
103
`(spocq.a:|select| ,group-graph-pattern ,(expression-dimensions group-graph-pattern))))
104
,@(when query-text `(:query-text ,query-text))
105
,@(when s-p `(:verbose ,(not silent))))))
106
;; otherwise the next argument is a field expression,
107
;; be prepared to sip-join it
109
(let* ((group-dimensions (expression-dimensions group-graph-pattern))
110
(location-dimensions (when (variable-p iri) (list iri)))
111
(reference-dimensions (union-dimensions group-dimensions location-dimensions)))
112
(destructuring-bind (field &key (silent nil s-p) query-text) args
113
`(spocq.e::service-join ',iri
114
(quote ,(if (select-form-p group-graph-pattern)
116
`(spocq.a:|select| ,group-graph-pattern ,group-dimensions)))
117
,(if reference-dimensions
118
`(spocq.e::with-reference-dimensions ,reference-dimensions ,field)
120
,@(when query-text `(:query-text ,query-text))
121
,@(when s-p `(:verbose ,(not silent)))))))))))
123
(defun macroexpand-service-join (iri group-graph-pattern sip-source env &rest args)
124
(declare (ignore env))
125
(case (federation-mode)
127
`(spocq.a:|null| ,(union-dimensions (when (variable-p iri) (list iri))
128
(union-dimensions (expression-dimensions group-graph-pattern)
129
(expression-dimensions sip-source)))))
131
(let* ((group-dimensions (expression-dimensions group-graph-pattern))
132
(location-dimensions (when (variable-p iri) (list iri)))
133
(reference-dimensions (union-dimensions group-dimensions location-dimensions)))
134
`(spocq.e::service-join ',iri
135
(quote ,(if (select-form-p group-graph-pattern)
137
`(spocq.a:|select| ,group-graph-pattern ,group-dimensions)))
138
,(if reference-dimensions
139
`(spocq.e::with-reference-dimensions ,reference-dimensions ,sip-source)
143
(defun macroexpand-service-leftjoin (iri group-graph-pattern sip-source env &rest args)
144
(declare (ignore env))
145
(case (federation-mode)
147
`(spocq.a:|null| ,(union-dimensions (when (variable-p iri) (list iri))
148
(union-dimensions (expression-dimensions group-graph-pattern)
149
(expression-dimensions sip-source)))))
151
(let* ((group-dimensions (expression-dimensions group-graph-pattern))
152
(location-dimensions (when (variable-p iri) (list iri)))
153
(reference-dimensions (union-dimensions group-dimensions location-dimensions)))
154
`(spocq.e::service-leftjoin ',iri
155
(quote ,(if (select-form-p group-graph-pattern)
157
`(spocq.a:|select| ,group-graph-pattern ,group-dimensions)))
158
,(if reference-dimensions
159
`(spocq.e::with-reference-dimensions ,reference-dimensions ,sip-source)
164
(defparameter *use-service-description-repository-p* nil)
166
(defgeneric spocq.e:service (location query-expression &key verbose user-id query-text)
167
(:documentation "Given a location and a query expression - as an sse and as a sparql string,
168
distinguish local from remote requests and either evaluate the sse pattern or delegate the request
169
to a remote processor.")
171
(:method :around (location query &rest args &key (verbose t) user-id query-text)
172
(declare (ignore user-id query-text))
173
(apply #'call-next-method location query
177
(:method ((service (eql |urn:dydra|:|service-description|)) (query-expression cons) &rest args)
178
;; should make an ephemeral repository and run the query on that.
179
(cond (*use-service-description-repository-p*
180
(let ((ephemeral-repository (ensure-ephemeral-repository (task-repository *task*) |urn:dydra|:|service-description|)))
181
(apply #'spocq.e:service ephemeral-repository query-expression args)))
183
(when (select-form-p query-expression) (setf query-expression (second query-expression)))
184
(let* ((statement-pattern (assoc 'spocq.a:|triple| (rest query-expression)))
185
(service-description (repository-service-description (repository-id (task-repository *task*)))))
186
(assert (every #'variable-p (rest statement-pattern)) ()
187
"service: Invalid service description query : ~s" query-expression)
188
(spocq.e:bindings (service-description-solutions service-description)
189
(rest statement-pattern))))))
192
(:method ((service (eql |urn:dydra|:|timemap|)) query-expression &rest args)
193
(declare (ignore args))
194
(when (select-form-p query-expression) (setf query-expression (second query-expression)))
195
(let* ((statement-pattern (assoc 'spocq.a:|triple| (rest query-expression)))
196
(timemap (compute-timemap (repository-id (task-repository *task*)))))
197
(assert (every #'variable-p (rest statement-pattern)) ()
198
"service: Invalid timemap query : ~s" query-expression)
199
(spocq.e:bindings timemap (rest statement-pattern))))
201
(:method ((service (eql |urn:dydra|:|revisionId|)) query-expression &rest args)
202
(declare (ignore args))
203
(assert (select-form-p query-expression)
205
"service: Invalid revision id query : ~s" query-expression)
206
(let* ((dimensions (third query-expression))
207
(revisions (repository-list-revision-ids (task-repository *task*))))
208
(assert (and (every #'variable-p dimensions)
209
(= (length dimensions) 1))
211
"service: Invalid revision id query : ~s" query-expression)
212
(spocq.e:bindings (mapcar #'list revisions) dimensions)))
214
(:method ((service (eql |urn:dydra|:|service-functions|)) (query-expression cons) &rest args)
215
;;; (loop for p in spocq.i:*iri-package-names* when (loop for s being the external-symbols in p when (fboundp s) return t) collect p)
216
(declare (ignore args))
217
(assert (select-form-p query-expression)
219
"service: Invalid service functions query : ~s" query-expression)
220
(let ((dimensions (third query-expression))
221
(operators (sort (loop for p in spocq.i:*iri-package-names*
222
append (loop for s being the external-symbols in p when (fboundp s) collect s))
224
(assert (and (every #'variable-p dimensions)
225
(= (length dimensions) 2))
227
"service: Invalid service functions query : ~s" query-expression)
228
(spocq.e:bindings (loop for symbol in operators
229
collect (list symbol (or (documentation symbol 'function) "")))
232
;; parser iri locations from a query expression must allow decoding
233
(:method ((service-location spocq:iri) query-expression &rest args)
234
(declare (dynamic-extent args))
235
(apply #'spocq.e:service (url-decode (spocq:iri-lexical-form service-location)) query-expression args))
236
;; while internal ones do not
237
(:method ((service-location puri:uri) query-expression &rest args)
238
(declare (dynamic-extent args))
239
(apply #'spocq.e:service (iri-lexical-form service-location) query-expression args))
241
(:method ((service-location string) query-expression &rest args &key (verbose t) user-id query-text)
242
(declare (dynamic-extent args))
243
(declare (ignore user-id query-text))
244
(flet ((call-with-repository ()
245
(let ((repository (handler-case (service-repository service-location)
246
(error (c) (spocq.e:request-error "service: invalid service location: ~s: ~a"
247
service-location c))))
248
(revision (service-repository-revision service-location)))
249
(apply #'spocq.e::service repository query-expression
250
:location service-location
251
:revision-id (or revision "HEAD")
254
(call-with-repository)
255
(handler-bind ((error (lambda (c)
256
(log-warn "service failure: ~a: ~a" service-location c)
258
(return-from spocq.e::service
259
;; https://www.w3.org/TR/2013/REC-sparql11-federated-query-20130321/#serviceFailure
260
(singleton-generator ())
262
(call-with-repository)))))
264
(:method ((service-repository service-repository) query-expression &key (verbose t) user-id query-text location)
265
"For external service processing, compute a generator which will start an external process to run
266
the remote query and intern the result soultion stream.
268
Given a unit table expression, attempt to locate a view."
269
(declare (ignore user-id))
270
(let* ((qdb (query-dynamic-bindings *task*))
271
(bindings (mapcar #'cons (first qdb) (rest qdb))))
272
(authorize-service-request *task* service-repository)
273
;; replace a unit table with the view definition - for remote locations as well.
274
(when (or (equal query-expression *select-table-unit-sse*)
275
(equal query-expression *table-unit-sse*))
276
(let* ((view-name (repository-resource-view-name service-repository location))
277
(view-text (when view-name (authorized-repository-view service-repository view-name (task-agent *task*)))))
279
(setf query-expression (parse-sparql view-text))
280
(setf query-text view-text))))
281
(service-generator service-repository (expression-dimensions query-expression)
282
;;!!! nb. if bindings are present, this will drop any graphs specified in the view.
283
;;!!! they will need to be captured as for internal federation and incorporated into the query text
285
(compute-endpoint-sparql-expression service-repository query-expression bindings))
288
(with-output-to-string (s) (format-sparql-sse s query-expression))))
291
(:method ((service-repository repository) (query-expression cons) &key (verbose t) query-text location
292
(user-id (task-user-id *task*))
295
"For internal service processing compile the query, if necessary, and return its generator.
296
In the event of error, unless verbose, suppress it and proceed with a table solution field.
297
When verbose, decline to handle the error and delegate it to the context.
299
Given a unit table expression, attempt to locate a view.
300
!!!HDT : based on the service-repository class interpose term mapping."
301
(let* ((qdb (query-dynamic-bindings *task*)))
302
(authorize-service-request *task* service-repository)
303
;; replace a unit table with the view definition
304
(when (or (equal query-expression *select-table-unit-sse*)
305
(equal query-expression *table-unit-sse*))
306
(let* ((view-name (repository-resource-view-name service-repository location))
307
(view-text (when view-name (authorized-repository-view service-repository view-name (task-agent *task*)))))
309
(multiple-value-bind (view-query-expression view-options)
310
(parse-sparql view-text)
311
(setf dataset-graphs (getf view-options :dataset-graphs))
312
(setf query-expression view-query-expression)
313
(setf query-text view-text)))))
314
(flet ((service-task-generator ()
315
(let* ((metadata (instance-metadata *task*))
316
(service-task (make-query :sse-expression query-expression
317
:id (make-service-task-id)
318
:dynamic-bindings qdb
319
:dataset-graphs dataset-graphs
320
:repository service-repository
321
:revision-id revision-id
323
:sparql-expression query-text
324
:agent (task-agent *task*)
326
:metadata (when metadata (clone-instance metadata))
327
:response-content-type (task-response-content-type *task*))))
328
(with-task-environment (:task service-task)
329
;; initialze the task in order that it has its generator
330
(initialize-task service-task)
331
;; a generator is returned as for a standard algebra operation, but
332
;; the expressed operation is wrapped in order to establish the service query
333
;; as the wrapping task
334
(setf (solution-generator-expression (task-result-generator service-task))
335
(list #'run-service-task service-task
336
(solution-generator-expression (task-result-generator service-task))))
337
(log-debug "query: wrapped for retask ~a -> ~a" *task* service-task)
338
;;; reduction yields the solution generator to be initiated my the respective operation
339
;;; !!!HDT based on the service-repository class interpose term mapping.
340
(task-result-generator service-task)))))
341
(handler-bind ((error (lambda (c)
342
(log-warn "service: internal service failure: ~a: ~a" service-repository c)
344
(return-from spocq.e::service
345
;; https://www.w3.org/TR/2013/REC-sparql11-federated-query-20130321/#serviceFailure
346
(singleton-generator ())
348
(service-task-generator)))))
350
(:method ((service-repository repository-revision) (query-expression cons) &rest args &key &allow-other-keys)
351
;; indirectly through the revision to the repository, with the revision ad as an argument
352
;; will cause the revision to be reconstituted when the query is initialized, but leaves
353
;; the methods otherwise as-they-are
354
(declare (dynamic-extent args))
355
(apply #'spocq.e::service (repository-revision-reference service-repository) query-expression
356
:revision-id (repository-revision-id service-repository)
359
;; this should not arise in the standard case as the macro expands to the join form given a variable...
360
(:method ((location symbol) query-expression &rest args)
361
(if (variable-p location)
362
(let ((location-value (query-binding-value location)))
363
(if (spocq:unbound-variable-p location-value )
364
(case (undefined-variable-behavior)
365
(spocq.e:compilation-error :expression location
366
:condition (make-condition 'spocq.e:undefined-variable-error
367
:variables (list location)))
368
(|urn:dydra|:|warning| (log-warn "~@[~a: ~]~a"
370
(make-condition 'spocq.e:undefined-variable-error
372
:variables (list location)))
373
(null-generator (union-dimensions (list location) (expression-dimensions query-expression))))
374
(|urn:dydra|:|dynamicBinding|
375
(null-generator (union-dimensions (list location) (expression-dimensions query-expression)))))
376
;; iff it was a bound variable
377
(apply #'spocq.e::service location-value query-expression args)))
378
;; iff it was not a variable
379
(spocq.e:compilation-error :expression location
380
:condition (make-condition 'spocq.e:resource-not-found-error
381
:identifier location)))))
383
(defgeneric run-service-task (task initial-operation)
384
(:documentation "run the query as a service sub-select by starting its algebra thread,
385
waiting for that to complete, and then finalizing it.
386
this last is necessary in order to clean up after it - particularly transactions.")
387
(:method ((task query) initial-operation)
388
(channel-put (task-operations task) initial-operation)
389
(unwind-protect (progn
390
(task-run-algebra-thread task)
392
(setf (task-state task) :complete)
393
;;; !!! finalization should be in the task thread ???
394
(finalize-task task))))
396
(defun spocq.e::service-join (location query-expression field &rest args)
397
(declare (dynamic-extent args))
398
(apply #'spocq.e::service-generic-join #'spocq.e:join location query-expression field args))
400
(defun spocq.e::service-leftjoin (location query-expression field &rest args)
401
(declare (dynamic-extent args))
402
(apply #'spocq.e::service-generic-join #'spocq.e:leftjoin location query-expression field args))
405
(defgeneric spocq.e::service-generic-join (join-operator location query-expression field &key verbose user-id query-text)
406
(:documentation "Given a service form which includes a source field, use that field content for some combination of
407
- a variable for a location
408
- values to propagate to the service field expression.
409
- as an sse and as a sparql string,
410
distinguish local from remote requests and either evaluate the sse pattern or delegate the request
411
to a remote processor.
412
If the variable is among the source field dimensions, for each solution in the source field,
413
issue the remote the service request and join the result field with the solution.
414
If the variable is not bound, attempt a dynamic binding and then delegate to a standard service operator.
415
If other variables correspond, then instantiate the pattern for each solution for those bindings and
416
reiterate the requests.")
418
(:method (join-operator (location symbol) group-graph-pattern source-field &rest args)
419
(declare (dynamic-extent args))
420
(if (variable-p location)
421
(let* ((service-variables (expression-projection-variables group-graph-pattern))
422
(extended-variables (cond ((member location service-variables) service-variables)
423
((variable-p location) (cons location service-variables))
424
(t service-variables)))
425
(source-variables (expression-projection-variables source-field))
426
(sip-variables (intersection extended-variables
428
(union-dimensions source-variables
429
(first (task-dynamic-bindings *task*))))))
430
; (trace-algebra spocq.e::service-join
431
(log-debug "service-generic-join: (location symbol) ~s" location)
432
(log-debug "service-generic-join: pattern variables ~s" service-variables)
433
(log-debug "service-generic-join: extended variables ~s" extended-variables)
434
(log-debug "service-generic-join: source variables: ~s" source-variables)
435
(log-debug "service-generic-join: sip variabes: ~s" sip-variables)
436
(if (find location sip-variables)
437
;; if the location can be taken from the source field, extract solutions
438
;; and join each with the results of the respective service
439
(if (rest sip-variables)
440
;; if other dimensions are propagated, arrange to integrate them into the service request
441
(apply #'service-sip-generic-join-generator join-operator location
442
group-graph-pattern source-field
443
:sip-variables sip-variables
445
;; otherwise, expect just the location
446
(apply #'service-location-join-generator join-operator location
447
group-graph-pattern source-field args))
448
;; otherwise attempt to get the value elsewhere.
449
;; if it is a request parameter, use that to performa static service operation
450
;; if no value is found follow the defined behaviour for undefined variable,
451
;; whereby if continued, the result is a null solution field
452
(let ((location-value (query-binding-value location)))
453
(if (spocq:unbound-variable-p location-value )
454
(case (undefined-variable-behavior)
455
(spocq.e:compilation-error :expression location
456
:condition (make-condition 'spocq.e:undefined-variable-error
457
:variables (list location)))
458
(|urn:dydra|:|warning| (log-warn "~@[~a: ~]~a"
460
(make-condition 'spocq.e:undefined-variable-error
462
:variables (list location)))
463
(null-generator (union-dimensions (list location) (solution-generator-dimensions source-field))))
464
(|urn:dydra|:|dynamicBinding|
465
(null-generator (union-dimensions (list location) (solution-generator-dimensions source-field)))))
466
(funcall join-operator source-field
467
(apply #'spocq.e:service location-value group-graph-pattern args))))))
468
;; if not a variable location, delegate to the base method as a constant
471
(:method (join-operator (location t) group-graph-pattern source-field &rest args)
472
"The default method allows both symbol and iri constant constant locations.
473
just propagate solutions"
474
(let* ((service-variables (expression-projection-variables group-graph-pattern))
475
(source-variables (expression-projection-variables source-field))
476
(sip-variables (intersection service-variables source-variables)))
477
(log-debug "service-generic-join: (location t) ~s" location)
478
(log-debug "service-generic-join: pattern variables ~s" service-variables)
479
(log-debug "service-generic-join: source variables: ~s" source-variables)
480
(log-debug "service-generic-join: sip variabes: ~s" sip-variables)
483
(apply #'service-sip-generic-join-generator join-operator location
484
group-graph-pattern source-field
485
:sip-variables sip-variables
487
;; no sip: fall-back on standard service
488
(apply #'spocq.e:service location group-graph-pattern args)))))
492
(defun run-service (location sparql-expression &rest args &key repository-id agent)
493
(declare (ignore repository-id agent))
494
(apply #'run-sparql `(spocq.a:|service| ,location ,sparql-expression)
497
(run-service <http://localhost/james/test2>
498
'(spocq.a:|select| (spocq.a:|bgp| (spocq.a:|triple| ?::s ?::p ?::o)) (?::s ?::p ?::o))
499
:repository-id "james/test"
500
:agent (system-agent))
503
;; (setq *authorize-service-access* nil)
505
(defparameter *authorizations* (make-registry :test #'equalp))
507
(defgeneric authorize-service-request (client-repository service-location &key if-unauthorized)
508
(:documentation "Authorize cross-repository access for a service request.
509
Require some one of (for mysql metadata)
510
- the agent is an administrator
511
- the agent account is the same as the target account
512
- the base repository account is the same as the target account
513
- the agent is authorized to read the target repository
514
- the agent is an authorized read collaborator
515
- the base repository account is authorized to read the target repository
516
- the base repository account is an authorized read collaborator
518
- the agent account has read access
519
- the base repository account has read access
520
If the tests succeed, return T, otherwise signal a request error.")
522
(:method ((task task) (service-location service-repository) &key if-unauthorized)
523
(declare (ignore if-unauthorized))
526
t ; based on mysql authorizations, remote requests are allowed based on configuration
530
(:method ((task task) (service-location repository) &key (if-unauthorized :error))
531
(let* ((agent (task-agent task))
532
(agent-name (agent-name agent))
533
(agent-location (agent-location agent))
534
(task-repository (task-repository task))
535
(task-account (repository-account task-repository))
536
(task-account-name (account-name task-account))
537
(location-id (repository-id service-location))
538
(location-name (repository-name service-location))
539
(location-account-name (account-name (repository-account service-location)))
540
(authorization-key (cons (or agent-name (when agent-location (list agent-location))) t))
541
(view (task-request-location task)))
542
(cond ((equal agent-name location-account-name)
543
(log-info "service authorization as owner ~s -> ~s" agent-name location-name)
545
((administrator-p agent)
546
(log-info "service authorization as admin ~s -> ~s" agent-name location-name)
548
((equal task-account-name location-account-name)
549
(log-info "service authorization as intra-account ~s -> ~s" task-account-name location-name)
551
((authorize-query-request agent service-location :if-unauthorized nil))
552
#+(or) ;; mysql -based version superseded by acl graph
553
((multiple-value-bind (authorization known-p) (gethash (cons location-id authorization-key) *authorizations*)
558
;; if the target repository permits read
559
(let* ((query-command
560
(if (authenticated-agent-p agent)
561
(format nil "mysql -h ~a -u root ~a -BNe \"select repositories.id from repositories, accounts where repositories.name = '~a' and accounts.name = '~a' and repositories.account_id = accounts.id and (repositories.privacy_setting = 5 or repositories.privacy_setting = 3);\""
562
*mysql-host* *mysql-database*
563
location-name location-account-name)
564
(format nil "mysql -h ~a -u root ~a -BNe \"select repositories.id from repositories, accounts where repositories.name = '~a' and accounts.name = '~a' and repositories.account_id = accounts.id and (repositories.privacy_setting = 5);\""
565
*mysql-host* *mysql-database*
566
location-name location-account-name)))
567
(process (run-program "/bin/sh" (list "-c" query-command)
568
:input nil :output :stream))
569
(id (unwind-protect (read-line (sb-ext:process-output process) nil)
570
(sb-ext:process-close process))))
571
(when (and (stringp id) (ignore-errors (parse-integer id)))
573
;; if collaboration permits the base repository account
574
(let* ((query-command
575
(format nil "mysql -h ~a -u root ~a -BNe \"select collaborations.id from collaborations where collaborations.account_id = (select accounts.id from accounts where accounts.name = '~a') and collaborations.repository_id = (select repositories.id from repositories, accounts where repositories.name = '~a' and accounts.name = '~a' and repositories.account_id = accounts.id) and collaborations.read = 1;\""
576
*mysql-host* *mysql-database*
577
task-account-name location-name location-account-name))
578
(process (run-program "/bin/sh" (list "-c" query-command)
579
:input nil :output :stream))
580
(id (unwind-protect (read-line (run-program-output process) nil)
581
(run-program-close process))))
582
(when (and (stringp id) (ignore-errors (parse-integer id)))
584
;; test authorization via the agent - which can fallback to the
585
;; account or origin repository
586
(access-authorized-p service-location agent |acl|:|Read|)))
587
(setf (gethash (cons location-id authorization-key) *authorizations*) authorization))
589
(log-info "service authorization ~s -> t" (cons (cons location-id authorization-key) t))
591
((access-authorized-p service-location agent |acl|:|Read|))
592
((access-authorized-p service-location task-account |acl|:|Read|))
594
(access-authorized-p service-location
596
(string (make-view :repository task-repository :name view))
601
(log-info "service authorization ~s -> nil" (cons (cons location-id authorization-key) t))
602
(ecase if-unauthorized
604
(spocq.e:task-authorization-error :task task :operation 'spocq.a:|service| :location service-location))
608
(defgeneric authorize-query-request (context repository &key read-only-p if-unauthorized)
609
(:method ((task task) (repository repository) &rest args &key (read-only-p (operation-read-only-p task)) if-unauthorized)
610
(declare (dynamic-extent args) (ignore if-unauthorized))
611
(apply #'authorize-query-request (task-agent task) repository
612
:read-only-p read-only-p
614
(:method ((agent agent) (repository repository) &key (read-only-p nil) (if-unauthorized :error))
615
(let* ((agent-name (agent-name agent))
616
(repository-id (repository-id repository))
617
(repository-name (repository-name repository))
618
(repository-account-name (account-name (repository-account repository)))
619
(location (agent-location agent))
620
(authorization-key (cons (or agent-name (when location (list location))) read-only-p)))
621
(cond ((equal agent-name repository-account-name)
622
(log-info "authorization as owner ~s" agent-name)
624
((administrator-p agent)
625
(log-info "authorization as admin ~s" agent-name)
627
((multiple-value-bind (authorization known-p) (gethash (list repository-id authorization-key read-only-p) *authorizations*)
629
(log-notice "authorize-query-request: ~s(~s)@~s ~s ~s"
630
agent-name (authenticated-agent-p agent) location
635
;; see if the repository permits the agent to read
636
(let* ((query-command
637
(if (authenticated-agent-p agent)
638
(format nil "mysql -h ~a -u root ~a -BNe \"select repositories.id from repositories, accounts where repositories.name = '~a' and accounts.name = '~a' and repositories.account_id = accounts.id and (repositories.privacy_setting = 5 or repositories.privacy_setting = 3);\""
639
*mysql-host* *mysql-database*
640
repository-name repository-account-name)
641
(format nil "mysql -h ~a -u root ~a -BNe \"select repositories.id from repositories, accounts where repositories.name = '~a' and accounts.name = '~a' and repositories.account_id = accounts.id and (repositories.privacy_setting = 5);\""
642
*mysql-host* *mysql-database*
643
repository-name repository-account-name)))
644
(process (sb-ext:run-program "/bin/sh" (list "-c" query-command)
645
:input nil :output :stream))
646
(id (unwind-protect (read-line (sb-ext:process-output process) nil)
647
(sb-ext:process-close process))))
648
(when (and (stringp id) (ignore-errors (parse-integer id)))
650
;; see if collaboration permits
651
(and (authenticated-agent-p agent)
652
(let* ((query-command
654
(format nil "mysql -h ~a -u root ~a -BNe \"select collaborations.id from collaborations where collaborations.account_id = (select accounts.id from accounts where accounts.name = '~a') and collaborations.repository_id = (select repositories.id from repositories, accounts where repositories.name = '~a' and accounts.name = '~a' and repositories.account_id = accounts.id) and collaborations.read = 1;\""
655
*mysql-host* *mysql-database*
656
agent-name repository-name repository-account-name)
657
(format nil "mysql -h ~a -u root ~a -BNe \"select collaborations.id from, collaborations where collaborations.account_id = (select accounts.id from accounts where accounts.name = '~a') and collaborations.repository_id = (select repositories.id from repositories, accounts where repositories.name = '~a' and accounts.name = '~a' and repositories.account_id = accounts.id) and collaborations.write = 1;\""
658
*mysql-host* *mysql-database*
659
agent-name repository-name repository-account-name)))
660
(process (sb-ext:run-program "/bin/sh" (list "-c" query-command)
661
:input nil :output :stream))
662
(id (unwind-protect (read-line (sb-ext:process-output process) nil)
663
(sb-ext:process-close process))))
664
(when (and (stringp id) (ignore-errors (parse-integer id)))
666
;; see if location permits
668
(located-user-p agent)
669
(let* ((query-command
670
(format nil "mysql -h ~a -u root ~a -BNe \"select repositories.id from repositories, accounts where repositories.name = '~a' and accounts.name = '~a' and repositories.account_id = accounts.id and repositories.permissable_ip_addresses regexp('.*~a.*');\""
671
*mysql-host* *mysql-database*
672
repository-name repository-account-name
674
(process (sb-ext:run-program "/bin/sh" (list "-c" query-command)
675
:input nil :output :stream))
676
(id (unwind-protect (read-line (sb-ext:process-output process) nil)
677
(sb-ext:process-close process))))
678
(when (and (stringp id) (ignore-errors (parse-integer id)))
680
(setf (gethash (cons repository-id authorization-key) *authorizations*) authorization))
682
;; read-only-p is redundant here
683
(log-info "authorization ~s -> t" (cons (cons repository-id authorization-key) read-only-p))
686
(log-info "authorization ~s -> nil" (cons (cons repository-id authorization-key) read-only-p))
687
(ecase if-unauthorized
689
(error "Access to repository is not authorized: ~s -> ~s." agent repository))
695
(defun service-location-join-generator (join-operator location query-expression base-field-generator &rest args)
696
"Delegate a service request with a propagated location to a join operation supplying it two generators:
697
- one yields the solutions from the base field - after each has provided a service location;
698
- the other yields the union of the solutions from the iterated service operations - if necessary extended with the location."
699
(assert (variable-p location) ()
700
"Invalid service join location variable: ~a" location)
701
(let* ((base-dimensions (solution-generator-dimensions base-field-generator))
702
(query-dimensions (expression-dimensions query-expression))
703
(extended-service-dimensions (union-dimensions (list location) query-dimensions))
704
(base-channel (make-channel :name (list 'spocq.a:|service| (task-id *query*))
705
:dimensions base-dimensions))
706
(location-sip-channel (make-solution-channel :name (list 'spocq.a:|service| (task-id *query*))
707
:dimensions (list location)))
708
(result-channel (make-channel :name (list 'spocq.a:|service| (task-id *query*))
709
:dimensions extended-service-dimensions))
710
;; the base field wrapper involves an operator which accepts the base channel and the sip channel to the service operator,
711
;; reads base solutions and for each one, writes the location to the service channel an the original solution
712
;; to the wrapped base channel
713
(base-field-generator-wrapped (make-solution-generator :operator 'spocq.a:|copy|
714
:dimensions base-dimensions
715
:expression (list #'run-service-location-join-base
719
location-sip-channel)
720
:channel base-channel
721
:constituents (list base-field-generator)))
722
;; the service field jenerator involves an operator which accepts a channel from which to read locations
723
;; and for each one, initiates the service operation and transcribes solutions
724
(service-field-generator (make-solution-generator :operator 'spocq.a::|service|
725
:dimensions extended-service-dimensions
726
:channel result-channel
727
:expression (list #'run-service-location-join-service
733
;; delegate to join the two fields
734
(funcall join-operator base-field-generator-wrapped service-field-generator)))
737
(defun run-service-location-join-base (destination location-variable base-generator location-sip-channel)
738
"Initiate the source processing, read each solution and propagate each unique value to the
739
service location processor before passing the complete page to the destination."
741
(let ((base-channel (solution-generator-channel base-generator))
742
(location-cache (make-hash-table :test'eql))
743
(location-index (position location-variable (solution-generator-dimensions base-generator))))
744
(query-run-in-thread *query* (solution-generator-expression base-generator))
745
(do-pages (solutions base-channel)
746
;; (print (cons :rsljb (field-term-objects solutions)))
747
(loop for index from 0 below (array-dimension solutions 0)
748
for location-term = (aref solutions index location-index)
749
unless (gethash location-term location-cache)
750
do (progn (setf (gethash location-term location-cache) location-term)
751
;; (print (cons :sip-put (rlmdb:term-number-value location-term)))
752
(channel-put location-sip-channel (list location-term))))
753
(let ((result-page (make-page (array-dimension solutions 0) (array-dimension solutions 1))))
754
(copy-page solutions result-page)
755
(put-field-page destination result-page)))
756
(complete-field location-sip-channel)
757
(complete-field destination)))
760
(defun run-service-location-join-service (destination location-variable location-sip-channel query-expression args)
761
"Implement the service request thread for a service clause in which the locations are passed
762
from a joined field. If the location is in the service bgp, then pass that reault as-is.
763
Otherwise extend it to include the location."
765
(loop for (location-term) = (channel-get location-sip-channel)
766
until (null location-term)
767
for location-object = (term-number-object location-term)
768
do (let* ((service-generator (apply #'spocq.e::service location-object query-expression args))
769
(service-dimensions (solution-generator-dimensions service-generator))
770
(service-source (solution-generator-channel service-generator))
771
(location-index (position location-variable service-dimensions)))
772
(query-run-in-thread *query* (solution-generator-expression service-generator))
774
;; just copy the pages
775
(do-pages (solutions service-source)
776
(check-query-status *query*)
777
(let* ((result-page (make-page (array-dimension solutions 0) (array-dimension solutions 1))))
778
(copy-page solutions result-page)
779
(put-field-page destination result-page)))
780
;; otherwise, extend the page with the respective location
781
(let* ((destination-dimensions (channel-dimensions destination))
782
(destination-width (channel-page-width destination))
783
(dimension-map (make-array (length service-dimensions)
784
:initial-contents (loop for dimension in service-dimensions
785
collect (or (position dimension destination-dimensions)
786
(error "service != destination: ~a ~a"
787
dimension destination-dimensions)))))
788
(destination-location-index (or (position location-variable destination-dimensions)
789
(error "service location != destination: ~a ~a"
790
location-variable destination-dimensions))))
791
(do-pages (solutions service-source)
792
(check-query-status *query*)
793
(let* ((length (array-dimension solutions 0))
794
(width (array-dimension solutions 1))
795
(result-page (make-page length destination-width)))
796
(assert (= width (1- destination-width)) ()
797
"run-service-location-join-service: service page width mismatch: ~a, ~a: ~a: ~a"
798
width destination-width
799
location-variable service-generator)
800
(loop for solution-index below length
802
(loop for service-index below width
803
do (setf (aref result-page solution-index (aref dimension-map service-index))
804
(aref solutions solution-index service-index)))
805
(setf (aref result-page solution-index destination-location-index)
807
(put-field-page destination result-page)))))))
808
(complete-field destination))
811
(defun service-generator (repository base-dimensions query-text &key (verbose t))
812
"Construct a generator which will delegate request/response for a remote query
813
to an external thread.
814
In the event of error, unless verbose, suppress it and proceed with a table solution field.
815
When verbose, decline to handle the error and delegate it to the context."
817
(handler-bind ((error (lambda (c)
818
(log-warn "external service failure: ~a: ~a" repository c)
820
(return-from service-generator
821
(singleton-generator base-dimensions))))))
822
(multiple-value-bind (result-solutions result-dimensions)
823
(request-sparql repository query-text)
824
(if (and (null (set-difference result-dimensions base-dimensions))
825
(null (set-difference base-dimensions result-dimensions)))
826
(typecase *transaction*
829
(let ((result-field (make-matrix-field :dimensions result-dimensions)))
830
(set-solution-field-solutions result-field result-solutions)
831
(complete-field-data result-field)
834
(let ((result-channel (make-channel :name (list 'spocq.a:|service| (task-id *query*))
835
:dimensions base-dimensions)))
836
(make-solution-generator :operator 'spocq.a:|service|
837
:dimensions base-dimensions
838
:expression (list #'run-service-thread result-channel result-solutions result-dimensions)
839
:channel result-channel
841
(null-generator base-dimensions)))))
844
(defgeneric run-service-thread (destination result-solutions &optional result-dimensions)
845
(:method (destination (result-solutions list) &optional (result-dimensions (channel-dimensions destination)))
846
(run-service-thread destination
847
(if (equal result-dimensions (channel-dimensions destination))
848
(term-number-field result-solutions)
849
(transaction-intern-shuffeled-field *transaction*
850
result-solutions result-dimensions
851
(channel-dimensions destination)))))
853
(:method (destination (solutions array) &optional (result-dimensions (channel-dimensions destination)))
854
(declare (type array solutions))
855
(incf-stat *algebra-operations*)
856
(assert-argument-types run-service-thread
857
(destination (or channel function)))
858
(trace-algebra run-service-thread solutions)
860
(when (> (array-dimension solutions 0) 0)
861
(assert (= (array-dimension solutions 1) (length (channel-dimensions destination))) ()
862
"Inconsistent solutions / dimensions: ~s ~s" (array-dimensions solutions) result-dimensions))
863
(new-field-page destination (array-dimension solutions 0) (array-dimension solutions 1))
864
(put-field-page destination solutions)
865
(incf-stat *solutions-processed* (array-dimension solutions 0))
866
(incf-stat *solutions-constructed* (array-dimension solutions 0))
867
(complete-field destination)))
871
(defparameter *curl-pathname* "/usr/bin/curl")
872
;;; (defparameter *curl-pathname* "/development/source/library/cat-sparql-results.sh")
874
(defgeneric agent-curl-authentication (authority &key auth-token agent-token agent-password
875
agent timeout arguments)
876
(:documentation "Given an authority agent, return the list of curl arguments for its auth data.")
877
(:method ((authority t) &key auth-token (agent-token auth-token) agent-password agent timeout arguments)
878
(declare (ignore agent timeout arguments))
879
(flet ((as-password (key)
880
(when key `("--user" ,(concatenate 'string ":" key))))
882
(when key `("--user" ,(concatenate 'string key ":")))))
883
(or (as-token agent-token)
884
(as-password agent-password))))
885
(:method ((authority string) &rest args)
886
(declare (dynamic-extent args))
887
(apply #'agent-curl-authentication (authority authority) args))
888
(:method ((authority authority) &key auth-token (agent-token auth-token) agent-password
889
agent timeout arguments)
890
(declare (ignore agent timeout arguments)) ; just to allow pass-through from service operators
891
(flet ((as-password (key)
892
(when key `("--user" ,(concatenate 'string (or (agent-name authority) "") ":" key))))
894
;; as of release 03.2018 the key is in the second position
895
(when key `("--user" ,(concatenate 'string ":" key)))))
896
(or (as-token agent-token)
897
(as-password agent-password)
898
;; if no auth information is provided, attempt to locate it respective the authority identifer
899
(as-token (agent-token authority))
900
(as-password (agent-password authority))))))
901
;;; (agent-curl-authentication *create-authority-id*)
903
(defgeneric agent-request-authentication (authority &key auth-token agent-token agent-password)
904
(:documentation "Given an authority agent, return the tbnl property list with arguments for its auth data.")
905
(:method ((authority string) &rest args)
906
(declare (dynamic-extent args))
907
(apply #'agent-request-authentication (authority authority) args))
908
(:method ((authority authority) &key (auth-token (agent-token authority)) (agent-token auth-token)
909
(agent-password (agent-password authority)))
910
(flet ((as-password (key)
911
(when key `(:basic-authentication ,(concatenate 'string (or (agent-name authority) "") ":" key))))
913
;; as of release 03.2018 the key is in the second position
914
(when key `(:basic-authentication ,(concatenate 'string ":" key)))))
915
(or (as-token agent-token)
916
(as-password agent-password)))))
919
;;; remote service request
921
;;; "-w" "%{http_code}\\n"
922
;;; is not useful as the code follows the response
924
(defun encode-query-url (base-url arguments &key (urlencode t))
925
(format nil "~a~@[~a~]~{~a=~a~^&~}"
926
;; do not add the extension as some urls do without one
928
(when arguments (if (position #\? base-url) "&" "?"))
929
(loop for (keyword . value) in arguments
930
for string-value = (typecase value (string value) (t (prin1-to-string value)))
931
collect (if (variable-p keyword) (concatenate 'string "\$" (string keyword)) keyword)
932
collect (if urlencode (url-encode string-value) string-value))))
933
;;; (encode-query-url "http://example.org" '(a 1 ?::|b| "http://dydra.com"))
936
(defgeneric run-external-view-request (method iri &key
937
auth-token agent-token agent-password agent
938
arguments timeout retry)
939
(:documentation "Execute a remote query based on a view given its IRI and optional
940
authentication and query arguments.
941
parse the result and return it as a symbolic field")
943
(:method :before ((method t) (iri-string string) &rest args)
944
(declare (dynamic-extent args))
945
(log-notice "run-external-view-request: ~s ~s . ~s" method iri-string args))
947
(:method ((method (eql :curl-csv)) (iri-string string) &rest args
948
&key auth-token agent-token agent-password
949
(agent (authority (parse-url-authority iri-string)))
951
(retry *external-request-retry*)
952
(timeout *external-request-timeout*))
953
(declare (dynamic-extent args)
954
(ignore auth-token agent-token agent-password))
955
(let* ((accept (load-time-value (concatenate 'string "Accept: " (mime:mime-type-namestring mime:text/csv)))))
956
(labels ((parse-dimensions (line)
957
(mapcar #'(lambda (name) (intern name *variable-package*))
958
(split-string line ",")))
959
(parse-sparql-query-solution (line)
960
(mapcar #'parse-csv-term
961
(spocq.i::parse-csv line :start-name 'odcsv::|recordfields| :separator #\,)))
962
(read-csv-line (stream)
963
(let ((line (read-line stream nil nil)))
965
(string-trim #(#\return #\newline #\space) line))))
966
(run-curl (arguments)
968
(let ((process (run-program *curl-pathname* arguments :input nil :output :stream :wait nil
969
;; needs the proxy environemnt variables
972
(unless process (error "run-external-view-request: run-curl failed."))
973
(case (run-program-exit-code process)
974
((0 nil) (return-from run-curl process))
976
(t (error "run-external-view-request: run-curl failed: ~s" (run-program-exit-code process))))))
977
(error "run-external-view-request: run-curl retries exhausted: ~s" arguments)))
978
(let* ((timeout (etypecase timeout
980
(number (write-to-string (ceiling timeout)))
982
(concrete-url (encode-query-url iri-string arguments :urlencode t))
983
(process (run-curl (append (apply #'agent-curl-authentication agent args)
984
`("-k" "-L" "--silent" "-H" ,accept)
985
(when timeout `("--max-time" ,timeout))
986
(list concrete-url))))
987
(result-stream (run-program-output process)))
989
(let* ((header (read-csv-line result-stream))
990
(dimensions (parse-dimensions header))
992
;; (print dimensions)
993
;; (print (cons :dimensisons dimensions))
994
(values (loop for line = (read-csv-line result-stream)
996
collect (parse-sparql-query-solution line)
1000
(run-program-close process))))))
1002
(:method :around ((method t) (iri-string string) &key
1003
auth-token agent-token agent-password agent
1004
arguments timeout retry)
1005
(declare (ignore agent auth-token agent-token agent-password timeout retry))
1007
(let* ((authority (parse-url-authority iri-string))
1008
(key (list iri-string arguments))
1009
(hash (make-sha1-digest (write-to-string key)))
1010
(cache-directory (make-pathname :directory `(:relative ,@*cache-path* ,authority)))
1011
(cache-pathname (make-pathname :directory (pathname-directory cache-directory)
1012
:name (format nil "~32,'0x" hash))))
1013
(log-trace "cache entry for ~s: ~s" key cache-pathname)
1014
; (format *trace-output* "~&cache entry for ~s: ~s~%" key cache-pathname)
1015
(if (probe-file cache-pathname)
1016
(with-open-file (stream cache-pathname :direction :input)
1017
(let ((*package* (find-package :spocq.i)))
1018
(apply #'values (read stream))))
1019
(let ((results (multiple-value-list (call-next-method))))
1020
(when (probe-file cache-directory)
1021
(with-open-file (stream cache-pathname :direction :output :if-does-not-exist :create)
1022
(let ((*package* (find-package :spocq.i)))
1023
(write results :stream stream))))
1024
(apply #'values results))))
1025
(call-next-method))))
1027
;;; with a stub script which returns a static file
1028
;;; (run-external-view-request :curl-csv "http://example.org" :arguments '(a 1 ?::|b| "http://dydra.com"))
1030
(defun request-sparql (iri query-text)
1031
"given an iri and a query string, send a request to the remote service and parse the response into a list of triples."
1032
(run-external-service-request *external-service-request-method* iri query-text))
1035
(defgeneric run-external-service-request (method iri query-text &key
1036
auth-token agent-token agent-password agent
1038
(:documentation "Execute a remote query given the IRI and the QUERY-TEXT,
1039
parse the result and return it as a symbolic field.
1040
Given the external program limitations, this supports neither views nor authentication.")
1042
(:method :before ((method t) (iri-string string) (query-text string) &rest args)
1043
(declare (dynamic-extent args))
1044
(log-notice "run-external-service-request: ~s ~s: ~s . ~s" method iri-string query-text args))
1046
(:method ((method t) (iri spocq:iri) query-text &rest args)
1047
(declare (dynamic-extent args))
1048
(apply #'run-external-service-request method (spocq:iri-lexical-form iri) query-text args))
1050
(:method ((method t) (repository service-repository) query-text &rest args)
1051
(declare (dynamic-extent args))
1052
;; (apply #'run-external-service-request method (repository-id repository) query-text args))
1053
(apply #'run-external-service-request method (repository-external-name repository) query-text args))
1055
(:method ((method (eql :curl)) (repository t) query-text &rest args)
1056
(declare (dynamic-extent args))
1057
(apply #'run-external-service-request :curl-csv repository query-text args))
1059
;; nb. no password authentcation
1060
(:method ((method (eql :sparql-query)) (iri-string string) (query-text string) &key
1061
agent-token (auth-token agent-token) agent-password agent
1063
(timeout *external-request-timeout*))
1064
(declare (ignore agent-password agent))
1065
(flet ((parse-dimensions (line)
1066
(mapcar #'(lambda (name) (intern name *variable-package*))
1067
(split-string line #(#\space #.(code-char 9474)))))
1068
(parse-sparql-query-solution (line)
1069
(flet ((parse-sparql-term (string)
1070
(or (ignore-errors (parse-term string))
1071
(parse-term (concatenate 'string "\"" (string-trim #(#\space) string) "\"")))))
1072
(mapcar #'parse-sparql-term (split-string line #.(code-char 9474))))))
1073
(let* ((timeout (etypecase timeout (null nil) (number timeout) (string (read-from-string timeout))))
1075
(encode-query-url iri-string (if auth-token (acons "auth_token" auth-token arguments) arguments)
1077
(process (if timeout
1078
(bt:with-timeout (timeout)
1079
(run-program *sparql-query-pathname* (list concrete-iri query-text)
1080
:input nil :output :stream))
1081
(run-program *sparql-query-pathname* (list concrete-iri query-text)
1082
:input nil :output :stream)))
1083
(result-stream (run-program-output process))
1084
(header (progn (read-line result-stream) ; skip decoration
1085
(read-line result-stream)))
1086
(dimensions (parse-dimensions header)))
1087
(read-line result-stream) ; skip decoration
1088
(values (loop for line = (read-line result-stream nil nil)
1089
until (or (null line) (not (eql (char line 0) #.(code-char 9474))))
1090
collect (parse-sparql-query-solution line)
1091
finally (run-program-close process))
1094
;; nb. no password authentication
1095
(:method ((method (eql :roqet)) (iri-string string) (query-text string) &key
1096
agent-token (auth-token agent-token) agent-password agent
1098
(timeout *external-request-timeout*) (retry *external-request-retry*))
1099
(declare (ignore agent-password agent))
1100
(labels ((parse-dimensions (line)
1101
(mapcar #'(lambda (name) (intern name *variable-package*))
1102
(split-string line #(#\space #\|))))
1103
(parse-positions (line)
1105
for char across line
1108
(parse-sparql-query-solution (line positions)
1109
(flet ((parse-column (start end)
1110
(cond ((= end (+ start 3))
1112
((string-equal "uri" line :start2 (+ start 2) :end2 (min (length line) (+ start 5)))
1113
(let ((start (+ start 6))
1114
(end (position #\> line :end (- end 1) :from-end t)))
1115
(intern-iri (subseq line start end))))
1116
((string-equal "blank" line :start2 (+ start 2) :end2 (min (length line) (+ start 7)))
1117
(let ((start (+ start 8))
1119
(intern-blank-node (string-trim #(#\space) (subseq line start end)))))
1120
((string-equal "string" line :start2 (+ start 2) :end2 (min (length line) (+ start 8)))
1121
(parse-roqet-term (subseq line (+ start 9) (- end 2))))
1123
(error "invalid term: ~s: (~s,~s)" line start end)))))
1124
(loop for start in positions
1125
for end in (rest positions)
1126
collect (parse-column start end))))
1127
(parse-roqet-term (term)
1129
(read-roqet-line (result-stream &rest args)
1130
(let ((line (apply #'read-line result-stream args)))
1132
(run-roqet (arguments)
1134
(let ((process (run-program (roqet-pathname) arguments :input :stream :output :stream :wait nil)))
1135
(unless process (error "run-external-service-request: run-roqet failed."))
1136
(case (run-program-exit-code process)
1138
(case (peek-char nil (run-program-output process) nil nil)
1139
(#\- (return-from run-roqet process))
1140
(nil (error "run-external-service-request: run-roqet failed: ~s" (run-program-exit-code process)))
1141
(t (run-program-close process)))) ;; retry
1142
((35 (run-program-close process)) ) ;; retry
1143
(t (error "run-external-service-request: run-roqet failed: ~s" (run-program-exit-code process))))))
1144
(error "run-external-service-request: run-roqet retries exhausted: ~s" arguments)))
1145
(let* ((timeout (etypecase timeout (null nil) (number timeout) (string (read-from-string timeout))))
1147
(encode-query-url iri-string (if auth-token (acons "auth_token" auth-token arguments) arguments)
1149
(process (or (if timeout
1150
(bt:with-timeout (timeout)
1151
(run-roqet (list "-p" concrete-iri "-e" query-text "-r" "table" "-q")))
1152
(run-roqet (list "-p" concrete-iri "-e" query-text "-r" "table" "-q")))
1153
(error "run-external-service-request: run-roqet timeout."))))
1155
(if (typep (run-program-exit-code process) '(integer 1))
1156
(error "run-external-service-request: run-program failed: ~s" (run-program-exit-code process))
1157
(let* ((result-stream (run-program-output process))
1158
(header (progn (read-roqet-line result-stream) ; skip decoration
1159
(read-roqet-line result-stream)))
1160
(positions (parse-positions header))
1161
(dimensions (parse-dimensions header)))
1162
(when dimensions ; no results yield a degenerate table
1163
(read-line result-stream) ; skip decoration
1164
(values (loop for line = (read-roqet-line result-stream nil nil)
1165
until (or (null line)
1166
(not (and (>= (length line) 3)
1167
(eql (char line 0) #\|)
1168
(eql (char line 1) #\space)
1169
(eql (char line (1- (length line))) #\|))))
1170
collect (parse-sparql-query-solution line positions))
1172
(run-program-close process)))))
1174
(:method ((method (eql :curl-csv)) (iri-string string) (query-text string) &rest args &key
1175
auth-token agent-token agent-password
1176
(agent (authority (parse-url-authority iri-string)))
1178
(timeout *external-request-timeout*) (retry *external-request-retry*)
1180
"invoke curl with the query text as its input stream, specify csv,json response type, parse it and return
1181
the interned field as a list"
1182
(declare (ignore auth-token agent-token agent-password))
1183
(let* ((accept (load-time-value (concatenate 'string "Accept: "
1184
(string-downcase (type-of mime:text/csv)) ;; d2rq permits lower case only
1186
(string-downcase (type-of mime:application/sparql-results+json)))))
1187
(content (load-time-value (concatenate 'string "Content-Type: "
1188
(string-downcase (type-of mime:application/sparql-query))))))
1189
(labels ((run-curl (arguments)
1191
(let ((process (run-program *curl-pathname*
1193
:input (when (eq http-method :post) :stream)
1194
:output :stream :wait nil)))
1195
(unless process (error "run-external-service-request: run-curl failed."))
1196
(case (run-program-exit-code process)
1197
((0 nil) (if (peek-char t (run-program-output process) nil nil)
1198
(return-from run-curl process)
1199
(run-program-close process)))
1200
((35 28) (run-program-close process)) ;;; retry
1201
(t (error "run-external-service-request: run-curl failed: ~s" (run-program-exit-code process))))))
1202
(error "run-external-service-request: run-curl retries exhausted: ~s" arguments))
1203
(read-content-type (stream)
1204
(let ((type (rest (assoc :content-type (read-http-headers stream :content-type)))))
1205
(when type (mime:mime-type type)))))
1206
(let* ((concrete-url (encode-query-url iri-string
1207
(acons "query" (substitute-if #\space #'(lambda (c) (or (eql c #\return) (eql c #\linefeed)))
1211
(timeout (etypecase timeout
1213
(real (write-to-string (ceiling timeout)))
1215
(dotimes (y retry ;; retry http return code
1216
(error "run-external-service-request: http status of retries exhausted: ~s" concrete-url))
1217
(let* ((process (run-curl (append (apply #'agent-curl-authentication agent args)
1218
`("-L" "-X" "GET" ;; "POST"
1220
"-k" "--silent" "-H" ,accept)
1221
(when (eq http-method :post) `( "-H" ,content "--data-binary" "@-"))
1222
(when timeout `("--max-time" ,timeout))
1223
(list concrete-url))))
1224
(query-stream (when (and process (eq http-method :post)) (run-program-input process)))
1225
(result-stream (when process (run-program-output process))))
1227
(write-string query-text query-stream)
1228
(close query-stream))
1230
(let ((start-character (peek-char nil result-stream)))
1231
(case start-character
1233
(let ((status (second (split-string (read-line result-stream) #(#\space)))))
1234
(when (eql (elt status 0) #\2)
1235
(return (receive-solutions result-stream (or (read-content-type result-stream)
1237
(#\{ (return (receive-solutions result-stream mime:application/sparql-results+json)))
1238
(t (return (receive-solutions result-stream mime:text/csv)))))
1240
(close result-stream)
1241
(run-program-close process))))))))))
1243
(defgeneric receive-solutions (source media-type)
1244
(:documentation "wrap receive-message with transformations necesary to yield two values
1245
- a list of solutions
1246
- a dimension list")
1247
(:method ((source t) (content-type mime:application/sparql-results+json))
1248
(parse-json-sparql-results source))
1249
(:method ((source t) (content-type t))
1250
(receive-message source content-type)))
1253
(defgeneric map-external-service-response (operator service-location query-text &rest args)
1255
"perform the external service request, collect the results, invoke the continuation for
1256
each _external_ solution in turn")
1258
(:method (operator (iri-string string) (query-text string) &rest args)
1259
(multiple-value-bind (solutions dimensions)
1260
(apply #'run-external-service-request :curl-csv iri-string query-text args)
1261
(loop for solution in solutions
1262
do (apply operator solution))
1263
(values solutions dimensions)))
1265
(:method (operator (repository service-repository) (query-text string) &rest args)
1266
(apply #'map-external-service-response operator (repository-external-name repository) query-text args))
1268
(:method (operator (iri spocq:iri) (query-text string) &rest args)
1269
(apply #'map-external-service-response operator (spocq:iri-lexical-form iri) query-text args)))
1271
;;; (map-external-service-response #'(lambda (&rest solution) (print solution)) (service-repository "https://www.dydra.com/test/test/sparql") "select ?s ?p ?o where {?s ?p ?o}")
1275
;;; sip service support
1276
;;; join v/s leftjoin sip abstracted over the concrete join operator
1278
(defun service-sip-join-generator (join-operator location query-expression base-field-generator &rest args)
1279
"Generate the join results with solutions from the first constituent passed
1280
both to the second argument and to the concrete join operator"
1281
(declare (dynamic-extent args))
1282
(apply #'service-sip-generic-join-generator #'spocq.e:join
1283
join-operator location query-expression base-field-generator
1286
(defun service-sip-leftjoin-generator (join-operator location query-expression base-field-generator &rest args)
1287
"Generate the optional join results with solutions from the first constituent
1288
passed both to the second argument and to the concrete leftjoin operator"
1289
(declare (dynamic-extent args))
1290
(apply #'service-sip-generic-join-generator #'spocq.e:leftjoin
1291
join-operator location query-expression base-field-generator
1294
(defun service-sip-generic-join-generator (join-operator location query-expression base-field-generator &rest args
1295
&key sip-variables &allow-other-keys)
1296
"Delegate a service request with a propagated location to a join operation supplying it two generators:
1297
- one yields the solutions from the base field - after each has provided a service location;
1298
- the other yields the union of the solutions from the iterated service operations - if necessary extended with the location."
1299
;; allow both a variable location and a constant
1300
(let* ((base-dimensions (solution-generator-dimensions base-field-generator))
1301
(query-dimensions (expression-dimensions query-expression))
1302
(extended-service-dimensions (if (variable-p location)
1303
(union-dimensions (list location) query-dimensions)
1305
(base-channel (make-channel :name (list 'spocq.a:|service| (task-id *query*))
1306
:dimensions base-dimensions))
1307
(sip-dimensions (sort (copy-list sip-variables) #'string-lessp))
1308
(sip-channel (make-solution-channel :name (list 'spocq.a:|service| (task-id *query*))
1309
:dimensions sip-dimensions))
1310
(service-channel (make-channel :name (list 'spocq.a:|service| (task-id *query*))
1311
:dimensions extended-service-dimensions))
1312
;; the base field wrapper involves an operator which accepts the base channel and the sip channel to the service operator,
1313
;; reads base solutions and for each one, writes the location to the service channel and the original solution
1314
;; to the wrapped base channel
1315
(base-field-generator-wrapped (make-solution-generator :operator 'spocq.a:|copy|
1316
:dimensions base-dimensions
1317
:expression (list #'run-service-sip-join-base
1319
base-field-generator
1321
:channel base-channel
1322
:constituents (list base-field-generator)))
1323
;; the service field jenerator involves an operator which accepts a channel from which to read locations
1324
;; and for each one, initiates the service operation and transcribes solutions
1325
(service-field-generator (make-solution-generator :operator 'spocq.a::|service|
1326
:dimensions extended-service-dimensions
1327
:channel service-channel
1328
:expression (list #'run-service-sip-join-service
1334
;; delegate to join the two fields and return that generator
1335
(trace-algebra service-sip-join-generator :base base-field-generator-wrapped :service service-field-generator)
1336
;; the base field generator is primary as the operator may be a leftjoin
1337
(funcall join-operator base-field-generator-wrapped service-field-generator)))
1339
(defun run-service-sip-join-base (destination base-generator sip-channel)
1340
"Initiate the source processing, use a projection operator to extract source solutions,
1341
map them to the sip dimensions and emit distinct solutions to the sip channel before
1342
passing the complete solution page to the destination."
1344
(let* ((base-channel (solution-generator-channel base-generator))
1345
(sip-cache (make-hash-table :test 'equal))
1346
(projection (compute-unary-extractor (channel-dimensions sip-channel)
1347
(channel-dimensions base-channel))))
1348
(query-run-in-thread *query* (solution-generator-expression base-generator))
1349
;; (print (cons :rssjb.sip-channel sip-channel))
1350
(do-pages (solutions base-channel)
1351
;; (print (cons :rssjb (field-term-objects solutions)))
1352
(loop for index from 0 below (array-dimension solutions 0)
1353
for sip-solution = (funcall projection solutions index)
1354
unless (gethash sip-solution sip-cache)
1355
do (progn (setf (gethash sip-solution sip-cache) t)
1356
;; emit the term-number solution sequence
1357
(channel-put sip-channel sip-solution)
1358
;; (print (cons :rssjb (loop for term in sip-solution collect (rlmdb:term-number-value term))))
1360
(let ((result-page (make-page (array-dimension solutions 0) (array-dimension solutions 1))))
1361
(copy-page solutions result-page)
1362
(put-field-page destination result-page)
1363
;; (print (cons :rssjb result-page))
1365
(complete-field sip-channel)
1366
(complete-field destination)))
1369
(defun run-service-sip-join-service (destination location sip-channel query-expression args)
1370
"Implement the service request thread for a service clause in which the location and other
1371
values are passed from a joined field.
1372
For each solution received from the base source, combine it with any dynamic bindings,
1373
arrange to execute the service operation with those bindings and emit the results to the destination.
1374
The intended precedence is
1375
1. constants in the query expression
1376
2. dynamic request arguments, as they appear in dynamic bindings
1380
(let* ((sip-dimensions (channel-dimensions sip-channel))
1381
;; iff the location is a variable
1382
(location-index (position location sip-dimensions)))
1383
(loop for solution = (channel-get sip-channel)
1384
until (null solution)
1385
;; for each solution, construct the equivalent dynamic bindings from the passed solution
1386
;; and any dynamic bindings. provide those to the concrete service call,
1387
;; for it to (somehow) incorporate in the query.
1388
for location-object = (if location-index
1389
(term-number-object (elt solution location-index))
1391
;; pass bindings which combine the dynamic bindings with step-specific bindings
1392
;; such that the query dynamic bindings take precedence
1393
for bindings = (loop for dimension in sip-dimensions for term-number in solution
1394
for dynamic-term-number = (query-binding-term-number dimension)
1395
if (not (= +NULL-TERM-ID+ dynamic-term-number))
1396
collect (cons dimension (term-number-object dynamic-term-number))
1397
else if (not (eql +NULL-TERM-ID+ term-number))
1398
collect (cons dimension (term-number-object term-number)))
1399
do (apply #'run-sip-service-step location-object destination query-expression bindings args)))
1400
(complete-field destination))
1403
(defmethod run-sip-service-step :around (location destination query-expression bindings &rest args)
1404
(print (list location destination query-expression bindings args))
1405
(print (compute-applicable-methods #'run-sip-service-step
1406
(list location destination query-expression bindings))))
1408
(defgeneric run-sip-service-step (location destination query-expression bindings &key verbose user-id query-text sip-variables)
1409
(:documentation "Given a location, a query expression - as an sse and as a sparql string,
1410
-and- bindings per iteration,
1411
distinguish local from remote requests and either evaluate the sse pattern or delegate the request
1412
to a remote processor.")
1414
(:method ((service-location spocq:iri) destination query-expression bindings &rest args &key (verbose t) user-id query-text sip-variables)
1415
(declare (dynamic-extent args))
1416
(declare (ignore user-id query-text sip-variables))
1417
(flet ((call-with-repository ()
1418
(let ((repository (service-repository service-location))
1419
(revision (service-repository-revision service-location)))
1420
(apply #'run-sip-service-step repository destination query-expression bindings
1421
:revision-id (or revision "HEAD")
1424
(call-with-repository)
1425
(handler-bind ((error (lambda (c)
1426
(log-warn "service failure: ~a: ~a" service-location c)
1428
(return-from run-sip-service-step
1429
(null-generator (expression-dimensions query-expression))
1430
;; (singleton-generator ())
1431
;; (singleton-generator (expression-dimensions query-expression))
1433
(call-with-repository)))))
1435
(:method ((service-location string) destination query-expression bindings &rest args &key (verbose t) user-id query-text sip-variables)
1436
(declare (dynamic-extent args))
1437
(declare (ignore user-id query-text sip-variables))
1438
(flet ((call-with-repository ()
1439
(let ((repository (service-repository service-location))
1440
(revision (service-repository-revision service-location)))
1441
(apply #'run-sip-service-step repository destination query-expression bindings
1442
:revision-id (or revision "HEAD")
1445
(call-with-repository)
1446
(handler-bind ((error (lambda (c)
1447
(log-warn "service failure: ~a: ~a" service-location c)
1449
(return-from run-sip-service-step
1450
(singleton-generator ())
1451
;; (singleton-generator (expression-dimensions query-expression))
1453
(call-with-repository)))))
1455
(:method ((service-repository service-repository) destination (query-expression cons) bindings &rest args)
1456
"For external service processing, compute a generator which will start an external process to run
1457
the remote query and intern the result soultion stream.
1458
use a query text rewritten to reflect the bindings."
1459
(declare (ignore args))
1460
(authorize-service-request *task* service-repository)
1461
(multiple-value-bind (solutions result-dimensions result-count)
1462
(request-sparql service-repository
1463
(compute-endpoint-sparql-expression service-repository
1467
(new-field-page destination result-count (length (first solutions)))
1468
(put-field-page destination
1469
(if (equal result-dimensions (channel-dimensions destination))
1470
(term-number-field solutions)
1471
(transaction-intern-shuffeled-field *transaction*
1472
solutions result-dimensions
1473
(channel-dimensions destination)))))))
1475
(:method ((service-repository repository-revision) destination (query-expression cons) bindings &rest args)
1476
;; indirectly through the revision to the repository, with the revision ad as an argument
1477
;; will cause the revision to be reconstituted when the query is initialized, but leaves
1478
;; the methods otherwise as-they-are
1479
(declare (dynamic-extent args))
1480
(apply #'run-sip-service-step (repository-revision-reference service-repository) destination query-expression bindings
1481
:revision-id (repository-revision-id service-repository)
1484
(:method ((service-repository repository) destination (query-expression cons) bindings &key (verbose t) query-text sip-variables
1485
(user-id (task-user-id *task*))
1486
(revision-id "HEAD")
1487
(timeout *external-request-timeout*))
1488
"For internal service processing compile the query, if necessary, and return its generator.
1489
In the event of error, unless verbose, suppress it and proceed with a table solution field.
1490
When verbose, decline to handle the error and delegate it to the context."
1491
(declare (ignore sip-variables))
1492
(authorize-service-request *task* service-repository)
1493
;;; this wrecks havoc with variable-directed analyses at the remote end
1494
;;; (setf query-expression (sublis bindings query-expression))
1495
;; for internal use, always perform substitution
1496
(setf query-expression (bind-sparql-expression query-expression bindings))
1497
(flet ((execute-task ()
1498
(let* ((metadata (instance-metadata *task*))
1499
(service-task (make-query :sse-expression query-expression
1500
:id (make-service-task-id)
1501
:dynamic-bindings (cons (mapcar #'first bindings) (mapcar #'rest bindings))
1502
:repository service-repository
1503
:revision-id revision-id
1505
:sparql-expression query-text
1506
:agent (task-agent *task*)
1508
:metadata (when metadata (clone-instance metadata))
1509
:response-content-type (task-response-content-type *task*))))
1510
(with-task-environment (:task service-task)
1511
;;(update-repository-revision-id (task-revision service-task))
1512
(initialize-task service-task)
1513
;; return the generator wrapped to initiate a new query context
1514
(flet ((retask (&rest args)
1515
; interposes an extra thread
1516
(query-run-in-thread service-task args)
1517
'spocq.a:|service|))
1518
(push #'retask (solution-generator-expression (task-result-generator service-task)))
1519
(log-debug "query: wrapped for retask ~a -> ~a" *task* service-task)
1520
;;;!!! reduction requires immediate execution and materialization...
1521
(let* ((service-generator (task-result-generator service-task))
1522
(service-source (solution-generator-channel service-generator)))
1523
;; just copy the pages
1524
; (print (list* :rsss.startdopages (repository-id service-repository) (solution-generator-dimensions service-generator)))
1525
(query-run-in-thread *query* (task-result-generator service-task))
1526
(do-pages (solutions service-source)
1527
;; (print (cons :rsss (field-term-objects solutions)))
1528
(check-query-status *query*)
1529
(let* ((solution-count (array-dimension solutions 0))
1530
(result-page (make-page solution-count (array-dimension solutions 1))))
1531
(copy-page solutions result-page)
1532
(put-field-page destination result-page)
1533
(incf-stat *solutions-processed* solution-count)
1534
(incf-stat *solutions-constructed* solution-count)))
1535
;; (print :rsss.enddopages)
1536
;; no, do not complete it. that is for the caller
1537
;; (complete-field destination)
1539
(handler-bind ((error (lambda (c)
1540
(log-warn "service: request failure: ~a: ~a" service-repository c)
1542
(return-from run-sip-service-step
1543
;;(singleton-generator ())
1544
(singleton-generator (expression-dimensions query-expression))
1546
(bt:with-timeout (timeout)
1549
;; this should not arise in the standard case as the macro expands to the join form given a variable...
1550
(:method ((location symbol) destination query-expression (bindings t) &rest args)
1551
(declare (ignore args))
1552
(null-generator (union-dimensions (list location) (expression-dimensions query-expression)))))
1554
#| degenerate roqet results
1556
(mapcar #'(lambda (name) (intern name *variable-package*))
1557
(split-string line #(#\space #\|))))
1560
for char across line
1565
(sb-ext:RUN-PROGRAM "/usr/local/bin/roqet"
1566
'("-p" "http://dbpedia.org/sparql" "-e"
1567
"select * where { ?film <http://purl.org/dc/elements/1.1/subject> <http://dbpedia.org/resource/Category:French_films> } "
1569
:INPUT NIL :OUTPUT :STREAM :WAIT NIL))
1574
;; from dev.dydra.com
1575
;; (sb-ext:run-program "/development/bin/sparql-query" '("http://1jStOy40J9151En2MsYQ:@wulff.dev/jhacker/functions-strlang03/sparql" "SELECT * WHERE { ?s ?p ?o } LIMIT 10") :input nil :output :stream)
1576
;; (request-sparql "http://dydra.com/jhacker/foaf/sparql" "SELECT count(*) WHERE { ?s ?p ?o }")
1577
;; (request-sparql "http://dydra.com/jhacker/foaf/sparql" "SELECT ?s ?p ?o WHERE { ?s ?p ?o }")
1578
;; (request-sparql "http://dydra.com/jhacker/bnode-coreference-dawg-bnode-coref-001/sparql" "SELECT ?s ?p ?o WHERE { ?s ?p ?o }")
1580
;; roqet -p "http://dydra.com/jhacker/foaf/sparql" -e 'SELECT count(*) WHERE { ?s ?p ?o }' -r table