Coverage report: /development/source/library/org/datagraph/spocq-shard/src/algebra/operators/bindings.lisp

KindCoveredAll%
expression136506 26.9
branch420 20.0
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
2
 
3
 (in-package :org.datagraph.spocq.implementation)
4
 
5
 (:documentation "This file defines the BINDINGS operator for the 'org.datagraph.spocq' RDF engine."
6
 
7
  (copyright
8
   "Copyright 2010 [james anderson](mailto:james.anderson@setf.de) All Rights Reserved."))
9
 
10
 
11
 (defmacro spocq.a:|bindings| (solutions variables)
12
   "( ( solutionSequence variable* ) solutionField )
13
 A BINDINGS form combines a sequence of solutions vith variables as dimensions to
14
 produce an unordered solution field, which can be serve as an argument to other query forms.
15
 The solution sequence may comprise constant values only.
16
 This implement the VALUES expression."
17
 
18
   (macroexpand-bindings solutions variables))
19
 
20
 
21
 (defun alphabetize-bindings (variables solutions)
22
   (let* ((dimensions (sort (copy-list variables) #'string-lessp))
23
          (positions (loop for dim in dimensions collect (position dim variables)))
24
          (ordered-solutions
25
           (loop for solution in solutions
26
             collect (loop for pos in positions
27
                       collect (nth pos solution)))))
28
     (values dimensions ordered-solutions)))
29
 
30
 (defun macroexpand-bindings (solutions variables)
31
   (multiple-value-bind (dimensions ordered-solutions)
32
                        (alphabetize-bindings variables solutions)
33
     `(spocq.e:bindings ',ordered-solutions ',dimensions)))
34
 
35
 
36
 (defgeneric spocq.e:bindings (solutions variables)
37
   (:documentation "Transform a bindings specification into a solution field source for combination with other fields.
38
  The spec allows no input source and requires constant values.")
39
 
40
   (:method ((solutions list) (variables list))
41
     (incf-stat *algebra-operations*)
42
     (trace-algebra spocq.e:bindings solutions variables)
43
     (spocq.e:stream-bindings solutions variables))
44
 
45
    (:method ((solutions function) variables)
46
     (spocq.e:bindings (funcall solutions) variables)))
47
 
48
 
49
 (defun spocq.e:stream-bindings (solutions dimensions)
50
   "Construct a solution source based on the given constant solution field and
51
  return the corresponding generator.
52
  Allow for a field, named by dimensions as an argument to the query run-time instance"
53
 
54
   ;; page length and channel depth are not significant, as there is just one solution page
55
   (let ((result-channel (make-channel :name (list 'spocq.a:|bindings| (task-id *query*))
56
                                       :dimensions dimensions
57
                                       :size *channel-sliced-size-limit*))
58
         (actual-solutions (or (query-values-argument *query* dimensions) solutions)))
59
     (labels ((run-bindings-thread (result-channel solutions dimensions)
60
                (let ((*thread-operations* (cons (list 'spocq.a:|bindings| solutions dimensions)
61
                                                 *thread-operations*)))
62
                    (process-bindings result-channel solutions dimensions)
63
                    'spocq.a:|bindings|)))
64
       ;; return the binding function to the combination operator
65
       (unless (eq solutions actual-solutions)
66
         (log-debug "bindings: dynamic field substitution: ~s ~s"
67
                    dimensions actual-solutions))
68
       (make-solution-generator :operator 'spocq.a:|bindings|
69
                                :dimensions dimensions
70
                                :expression (list #'run-bindings-thread result-channel
71
                                                  actual-solutions
72
                                                  dimensions)
73
                                :channel result-channel
74
                                :constituents ()))))
75
 
76
 
77
 (defgeneric process-bindings (continuation solutions dimensions)
78
   (:documentation
79
    "Construct a field and emit it to the destination.
80
     If an array is supplied, it contains interned term numbers, which a list of solutions
81
     is treated a term objects and interned to  the array form.")
82
 
83
   (:method (continuation (solutions cons) (dimensions list))
84
     (process-bindings continuation
85
                       (term-number-field solutions :field-width (length dimensions))
86
                       dimensions))
87
 
88
   (:method (continuation (solutions null) (dimensions list))
89
     (process-bindings continuation (make-array (list 0 (length dimensions))) dimensions))
90
 
91
   (:method ((destination array-page-channel) (solutions array) (dimensions list))
92
     (declare (type array solutions))
93
     (assert-argument-types process-bindings
94
       (destination (or channel function)))
95
 
96
     (when (> (array-dimension solutions 0) 0)
97
       (assert (= (array-dimension solutions 1) (length dimensions)) ()
98
               "Inconsistent solutions / dimensions: ~s ~s" (array-dimensions solutions) dimensions))
99
     (new-field-page destination (array-dimension solutions 0) (array-dimension solutions 1))
100
     (put-field-page destination solutions)
101
     (incf-stat *solutions-processed* (array-dimension solutions 0))
102
     (incf-stat *solutions-constructed* (array-dimension solutions 0))
103
     (complete-field destination)))
104
 
105
 
106
 ;;; csv-bindings
107
 ;;;
108
 
109
 (defmacro spocq.a::|csv-bindings| (source &optional variables)
110
   `(spocq.e::generate-csv-bindings ',source ',variables))
111
 
112
 (defun spocq.e::generate-csv-bindings (source variables)
113
   (let* ((dimensions (sort (copy-list variables) #'string-lessp))
114
          (result-channel (make-channel :name (list 'spocq.a::|csv-bindings| (task-id *query*))
115
                                        :dimensions dimensions
116
                                        :size *channel-size-limit*)))
117
     (labels ((run-csv-bindings-thread (result-channel source variables)
118
                (let ((*thread-operations* (cons (list 'spocq.a::|csv-bindings| source dimensions)
119
                                                 *thread-operations*)))
120
                  (process-csv-bindings result-channel source variables)
121
                  'spocq.a::|csv-bindings|)))
122
       ;; return the binding function to the combination operator
123
       (make-solution-generator :operator 'spocq.a:|bindings|
124
                                :dimensions dimensions
125
                                :expression (list #'run-csv-bindings-thread result-channel
126
                                                  source
127
                                                  variables)
128
                                :channel result-channel
129
                                :constituents ()))))
130
 
131
 (defgeneric process-csv-bindings (destinaton source variables)
132
   (:documentation "Given a csv source and a variable list,
133
    parse the source and emit a field with columns schuffled to the equivalent dimensions.
134
    Require that the csv stream conain an initial header and that its cardinality
135
    match that of the provided variables.
136
 
137
    Accept as sources a pathname for cached imports and a remote located, both of which
138
    are resolved to a stream.")
139
 
140
   (:method ((destination t) (source pathname) (variables list))
141
     (with-open-file (stream source :direction :input)
142
       (process-csv-bindings destination stream variables)))
143
 
144
   (:method ((destination t) (location spocq:url) (variables list))
145
     (flet ((process-location (stream)
146
              (process-csv-bindings destination stream variables)))
147
       (declare (dynamic-extent #'process-location))
148
       (hunchentoot::call-with-open-request-stream #'process-location location :accept mime:text/csv)))
149
 
150
   (:method ((destination array-page-channel) (source stream) (variables list))
151
     "use variables to map the columns based on the file header.
152
     reiterate the logic in parse-csv, but per line."
153
     (let* ((*parse-csv.initial-line* (read-line source)))
154
       (multiple-value-bind (field-names separator)
155
                            (parse-csv-header *parse-csv.initial-line*)
156
         (let ((header-variables (mapcar #'csv-variable-name field-names)))
157
           (assert (= (length header-variables) (length variables)) ()
158
                   "process-csv-bindings: header does not match query '~{~a~^, ~}’ ~{~a~^, ~}’" header-variables variables)
159
           (let* ((dimensions (channel-dimensions destination))
160
                  (schuffle? (not (equal variables dimensions)))
161
                  (solutions ())
162
                  (solution-count 0)
163
                  (page-size (channel-size destination)))
164
             (labels ((parse-csv-solution (line)
165
                        (mapcar #'(lambda (term-string) (or (parse-csv-term term-string) 
166
                                                            (load-time-value (spocq:make-unbound-variable nil))))
167
                                (parse-csv line :start-name 'odcsv::|recordfields| :separator separator)))
168
                      (read-csv-line (stream)
169
                        (let ((line (read-line stream nil nil)))
170
                          (when line
171
                            (string-trim #(#\return #\newline #\space) line))))
172
                      (emit-page (solutions)
173
                        (let ((new-page
174
                               (if schuffle?
175
                                   (transaction-intern-shuffeled-field *transaction* (nreverse solutions) variables dimensions)
176
                                   (term-number-field (nreverse solutions)
177
                                                      :field-width (length dimensions)))))
178
                          ;; prepare for a ne wpage and then substitute the local one
179
                          (new-field-page destination (array-dimension new-page 0) (array-dimension new-page 1))
180
                          (put-field-page destination new-page)
181
                          (incf-stat *solutions-processed* (array-dimension new-page 0))
182
                          (incf-stat *solutions-constructed* (array-dimension new-page 0))
183
                          (setf solutions '()
184
                                solution-count 0))))
185
               (loop for csv-line = (read-csv-line source)
186
                 while csv-line
187
                 unless (zerop (length csv-line))
188
                 do (let ((solution (parse-csv-solution csv-line)))
189
                      (push solution solutions)
190
                      (when (>= (incf solution-count) page-size)
191
                        (emit-page solutions))))
192
               (when (plusp solution-count)
193
                 (emit-page solutions)))
194
             (complete-field destination)))))))
195
 
196
 
197
 ;;; json-bindings
198
 ;;;
199
 
200
 (defmacro spocq.a::|json-bindings| (source &optional variables)
201
   `(spocq.e::generate-json-bindings ',source ',variables))
202
 
203
 (defun spocq.e::generate-json-bindings (source variables)
204
   (let* ((dimensions (sort (copy-list variables) #'string-lessp))
205
          (result-channel (make-channel :name (list 'spocq.a::|json-bindings| (task-id *query*))
206
                                        :dimensions dimensions
207
                                        :size *channel-size-limit*)))
208
     (labels ((run-json-bindings-thread (result-channel source variables)
209
                (let ((*thread-operations* (cons (list 'spocq.a::|json-bindings| source dimensions)
210
                                                 *thread-operations*)))
211
                  (process-json-bindings result-channel source variables)
212
                  'spocq.a::|json-bindings|)))
213
       ;; return the binding function to the combination operator
214
       (make-solution-generator :operator 'spocq.a:|bindings|
215
                                :dimensions dimensions
216
                                :expression (list #'run-json-bindings-thread result-channel
217
                                                  source
218
                                                  variables)
219
                                :channel result-channel
220
                                :constituents ()))))
221
 
222
 (defgeneric process-json-bindings (destinaton source variables)
223
   (:documentation "Given a json source and a variable list,
224
    parse the source and emit a field with columns schuffled to the equivalent dimensions.
225
    Require that the csv stream conain an initial header and that its cardinality
226
    match that of the provided variables.
227
 
228
    Accept as sources a pathname for cached imports and a remote located, both of which
229
    are resolved to a stream.")
230
 
231
   (:method ((destination t) (source pathname) (variables list))
232
     (with-open-file (stream source :direction :input)
233
       (process-json-bindings destination stream variables)))
234
 
235
   (:method ((destination t) (location spocq:url) (variables list))
236
     (flet ((process-location (stream)
237
              (process-json-bindings destination stream variables)))
238
       (declare (dynamic-extent #'process-location))
239
       (hunchentoot::call-with-open-request-stream #'process-location location :accept mime:text/csv)))
240
 
241
   (:method ((destination array-page-channel) (source stream) (variables list))
242
     "use variables to map the columns based on the file header."
243
     (let* ((dimensions (channel-dimensions destination)))
244
       (labels ((emit-page (new-page)
245
                  ;; prepare for a new page and then substitute the local one
246
                  (new-field-page destination (array-dimension new-page 0) (array-dimension new-page 1))
247
                  (put-field-page destination new-page)
248
                  (incf-stat *solutions-processed* (array-dimension new-page 0))
249
                  (incf-stat *solutions-constructed* (array-dimension new-page 0))))
250
         (declare (dynamic-extent #'emit-page))
251
         (call-with-json-field #'emit-page source dimensions :page-size (channel-size destination))
252
         (complete-field destination)))))
253
 
254
 (defmethod call-with-json-field (op (source stream) (dimensions list) &key (page-size *channel-size-limit*))
255
   (let* ((solutions ())
256
          (solution-count 0)
257
          (unbound (spocq:make-unbound-variable nil))
258
          (buffer (make-array 1024 :element-type 'character :adjustable t :fill-pointer 0)))
259
     (labels ((parse-json-object (buffer)
260
                (parse-json buffer :start-name 'dsj::|Object|))
261
              (read-json-object (stream)
262
                (setf (fill-pointer buffer) 0)
263
                (peek-char #\{ stream)
264
                (loop for char = (read-char stream)
265
                  do (vector-push-extend char buffer)
266
                  until (eql char #\}))
267
                (parse-json-object buffer))
268
              (emit-page (solutions)
269
                (let ((new-page (term-number-field (nreverse solutions)
270
                                                   :field-width (length dimensions))))
271
                  (funcall op new-page)
272
                  (setf solutions '()
273
                        solution-count 0)))
274
              (object-field-value (object variable)
275
                (loop for (name . value) in object
276
                  for end = (or (position-if-not #'alphanumericp name) (length name))
277
                  when (string-equal variable name :end2 end)
278
                  return value))
279
              (translate-json-object (object)
280
                (loop for variable in dimensions
281
                  collect (or (object-field-value object variable) unbound))))
282
       (loop for object = (read-json-object source)
283
         while object
284
         do (let ((solution (translate-json-object object)))
285
              (push solution solutions)
286
              (when (>= (incf solution-count) page-size)
287
                (emit-page solutions))
288
              ;; skip to next or eof
289
              (case (peek-char t source t nil)
290
                ((#\] nil) (return))
291
                (#\, (read-char source)))))
292
       (when (plusp solution-count)
293
             (emit-page solutions)))))
294
 
295
 
296
 
297
 
298
 #+(or)
299
 (
300
 (defgeneric generate-*aeql-view (source &key stream default separator)
301
   (:method ((source string) &rest args)
302
     (with-input-from-string (stream source)
303
       (apply #'generate-*aeql-view stream args)))
304
   (:method ((source pathname) &rest args)
305
     (with-open-file (stream source :direction :input)
306
       (apply #'generate-*aeql-view stream args)))
307
 
308
   (:method ((source stream) &key (stream *standard-output*) (default "http://dydra.com") (separator #\,))
309
     (format stream "##
310
 prefix : <~a>
311
 prefix dydra: <http://dydra.com/sparql-functions#>
312
 prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
313
 prefix xsd: <http://www.w3.org/2001/XMLSchema>
314
 construct {
315
 "  default)
316
     (let* ((header (read-line source))
317
            (columns (split-string header (vector separator #\newline #\return #\")))
318
            (names (loop for column in columns
319
                     collect (csv-variable-name column))))
320
       (loop for name in names
321
         do (format stream "?uuid :~a ?~a~%" name name))
322
       (format stream "      }
323
 where {
324
  { select ~{?~a ~}
325
    where {} }
326
  bind(uuid() as ?uuid)
327
 }
328
         " names))))
329
 (with-open-file (query #p"/tmp/Accounts2.rq" :direction :output :if-exists :supersede :if-does-not-exist :create)
330
   (generate-*aeql-view #p"/root/imports/aeb/Accounts2.csv" :separator #\, :stream query :default "http://dydra.com/aeb/z#"))
331
 (with-open-file (query #p"/tmp/Contacts.rq" :direction :output :if-exists :supersede :if-does-not-exist :create)
332
   (generate-*aeql-view #p"/root/imports/aeb/Contacts.csv" :separator #\; :stream query :default "http://dydra.com/aeb/z#"))
333
 (with-open-file (query #p"/tmp/Contact.rq" :direction :output :if-exists :supersede :if-does-not-exist :create)
334
   (generate-*aeql-view #p"/root/imports/aeb/CustomerAccounts_20201021112424/Contact_20201021092425.csv" :separator #\, :stream query :default "http://dydra.com/aeb/sf#"))
335
 (with-open-file (query #p"/tmp/CustomerAccount.rq" :direction :output :if-exists :supersede :if-does-not-exist :create)
336
   (generate-*aeql-view #p"/root/imports/aeb/CustomerAccounts_20201021112424/CustomerAccount_20201021092425.csv" :separator #\, :stream query :default "http://dydra.com/aeb/sf#"))
337
 (with-open-file (query #p"/tmp/PaymentMethod.rq" :direction :output :if-exists :supersede :if-does-not-exist :create)
338
   (generate-*aeql-view #p"/root/imports/aeb/CustomerAccounts_20201021112424/PaymentMethod_20201021092425.csv" :separator #\, :stream query :default "http://dydra.com/aeb/sf#"))
339
 
340
 
341
  (with-input-from-string (stream "[{\"asdf\": 1}, {\"asdf\": 2, \"qwer\": 3} ]")
342
    (call-with-json-field #'print stream '(?::asdf ?::qwer)))
343
 
344
 (with-input-from-string (stream "combuchadata.json")
345
    (call-with-json-field #'print stream '(?::asdf ?::qwer)))
346
 
347
 (parse-sparql "
348
 select *
349
 where {
350
   values (?a ?s) {(1 2) ('a' 's')}
351
 }")
352
 
353
 (parse-sparql "
354
 select *
355
 where {
356
   values (?a ?s) {(1 2) ('a' 's')}
357
 }")
358
 
359
 (parse-sparql "
360
 construct {?b a ?a}
361
 where {
362
   
363
   bind( ?b as ?a)
364
 }")
365
 
366
 ;; the intended use is to employ construct queries
367
 (spocq.i::pipe-query
368
  '(ORG.DATAGRAPH.SPOCQ.ALGEBRA:|construct|
369
  (ORG.DATAGRAPH.SPOCQ.ALGEBRA:|extend|
370
   (spocq.a::|csv-bindings| #p"/tmp/b.csv" (?::|b|))
371
   ?::|a| ?::|b|)
372
  ((ORG.DATAGRAPH.SPOCQ.ALGEBRA:|triple| ?::|b|
373
                                         |http://www.w3.org/1999/02/22-rdf-syntax-ns#|:|type|
374
                                         ?::|a|)))
375
  *standard-output*
376
  :repository-id "james/test"
377
  :response-content-type mime:application/n-quads)
378
 
379
 
380
 ;;; as a select is also possible, but does not contribute to importing
381
 (spocq.i::pipe-query
382
  '(ORG.DATAGRAPH.SPOCQ.ALGEBRA:|select|
383
  (ORG.DATAGRAPH.SPOCQ.ALGEBRA:|extend|
384
   ;; (ORG.DATAGRAPH.SPOCQ.ALGEBRA:|table| ORG.DATAGRAPH.SPOCQ.ALGEBRA:|unit|)
385
   (spocq.a::|csv-bindings| #p"/tmp/b.csv" (?::|a|))
386
   ?::|b| ?::|a|)
387
  (?::|b|)) *standard-output*
388
  :repository-id "james/test"
389
  :response-content-type mime:application/sparql-results+json)
390
 
391
 "construct {?buri a ?auri}
392
 where {
393
  { select ?a ?b where {} }
394
  bind( uri(?a) as ?auri)
395
  bind( uri(?b) as ?buri)
396
 }
397
 "
398
 
399
 (spocq.i::pipe-query
400
  '(ORG.DATAGRAPH.SPOCQ.ALGEBRA:|construct|
401
  (ORG.DATAGRAPH.SPOCQ.ALGEBRA:|extend|
402
   (ORG.DATAGRAPH.SPOCQ.ALGEBRA:|extend|
403
    (spocq.a::|csv-bindings| #p"/tmp/b.csv" (?::|a| ?::|b|))
404
    ?::|auri| (ORG.DATAGRAPH.SPOCQ.ALGEBRA:|uri| ?::|a|))
405
   ?::|buri| (ORG.DATAGRAPH.SPOCQ.ALGEBRA:|uri| ?::|b|))
406
  ((ORG.DATAGRAPH.SPOCQ.ALGEBRA:|triple| ?::|buri|
407
                                         |http://www.w3.org/1999/02/22-rdf-syntax-ns#|:|type|
408
                                         ?::|auri|)))
409
  *standard-output*
410
  :repository-id "james/test"
411
  :response-content-type mime:application/sparql-results+json)
412
 
413
 
414
 )����