Coverage report: /development/source/library/org/datagraph/spocq-shard/src/algebra/operators/bindings.lisp
| Kind | Covered | All | % |
| expression | 136 | 506 | 26.9 |
| branch | 4 | 20 | 20.0 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
3
(in-package :org.datagraph.spocq.implementation)
5
(:documentation "This file defines the BINDINGS operator for the 'org.datagraph.spocq' RDF engine."
8
"Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved."))
11
(defmacro spocq.a:|bindings| (solutions variables)
12
"( ( solutionSequence variable* ) solutionField )
13
A BINDINGS form combines a sequence of solutions vith variables as dimensions to
14
produce an unordered solution field, which can be serve as an argument to other query forms.
15
The solution sequence may comprise constant values only.
16
This implement the VALUES expression."
18
(macroexpand-bindings solutions variables))
21
(defun alphabetize-bindings (variables solutions)
22
(let* ((dimensions (sort (copy-list variables) #'string-lessp))
23
(positions (loop for dim in dimensions collect (position dim variables)))
25
(loop for solution in solutions
26
collect (loop for pos in positions
27
collect (nth pos solution)))))
28
(values dimensions ordered-solutions)))
30
(defun macroexpand-bindings (solutions variables)
31
(multiple-value-bind (dimensions ordered-solutions)
32
(alphabetize-bindings variables solutions)
33
`(spocq.e:bindings ',ordered-solutions ',dimensions)))
36
(defgeneric spocq.e:bindings (solutions variables)
37
(:documentation "Transform a bindings specification into a solution field source for combination with other fields.
38
The spec allows no input source and requires constant values.")
40
(:method ((solutions list) (variables list))
41
(incf-stat *algebra-operations*)
42
(trace-algebra spocq.e:bindings solutions variables)
43
(spocq.e:stream-bindings solutions variables))
45
(:method ((solutions function) variables)
46
(spocq.e:bindings (funcall solutions) variables)))
49
(defun spocq.e:stream-bindings (solutions dimensions)
50
"Construct a solution source based on the given constant solution field and
51
return the corresponding generator.
52
Allow for a field, named by dimensions as an argument to the query run-time instance"
54
;; page length and channel depth are not significant, as there is just one solution page
55
(let ((result-channel (make-channel :name (list 'spocq.a:|bindings| (task-id *query*))
56
:dimensions dimensions
57
:size *channel-sliced-size-limit*))
58
(actual-solutions (or (query-values-argument *query* dimensions) solutions)))
59
(labels ((run-bindings-thread (result-channel solutions dimensions)
60
(let ((*thread-operations* (cons (list 'spocq.a:|bindings| solutions dimensions)
61
*thread-operations*)))
62
(process-bindings result-channel solutions dimensions)
63
'spocq.a:|bindings|)))
64
;; return the binding function to the combination operator
65
(unless (eq solutions actual-solutions)
66
(log-debug "bindings: dynamic field substitution: ~s ~s"
67
dimensions actual-solutions))
68
(make-solution-generator :operator 'spocq.a:|bindings|
69
:dimensions dimensions
70
:expression (list #'run-bindings-thread result-channel
73
:channel result-channel
77
(defgeneric process-bindings (continuation solutions dimensions)
79
"Construct a field and emit it to the destination.
80
If an array is supplied, it contains interned term numbers, which a list of solutions
81
is treated a term objects and interned to the array form.")
83
(:method (continuation (solutions cons) (dimensions list))
84
(process-bindings continuation
85
(term-number-field solutions :field-width (length dimensions))
88
(:method (continuation (solutions null) (dimensions list))
89
(process-bindings continuation (make-array (list 0 (length dimensions))) dimensions))
91
(:method ((destination array-page-channel) (solutions array) (dimensions list))
92
(declare (type array solutions))
93
(assert-argument-types process-bindings
94
(destination (or channel function)))
96
(when (> (array-dimension solutions 0) 0)
97
(assert (= (array-dimension solutions 1) (length dimensions)) ()
98
"Inconsistent solutions / dimensions: ~s ~s" (array-dimensions solutions) dimensions))
99
(new-field-page destination (array-dimension solutions 0) (array-dimension solutions 1))
100
(put-field-page destination solutions)
101
(incf-stat *solutions-processed* (array-dimension solutions 0))
102
(incf-stat *solutions-constructed* (array-dimension solutions 0))
103
(complete-field destination)))
109
(defmacro spocq.a::|csv-bindings| (source &optional variables)
110
`(spocq.e::generate-csv-bindings ',source ',variables))
112
(defun spocq.e::generate-csv-bindings (source variables)
113
(let* ((dimensions (sort (copy-list variables) #'string-lessp))
114
(result-channel (make-channel :name (list 'spocq.a::|csv-bindings| (task-id *query*))
115
:dimensions dimensions
116
:size *channel-size-limit*)))
117
(labels ((run-csv-bindings-thread (result-channel source variables)
118
(let ((*thread-operations* (cons (list 'spocq.a::|csv-bindings| source dimensions)
119
*thread-operations*)))
120
(process-csv-bindings result-channel source variables)
121
'spocq.a::|csv-bindings|)))
122
;; return the binding function to the combination operator
123
(make-solution-generator :operator 'spocq.a:|bindings|
124
:dimensions dimensions
125
:expression (list #'run-csv-bindings-thread result-channel
128
:channel result-channel
131
(defgeneric process-csv-bindings (destinaton source variables)
132
(:documentation "Given a csv source and a variable list,
133
parse the source and emit a field with columns schuffled to the equivalent dimensions.
134
Require that the csv stream conain an initial header and that its cardinality
135
match that of the provided variables.
137
Accept as sources a pathname for cached imports and a remote located, both of which
138
are resolved to a stream.")
140
(:method ((destination t) (source pathname) (variables list))
141
(with-open-file (stream source :direction :input)
142
(process-csv-bindings destination stream variables)))
144
(:method ((destination t) (location spocq:url) (variables list))
145
(flet ((process-location (stream)
146
(process-csv-bindings destination stream variables)))
147
(declare (dynamic-extent #'process-location))
148
(hunchentoot::call-with-open-request-stream #'process-location location :accept mime:text/csv)))
150
(:method ((destination array-page-channel) (source stream) (variables list))
151
"use variables to map the columns based on the file header.
152
reiterate the logic in parse-csv, but per line."
153
(let* ((*parse-csv.initial-line* (read-line source)))
154
(multiple-value-bind (field-names separator)
155
(parse-csv-header *parse-csv.initial-line*)
156
(let ((header-variables (mapcar #'csv-variable-name field-names)))
157
(assert (= (length header-variables) (length variables)) ()
158
"process-csv-bindings: header does not match query '~{~a~^, ~}’ ~{~a~^, ~}’" header-variables variables)
159
(let* ((dimensions (channel-dimensions destination))
160
(schuffle? (not (equal variables dimensions)))
163
(page-size (channel-size destination)))
164
(labels ((parse-csv-solution (line)
165
(mapcar #'(lambda (term-string) (or (parse-csv-term term-string)
166
(load-time-value (spocq:make-unbound-variable nil))))
167
(parse-csv line :start-name 'odcsv::|recordfields| :separator separator)))
168
(read-csv-line (stream)
169
(let ((line (read-line stream nil nil)))
171
(string-trim #(#\return #\newline #\space) line))))
172
(emit-page (solutions)
175
(transaction-intern-shuffeled-field *transaction* (nreverse solutions) variables dimensions)
176
(term-number-field (nreverse solutions)
177
:field-width (length dimensions)))))
178
;; prepare for a ne wpage and then substitute the local one
179
(new-field-page destination (array-dimension new-page 0) (array-dimension new-page 1))
180
(put-field-page destination new-page)
181
(incf-stat *solutions-processed* (array-dimension new-page 0))
182
(incf-stat *solutions-constructed* (array-dimension new-page 0))
185
(loop for csv-line = (read-csv-line source)
187
unless (zerop (length csv-line))
188
do (let ((solution (parse-csv-solution csv-line)))
189
(push solution solutions)
190
(when (>= (incf solution-count) page-size)
191
(emit-page solutions))))
192
(when (plusp solution-count)
193
(emit-page solutions)))
194
(complete-field destination)))))))
200
(defmacro spocq.a::|json-bindings| (source &optional variables)
201
`(spocq.e::generate-json-bindings ',source ',variables))
203
(defun spocq.e::generate-json-bindings (source variables)
204
(let* ((dimensions (sort (copy-list variables) #'string-lessp))
205
(result-channel (make-channel :name (list 'spocq.a::|json-bindings| (task-id *query*))
206
:dimensions dimensions
207
:size *channel-size-limit*)))
208
(labels ((run-json-bindings-thread (result-channel source variables)
209
(let ((*thread-operations* (cons (list 'spocq.a::|json-bindings| source dimensions)
210
*thread-operations*)))
211
(process-json-bindings result-channel source variables)
212
'spocq.a::|json-bindings|)))
213
;; return the binding function to the combination operator
214
(make-solution-generator :operator 'spocq.a:|bindings|
215
:dimensions dimensions
216
:expression (list #'run-json-bindings-thread result-channel
219
:channel result-channel
222
(defgeneric process-json-bindings (destinaton source variables)
223
(:documentation "Given a json source and a variable list,
224
parse the source and emit a field with columns schuffled to the equivalent dimensions.
225
Require that the csv stream conain an initial header and that its cardinality
226
match that of the provided variables.
228
Accept as sources a pathname for cached imports and a remote located, both of which
229
are resolved to a stream.")
231
(:method ((destination t) (source pathname) (variables list))
232
(with-open-file (stream source :direction :input)
233
(process-json-bindings destination stream variables)))
235
(:method ((destination t) (location spocq:url) (variables list))
236
(flet ((process-location (stream)
237
(process-json-bindings destination stream variables)))
238
(declare (dynamic-extent #'process-location))
239
(hunchentoot::call-with-open-request-stream #'process-location location :accept mime:text/csv)))
241
(:method ((destination array-page-channel) (source stream) (variables list))
242
"use variables to map the columns based on the file header."
243
(let* ((dimensions (channel-dimensions destination)))
244
(labels ((emit-page (new-page)
245
;; prepare for a new page and then substitute the local one
246
(new-field-page destination (array-dimension new-page 0) (array-dimension new-page 1))
247
(put-field-page destination new-page)
248
(incf-stat *solutions-processed* (array-dimension new-page 0))
249
(incf-stat *solutions-constructed* (array-dimension new-page 0))))
250
(declare (dynamic-extent #'emit-page))
251
(call-with-json-field #'emit-page source dimensions :page-size (channel-size destination))
252
(complete-field destination)))))
254
(defmethod call-with-json-field (op (source stream) (dimensions list) &key (page-size *channel-size-limit*))
255
(let* ((solutions ())
257
(unbound (spocq:make-unbound-variable nil))
258
(buffer (make-array 1024 :element-type 'character :adjustable t :fill-pointer 0)))
259
(labels ((parse-json-object (buffer)
260
(parse-json buffer :start-name 'dsj::|Object|))
261
(read-json-object (stream)
262
(setf (fill-pointer buffer) 0)
263
(peek-char #\{ stream)
264
(loop for char = (read-char stream)
265
do (vector-push-extend char buffer)
266
until (eql char #\}))
267
(parse-json-object buffer))
268
(emit-page (solutions)
269
(let ((new-page (term-number-field (nreverse solutions)
270
:field-width (length dimensions))))
271
(funcall op new-page)
274
(object-field-value (object variable)
275
(loop for (name . value) in object
276
for end = (or (position-if-not #'alphanumericp name) (length name))
277
when (string-equal variable name :end2 end)
279
(translate-json-object (object)
280
(loop for variable in dimensions
281
collect (or (object-field-value object variable) unbound))))
282
(loop for object = (read-json-object source)
284
do (let ((solution (translate-json-object object)))
285
(push solution solutions)
286
(when (>= (incf solution-count) page-size)
287
(emit-page solutions))
288
;; skip to next or eof
289
(case (peek-char t source t nil)
291
(#\, (read-char source)))))
292
(when (plusp solution-count)
293
(emit-page solutions)))))
300
(defgeneric generate-*aeql-view (source &key stream default separator)
301
(:method ((source string) &rest args)
302
(with-input-from-string (stream source)
303
(apply #'generate-*aeql-view stream args)))
304
(:method ((source pathname) &rest args)
305
(with-open-file (stream source :direction :input)
306
(apply #'generate-*aeql-view stream args)))
308
(:method ((source stream) &key (stream *standard-output*) (default "http://dydra.com") (separator #\,))
311
prefix dydra: <http://dydra.com/sparql-functions#>
312
prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
313
prefix xsd: <http://www.w3.org/2001/XMLSchema>
316
(let* ((header (read-line source))
317
(columns (split-string header (vector separator #\newline #\return #\")))
318
(names (loop for column in columns
319
collect (csv-variable-name column))))
320
(loop for name in names
321
do (format stream "?uuid :~a ?~a~%" name name))
326
bind(uuid() as ?uuid)
329
(with-open-file (query #p"/tmp/Accounts2.rq" :direction :output :if-exists :supersede :if-does-not-exist :create)
330
(generate-*aeql-view #p"/root/imports/aeb/Accounts2.csv" :separator #\, :stream query :default "http://dydra.com/aeb/z#"))
331
(with-open-file (query #p"/tmp/Contacts.rq" :direction :output :if-exists :supersede :if-does-not-exist :create)
332
(generate-*aeql-view #p"/root/imports/aeb/Contacts.csv" :separator #\; :stream query :default "http://dydra.com/aeb/z#"))
333
(with-open-file (query #p"/tmp/Contact.rq" :direction :output :if-exists :supersede :if-does-not-exist :create)
334
(generate-*aeql-view #p"/root/imports/aeb/CustomerAccounts_20201021112424/Contact_20201021092425.csv" :separator #\, :stream query :default "http://dydra.com/aeb/sf#"))
335
(with-open-file (query #p"/tmp/CustomerAccount.rq" :direction :output :if-exists :supersede :if-does-not-exist :create)
336
(generate-*aeql-view #p"/root/imports/aeb/CustomerAccounts_20201021112424/CustomerAccount_20201021092425.csv" :separator #\, :stream query :default "http://dydra.com/aeb/sf#"))
337
(with-open-file (query #p"/tmp/PaymentMethod.rq" :direction :output :if-exists :supersede :if-does-not-exist :create)
338
(generate-*aeql-view #p"/root/imports/aeb/CustomerAccounts_20201021112424/PaymentMethod_20201021092425.csv" :separator #\, :stream query :default "http://dydra.com/aeb/sf#"))
341
(with-input-from-string (stream "[{\"asdf\": 1}, {\"asdf\": 2, \"qwer\": 3} ]")
342
(call-with-json-field #'print stream '(?::asdf ?::qwer)))
344
(with-input-from-string (stream "combuchadata.json")
345
(call-with-json-field #'print stream '(?::asdf ?::qwer)))
350
values (?a ?s) {(1 2) ('a' 's')}
356
values (?a ?s) {(1 2) ('a' 's')}
366
;; the intended use is to employ construct queries
368
'(ORG.DATAGRAPH.SPOCQ.ALGEBRA:|construct|
369
(ORG.DATAGRAPH.SPOCQ.ALGEBRA:|extend|
370
(spocq.a::|csv-bindings| #p"/tmp/b.csv" (?::|b|))
372
((ORG.DATAGRAPH.SPOCQ.ALGEBRA:|triple| ?::|b|
373
|http://www.w3.org/1999/02/22-rdf-syntax-ns#|:|type|
376
:repository-id "james/test"
377
:response-content-type mime:application/n-quads)
380
;;; as a select is also possible, but does not contribute to importing
382
'(ORG.DATAGRAPH.SPOCQ.ALGEBRA:|select|
383
(ORG.DATAGRAPH.SPOCQ.ALGEBRA:|extend|
384
;; (ORG.DATAGRAPH.SPOCQ.ALGEBRA:|table| ORG.DATAGRAPH.SPOCQ.ALGEBRA:|unit|)
385
(spocq.a::|csv-bindings| #p"/tmp/b.csv" (?::|a|))
387
(?::|b|)) *standard-output*
388
:repository-id "james/test"
389
:response-content-type mime:application/sparql-results+json)
391
"construct {?buri a ?auri}
393
{ select ?a ?b where {} }
394
bind( uri(?a) as ?auri)
395
bind( uri(?b) as ?buri)
400
'(ORG.DATAGRAPH.SPOCQ.ALGEBRA:|construct|
401
(ORG.DATAGRAPH.SPOCQ.ALGEBRA:|extend|
402
(ORG.DATAGRAPH.SPOCQ.ALGEBRA:|extend|
403
(spocq.a::|csv-bindings| #p"/tmp/b.csv" (?::|a| ?::|b|))
404
?::|auri| (ORG.DATAGRAPH.SPOCQ.ALGEBRA:|uri| ?::|a|))
405
?::|buri| (ORG.DATAGRAPH.SPOCQ.ALGEBRA:|uri| ?::|b|))
406
((ORG.DATAGRAPH.SPOCQ.ALGEBRA:|triple| ?::|buri|
407
|http://www.w3.org/1999/02/22-rdf-syntax-ns#|:|type|
410
:repository-id "james/test"
411
:response-content-type mime:application/sparql-results+json)