Coverage report: /development/source/library/org/datagraph/spocq-shard/src/core/message-communication.lisp

KindCoveredAll%
expression23704 3.3
branch020 0.0
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
2
 
3
 (in-package :org.datagraph.spocq.implementation)
4
 
5
 ;;;
6
 ;;; message communication interface operations
7
 
8
 (defgeneric receive-message (stream content-type &rest args)
9
   (:documentation "Receive a message through a STREAM with the given CONTENT-TYPE
10
     STREAM : stream : capable of supporting read operations (read-byte v/s read-char) respective
11
       the content type
12
     CONTEXT-TYPE : mime:*/* : in practice application/octet-stream or application/sparql+sse
13
     VALUES : symbol : the request/response operator
14
              list : a plist of the parsed message properties
15
 
16
     The operator applies generically across all message types - requests, responses, and errors.
17
     In particular, a store request may yield ether a response or an error.
18
     Header fields are validated, if present, and the 'operator' and body are returned.")
19
   (:argument-precedence-order content-type stream)
20
 
21
   (:method ((stream t) (content-type t) &rest args)
22
     (declare (ignore args))
23
     (error "Invalid stream and/or content type for receive-message: (~s ~s)." stream content-type))
24
 
25
   (:method :after ((stream t) (content-type t) &rest args)
26
     (declare (ignore args))
27
     ;; record the read byte count for accounting
28
     (incf *bytes-read* (or (stream-file-position stream) 0)))
29
 
30
   (:method ((stream stream) (content-type t) &rest args)
31
     "Given a stream and no over-riding content type, read the stream content and delegate to
32
      the string method.
33
      If the string is zero-length, signal en end of file"
34
     (declare (dynamic-extent args))
35
     (apply #'receive-message (read-stream stream :eof-p t) content-type args)))
36
 
37
 
38
 
39
 
40
 (defgeneric send-account-message (message-body stream content-type)
41
   (:documentation "Encode a message with accounting iinformation as per content-type.")
42
   (:argument-precedence-order content-type stream message-body))
43
 
44
 
45
 (defgeneric send-error-message (body stream content-type)
46
   (:documentation "Encode an error response body as per content-type.")
47
   (:argument-precedence-order content-type stream body))
48
 
49
 
50
 (defgeneric send-request-message (operation message-body stream content-type)
51
   (:documentation "Encode a request message (operation x body) as per content-type.")
52
   (:argument-precedence-order content-type stream message-body operation)
53
 
54
   (:method :after ((operation t) (message-body t) (stream t) (content-type t))
55
     ;; record the read byte count for accounting
56
     (incf *bytes-written* (or (stream-file-position stream) 0))))
57
 
58
 (defparameter *send-response-message.compute-applicable-methods* nil)
59
 (defparameter *send-response-message.sparql-results-content-type* mime:application/sparql-results+json
60
   "Specify the default concrete encoding tu use if given an abstract type.")
61
 
62
 (defgeneric send-response-message (operation message-body stream content-type)
63
   (:documentation "Encode a response message (operation x body) as per accept content-type.
64
     Use, in addition a presentation type to determine whether/how to wrap the response
65
     content in a presentation document.")
66
   (:argument-precedence-order message-body content-type stream operation)
67
 
68
   (:method :around ((operation t) (message-body task) (stream t) (content-type t))
69
     (when *send-response-message.compute-applicable-methods*
70
       (print (compute-applicable-methods #'send-response-message
71
                                          (list operation message-body stream content-type))))
72
     (with-accounting
73
         (call-next-method)
74
       (generate-accounting-note :encode)))
75
 
76
   (:method :before ((operation t) (message-body t) (stream t) (content-type t))
77
     ;; flush any headers either 
78
     (force-output stream))
79
 
80
   (:method :after ((operation t) (message-body task) (stream t) (content-type t))
81
     ;; record the read byte count for accounting
82
     (incf *bytes-written* (or (stream-file-position stream) 0)))
83
 
84
   (:method ((operation t) (message t) (stream stream) (content-type mime:application/sparql-results))
85
   "Supply a default encoding if given a generic sparql-results response content type"
86
     (unless (eq (type-of content-type)
87
                 (type-of *send-response-message.sparql-results-content-type*))
88
       (log-warn "send-response-message: content type not recognized (~s). defaulting to sparql-results+json: ~s"
89
                 content-type
90
                 *send-response-message.sparql-results-content-type*))
91
     (send-response-message operation message stream *send-response-message.sparql-results-content-type*))
92
 
93
   (:method ((operation t) (message-body task) (stream t) (content-type t))
94
     "As the default for any task, send the root generator"
95
     (send-response-message operation (task-result-generator message-body) stream content-type))
96
 
97
   (:method ((operation t) (object persistent-object) (stream t) (content-type t))
98
     (send-response-message operation
99
                            (cons *construct-dimensions*
100
                                  (encode-presentation-graph object))
101
                            stream
102
                            content-type))
103
 
104
    (:method ((operation t) (query query) (channel amqp:channel) (content-type t))
105
     (flet ((send-body (stream content-type)
106
              (call-next-method operation query stream content-type)))
107
       (declare (dynamic-extent #'send-body))
108
       (let ((basic (amqp:channel.basic channel)))
109
         (setf (amqp:basic-timestamp basic) (- (get-universal-time) amqp:*timestamp-epoch*)
110
               (amqp:basic-reply-to basic) ""
111
               (amqp:basic-correlation-id basic) (task-id query)
112
               (amqp:basic-headers basic) (list :|operation| "response"
113
                                                :|repository| (repository-id query)
114
                                                :|task_id| (task-id query)
115
                                                :|user_id| (task-user-id query))))
116
       (unless (eq (amqp.u:channel-content-type channel) (task-response-content-type query))
117
         (setf (amqp.u:channel-content-type channel) (task-response-content-type query)))
118
       (amqp:publish channel :body #'send-body
119
                     :exchange (task-request-exchange query)
120
                     :routing-key (task-request-reply-routing-key query)
121
                     :headers (list :|operation| "response"
122
                                    :|repository| (repository-id query)
123
                                    :|task_id| (task-id query)
124
                                    :|user_id| (task-user-id query)
125
                                    :transfer-encoding "chunked"))))
126
   )
127
 
128
 
129
 ;;;
130
 ;;; request/response operators
131
 
132
 (defgeneric send-error-response (task stream condition &key detail)
133
 
134
   (:method ((task t) (channel t) (condition t) &key &allow-other-keys)
135
     (log-warn "invalid task for error response: ~s: ~a." task condition))
136
 
137
   (:method ((task task) (channel t) (condition condition)
138
             &key (detail (format nil "task '~a' signaled:~%~a" (task-id task) condition)))
139
     (send-error-response task channel (symbol-name (type-of condition)) :detail detail))
140
   
141
   (:method ((task task) (channel amqp:channel) (condition string)
142
             &key (detail (format nil "task ~s signaled:~%~a" (task-id task) condition)))
143
     (labels ((send-body (stream content-type)
144
                (send-error-message (vector :|user| 100
145
                                            condition
146
                                            detail
147
                                            nil)
148
                                    stream
149
                                    content-type)))
150
       (declare (dynamic-extent #'send-body))
151
       (log-info "query: ~s. error: ~a" (task-id task) condition)
152
       (let ((basic (amqp:channel.basic channel)))
153
         (setf (amqp:basic-timestamp basic) (- (get-universal-time) amqp:*timestamp-epoch*)
154
               (amqp:basic-reply-to basic) ""
155
               (amqp:basic-correlation-id basic) (task-id task)))
156
       (unless (eq (amqp.u:channel-content-type channel) (task-response-content-type task))
157
         (setf (amqp.u:channel-content-type channel) (task-response-content-type task)))
158
       (let ((*package* (find-package :spocq.i)))
159
         (setf (amqp.u:channel-content-type channel) mime:application/sparql-query+sse)
160
         (amqp:publish channel :body #'send-body
161
                       :exchange (task-request-exchange task)
162
                       :routing-key (task-request-error-routing-key task)
163
                       :headers (list :|operation| "error"
164
                                      :|condition| condition
165
                                      :|task_id| (task-id task)
166
                                      :|user_id| (task-user-id task)
167
                                      :package (string :spocq.i))))))
168
   
169
   (:method ((task task) (stream stream) (condition string)
170
             &key (detail (format nil "task '~a' signaled:~%~a" (task-id task) condition)))
171
     (send-error-message (vector :|user| 100
172
                                 condition
173
                                 detail
174
                                 nil)
175
                         stream
176
                         *request-content-type*)))
177
 
178
 
179
 (defgeneric receive-query-request (stream content-type &rest args)
180
   (:method ((stream stream) (content-type t) &rest args)
181
     (apply #'receive-message stream content-type args)))
182
 
183
 
184
 (defgeneric send-algebra-response (query stream result)
185
   (:method ((query query) (channel amqp:channel) result)
186
     (flet ((send-body (stream content-type)
187
              (send-response-message :|response|
188
                                     result
189
                                     stream
190
                                     content-type)))
191
       (declare (dynamic-extent #'send-body))
192
       (let ((basic (amqp:channel.basic channel)))
193
         (setf (amqp:basic-timestamp basic) (- (get-universal-time) amqp:*timestamp-epoch*)
194
               (amqp:basic-reply-to basic) ""
195
               (amqp:basic-correlation-id basic) (task-id query)
196
               (amqp:basic-headers basic) (list :|operation| "response"
197
                                                :|repository| (repository-id query)
198
                                                :|task_id| (task-id query)
199
                                                :|user_id| (task-user-id query))))
200
       (setf (amqp.u:channel-content-type channel) mime:application/sparql-query+sse)
201
       (amqp:publish channel :body #'send-body
202
                     :exchange (task-request-exchange query)
203
                     :routing-key (task-request-reply-routing-key query)
204
                     :headers (list :|operation| "response"
205
                                    :|repository| (repository-id query)
206
                                    :|task_id| (task-id query)
207
                                    :|user_id| (task-user-id query)
208
                                    :transfer-encoding "chunked"))))
209
   
210
   (:method ((query query) (stream stream) result)
211
     (send-response-message :|response|
212
                            result
213
                            stream
214
                            (task-response-content-type query))))
215
 
216
 
217
 (defgeneric send-query-response (query stream result)
218
   #+(or)
219
   (:method ((query query) (channel amqp:channel) result)
220
     (flet ((send-body (stream content-type)
221
              (send-response-message :|response|
222
                                     result
223
                                     stream
224
                                     content-type)))
225
       (declare (dynamic-extent #'send-body))
226
       (let ((basic (amqp:channel.basic channel)))
227
         (setf (amqp:basic-timestamp basic) (- (get-universal-time) amqp:*timestamp-epoch*)
228
               (amqp:basic-reply-to basic) ""
229
               (amqp:basic-correlation-id basic) (task-id query)
230
               (amqp:basic-headers basic) (list :|operation| "response"
231
                                                :|repository| (repository-id query)
232
                                                :|task_id| (task-id query)
233
                                                :|user_id| (task-user-id query))))
234
       (unless (eq (amqp.u:channel-content-type channel) (task-response-content-type query))
235
         (setf (amqp.u:channel-content-type channel) (task-response-content-type query)))
236
       (amqp:publish channel :body #'send-body
237
                     :exchange (task-request-exchange query)
238
                     :routing-key (task-request-reply-routing-key query)
239
                     :headers (list :|operation| "response"
240
                                    :|repository| (repository-id query)
241
                                    :|task_id| (task-id query)
242
                                    :|user_id| (task-user-id query)
243
                                    :transfer-encoding "chunked"))))
244
   
245
   (:method ((query query) (stream stream) result)
246
     (send-response-message :|response|
247
                            result
248
                            stream
249
                            (task-response-content-type query)))
250
 
251
   (:method ((task task) (stream null) (result t))
252
     "Given a null destination, just return the task"
253
     task))
254
 
255
 
256
 (defgeneric send-trace-response (query stream data)
257
   (:documentation "Encode a query trace tree as the response")
258
 
259
   (:method ((query query) (channel amqp:channel) data)
260
     (flet ((send-body (stream content-type)
261
              (send-response-message :|response|
262
                                     data
263
                                     stream
264
                                     content-type)))
265
       (declare (dynamic-extent #'send-body))
266
       (let ((basic (amqp:channel.basic channel)))
267
         (setf (amqp:basic-timestamp basic) (- (get-universal-time) amqp:*timestamp-epoch*)
268
               (amqp:basic-reply-to basic) ""
269
               (amqp:basic-correlation-id basic) (task-id query)
270
               (amqp:basic-headers basic) (list :|operation|"response"
271
                                                :|repository| (repository-id query)
272
                                                :|task_id| (task-id query)
273
                                                :|user_id| (task-user-id query))))
274
       (unless (eq (amqp.u:channel-content-type channel) (task-response-content-type query))
275
         (setf (amqp.u:channel-content-type channel) (task-response-content-type query)))
276
       (amqp:publish channel :body #'send-body
277
                     :exchange (task-request-exchange query)
278
                     :routing-key (task-request-reply-routing-key query)
279
                     :headers (list :|operation| "response"
280
                                    :|repository| (repository-id query)
281
                                    :|task_id| (task-id query)
282
                                    :|user_id| (task-user-id query)
283
                                    :transfer-encoding "chunked"))))
284
   
285
   (:method ((query query) (stream stream) data)
286
     (send-response-message :|response|
287
                            data
288
                            stream
289
                            (task-response-content-type query))))
290
 
291
 
292
 (defgeneric send-store-request (query stream variable form &key repository)
293
   (:documentation "Send an individual graph pattern request to a store.
294
  Use a common exchange and route it as per the repository. By default assume the repository from the query.
295
  Send properties to specify
296
  - bgp_id : an unique id to identify the bgp in the response
297
  - graphs : the graphs specified as datasets in the original sparql query
298
  - routing_key : an unique id to route the response through the store exchange back to the requesing engine.")
299
   #+(or) ; just for tracing
300
   (:method ((query t) (destination t) (variable t) form )
301
     (assert (typep form '(or (cons (eql spocq.a:|bgp|)) (cons (eql spocq.a:|graph|)))) ()
302
             "Invalid store request: ~s ~s." variable form))
303
 
304
   (:method ((query query) (channel amqp:channel) variable form &key (repository (repository-id query)))
305
     (flet ((send-body (stream content-type)
306
              content-type
307
              (send-request-message :|bgpmatch|
308
                                    form
309
                                    stream
310
                                    content-type)))
311
       (declare (dynamic-extent #'send-body))
312
       ;; publish the store bgp request to the channel's default exchange - which have been configured
313
       ;; at process start, and use as the routing key the repository id presented with the query.
314
       ;; 2020-10-26 no timestamps to store
315
       (let ((basic (amqp:channel.basic channel)))
316
         (setf (amqp:basic-timestamp basic) 0
317
               (amqp:basic-reply-to basic) (task-store-routing-key query)
318
               (amqp:basic-correlation-id basic) (task-id query)
319
               (amqp:basic-headers basic) `(:|operation| "bgpmatch"
320
                                                         :|repository| ,repository
321
                                                         :|task_id| ,(string (task-id query))
322
                                                         :|user_id| ,(task-user-id query)
323
                                                         :|bgp_id|  ,(string variable)
324
                                                         :|graphs|  ,(task-dataset-graphs query)
325
                                                         :|routing_key| ,(task-store-routing-key query))))
326
       (unless (eq (amqp.u:channel-content-type channel) *store-content-type*)
327
         #+(or)
328
         (format *trace-output* "~&~s:for send  ~s -> ~s~%"
329
                 channel (amqp.u:channel-content-type channel) *store-content-type*)
330
         (setf (amqp.u:channel-content-type channel) *store-content-type*))
331
       (amqp:publish channel :body #'send-body
332
                     :exchange *store-exchange*
333
                     ;; :routing-key (task-store-bgp-routing-key query)
334
                     :routing-key (concatenate 'string "bgp." (string variable))
335
                     :headers  (list :|operation| "bgpmatch"
336
                                     :|repository| repository
337
                                     :|task_id| (string (task-id query))
338
                                     :|user_id| (task-user-id query)
339
                                     :|bgp_id|  (string variable)
340
                                     :|graphs|  (task-dataset-graphs query)
341
                                     :|routing_key| (task-store-routing-key query)
342
                                     ))
343
       t))
344
   
345
   (:method ((query query) (stream stream) variable form &key repository)
346
     (declare (ignore repository))
347
     (send-request-message :|bgpmatch|
348
                           form
349
                           stream
350
                           *store-content-type*)))
351
 
352
 (defgeneric receive-store-request (stream content-type)
353
   (:method ((stream stream) (content-type t))
354
     (multiple-value-bind (operation repository task-id bgp-form properties)
355
                          (receive-message stream content-type)
356
       (destructuring-bind (&key (request-routing-key repository)
357
                                 (accept content-type)
358
                                 (request-exchange (amqp:exchange-exchange *store-exchange*))
359
                                 &allow-other-keys)
360
                           properties
361
         (apply #'make-instance operation
362
                :repository repository
363
                :task-id task-id
364
                :expression bgp-form
365
                :accept accept
366
                :request-exchange request-exchange
367
                :request-routing-key request-routing-key
368
                properties)))))
369
 
370
 
371
 (defgeneric receive-store-response (stream content-type)
372
   (:method ((stream stream) (content-type t))
373
     (multiple-value-bind (operation body)
374
                          (receive-message stream content-type)
375
       (cond ((string-equal operation :|bgpfield|)
376
              (make-instance 'store-reply :operation operation :term body))
377
             ((string-equal operation :|error|)
378
              (make-instance 'error-task :operation 'spocq:|error| :term body))
379
             (t
380
              (spocq.e::message-syntax-error :expression (list operation body)))))))
381
 
382
 
383
 (defgeneric send-store-response (query stream result)
384
   (:method ((task bgp-match) (channel amqp:channel) result)
385
     (flet ((send-body (stream content-type)
386
              (send-response-message  :|bgpfield|
387
                                      result
388
                                      stream
389
                                      content-type)))
390
       (declare (dynamic-extent #'send-body))
391
       (let ((basic (amqp:channel.basic channel)))
392
         (setf (amqp:basic-timestamp basic) (- (get-universal-time) amqp:*timestamp-epoch*)
393
               (amqp:basic-reply-to basic) ""
394
               (amqp:basic-correlation-id basic) (task-id task)
395
               (amqp:basic-headers basic) `(:|operation| :|bgpfield|
396
                                            :|repository| ,(repository-id task)
397
                                            :|task_id| ,(task-id task)
398
                                            :|user_id| ,(task-user-id task)
399
                                            :|bgp_id|  ,(task-bgp-id task))))
400
       (unless (eq (amqp.u:channel-content-type channel) (task-response-content-type task))
401
         (setf (amqp.u:channel-content-type channel) (task-response-content-type task)))
402
       (amqp:publish channel :body #'send-body
403
                     :exchange (task-request-exchange task)
404
                     :routing-key (task-request-routing-key task)
405
                     :headers (list :|operation| "bgpfield"
406
                                    :|repository| (repository-id task)
407
                                    :|task_id| (task-id task)
408
                                    :|user_id| (task-user-id task)
409
                                    :|bgp_id|  (task-bgp-id task)))))
410
   
411
   (:method ((task bgp-match) (stream stream) result)
412
     (send-response-message :|bgpfield|
413
                            result
414
                            stream
415
                            *store-content-type*)))
416
 
417
 
418
 (defgeneric receive-account-note (stream content-type)
419
   #+(or)                                ; as stub pattern only
420
   (:method ((stream stream) (content-type t))
421
     (multiple-value-bind (operation body)
422
                          (receive-message stream content-type)
423
       (cond ((string-equal operation :|account|)
424
              (let* ((basic (amqp:channel.basic stream))
425
                     (timestamp (amqp:basic-timestamp basic)))
426
                (concatenate 'list body (list timestamp))))
427
             (t
428
              (spocq.e::message-syntax-error :expression (list operation body)))))))
429
 
430
 
431
 (defgeneric send-account-note (query state accounting-io)
432
   (:documentation "Generate an accounting record for task resource usage in the present
433
     state and update it to the new one.")
434
   
435
   (:method ((task t) (state t) (io t))
436
     (log-warn "invalid task for accounting note: ~s." task))
437
 
438
   (:method ((task task) new-state (io null)))
439
   
440
   (:method ((task task) new-state (io amqp:channel))
441
     "Capture the usage since the last mark, send the values and the state to the accounting queue."
442
     (let ((basic (amqp:channel.basic io))
443
           (*package* (find-package :spocq.i))
444
           (now (get-universal-time)))
445
       (flet ((send-progress-note (stream content-type)
446
                (let ((message (list* :|state| (task-state task) (accounting-properties)))) 
447
                  (log-debug "send-account-note: query: ~a note: ~a" (task-id task) message)
448
                  (send-account-message message stream content-type)))
449
              (send-completion-note (stream content-type)
450
                (send-account-message `(:|state|  ,new-state)
451
                                      stream
452
                                      content-type)))
453
         (declare (dynamic-extent #'send-progress-note #'send-completion-note))
454
         (unless (eq (amqp.u:channel-content-type io) mime:application/sse)
455
           (setf (amqp.u:channel-content-type io) mime:application/sse))
456
         
457
         (setf (amqp:basic-timestamp basic) (- now amqp:*timestamp-epoch*)
458
               (amqp:basic-reply-to basic) ""
459
               (amqp:basic-type basic) "account"
460
               (amqp:basic-correlation-id basic) (task-id task)
461
               (amqp:basic-message-id basic) (format nil "~a.~/format-iso-time/" *thread-name* now)
462
               (amqp:basic-app-id basic) *service-name*
463
               (amqp:basic-headers basic) `(:|operation| :|account|
464
                                                         :|repository| ,(repository-id task)
465
                                                         :|task_id| ,(task-id task)
466
                                                         :|user_id| ,(task-user-id task)))
467
         (amqp:publish io :body #'send-progress-note
468
                       :exchange *accounting-exchange*
469
                       :routing-key (task-id task)
470
                       :headers (list :|operation| "account"
471
                                      :|repository| (repository-id task)
472
                                      :|task_id| (task-id task)
473
                                      :|user_id| (task-user-id task)
474
                                      :package (string :spocq.i)))
475
         (case new-state
476
           ((:terminate :complete)     ; indicate completion as its own note
477
            (amqp:publish io :body #'send-completion-note
478
                          :exchange *accounting-exchange*
479
                          :routing-key (task-id task)
480
                          :headers (list :|operation| "account"
481
                                         :|repository| (repository-id task)
482
                                         :|task_id| (task-id task)
483
                                         :|user_id| (task-user-id task)
484
                                         :package (string :spocq.i))))
485
           (t )))))
486
 
487
   (:method ((task task) (properties list) (io amqp:channel))
488
     (let ((basic (amqp:channel.basic io))
489
           (*package* (find-package :spocq.i))
490
           (now (get-universal-time)))
491
       (flet ((send-progress-note (stream content-type)
492
                (log-debug "query: ~a accounting: ~a" (task-id task) properties)
493
                (send-account-message properties stream content-type)))
494
         (declare (dynamic-extent #'send-progress-note))
495
         (unless (eq (amqp.u:channel-content-type io) mime:application/sse)
496
           (setf (amqp.u:channel-content-type io) mime:application/sse))
497
         
498
         (setf (amqp:basic-timestamp basic) (- now amqp:*timestamp-epoch*)
499
               (amqp:basic-reply-to basic) ""
500
               (amqp:basic-type basic) "account"
501
               (amqp:basic-correlation-id basic) (task-id task)
502
               (amqp:basic-message-id basic) (format nil "~a.~/format-iso-time/" *thread-name* now)
503
               (amqp:basic-app-id basic) *service-name*
504
               (amqp:basic-headers basic) `(:|operation| :|account|
505
                                                         :|repository| ,(repository-id task)
506
                                                         :|task_id| ,(task-id task)
507
                                                         :|user_id| ,(task-user-id task)))
508
         (amqp:publish io :body #'send-progress-note
509
                       :exchange *accounting-exchange*
510
                       :routing-key (task-id task)
511
                       :headers (list :|operation| "account"
512
                                      :|repository| (repository-id task)
513
                                      :|task_id| (task-id task)
514
                                      :|user_id| (task-user-id task)
515
                                      :package (string :spocq.i)))))))
516
 
517
 
518
 #|
519
 (trace receive-query-request send-query-response receive-store-request send-store-response
520
        send-store-request receive-store-response
521
        )
522
 
523
 ;; nb. the sse extensions are defined for streams only
524
 
525
 (let ((stream (make-instance 'DE.SETF.UTILITY.IMPLEMENTATION::VECTOR-IO-STREAM)))
526
   (send-request-message :|bgpmatch|
527
                         `("jhacker/foaf" "bfe63b60-ca75-012d-8bf1-123139180561"
528
                           (ORG.DATAGRAPH.SPOCQ.ALGEBRA:|bgp|
529
                            (ORG.DATAGRAPH.SPOCQ.ALGEBRA:|triple|
530
                             ,(PURI:URI "http://example.org/ns#x") ?::|p| -18))
531
                           ((:|bgp_id| . #:D3509580-E847-11DF-A031-12313A0075A4) (:|graphs|)
532
                            (:|routing_key| . "ip-10-251-122-82.spocq.8490")))
533
                         stream
534
                         mime:application/octet-stream)
535
   (de.setf.utility.implementation::stream-position stream 0)
536
   (receive-message stream mime:application/octet-stream))
537
 
538
 (let ((stream (make-instance 'DE.SETF.UTILITY.IMPLEMENTATION::VECTOR-IO-STREAM)))
539
   (send-term #(:|cast| :|spocq| :|bgpmatch| (3/2))
540
              stream)
541
   (de.setf.utility.implementation::stream-position stream 0)
542
   (receive-message stream mime:application/octet-stream))
543
 
544
 (let ((stream (make-instance 'DE.SETF.UTILITY.IMPLEMENTATION::VECTOR-IO-STREAM)))
545
   (send-term '(_::test "asdf" |http://www.w3.org/2001/XMLSchema|:|dateTime|) stream)
546
   (de.setf.utility.implementation::stream-position stream 0)
547
   (receive-message stream mime:application/octet-stream))
548
 
549
 (let ((stream (make-instance 'DE.SETF.UTILITY.IMPLEMENTATION::VECTOR-IO-STREAM)))
550
   (send-term (vector (find-symbol "^" :keyword)
551
                (vector "10"
552
                        "http://www.w3.org/2001/XMLSchema#string")) stream)
553
   (princ (with-output-to-string (out)
554
            (map nil #'(lambda (byte)
555
                         (cond ((alphanumericp (code-char byte)) (write-char (code-char byte)  out))
556
                               (t (format out "\\x~2,'0x" byte))))
557
                 (DE.SETF.UTILITY.IMPLEMENTATION::vector-stream-vector stream))))
558
   (de.setf.utility.implementation::stream-position stream 0)
559
   (receive-message stream mime:application/octet-stream))
560
 
561
 (let ((stream (make-instance 'DE.SETF.UTILITY.IMPLEMENTATION::VECTOR-IO-STREAM)))
562
   (send-term (find-symbol "t" :spocq.a) stream)
563
   (princ (with-output-to-string (out)
564
            (map nil #'(lambda (byte)
565
                         (cond ((alphanumericp (code-char byte)) (write-char (code-char byte)  out))
566
                               (t (format out "\\x~2,'0x" byte))))
567
                 (DE.SETF.UTILITY.IMPLEMENTATION::vector-stream-vector stream))))
568
   (de.setf.utility.implementation::stream-position stream 0)
569
   (receive-message stream mime:application/octet-stream))
570
 
571
 (let ((stream (make-instance 'DE.SETF.UTILITY.IMPLEMENTATION::VECTOR-IO-STREAM)))
572
   (send-term (vector (find-symbol "^" :keyword)
573
                      "Hello, world!"
574
                      "http://www.w3.org/2001/XMLSchema#string") stream)
575
   (princ (with-output-to-string (out)
576
            (map nil #'(lambda (byte)
577
                         (cond ((and (/= 131 byte) (graphic-char-p (code-char byte)))
578
                                (write-char (code-char byte)  out))
579
                               (t (format out "\\x~2,'0x" byte))))
580
                 (DE.SETF.UTILITY.IMPLEMENTATION::vector-stream-vector stream))))
581
   (de.setf.utility.implementation::stream-position stream 0)
582
   (receive-message stream mime:application/octet-stream))
583
 
584
 (let ((stream (make-instance 'DE.SETF.UTILITY.IMPLEMENTATION::VECTOR-IO-STREAM)))
585
   (flet ((tsr (value)
586
            (format t "~%----------")
587
            (de.setf.utility.implementation::stream-position stream 0)
588
            (send-term (print value) stream)
589
            (print (de.setf.utility.implementation::get-vector-stream-vector stream))
590
            (de.setf.utility.implementation::stream-position stream 0)
591
            (print (receive-message stream mime:application/octet-stream))))
592
     (tsr '((1 . 2)))
593
     (tsr '(1 2))
594
 
595
     (tsr #(:|cast| :|spocq| :|bgpmatch|
596
             ("jhacker/foaf" "d96b4f60-cad7-012d-b773-123139183153"
597
              (ORG.DATAGRAPH.SPOCQ.ALGEBRA:|bgp|
598
               (ORG.DATAGRAPH.SPOCQ.ALGEBRA:|triple|
599
                ?::|x| #.(PURI:URI "http://example.org/ns#x") ?::|v|))
600
              ((:|bgp_id| . #:EC8A2E00-E8A9-11DF-A00B-12313A0075A4) (:|graphs|)
601
               (:|routing_key| . "ip-10-251-122-82.spocq.14717")))))))
602
 
603
 (with-open-file (stream "/tmp/bert.bin" :direction :output :if-exists :supersede
604
                         :if-does-not-exist :create :element-type '(unsigned-byte 8))
605
   (map nil #'(lambda (b) (write-byte b stream))
606
        #(131 104 4 100 0 4 99 97 115 116 100 0 5 115 112 111 99 113 100 0 8 98 103 112 109 97 116 99 104 108 0 0 0 4 104 2 100 0 1 34 109 0 0 0 12 106 104 97 99 107 101 114 47 102 111 97 102 104 2 100 0 1 34 109 0 0 0 36 54 99 52 51 52 101 50 48 45 99 97 101 52 45 48 49 50 100 45 97 102 99 50 45 49 50 51 49 51 98 49 48 48 53 57 50 108 0 0 0 2 100 0 3 98 103 112 108 0 0 0 4 100 0 6 116 114 105 112 108 101 104 2 100 0 1 63 100 0 1 120 104 2 100 0 1 60 109 0 0 0 27 104 116 116 112 58 47 47 101 120 97 109 112 108 101 46 111 114 103 47 116 104 105 110 103 115 35 112 104 2 100 0 1 63 100 0 1 118 106 106 104 3 100 0 4 98 101 114 116 100 0 4 100 105 99 116 108 0 0 0 3 104 2 100 0 6 98 103 112 95 105 100 100 0 36 55 70 67 57 54 48 56 48 45 69 56 66 54 45 49 49 68 70 45 65 48 48 66 45 49 50 51 49 51 65 48 48 55 53 65 52 104 2 100 0 6 103 114 97 112 104 115 106 104 2 100 0 11 114 111 117 116 105 110 103 95 107 101 121 104 2 100 0 1 34 109 0 0 0 28 105 112 45 49 48 45 50 53 49 45 49 50 50 45 56 50 46 115 112 111 99 113 46 49 52 55 49 55 106 106)))
607
 
608
 |#