Coverage report: /development/source/library/org/datagraph/spocq-shard/src/core/encoding/ssf-text.lisp

KindCoveredAll%
expression01042 0.0
branch070 0.0
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
2
 
3
 (in-package :org.datagraph.spocq.implementation)
4
 
5
 ;;; ssf reader/serializer
6
 
7
 
8
 (defun unescape-sse-string (string)
9
   (let ((from (position #\\ string)))
10
       (if from
11
         (let ((buffer (make-string (length string) :element-type 'character))
12
               (end (length string))
13
               (to from)
14
               (char #\null))
15
           (replace buffer string :end1 from :end2 from)
16
           (flet ((unicode-char (length)
17
                    (incf from)
18
                    (prog1 (code-char (parse-integer string :start from :end (+ from length) :radix 16))
19
                      (incf from (1- length)))))
20
             (loop (setf (char buffer to)
21
                         (case (setf char (char string (incf from)))
22
                           (#\t #\tab)
23
                           (#\b #\backspace)
24
                           (#\n #\linefeed)
25
                           (#\r #\return)
26
                           (#\f #\formfeed)
27
                           (#\u (unicode-char 4))
28
                           (#\U (unicode-char 8))
29
                           (t char)))
30
                   (loop (incf to)
31
                         (incf from)
32
                         (when (>= from end) (return))
33
                         (case (setf char (char string from))
34
                           (#\\ (return))
35
                           (t (setf (char buffer to) char))))
36
                   (when (>= from end) (return)))
37
             (subseq buffer 0 to)))
38
         string)))
39
 ;;; (unescape-sse-string "aa\\\"\\'\\u0020\\U00000040")
40
   
41
 
42
 ;;; reading
43
 
44
 (defgeneric read-ssf (stream)
45
   (:method ((stream stream))
46
     (let ((*readtable* *sse-readtable*)
47
           (*package* (find-package :spocq.a)))
48
       (read stream)))
49
   (:method ((source pathname))
50
     (with-open-file (stream source :direction :input)
51
       (read-ssf stream)))
52
   (:method ((source string))
53
     (with-input-from-string (stream source)
54
       (read-ssf stream))))
55
 
56
 (defun read-blank-node (stream char)
57
   (labels ((delimiter-p (c) (find c #(#\space #\tab #\return #\linefeed #\page #\nul #\))))
58
            (non-delimiter-p (c) (not (delimiter-p c))))
59
     (declare (dynamic-extent #'delimiter-p #'non-delimiter-p))
60
     (setf char (read-char stream))
61
     (cond ((delimiter-p char)
62
            'spocq.a::_)
63
           ((eql char #\:)
64
            (let* ((symbol-name (read-buffer stream #'non-delimiter-p)))
65
              (intern-blank-node symbol-name)))
66
           (t
67
            (error "invalid blank node syntax: ~s" char)))))
68
 
69
 
70
 (defun read-literal (stream char)
71
   (let ((escaped nil))
72
     (flet ((non-delimiter-p (c)
73
              (cond ((shiftf escaped nil) t)
74
                    ((eql c #\\) (setf escaped t))
75
                    ((eql c char) nil)
76
                    (t t))))
77
       (declare (dynamic-extent #'non-delimiter-p))
78
       (let ((literal-string (unescape-sse-string (read-buffer stream #'non-delimiter-p))))
79
         (read-char stream)
80
         (case (peek-char nil stream nil nil)
81
           (#\^
82
            (read-char stream)
83
            (read-char stream)
84
            (assert (eql #\< (read-char stream)) () "invalid rdf literal syntax.")
85
            (let ((type (read-buffer stream #'(lambda (c) (not (eql c #\>))))))
86
              (read-char stream)
87
              (intern-literal literal-string type)))
88
           (#\@
89
            (read-char stream)
90
            (let ((tag (read stream)))
91
              (assert (symbolp tag) () "invalid rdf literal syntax.")
92
              (when (zerop (length literal-string)) (setf literal-string (make-string 0)))
93
              (intern-plain-literal literal-string tag)))
94
           (t
95
            literal-string))))))
96
 
97
 
98
 (defun read-or (stream char)
99
   (declare (ignore char))
100
   (assert (eql #\| (read-char stream)) () "invalid || syntax")
101
   'spocq.a:\|\|)
102
 
103
 (defun read-vector (stream char)
104
   (declare (ignore char))
105
   (let ((vector (make-array 4 :adjustable t :fill-pointer 0)))
106
     (loop (when (eql (peek-char t stream) #\]) (return vector))
107
           (vector-push-extend (read stream t nil t) vector))))
108
 
109
 (defun read-character-error (stream char)
110
   (error "Extra ~c in input from ~a." char stream))
111
 
112
 
113
 ;;; writing
114
 
115
 (defgeneric write-ssf (object stream)
116
   (:method ((object t) (stream stream))
117
     (let ((*readtable* *sse-readtable*)
118
           (*package* (find-package :spocq.a)))
119
       (encode-ssf-object object stream)))
120
   (:method (object (source pathname))
121
     (with-open-file (stream source :direction :output :if-does-not-exist :create :if-exists :supersede)
122
       (write-ssf object stream)))
123
   (:method (object (destination string))
124
     (setf (fill-pointer destination) 0)
125
     (with-output-to-string (stream destination)
126
       (write-ssf object stream))))
127
 
128
 
129
 (defun cl-user::format-ssf-object (stream object &optional colon at var)
130
   (declare (ignore colon at var))
131
   (encode-ssf-object object stream))
132
 
133
 (defun cl-user::format-sse-string (stream string &optional colon at var)
134
   (declare (ignore colon at var))
135
   (loop for char across string
136
         do (case char
137
              (#\" (write-string "\\\"" stream))
138
              (#\' (write-string "\\'" stream))
139
              (#\\ (write-string "\\\\" stream))
140
              (#\tab (write-string "\\t" stream))
141
              (#\backspace (write-string "\\b" stream))
142
              (#\linefeed (write-string "\\n" stream))
143
              (#\return (write-string "\\r" stream))
144
              (#\formfeed (write-string "\\f" stream))
145
              (t
146
               (write-char char stream)))))
147
 
148
 (defgeneric cl-user::format-sse-iri-namestring (stream iri &optional option arg)
149
   (:method (stream (object string) &optional option arg)
150
     (declare (ignore option arg))
151
     (write-string object stream))
152
 
153
   (:method (stream (object spocq:iri) &optional option arg)
154
     (declare (ignore option arg))
155
     (write-string (spocq:iri-lexical-form object) stream))
156
   
157
   (:method (stream (object symbol) &optional option arg)
158
     (declare (ignore option arg))
159
     (write-string (symbol-uri-namestring object) stream)))
160
 
161
 ;;; leaves non-iso-8861 as is for the stream encoding to handle
162
 ;;; (format nil ">~/format-sse-string/<" (unescape-sse-string "aa\\\"\\'\\u2020\\U00000040"))
163
 
164
 
165
 (defgeneric encode-ssf-form (operator arguments stream)
166
   (:documentation
167
    "Allow for sse forms with variant structures")
168
   (:method ((operator (eql :group-by)) (arguments list) stream)
169
     (format stream "(group-by~{ ~/format-ssf-object/~})" arguments))
170
   (:method ((operator t) (arguments list) stream)
171
     (write-char #\( stream)
172
     (encode-ssf-object operator stream)
173
     (loop (cond ((null arguments) (return))
174
                 ((consp arguments)
175
                  (write-char #\space stream)
176
                  (encode-ssf-object (pop arguments) stream))
177
                 (t
178
                  (write-string " . " stream)
179
                  (encode-ssf-object arguments stream)
180
                  (return))))
181
     (write-char #\) stream)))
182
 
183
 (defgeneric encode-ssf-object (object stream)
184
   (:method ((object function) stream)
185
     (funcall object stream))
186
 
187
   (:method ((object null) stream)
188
     (write-string *encode-ssf-object-nil-designator* stream))
189
 
190
   (:method ((object cons) stream)
191
     (encode-ssf-form (first object) (rest object) stream))
192
 
193
   (:method ((object vector) stream)
194
     (write-char #\[ stream)
195
     (dotimes (i (length object))
196
       (unless (zerop i) (write-char #\space stream))
197
       (encode-ssf-object (aref object i) stream))
198
     (write-char #\] stream))
199
 
200
   (:method ((object spocq:iri) stream)
201
     (format stream "<~/format-sse-iri-namestring/>" object))
202
 
203
   (:method ((object triple) stream)
204
    (format stream "(~a ~a ~a ~a)"
205
            3 ;; 'spocq.a:|triple|
206
            (triple-subject object)
207
            (triple-predicate object)
208
            (triple-object object)))
209
 
210
   (:method ((object spocq:date) stream)
211
     (write-char #\" stream)
212
     (write-string (term-lexical-form object) stream)
213
     (write-string "\"^^<http://www.w3.org/2001/XMLSchema#date>" stream))
214
 
215
   (:method ((object spocq:date-time) stream)
216
     (write-char #\" stream)
217
     (write-string (term-lexical-form object) stream)
218
     (write-string "\"^^<http://www.w3.org/2001/XMLSchema#dateTime>" stream))
219
 
220
   (:method ((object spocq:time) stream)
221
     (write-char #\" stream)
222
     (write-string (term-lexical-form object) stream)
223
     (write-string "\"^^<http://www.w3.org/2001/XMLSchema#time>" stream))
224
 
225
   (:method ((object spocq:day-time-duration) stream)
226
     (write-char #\" stream)
227
     (write-string (term-lexical-form object) stream)
228
     (write-string "\"^^<http://www.w3.org/2001/XMLSchema#dayTimeDuration>" stream))
229
 
230
   (:method ((object spocq:year-month-duration) stream)
231
     (write-char #\" stream)
232
     (write-string (term-lexical-form object) stream)
233
     (write-string "\"^^<http://www.w3.org/2001/XMLSchema#yearMonthDuration>" stream))
234
 
235
   (:method ((object spocq::gregorian) stream)
236
     (write-char #\" stream)
237
     (write-string (term-lexical-form object) stream)
238
     (format stream "\"^^<~a>" (iri-lexical-form (spocq::literal-datatype-uri object))))
239
 
240
   (:method ((object spocq:blank-node) stream)
241
     (format stream "_:~@[~a_~]~a"
242
             (blank-node-prefix)
243
             (spocq:blank-node-label object)))
244
 
245
   (:method ((object symbol) stream)
246
     (typecase object
247
       (keyword (format stream "~(~s~)" object))
248
       (t
249
        (let ((uri-namestring (get-symbol-uri-namestring object)))
250
          (cond (uri-namestring
251
                 (format stream "<~/format-sse-iri-namestring/>" uri-namestring))
252
                ((eq (symbol-package object) *variable-package*)
253
                 (if (undistinguished-variable-p object)
254
                     (format stream "_:node~a" (subseq (symbol-name object) 1))
255
                     (format stream "?~a" object)))
256
                ((eq object +null-term+)
257
                 (write-string *encode-ssf-object-nil-designator* stream))
258
                (t
259
                 (write-string (symbol-name object) stream)))))))
260
 
261
   (:method ((object spocq:plain-literal) stream)
262
     (format stream "\"~/format-sse-string/\"@~a" (spocq:literal-lexical-form object) (spocq:plain-literal-language-tag object)))
263
 
264
   (:method ((object string) stream)
265
     (format stream "\"~/format-sse-string/\"" object))
266
 
267
   (:method ((object spocq:unbound-variable) stream)
268
     (write-string *encode-ssf-object-nil-designator* stream))
269
 
270
   (:method ((object spocq:atomic-typed-literal) stream)
271
    (format stream "\"~/format-sse-string/\"^^<~/format-sse-iri-namestring/>"
272
             (spocq:literal-lexical-form object)
273
             (spocq:literal-datatype-uri object)))
274
 
275
   (:method ((object spocq:unsupported-typed-literal) stream)
276
     (format stream "\"~/format-sse-string/\"^^<~/format-sse-iri-namestring/>"
277
             (spocq:literal-lexical-form object)
278
             (spocq:unsupported-typed-literal-datatype-uri object)))
279
   
280
   (:method ((object integer) stream)
281
     (if (typep object *expand-literal-values*)
282
       (format stream "\"~a\"^^<~a>"
283
               object
284
               (if *encode-object-subtypes*
285
                 (typecase object
286
                   ((signed-byte 16) "http://www.w3.org/2001/XMLSchema#short")
287
                   (t "http://www.w3.org/2001/XMLSchema#integer"))
288
                 "http://www.w3.org/2001/XMLSchema#integer"))
289
       (princ object stream)))
290
 
291
   (:method ((object double-float) stream)
292
    (if (or (eql object dsu.codecs:double-float-nan)
293
            (eql object dsu.codecs:double-float-positive-infinity)
294
            (eql object dsu.codecs:double-float-negative-infinity))
295
      (error "Invalid float value: ~a" object)
296
      (let ((*read-default-float-format* 'double-float))
297
        (if (typep object *expand-literal-values*)
298
          (format stream "\"~f\"^^<http://www.w3.org/2001/XMLSchema#double>" object)
299
          (format stream "~f" object)))))
300
 
301
   (:method ((object single-float) stream)
302
    (if (or (eq object nan) (eq object +inf) (eq object -inf))
303
      (error "Invalid float value: ~a" object)
304
      (let ((*read-default-float-format* 'single-float))
305
        (if (typep object *expand-literal-values*)
306
          (format stream "\"~f\"^^<http://www.w3.org/2001/XMLSchema#float>" object)
307
          (format stream "~f" object)))))
308
 
309
   (:method ((object rational) stream)
310
     (let ((*read-default-float-format* 'single-float))
311
       (if (typep object *expand-literal-values*)
312
         (format stream "\"~f\"^^<http://www.w3.org/2001/XMLSchema#decimal>" (float object 1.0s0))
313
         (format stream "~f" (float object 1.0s0)))))
314
 
315
   (:method ((object agp) stream)
316
     "Emit a delegation specification for an AGP."
317
     (encode-ssf-object (agp-form object) stream))
318
 
319
   (:method ((object boolean-generator) stream)
320
     (let* ((channel (boolean-generator-channel object)))
321
       (encode-ssf-object (if (get-field-page channel) spocq.a:|true| spocq.a:|false|) stream))
322
     (incf-stat *statements-returned*))
323
 
324
   (:method ((object solution-generator) stream)
325
     (let* ((dimensions (solution-generator-dimensions object))
326
            (channel (solution-generator-channel object))
327
            (field-width (length dimensions))
328
            (count 0))
329
       (write-char #\( stream)
330
       (encode-ssf-object dimensions stream)
331
       (terpri stream)
332
       (do-pages (page channel)
333
         (if (= field-width (array-dimension page 1))
334
           (encode-ssf-object page stream)
335
           (log-warn "field width mismatch: ~s : ~s."
336
                     dimensions (array-dimension page 1)))
337
         (incf count (array-dimension page 0)))
338
       (write-char #\) stream)
339
       (incf-stat *statements-returned* count)))
340
 
341
   (:method ((object array) stream)
342
     ;; emit just the values from the field.
343
     (flet ((term-aspect-encoder (term-type term-literal term-language-tag term-datatype)
344
              (encode-ssf-term-aspects term-type term-literal term-language-tag term-datatype stream)))
345
       (declare (dynamic-extent #'term-aspect-encoder))
346
       (let ((term-deconstructor (repository-term-deconstructor *transaction*)))
347
         (let ((solution-count (array-dimension object 0))
348
               (variable-count (array-dimension object 1)))
349
           (dotimes (page-index solution-count)
350
             (write-string "(" stream)
351
             (loop for value-index from 0 below variable-count
352
                   for term-id = (aref object page-index value-index)
353
                   do (progn (unless (= value-index 0) (write-char #\space stream))
354
                             (funcall term-deconstructor #'term-aspect-encoder *transaction* term-id)))
355
             (write-char #\) stream)
356
             (terpri stream)))))))
357
     
358
 
359
 
360
 (defun encode-ssf-term-id (term-number stream)
361
   (flet ((term-aspect-encoder (term-type term-literal term-language-tag term-datatype)
362
            (encode-ssf-term-aspects term-type term-literal term-language-tag term-datatype stream)))
363
     (declare (dynamic-extent #'term-aspect-encoder))
364
     (repository-call-with-numbered-term-aspects #'term-aspect-encoder *transaction* term-number)))
365
 
366
 (defun encode-ssf-term (%term stream)
367
   (encode-ssf-term-aspects (cffi:foreign-slot-value %term 'rdfcache::term 'rdfcache::type)
368
                            (cffi:foreign-slot-value %term 'rdfcache::term 'rdfcache::value)
369
                            (cffi:foreign-slot-value %term 'rdfcache::term 'rdfcache::language)
370
                            (cffi:foreign-slot-value %term 'rdfcache::term 'rdfcache::datatype)
371
                            stream))
372
 
373
 (defun encode-ssf-term-aspects (term-type term-literal term-language-tag term-datatype stream)
374
   (ecase term-type
375
     (:none
376
      (write-string *encode-ssf-object-nil-designator* stream))
377
     (:node                            ; encode a blank node
378
      (format stream "_:~@[~a_~]" (blank-node-prefix))
379
      (stream-write-external-utf8-string stream term-literal))
380
     (:literal                         ; encode a typed or language-tagged literal
381
      (write-char #\" stream)
382
      (stream-write-external-utf8-string stream term-literal)
383
      (write-char #\" stream)
384
      (cond ((not (cffi:null-pointer-p term-datatype))
385
             (write-string "^^<" stream)
386
             (stream-write-external-utf8-string stream term-datatype)
387
             (write-char #\> stream))
388
            ((not (cffi:null-pointer-p term-language-tag))
389
             (write-char #\@ stream)
390
             (stream-write-external-utf8-string stream term-language-tag))))
391
     (:uri                             ; encode a uri
392
      (write-char #\< stream)
393
      (stream-write-external-utf8-string stream term-literal)
394
      (write-char #\> stream))))
395
 
396
 
397
 ;;; configure readtable
398
 
399
 (set-macro-character #\_ 'read-blank-node t *sse-readtable*)
400
 (set-macro-character #\? 'read-variable t *sse-readtable*)
401
 (set-macro-character #\] 'read-character-error nil *sse-readtable*)
402
 (set-macro-character #\[ 'read-vector t *sse-readtable*)
403
 (set-macro-character #\< 'read-iri-or-blank-node t *sse-readtable*)
404
 (set-macro-character #\" 'read-literal nil *sse-readtable*)
405
 (set-macro-character #\| 'read-or t *sse-readtable*)
406
 
407
 
408
 (:documentation (receive-message send-error-message send-request-message send-response-message)
409
   "The content-type application/sparql-query+sse indicates the message is to be coded
410
  as ssf/utf8 .")
411
 
412
 
413
 (defmethod receive-message ((stream amqp:channel) (content-type mime:application/sparql-query+sse) &key)
414
   "Given a STREAM with application/sparql-query+sse CONTENT-TYPE, decode as SSF/utf8"
415
 
416
   (log-debug "receive-message : (~s ~s)" stream
417
              (when (typep stream 'amqp:channel) (amqp.u:channel-content-type stream)))
418
   (flet ((coerce-to-iri (value)
419
            (when (stringp value) (intern-iri value))))
420
     (let* ((*package* *spocq-reader-package*)
421
            (basic (amqp:channel.basic stream))
422
            (operation (intern (string-upcase (amqp.u:basic-header basic :operation)) :keyword))
423
            (repository (or (amqp.u:basic-header basic :repository)
424
                            (amqp.u:basic-header basic :repository_id)))
425
            (base-iri (or (coerce-to-iri (amqp.u:basic-header basic :base)) (base-iri)))
426
            (task-id (or (amqp.u:basic-header basic :task_id)
427
                         (amqp.u:basic-header basic :query_id)))
428
            (user-id (amqp.u:basic-header basic :user_id))
429
            (default-graph-uris (or (amqp.u:basic-header basic :default_graph_uri)
430
                                    (amqp.u:basic-header basic :default-graph-uri)))
431
            (named-graph-uris (or (amqp.u:basic-header basic :named_graph_uri)
432
                                  (amqp.u:basic-header basic :named-graph-uri)))
433
            (routing-key (amqp.u:basic-header basic :routing_key))
434
            (trace-routing-key (amqp.u:basic-header basic :trace_routing_key))
435
            (accept (amqp.u:basic-header basic :accept))
436
            (limit (amqp.u:basic-header basic :limit))
437
            (offset (amqp.u:basic-header basic :offset)))
438
       (flet ((guarded-op (function)
439
                (handler-case (funcall function)
440
                  (error (condition)
441
                         (log-error "invalid sparql message body: ~a" condition)
442
                         (spocq.e::message-syntax-error :expression nil
443
                                                        :condition condition
444
                                                        :task-id task-id
445
                                                        :user-id user-id
446
                                                        :operation operation
447
                                                        :repository-id repository
448
                                                        :request-exchange nil
449
                                                        :request-routing-key routing-key
450
                                                        :accept accept)))))
451
         (let ((expression (guarded-op #'(lambda () (read-ssf stream)))))
452
           (if operation
453
             (values (list :repository repository :task-id task-id
454
                           :user-id user-id
455
                           :sse-expression expression
456
                           :graphs (list default-graph-uris named-graph-uris)
457
                           :routing-key routing-key
458
                           :trace-routing-key trace-routing-key
459
                           :accept accept
460
                           :limit limit
461
                           :offset offset
462
                           :base-iri base-iri)
463
                     operation)
464
             (spocq.e::message-syntax-error :expression expression
465
                                            :byte-offset 0
466
                                            :line-offset 0
467
                                            :task-id task-id
468
                                            :operation (or operation (amqp.u:basic-header basic :operation))
469
                                            :repository-id repository
470
                                            :request-exchange nil
471
                                            :request-routing-key routing-key
472
                                            :accept accept)))))))
473
 
474
 (defmethod receive-message ((stream stream) (content-type mime:application/sparql-query+sse) &key)
475
   "Given a STREAM with application/sparql-query+sse CONTENT-TYPE, decode as SSF/utf8"
476
   
477
   (log-debug "receive-message : (~s ~s)" stream
478
              (when (typep stream 'amqp:channel) (amqp.u:channel-content-type stream)))
479
   (let* ((*package* *spocq-reader-package*))
480
     (flet ((guarded-op (function)
481
              (handler-case (funcall function)
482
                (error (condition)
483
                       (log-error "invalid sparql message body: ~a" condition)
484
                       (spocq.e::message-syntax-error :expression nil
485
                                                      :condition condition)))))
486
       (guarded-op #'(lambda () (read-ssf stream))))))
487
 
488
 
489
 (defmethod receive-message ((stream amqp:channel) (content-type mime:application/bgp+sse) &key)
490
   "Given a STREAM with application/bgp+sse CONTENT-TYPE, decode a bgp/agp as SSF/utf8. Return the
491
  BGP form and the control parameters
492
  STREAM : AMQP:CHANNEL
493
  CONTENT:TYPE : mime:application/bgp+sse
494
  VALUES : operation : string
495
           repository : string
496
           task-id : string
497
           bgp : the agp form
498
           attributes : alist specifying response and control information
499
  "
500
 
501
   (log-debug "receive-message : (~s ~s)" stream (amqp.u:channel-content-type stream))
502
   (let* ((*package* *spocq-reader-package*)
503
          (basic (amqp:channel.basic stream))
504
          (bgp-form nil)
505
          (operation (find-symbol (string (amqp.u:basic-header basic :operation)) :spocq))
506
          (repository (or (amqp.u:basic-header basic :repository)
507
                          (amqp.u:basic-header basic :repository_id)))
508
          (task-id (or (amqp.u:basic-header basic :task_id)
509
                       (amqp.u:basic-header basic :query_id)))
510
          (default-graph-uris (amqp.u:basic-header-list basic :default-graph-uri))
511
          (named-graph-uris (amqp.u:basic-header-list basic :named-graph-uri))
512
          (routing-key (amqp.u:basic-header basic :routing_key))
513
          (trace-routing-key (amqp.u:basic-header basic :trace_routing_key))
514
          (accept (amqp.u:basic-header basic :accept))
515
          (limit (amqp.u:basic-header basic :limit))
516
          (offset (amqp.u:basic-header basic :offset)))
517
     (handler-case (setf bgp-form (read-ssf stream))
518
       (error (condition)
519
              (log-error "invalid bgp+sse message body: ~a" condition)
520
              (spocq.e::message-syntax-error :expression nil
521
                                             :task-id task-id
522
                                             :operation operation
523
                                             :repository-id repository
524
                                             :request-exchange nil
525
                                             :request-routing-key routing-key
526
                                             :accept accept)))
527
     (if (and operation (find-class operation nil(subtypep operation 'bgp-match) bgp-form)
528
       (values operation repository task-id bgp-form
529
               `(:graphs ,(list (mapcar #'intern-iri default-graph-uris)
530
                                  (mapcar #'intern-iri named-graph-uris))
531
                           :request-routing-key  ,routing-key
532
                           :trace-routing-key  ,trace-routing-key
533
                           ,@(when accept `(:accept  ,accept))
534
                           ,@(when limit `(:limit  ,limit))
535
                           ,@(when offset `(:offset ,offset))))
536
       (spocq.e::message-syntax-error :expression (list operation bgp-form)))))
537
 
538
 
539
 #+(or)
540
 (defmethod receive-message ((stream t) (content-type mime:application/sparql-results+sse-stream) &key)
541
   "Given a STREAM with application/sparql-result+sse-stream CONTENT-TYPE, locate the respective query and
542
  BGP node, decode the stream as a SSF/utf8 into a sequence of solutions and push them into the query's
543
  reduction graph."
544
   (flet ((basic-header (headers keyword)
545
            (loop for (key value) on headers by #'cddr
546
                  when (string-equal key keyword)
547
                  do (return value))))
548
     (let* ((headers (amqp:basic-headers (amqp:channel.basic stream)))
549
            (operation (find-symbol (string (basic-header headers :operation)) :spocq))
550
            (bgp-id (basic-header headers :bgp_id))
551
            (*query* (find-query bgp-id)))
552
       
553
       (assert (and operation bgp-id) ()
554
               "Invalid message: stream: ~s, content-type: ~a, headers: ~s."
555
               stream content-type headers)
556
       (if *query*
557
         (flet ((read-next-solution-as-ssf () (read-ssf stream)))
558
           (declare (dynamic-extent #'read-next-solution-as-ssf))
559
           (propagate-query-solution-field *query* bgp-id #'read-next-solution-as-ssf))
560
         (log-warn "query ?, none found for store response: headers: ~s" headers))
561
       bgp-id)))
562
     
563
 
564
 
565
 
566
 (defmethod send-account-message ((message-body t) (stream t) (content-type mime:application/sse))
567
   "Given a MESSAGE, and a STREAM with text/ssf CONTENT-TYPE, encode as an undecorated SSF/utf8 form.
568
  Any header information - eg. the operation, is out-of-band."
569
   (let ((*expand-literal-values* nil))          ; body should be a p-list with just integer values
570
     (write-ssf message-body stream)))
571
 
572
 
573
 (defmethod send-error-message ((body t) (stream t) (content-type mime:application/sparql-query+sse))
574
   "Given a MESSAGE, and a STREAM with text/ssf CONTENT-TYPE, encode as SSF/utf8"
575
   (write-ssf (vector :|error| body) stream))
576
 
577
 
578
 (defmethod send-request-message (operation (message-body cons) (stream t) (content-type mime:application/sparql-query+sse))
579
   "Given a MESSAGE, and a STREAM with text/ssf CONTENT-TYPE, encode as SSF/utf8"
580
   (write-ssf message-body stream))
581
 
582
 
583
 (defmethod send-response-message (operation (message-body cons) (stream t) (content-type mime:application/sparql-query+sse))
584
   "Given a MESSAGE, and a STREAM with text/ssf CONTENT-TYPE, encode as SSF/utf8"
585
   (when *algebra-trace-output*
586
     (setf stream (make-broadcast-stream *algebra-trace-output* stream)))
587
   (let ((*package* *spocq-reader-package*))
588
     (write-ssf message-body stream)))
589
 
590
 
591
 
592
 #|
593
 (read-ssf #P"yoda:Development:Source:production:Library:org:datagraph:spocq:src:test:rspec:tests:data:OpenWorld:open-cmp-02.ssf")
594
 
595
 (read-ssf " (|| (= ?date (<http://www.w3.org/2001/XMLSchema#dateTime> \"2004-01-01T00:00:00Z\")))" )
596
 
597
 (dolist (pathname (directory #P"P-LIBRARY:org;datagraph;spocq;src;test;rspec;tests;**;*.ssf"))
598
   (handler-case (read-ssf pathname)
599
     (error (c)
600
            (format *trace-output* "~%~s : ~a" pathname c))))
601
 
602
 |#
603
 #+(or)
604
 (read-ssf "(reply spocq bgpfield (\"jhacker/basic-data-1.ttl\" \"9ee6e600-d8a7-012d-6898-12313918161a\"
605
                                   (((? p) \"http://example.org/ns#p\" (? v) \"d:x ns:p\")
606
                                    ((? p) \"http://example.org/x/p\" (? v) \"x:x x:p\"))
607
                                   ((bgp_id . (B276A580-F679-11DF-8828-12313A0075A4)))))")