Coverage report: /development/source/library/org/datagraph/spocq-shard/src/core/message-communication.lisp
| Kind | Covered | All | % |
| expression | 23 | 704 | 3.3 |
| branch | 0 | 20 | 0.0 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
3
(in-package :org.datagraph.spocq.implementation)
6
;;; message communication interface operations
8
(defgeneric receive-message (stream content-type &rest args)
9
(:documentation "Receive a message through a STREAM with the given CONTENT-TYPE
10
STREAM : stream : capable of supporting read operations (read-byte v/s read-char) respective
12
CONTEXT-TYPE : mime:*/* : in practice application/octet-stream or application/sparql+sse
13
VALUES : symbol : the request/response operator
14
list : a plist of the parsed message properties
16
The operator applies generically across all message types - requests, responses, and errors.
17
In particular, a store request may yield ether a response or an error.
18
Header fields are validated, if present, and the 'operator' and body are returned.")
19
(:argument-precedence-order content-type stream)
21
(:method ((stream t) (content-type t) &rest args)
22
(declare (ignore args))
23
(error "Invalid stream and/or content type for receive-message: (~s ~s)." stream content-type))
25
(:method :after ((stream t) (content-type t) &rest args)
26
(declare (ignore args))
27
;; record the read byte count for accounting
28
(incf *bytes-read* (or (stream-file-position stream) 0)))
30
(:method ((stream stream) (content-type t) &rest args)
31
"Given a stream and no over-riding content type, read the stream content and delegate to
33
If the string is zero-length, signal en end of file"
34
(declare (dynamic-extent args))
35
(apply #'receive-message (read-stream stream :eof-p t) content-type args)))
40
(defgeneric send-account-message (message-body stream content-type)
41
(:documentation "Encode a message with accounting iinformation as per content-type.")
42
(:argument-precedence-order content-type stream message-body))
45
(defgeneric send-error-message (body stream content-type)
46
(:documentation "Encode an error response body as per content-type.")
47
(:argument-precedence-order content-type stream body))
50
(defgeneric send-request-message (operation message-body stream content-type)
51
(:documentation "Encode a request message (operation x body) as per content-type.")
52
(:argument-precedence-order content-type stream message-body operation)
54
(:method :after ((operation t) (message-body t) (stream t) (content-type t))
55
;; record the read byte count for accounting
56
(incf *bytes-written* (or (stream-file-position stream) 0))))
58
(defparameter *send-response-message.compute-applicable-methods* nil)
59
(defparameter *send-response-message.sparql-results-content-type* mime:application/sparql-results+json
60
"Specify the default concrete encoding tu use if given an abstract type.")
62
(defgeneric send-response-message (operation message-body stream content-type)
63
(:documentation "Encode a response message (operation x body) as per accept content-type.
64
Use, in addition a presentation type to determine whether/how to wrap the response
65
content in a presentation document.")
66
(:argument-precedence-order message-body content-type stream operation)
68
(:method :around ((operation t) (message-body task) (stream t) (content-type t))
69
(when *send-response-message.compute-applicable-methods*
70
(print (compute-applicable-methods #'send-response-message
71
(list operation message-body stream content-type))))
74
(generate-accounting-note :encode)))
76
(:method :before ((operation t) (message-body t) (stream t) (content-type t))
77
;; flush any headers either
78
(force-output stream))
80
(:method :after ((operation t) (message-body task) (stream t) (content-type t))
81
;; record the read byte count for accounting
82
(incf *bytes-written* (or (stream-file-position stream) 0)))
84
(:method ((operation t) (message t) (stream stream) (content-type mime:application/sparql-results))
85
"Supply a default encoding if given a generic sparql-results response content type"
86
(unless (eq (type-of content-type)
87
(type-of *send-response-message.sparql-results-content-type*))
88
(log-warn "send-response-message: content type not recognized (~s). defaulting to sparql-results+json: ~s"
90
*send-response-message.sparql-results-content-type*))
91
(send-response-message operation message stream *send-response-message.sparql-results-content-type*))
93
(:method ((operation t) (message-body task) (stream t) (content-type t))
94
"As the default for any task, send the root generator"
95
(send-response-message operation (task-result-generator message-body) stream content-type))
97
(:method ((operation t) (object persistent-object) (stream t) (content-type t))
98
(send-response-message operation
99
(cons *construct-dimensions*
100
(encode-presentation-graph object))
104
(:method ((operation t) (query query) (channel amqp:channel) (content-type t))
105
(flet ((send-body (stream content-type)
106
(call-next-method operation query stream content-type)))
107
(declare (dynamic-extent #'send-body))
108
(let ((basic (amqp:channel.basic channel)))
109
(setf (amqp:basic-timestamp basic) (- (get-universal-time) amqp:*timestamp-epoch*)
110
(amqp:basic-reply-to basic) ""
111
(amqp:basic-correlation-id basic) (task-id query)
112
(amqp:basic-headers basic) (list :|operation| "response"
113
:|repository| (repository-id query)
114
:|task_id| (task-id query)
115
:|user_id| (task-user-id query))))
116
(unless (eq (amqp.u:channel-content-type channel) (task-response-content-type query))
117
(setf (amqp.u:channel-content-type channel) (task-response-content-type query)))
118
(amqp:publish channel :body #'send-body
119
:exchange (task-request-exchange query)
120
:routing-key (task-request-reply-routing-key query)
121
:headers (list :|operation| "response"
122
:|repository| (repository-id query)
123
:|task_id| (task-id query)
124
:|user_id| (task-user-id query)
125
:transfer-encoding "chunked"))))
130
;;; request/response operators
132
(defgeneric send-error-response (task stream condition &key detail)
134
(:method ((task t) (channel t) (condition t) &key &allow-other-keys)
135
(log-warn "invalid task for error response: ~s: ~a." task condition))
137
(:method ((task task) (channel t) (condition condition)
138
&key (detail (format nil "task '~a' signaled:~%~a" (task-id task) condition)))
139
(send-error-response task channel (symbol-name (type-of condition)) :detail detail))
141
(:method ((task task) (channel amqp:channel) (condition string)
142
&key (detail (format nil "task ~s signaled:~%~a" (task-id task) condition)))
143
(labels ((send-body (stream content-type)
144
(send-error-message (vector :|user| 100
150
(declare (dynamic-extent #'send-body))
151
(log-info "query: ~s. error: ~a" (task-id task) condition)
152
(let ((basic (amqp:channel.basic channel)))
153
(setf (amqp:basic-timestamp basic) (- (get-universal-time) amqp:*timestamp-epoch*)
154
(amqp:basic-reply-to basic) ""
155
(amqp:basic-correlation-id basic) (task-id task)))
156
(unless (eq (amqp.u:channel-content-type channel) (task-response-content-type task))
157
(setf (amqp.u:channel-content-type channel) (task-response-content-type task)))
158
(let ((*package* (find-package :spocq.i)))
159
(setf (amqp.u:channel-content-type channel) mime:application/sparql-query+sse)
160
(amqp:publish channel :body #'send-body
161
:exchange (task-request-exchange task)
162
:routing-key (task-request-error-routing-key task)
163
:headers (list :|operation| "error"
164
:|condition| condition
165
:|task_id| (task-id task)
166
:|user_id| (task-user-id task)
167
:package (string :spocq.i))))))
169
(:method ((task task) (stream stream) (condition string)
170
&key (detail (format nil "task '~a' signaled:~%~a" (task-id task) condition)))
171
(send-error-message (vector :|user| 100
176
*request-content-type*)))
179
(defgeneric receive-query-request (stream content-type &rest args)
180
(:method ((stream stream) (content-type t) &rest args)
181
(apply #'receive-message stream content-type args)))
184
(defgeneric send-algebra-response (query stream result)
185
(:method ((query query) (channel amqp:channel) result)
186
(flet ((send-body (stream content-type)
187
(send-response-message :|response|
191
(declare (dynamic-extent #'send-body))
192
(let ((basic (amqp:channel.basic channel)))
193
(setf (amqp:basic-timestamp basic) (- (get-universal-time) amqp:*timestamp-epoch*)
194
(amqp:basic-reply-to basic) ""
195
(amqp:basic-correlation-id basic) (task-id query)
196
(amqp:basic-headers basic) (list :|operation| "response"
197
:|repository| (repository-id query)
198
:|task_id| (task-id query)
199
:|user_id| (task-user-id query))))
200
(setf (amqp.u:channel-content-type channel) mime:application/sparql-query+sse)
201
(amqp:publish channel :body #'send-body
202
:exchange (task-request-exchange query)
203
:routing-key (task-request-reply-routing-key query)
204
:headers (list :|operation| "response"
205
:|repository| (repository-id query)
206
:|task_id| (task-id query)
207
:|user_id| (task-user-id query)
208
:transfer-encoding "chunked"))))
210
(:method ((query query) (stream stream) result)
211
(send-response-message :|response|
214
(task-response-content-type query))))
217
(defgeneric send-query-response (query stream result)
219
(:method ((query query) (channel amqp:channel) result)
220
(flet ((send-body (stream content-type)
221
(send-response-message :|response|
225
(declare (dynamic-extent #'send-body))
226
(let ((basic (amqp:channel.basic channel)))
227
(setf (amqp:basic-timestamp basic) (- (get-universal-time) amqp:*timestamp-epoch*)
228
(amqp:basic-reply-to basic) ""
229
(amqp:basic-correlation-id basic) (task-id query)
230
(amqp:basic-headers basic) (list :|operation| "response"
231
:|repository| (repository-id query)
232
:|task_id| (task-id query)
233
:|user_id| (task-user-id query))))
234
(unless (eq (amqp.u:channel-content-type channel) (task-response-content-type query))
235
(setf (amqp.u:channel-content-type channel) (task-response-content-type query)))
236
(amqp:publish channel :body #'send-body
237
:exchange (task-request-exchange query)
238
:routing-key (task-request-reply-routing-key query)
239
:headers (list :|operation| "response"
240
:|repository| (repository-id query)
241
:|task_id| (task-id query)
242
:|user_id| (task-user-id query)
243
:transfer-encoding "chunked"))))
245
(:method ((query query) (stream stream) result)
246
(send-response-message :|response|
249
(task-response-content-type query)))
251
(:method ((task task) (stream null) (result t))
252
"Given a null destination, just return the task"
256
(defgeneric send-trace-response (query stream data)
257
(:documentation "Encode a query trace tree as the response")
259
(:method ((query query) (channel amqp:channel) data)
260
(flet ((send-body (stream content-type)
261
(send-response-message :|response|
265
(declare (dynamic-extent #'send-body))
266
(let ((basic (amqp:channel.basic channel)))
267
(setf (amqp:basic-timestamp basic) (- (get-universal-time) amqp:*timestamp-epoch*)
268
(amqp:basic-reply-to basic) ""
269
(amqp:basic-correlation-id basic) (task-id query)
270
(amqp:basic-headers basic) (list :|operation|"response"
271
:|repository| (repository-id query)
272
:|task_id| (task-id query)
273
:|user_id| (task-user-id query))))
274
(unless (eq (amqp.u:channel-content-type channel) (task-response-content-type query))
275
(setf (amqp.u:channel-content-type channel) (task-response-content-type query)))
276
(amqp:publish channel :body #'send-body
277
:exchange (task-request-exchange query)
278
:routing-key (task-request-reply-routing-key query)
279
:headers (list :|operation| "response"
280
:|repository| (repository-id query)
281
:|task_id| (task-id query)
282
:|user_id| (task-user-id query)
283
:transfer-encoding "chunked"))))
285
(:method ((query query) (stream stream) data)
286
(send-response-message :|response|
289
(task-response-content-type query))))
292
(defgeneric send-store-request (query stream variable form &key repository)
293
(:documentation "Send an individual graph pattern request to a store.
294
Use a common exchange and route it as per the repository. By default assume the repository from the query.
295
Send properties to specify
296
- bgp_id : an unique id to identify the bgp in the response
297
- graphs : the graphs specified as datasets in the original sparql query
298
- routing_key : an unique id to route the response through the store exchange back to the requesing engine.")
299
#+(or) ; just for tracing
300
(:method ((query t) (destination t) (variable t) form )
301
(assert (typep form '(or (cons (eql spocq.a:|bgp|)) (cons (eql spocq.a:|graph|)))) ()
302
"Invalid store request: ~s ~s." variable form))
304
(:method ((query query) (channel amqp:channel) variable form &key (repository (repository-id query)))
305
(flet ((send-body (stream content-type)
307
(send-request-message :|bgpmatch|
311
(declare (dynamic-extent #'send-body))
312
;; publish the store bgp request to the channel's default exchange - which have been configured
313
;; at process start, and use as the routing key the repository id presented with the query.
314
;; 2020-10-26 no timestamps to store
315
(let ((basic (amqp:channel.basic channel)))
316
(setf (amqp:basic-timestamp basic) 0
317
(amqp:basic-reply-to basic) (task-store-routing-key query)
318
(amqp:basic-correlation-id basic) (task-id query)
319
(amqp:basic-headers basic) `(:|operation| "bgpmatch"
320
:|repository| ,repository
321
:|task_id| ,(string (task-id query))
322
:|user_id| ,(task-user-id query)
323
:|bgp_id| ,(string variable)
324
:|graphs| ,(task-dataset-graphs query)
325
:|routing_key| ,(task-store-routing-key query))))
326
(unless (eq (amqp.u:channel-content-type channel) *store-content-type*)
328
(format *trace-output* "~&~s:for send ~s -> ~s~%"
329
channel (amqp.u:channel-content-type channel) *store-content-type*)
330
(setf (amqp.u:channel-content-type channel) *store-content-type*))
331
(amqp:publish channel :body #'send-body
332
:exchange *store-exchange*
333
;; :routing-key (task-store-bgp-routing-key query)
334
:routing-key (concatenate 'string "bgp." (string variable))
335
:headers (list :|operation| "bgpmatch"
336
:|repository| repository
337
:|task_id| (string (task-id query))
338
:|user_id| (task-user-id query)
339
:|bgp_id| (string variable)
340
:|graphs| (task-dataset-graphs query)
341
:|routing_key| (task-store-routing-key query)
345
(:method ((query query) (stream stream) variable form &key repository)
346
(declare (ignore repository))
347
(send-request-message :|bgpmatch|
350
*store-content-type*)))
352
(defgeneric receive-store-request (stream content-type)
353
(:method ((stream stream) (content-type t))
354
(multiple-value-bind (operation repository task-id bgp-form properties)
355
(receive-message stream content-type)
356
(destructuring-bind (&key (request-routing-key repository)
357
(accept content-type)
358
(request-exchange (amqp:exchange-exchange *store-exchange*))
361
(apply #'make-instance operation
362
:repository repository
366
:request-exchange request-exchange
367
:request-routing-key request-routing-key
371
(defgeneric receive-store-response (stream content-type)
372
(:method ((stream stream) (content-type t))
373
(multiple-value-bind (operation body)
374
(receive-message stream content-type)
375
(cond ((string-equal operation :|bgpfield|)
376
(make-instance 'store-reply :operation operation :term body))
377
((string-equal operation :|error|)
378
(make-instance 'error-task :operation 'spocq:|error| :term body))
380
(spocq.e::message-syntax-error :expression (list operation body)))))))
383
(defgeneric send-store-response (query stream result)
384
(:method ((task bgp-match) (channel amqp:channel) result)
385
(flet ((send-body (stream content-type)
386
(send-response-message :|bgpfield|
390
(declare (dynamic-extent #'send-body))
391
(let ((basic (amqp:channel.basic channel)))
392
(setf (amqp:basic-timestamp basic) (- (get-universal-time) amqp:*timestamp-epoch*)
393
(amqp:basic-reply-to basic) ""
394
(amqp:basic-correlation-id basic) (task-id task)
395
(amqp:basic-headers basic) `(:|operation| :|bgpfield|
396
:|repository| ,(repository-id task)
397
:|task_id| ,(task-id task)
398
:|user_id| ,(task-user-id task)
399
:|bgp_id| ,(task-bgp-id task))))
400
(unless (eq (amqp.u:channel-content-type channel) (task-response-content-type task))
401
(setf (amqp.u:channel-content-type channel) (task-response-content-type task)))
402
(amqp:publish channel :body #'send-body
403
:exchange (task-request-exchange task)
404
:routing-key (task-request-routing-key task)
405
:headers (list :|operation| "bgpfield"
406
:|repository| (repository-id task)
407
:|task_id| (task-id task)
408
:|user_id| (task-user-id task)
409
:|bgp_id| (task-bgp-id task)))))
411
(:method ((task bgp-match) (stream stream) result)
412
(send-response-message :|bgpfield|
415
*store-content-type*)))
418
(defgeneric receive-account-note (stream content-type)
419
#+(or) ; as stub pattern only
420
(:method ((stream stream) (content-type t))
421
(multiple-value-bind (operation body)
422
(receive-message stream content-type)
423
(cond ((string-equal operation :|account|)
424
(let* ((basic (amqp:channel.basic stream))
425
(timestamp (amqp:basic-timestamp basic)))
426
(concatenate 'list body (list timestamp))))
428
(spocq.e::message-syntax-error :expression (list operation body)))))))
431
(defgeneric send-account-note (query state accounting-io)
432
(:documentation "Generate an accounting record for task resource usage in the present
433
state and update it to the new one.")
435
(:method ((task t) (state t) (io t))
436
(log-warn "invalid task for accounting note: ~s." task))
438
(:method ((task task) new-state (io null)))
440
(:method ((task task) new-state (io amqp:channel))
441
"Capture the usage since the last mark, send the values and the state to the accounting queue."
442
(let ((basic (amqp:channel.basic io))
443
(*package* (find-package :spocq.i))
444
(now (get-universal-time)))
445
(flet ((send-progress-note (stream content-type)
446
(let ((message (list* :|state| (task-state task) (accounting-properties))))
447
(log-debug "send-account-note: query: ~a note: ~a" (task-id task) message)
448
(send-account-message message stream content-type)))
449
(send-completion-note (stream content-type)
450
(send-account-message `(:|state| ,new-state)
453
(declare (dynamic-extent #'send-progress-note #'send-completion-note))
454
(unless (eq (amqp.u:channel-content-type io) mime:application/sse)
455
(setf (amqp.u:channel-content-type io) mime:application/sse))
457
(setf (amqp:basic-timestamp basic) (- now amqp:*timestamp-epoch*)
458
(amqp:basic-reply-to basic) ""
459
(amqp:basic-type basic) "account"
460
(amqp:basic-correlation-id basic) (task-id task)
461
(amqp:basic-message-id basic) (format nil "~a.~/format-iso-time/" *thread-name* now)
462
(amqp:basic-app-id basic) *service-name*
463
(amqp:basic-headers basic) `(:|operation| :|account|
464
:|repository| ,(repository-id task)
465
:|task_id| ,(task-id task)
466
:|user_id| ,(task-user-id task)))
467
(amqp:publish io :body #'send-progress-note
468
:exchange *accounting-exchange*
469
:routing-key (task-id task)
470
:headers (list :|operation| "account"
471
:|repository| (repository-id task)
472
:|task_id| (task-id task)
473
:|user_id| (task-user-id task)
474
:package (string :spocq.i)))
476
((:terminate :complete) ; indicate completion as its own note
477
(amqp:publish io :body #'send-completion-note
478
:exchange *accounting-exchange*
479
:routing-key (task-id task)
480
:headers (list :|operation| "account"
481
:|repository| (repository-id task)
482
:|task_id| (task-id task)
483
:|user_id| (task-user-id task)
484
:package (string :spocq.i))))
487
(:method ((task task) (properties list) (io amqp:channel))
488
(let ((basic (amqp:channel.basic io))
489
(*package* (find-package :spocq.i))
490
(now (get-universal-time)))
491
(flet ((send-progress-note (stream content-type)
492
(log-debug "query: ~a accounting: ~a" (task-id task) properties)
493
(send-account-message properties stream content-type)))
494
(declare (dynamic-extent #'send-progress-note))
495
(unless (eq (amqp.u:channel-content-type io) mime:application/sse)
496
(setf (amqp.u:channel-content-type io) mime:application/sse))
498
(setf (amqp:basic-timestamp basic) (- now amqp:*timestamp-epoch*)
499
(amqp:basic-reply-to basic) ""
500
(amqp:basic-type basic) "account"
501
(amqp:basic-correlation-id basic) (task-id task)
502
(amqp:basic-message-id basic) (format nil "~a.~/format-iso-time/" *thread-name* now)
503
(amqp:basic-app-id basic) *service-name*
504
(amqp:basic-headers basic) `(:|operation| :|account|
505
:|repository| ,(repository-id task)
506
:|task_id| ,(task-id task)
507
:|user_id| ,(task-user-id task)))
508
(amqp:publish io :body #'send-progress-note
509
:exchange *accounting-exchange*
510
:routing-key (task-id task)
511
:headers (list :|operation| "account"
512
:|repository| (repository-id task)
513
:|task_id| (task-id task)
514
:|user_id| (task-user-id task)
515
:package (string :spocq.i)))))))
519
(trace receive-query-request send-query-response receive-store-request send-store-response
520
send-store-request receive-store-response
523
;; nb. the sse extensions are defined for streams only
525
(let ((stream (make-instance 'DE.SETF.UTILITY.IMPLEMENTATION::VECTOR-IO-STREAM)))
526
(send-request-message :|bgpmatch|
527
`("jhacker/foaf" "bfe63b60-ca75-012d-8bf1-123139180561"
528
(ORG.DATAGRAPH.SPOCQ.ALGEBRA:|bgp|
529
(ORG.DATAGRAPH.SPOCQ.ALGEBRA:|triple|
530
,(PURI:URI "http://example.org/ns#x") ?::|p| -18))
531
((:|bgp_id| . #:D3509580-E847-11DF-A031-12313A0075A4) (:|graphs|)
532
(:|routing_key| . "ip-10-251-122-82.spocq.8490")))
534
mime:application/octet-stream)
535
(de.setf.utility.implementation::stream-position stream 0)
536
(receive-message stream mime:application/octet-stream))
538
(let ((stream (make-instance 'DE.SETF.UTILITY.IMPLEMENTATION::VECTOR-IO-STREAM)))
539
(send-term #(:|cast| :|spocq| :|bgpmatch| (3/2))
541
(de.setf.utility.implementation::stream-position stream 0)
542
(receive-message stream mime:application/octet-stream))
544
(let ((stream (make-instance 'DE.SETF.UTILITY.IMPLEMENTATION::VECTOR-IO-STREAM)))
545
(send-term '(_::test "asdf" |http://www.w3.org/2001/XMLSchema|:|dateTime|) stream)
546
(de.setf.utility.implementation::stream-position stream 0)
547
(receive-message stream mime:application/octet-stream))
549
(let ((stream (make-instance 'DE.SETF.UTILITY.IMPLEMENTATION::VECTOR-IO-STREAM)))
550
(send-term (vector (find-symbol "^" :keyword)
552
"http://www.w3.org/2001/XMLSchema#string")) stream)
553
(princ (with-output-to-string (out)
554
(map nil #'(lambda (byte)
555
(cond ((alphanumericp (code-char byte)) (write-char (code-char byte) out))
556
(t (format out "\\x~2,'0x" byte))))
557
(DE.SETF.UTILITY.IMPLEMENTATION::vector-stream-vector stream))))
558
(de.setf.utility.implementation::stream-position stream 0)
559
(receive-message stream mime:application/octet-stream))
561
(let ((stream (make-instance 'DE.SETF.UTILITY.IMPLEMENTATION::VECTOR-IO-STREAM)))
562
(send-term (find-symbol "t" :spocq.a) stream)
563
(princ (with-output-to-string (out)
564
(map nil #'(lambda (byte)
565
(cond ((alphanumericp (code-char byte)) (write-char (code-char byte) out))
566
(t (format out "\\x~2,'0x" byte))))
567
(DE.SETF.UTILITY.IMPLEMENTATION::vector-stream-vector stream))))
568
(de.setf.utility.implementation::stream-position stream 0)
569
(receive-message stream mime:application/octet-stream))
571
(let ((stream (make-instance 'DE.SETF.UTILITY.IMPLEMENTATION::VECTOR-IO-STREAM)))
572
(send-term (vector (find-symbol "^" :keyword)
574
"http://www.w3.org/2001/XMLSchema#string") stream)
575
(princ (with-output-to-string (out)
576
(map nil #'(lambda (byte)
577
(cond ((and (/= 131 byte) (graphic-char-p (code-char byte)))
578
(write-char (code-char byte) out))
579
(t (format out "\\x~2,'0x" byte))))
580
(DE.SETF.UTILITY.IMPLEMENTATION::vector-stream-vector stream))))
581
(de.setf.utility.implementation::stream-position stream 0)
582
(receive-message stream mime:application/octet-stream))
584
(let ((stream (make-instance 'DE.SETF.UTILITY.IMPLEMENTATION::VECTOR-IO-STREAM)))
586
(format t "~%----------")
587
(de.setf.utility.implementation::stream-position stream 0)
588
(send-term (print value) stream)
589
(print (de.setf.utility.implementation::get-vector-stream-vector stream))
590
(de.setf.utility.implementation::stream-position stream 0)
591
(print (receive-message stream mime:application/octet-stream))))
595
(tsr #(:|cast| :|spocq| :|bgpmatch|
596
("jhacker/foaf" "d96b4f60-cad7-012d-b773-123139183153"
597
(ORG.DATAGRAPH.SPOCQ.ALGEBRA:|bgp|
598
(ORG.DATAGRAPH.SPOCQ.ALGEBRA:|triple|
599
?::|x| #.(PURI:URI "http://example.org/ns#x") ?::|v|))
600
((:|bgp_id| . #:EC8A2E00-E8A9-11DF-A00B-12313A0075A4) (:|graphs|)
601
(:|routing_key| . "ip-10-251-122-82.spocq.14717")))))))
603
(with-open-file (stream "/tmp/bert.bin" :direction :output :if-exists :supersede
604
:if-does-not-exist :create :element-type '(unsigned-byte 8))
605
(map nil #'(lambda (b) (write-byte b stream))
606
#(131 104 4 100 0 4 99 97 115 116 100 0 5 115 112 111 99 113 100 0 8 98 103 112 109 97 116 99 104 108 0 0 0 4 104 2 100 0 1 34 109 0 0 0 12 106 104 97 99 107 101 114 47 102 111 97 102 104 2 100 0 1 34 109 0 0 0 36 54 99 52 51 52 101 50 48 45 99 97 101 52 45 48 49 50 100 45 97 102 99 50 45 49 50 51 49 51 98 49 48 48 53 57 50 108 0 0 0 2 100 0 3 98 103 112 108 0 0 0 4 100 0 6 116 114 105 112 108 101 104 2 100 0 1 63 100 0 1 120 104 2 100 0 1 60 109 0 0 0 27 104 116 116 112 58 47 47 101 120 97 109 112 108 101 46 111 114 103 47 116 104 105 110 103 115 35 112 104 2 100 0 1 63 100 0 1 118 106 106 104 3 100 0 4 98 101 114 116 100 0 4 100 105 99 116 108 0 0 0 3 104 2 100 0 6 98 103 112 95 105 100 100 0 36 55 70 67 57 54 48 56 48 45 69 56 66 54 45 49 49 68 70 45 65 48 48 66 45 49 50 51 49 51 65 48 48 55 53 65 52 104 2 100 0 6 103 114 97 112 104 115 106 104 2 100 0 11 114 111 117 116 105 110 103 95 107 101 121 104 2 100 0 1 34 109 0 0 0 28 105 112 45 49 48 45 50 53 49 45 49 50 50 45 56 50 46 115 112 111 99 113 46 49 52 55 49 55 106 106)))