Coverage report: /development/source/library/org/datagraph/spocq-shard/src/core/start.lisp
| Kind | Covered | All | % |
| expression | 451 | 1145 | 39.4 |
| branch | 58 | 156 | 37.2 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
3
(in-package :org.datagraph.spocq.implementation)
5
(:documentation "start up"
12
;;; for info on exit code see among others
13
;;; http://manpages.ubuntu.com/manpages/zesty/man5/systemd.service.5.html
15
(defun exit-lisp (&optional (return-code 0))
17
(declare (ignore return-code))
19
(ccl:quit return-code)
23
(lispworks:quit :status return-code :confirm nil :return nil :ignore-errors-p t)
25
(funcall (intern (if (equal (lisp-implementation-version) "1.0.46") "QUIT" "EXIT") :sb-ext)
26
(if (equal (lisp-implementation-version) "1.0.46") :unix-status :code)
29
#+clozure (eval-when (:compile-toplevel)
30
(error "need the signal handler definition operators"))
32
(defvar *signal-state* nil)
34
(defun sigterm-handler (signal code context)
35
(declare (ignore signal code context))
37
(:terminate ; if repeated exit immediately
38
(log-notice "sigterm-handler: repeated SIGTERM ~a." (iso-time))
41
(log-notice "sigterm-handler: initial SIGTERM ~a." (iso-time))
42
(setq *signal-state* :sigterm)
43
(setq *run-state* :terminate))))
45
(defun sighup-handler (signal code context)
46
(declare (ignore signal code context))
47
(setq *run-state* :restart))
49
(defun enable-interrupt (signal handler)
51
(declare (ignore signal handler))
53
(system:set-signal-handler (ecase signal
54
(:sigterm SYSTEM::UNIX-SIGTERM))
57
(sb-unix::enable-interrupt (ecase signal
58
(:sigterm sb-unix::sigterm))
61
;; no longer used as it creates a config requirement which is troublesome
62
;; on reboot for virtual hosts
63
(defun write-pidfile (&optional (pathname *pidfile-pathname*))
64
(unless (pathname-name pathname)
65
(let* ((executable (getarg 0))
66
(name (pathname-name executable)))
67
(setf pathname (make-pathname :name name :type "pid" :defaults pathname))))
68
(setq *pidfile-pathname* pathname)
69
(with-open-file (pidfile-stream pathname :direction :output :if-exists nil :if-does-not-exist :create)
70
(format pidfile-stream "~d~%" (getpid))
74
(defun maybe-exit-on-error (&optional (code 70))
75
(cond ((and *exit-on-errors*
76
(not (and (streamp *terminal-io*)
77
(interactive-stream-p *terminal-io*))))
78
;;; if exiting because a thread is hung, this does not work. (stop-threads)
79
(log-info "Stop: exit-on-errors ~a." (iso-time))
82
(log-info "continue: exit-on-errors ~a: ~a."
84
(list *exit-on-errors* *terminal-io* (streamp *terminal-io*)
85
(interactive-stream-p *terminal-io*))))))
87
(defun complete-output (destination)
88
"Iff the destination is standard or error output, write a null marker.
89
In all cases finish output to ensure the reader has everything."
91
(stream (fresh-line destination)
92
;; add a termination marker to separate results for persistent back-end processors
93
(when (or (eq destination *error-output*) (eq destination *standard-output*))
94
(write-char #\null destination))
95
(finish-output destination))
97
(log-debug "complete-output: ~s" destination))
100
;;; initial- and re-start
102
(defun error-condition-channel ()
103
(or *error-condition-channel*
104
(setf *error-condition-channel* (make-pool :name "error conditions"))))
105
(defun accounting-notes ()
106
(or *accounting-notes*
107
(setf *accounting-notes* (make-array 32 :adjustable t :fill-pointer 0))))
109
(defun initialize-channels ()
111
;; (setq *accounting-notes* nil)
112
;; (setq *error-condition-channel* nil)
113
;; use a multi-reader/multi-writer queue to carry tasks to threads for them to process per-task operations
114
(setq *algebra-task-channel* (make-pool :name "algebra tasks"))
115
(setq *service-channel* (make-pool :name "service"))
117
;; warm-up the type system, but do not instantiate
118
(let ((tmp-channel (allocate-instance (find-class *make-channel.class*))))
119
(assert-argument-types initialize-channels
120
(tmp-channel channel))))
123
(defun initialize-spocq (&key (title (getarg "--title")) debugger)
126
(setq *run-state* :initialize)
127
(setq *start-timestamp* (iso-time))
128
(enable-interrupt :sigterm #'sigterm-handler)
129
;; randomize clock component of uuids
130
(setq uuid::*clock-seq* 0)
135
(and (streamp *terminal-io*) (interactive-stream-p *terminal-io*)))
136
;; avoid hanging with stack overflows &co
137
(sb-ext:disable-debugger))
138
(setq sb-impl::*default-external-format* :utf-8)
139
(proclaim '(sb-ext:muffle-conditions sb-ext:compiler-note))
140
;; double th gc interval
141
(setf (sb-ext:bytes-consed-between-gcs) 750000000
142
(sb-ext::generation-number-of-gcs-before-promotion 0) 2)
144
(setq *service-name* (or (getarg "--service-name") *service-name*))
145
(setq *configuration-pathname* (or (getarg "--spocqinit") *configuration-pathname*))
146
(setq *log-pathname* (or (getarg "--spocqlog") *log-pathname*))
147
(open-log :options '(:pid) :identity *service-name* :title title)
148
(log-notice! "init: Build ~a/~a; Parser: ~s."
149
*build-revision* *build-timestamp* *query-parser*)
151
;;; process initialization has two phases
152
;;; first, load the installation configuration
153
(unless (load-system-configuration *configuration-pathname*)
154
(error "Cannot load system configuration: ~s." *configuration-pathname*))
155
;;; then apply any settings present on the process' command-line
156
(unless (load-command-line-configuration)
157
(error "Cannot load command-line configuration."))
158
(initialize-built-in-repositories)
159
(initialize-channels)
160
;;; load and initialize libraries
161
(handler-bind ((warning #'muffle-warning))
162
(unless (ignore-errors (ensure-directories-exist *log-pathname*))
163
(error "Cannot locate log file: ~s." *log-pathname*))
164
#+sbcl (pushnew 'log-memory-usage sb-ext:*after-gc-hooks*)
165
#+lispworks (sys:set-automatic-gc-callback nil 'log-memory-usage)
167
;; bind vocabulary uri objects to their lexical and store designators
168
(initialize-store *class.repository* *store-uri* *rdfcache-pathname*)
170
#+use-monkeylib-bcrypt ;; this conflicts with dydra-ndk
171
(when (find-package :bcrypt)
172
(cffi:load-foreign-library (intern (symbol-name :libbcrypt) :bcrypt) :search-PATH #P"/opt/dydra/lib") )
175
;; work around clos errors
176
(ignore-errors (make-instance 'query :repository-id "system/system")))
179
(setq *run-state* :initialize)
180
(open-log :options '(:pid) :identity *service-name* :title title)
181
(log-notice! "reinit: Build ~a/~a; Parser: ~s."
182
*build-revision* *build-timestamp* *query-parser*)
183
(unless (load-system-configuration *configuration-pathname*)
184
(error "Cannot load configuration: ~s." *configuration-pathname*))
185
;; reset the processor instance
186
(reinitialize-store *class.repository*)))
187
;; reset the processor instance
188
(setq *request-processor* nil)
193
(let ((result (main-task-loop (command-line-argument-list))))
195
(query ; normal completion, return the reqult and allow runtime to exit
197
(sb-sys:interactive-interrupt
199
(condition ; some error occurred
205
(defun stop-threads ()
206
;; in case being done from a top-level, first kill the management thread
207
;; otherwise it will start killing threads as well
208
(when (and *management-thread*
209
(not (eq *management-thread* (bt:current-thread)))
210
(bt:thread-alive-p *management-thread*))
211
(bt:destroy-thread *management-thread*)
212
#-sbcl(bt:join-thread *management-thread*)
213
#+sbcl(sb-thread:join-thread *management-thread* :default nil))
214
;; disable further requests
215
(setq *run-state* :terminate)
216
;; but - don't wait for them to stop
217
;; first cancel those which have the handler this makes them available
218
(cancel-algebra-threads)
219
;; now terminate everything
220
(dolist (thread (bt:all-threads))
221
(unless ;; (or (eq (bt:current-thread) thread) (eq *management-thread* thread))
222
(eq (bt:current-thread) thread)
223
;; allow for race conditions
224
(multiple-value-bind (result condition)
225
(ignore-errors (progn
226
(when (bt:thread-alive-p thread)
227
(bt:destroy-thread thread))
228
#-sbcl(bt:join-thread thread)
229
#+sbcl(sb-thread:join-thread thread :default nil :timeout 1)))
230
(declare (ignore result))
232
(log-debug "stop-threads: error in thread termination: ~a" condition))))))
235
(log-info "stopping ...")
236
(when *algebra-task-channel*
237
(loop for task in (channel-get-all *algebra-task-channel*)
238
do (log-warn "stop with task pending operation: ~a" task)))
239
(when *service-channel* (channel-get-all *service-channel*))
240
(map-queries #'(lambda (q) (handler-case (terminate-task q)
242
(warn "Error in termination: ~a" c)
243
(setf (task-state q) :terminate)))))
245
(log-info "stopped."))
247
(defun stop-idle-threads ()
248
(dolist (thread (bt:all-threads))
249
(unless (or (eq *management-thread* thread)
250
(eq thread (bt:current-thread))
251
(sb-thread:symbol-value-in-thread '*task* thread nil))
252
(when (bt:thread-alive-p thread)
253
;; allow for race conditions
254
(ignore-errors (bt:destroy-thread thread)
255
#-sbcl(bt:join-thread thread)
256
#+sbcl(sb-thread:join-thread thread :default nil))))))
259
"Shut the engine down by draining queries - waiting for processes to complete, and then exiting."
260
(setq *run-state* :terminate)
261
(loop (when (zerop (query-count)) (return))
264
(log-info "terminated."))
267
(defun run-toplevel (&rest args &key (toplevel-function *toplevel-function* tl-s))
269
(setf args (copy-list args))
270
(remf args :toplevel-function))
271
(log-info "starting toplevel [~a]... (~s . ~s)." *thread-name* toplevel-function args)
272
(handler-bind ((error (lambda (condition)
273
;; (break "toplevel error: ~a" condition)
274
(if (and (streamp *terminal-io*) (interactive-stream-p *terminal-io*))
275
(break "Error at toplevel: ~a." condition)
276
(log-stacktrace "Error at toplevel: ~a." condition))
279
(write-log :error "abstract channel class precedence: ~s"
280
(sb-mop:class-precedence-list (find-class 'amqp:channel)))
281
(write-log :error "concrete channel sub-classes' precedence: ~s"
282
(mapcar #'sb-mop:class-precedence-list
283
(sb-mop:class-direct-subclasses (find-class 'amqp:channel))))
284
(write-log :error "abstract connection class precedence: ~s"
285
(sb-mop:class-precedence-list (find-class 'amqp:connection)))
286
(write-log :error "concrete channel sub-classes' precedence: ~s"
287
(mapcar #'sb-mop:class-precedence-list
288
(sb-mop:class-direct-subclasses (find-class 'amqp:connection)))))
290
(return-from run-toplevel nil))))
291
(apply toplevel-function args)))
293
(defun run-toplevel-loop (&rest args)
294
(log-info "Thread [~@[~a~]] run-toplevel-loop starting." *thread-name*)
295
(loop while (apply #'run-toplevel args)
298
(log-info "Thread re-loading configuration.")
299
(unless (load-configuration *configuration-pathname*)
300
(log-error "Cannot load configuration: ~s." *configuration-pathname*))
301
(setq *run-state* :process))))
302
(log-info "Thread [~@[~a~]] run-toplevel-loop returning." *thread-name*))
304
(defun run-management-thread (&key &allow-other-keys)
305
"The HTTP server management thread does no autonomous processing - that is handled by an admin
306
response function. It handles just the statistics and termination."
308
(log-info "starting management thread...")
309
(setq spocq.i:*management-thread* (bt:current-thread))
310
(setq spocq.i:*run-state* :run)
311
(loop (unless (management-idle-handler) (return spocq.i:*run-state*))
312
(sleep *management-thread-interval*)))
314
(defun management-idle-handler ()
315
"Consolidated implementation for standard management tasks.
316
Return nil if state is terminate and no queries remain."
317
;; now handled in scripts explicitly, which means the channel will remain empty
318
(publish-accounting-notes (get-accounting-notes) *accounting-destination*)
319
(when (plusp (channel-count *error-condition-channel*))
320
(let ((error-notes (channel-get-all *error-condition-channel*)))
321
(log-error-notes-trig error-notes)))
322
(when (and (zerop (logand (ash (get-internal-real-time) -10) #x1f))
323
(zerop (hash-table-count *query-registry*)))
324
;; if no query is running, probe the store every +/- thirty seconds
326
;;; eventually consumes lmdb readers?
327
;;; (with-open-repository (*system-repository-id* :normal-disposition :abort) t)
328
(probe-transaction *system-repository-id*)
330
(when *exit-on-errors*
331
(setq *run-state* :terminate)
332
(log-error "management-thread-step: termination due to condition: ~a" condition)
333
(format *error-output* "management-thread-step: termination due to condition: ~a" condition)
334
(spocq.i::exit-lisp 70)))))
335
(handler-case (make-v1-uuid-string)
337
(log-error "management-idle-handler: uuid timeout, terminating: ~a" c)
338
(setf *run-state* :terminate)))
339
(constrain-resources)
342
;; once there are no more active queries, indicate to terminate
343
(when (zerop (query-count))
344
(log-info "terminate: management thread terminating other threads.")
346
(log-info "terminate: management thread returning@ ~a tasks." *task-count*)
351
(defun start-management-thread (&key (name "management"))
352
(or *management-thread*
353
(setq *management-thread*
354
(bt:make-thread *toplevel-function*
356
:initial-bindings `((*thread-name* . ,name)
358
(*class.request-processor* . ',*class.request-processor*))))))
360
(defun management-thread-toplevel ()
361
(loop (management-thread-step (request-processor))
362
(sleep *management-thread-interval*)))
365
(defgeneric management-thread-step (processor)
366
(:method ((processor t))
371
(log-info "Stop: SIGTERM ~a." (iso-time))
374
(constrain-resources)))))
376
(defun run-processing-threads (&optional (thread-specifications *thread-specifications*))
377
"create threads as per configuration plus a management thread
378
if threads are specified, start each and then run the mangement loop.
379
iff the session is interactive initiate a repl.
380
otherwise just run the management thread loop."
381
(loop for (name function . spec) in thread-specifications
382
do (log-notice "initiate processing thread: ~s ~s" name function)
383
do (let ((old (find name (bt:all-threads) :test #'equal :key #'bt:thread-name))
384
(thread (make-processing-thread function
386
:initial-bindings (acons '*task* nil
387
(acons '*thread-name* name spec)))))
389
(bt:destroy-thread old)
390
(setf *processing-threads* (substitute thread old *processing-threads*)))
392
(push thread *processing-threads*)))))
393
;; always start a management thread to look for things to kill and reap.
395
(cond ((and (streamp *terminal-io*) (interactive-stream-p *terminal-io*))
396
;; if interactive, put the management thread in the background
397
(bt:make-thread #'(lambda () (run-toplevel-loop :toplevel-function *toplevel-function*))
399
:initial-bindings (acons '*task* nil nil))
401
(SB-IMPL::TOPLEVEL-REPL nil)
402
(log-warn "run-processing-threads: interactive toplevel returned"))
404
(run-toplevel-loop :toplevel-function *toplevel-function*)
405
(log-warn "run-processing-threads: toplevel loop returned")))
407
(log-warn "final threads ~s" (bt:all-threads))
408
;; (delete-file spocq.i:*pidfile-pathname*)
409
(log-warn "run-processing-threads: returning")))
412
(defgeneric publish-accounting-notes (notes destination)
413
(:method :before ((notes t) (destination t))
415
(log-trace "publish-accounting-notes.before to (~s ~s)" (type-of notes) destination)))
416
(:method :after ((notes t) (destination t))
418
(log-trace "publish-accounting-notes.after to (~s ~s)" (type-of notes) destination)))
420
(:method ((notes null) (destination t))
422
(:method ((notes t) (destination null))
424
(:method ((notes t) (destinations list))
425
(dolist (destination destinations)
426
(publish-accounting-notes notes destination)))
428
(:method ((task task) (destination t))
429
;; publish, all collected notes
430
;; nb. 20151123: included in finalize-task and changed to no longer retain the notes
431
(let ((notes (channel-get-all (task-statistics task))))
432
;; force completion as the task is provided
433
(push (list task :|task_id| (task-id task)
434
:|state| (case (task-state task) (:terminated :terminated) (t :complete)))
436
(publish-accounting-notes notes destination)))
438
(:method ((notes cons) (destination (eql '*accounting-io*)))
439
(publish-accounting-notes notes :amqp))
441
(:method ((notes cons) (destination (eql :accounting-io)))
442
(publish-accounting-notes notes :amqp))
444
(:method ((notes cons) (destination (eql :amqp)))
446
(loop for (task . properties) in notes
447
do (send-account-note task properties *accounting-io*))
448
(amqp:with-open-connection (accounting-connection :uri *broker-uri*)
449
(amqp:with-open-channel (accounting-io accounting-connection
450
:element-type '(unsigned-byte 8)
451
:content-type *accounting-content-type*)
452
(log-info "management processor accounting channel : ~s" accounting-io)
453
(setf (amqp.u:channel-content-type accounting-io) *accounting-content-type*)
454
(let ((accounting-exchange (amqp:exchange accounting-io :exchange *accounting-exchange*))
455
(accounting-queue (amqp:queue accounting-io :queue *accounting-queue*)))
456
;; instantiate the channels' basic and specify non-persistent delivery mode
457
(amqp:basic accounting-io :delivery-mode 1)
458
;; removed :auto-delete t on rabbitmq transition to 0.9.1, but it shouldn't be present
459
;; anyway, since the exchanges should be permanent
460
(amqp:declare accounting-exchange :type "topic")
461
(amqp:declare accounting-queue :auto-delete t)
462
(let ((*accounting-io* accounting-io))
463
;; recurse with the channel bound
464
(publish-accounting-notes notes :amqp)))))))
466
(:method ((notes cons) (destination stream))
467
(write-accounting-notes-csv notes destination))
469
(:method ((notes cons) (destination (eql '*error-output*)))
470
(write-accounting-notes-csv notes *error-output*))
472
(:method ((notes cons) (destination (eql :error-output)))
473
(format *error-output* "~&# accounting")
474
(write-accounting-notes-csv notes *error-output*))
476
(:method ((notes cons) (destination (eql '*standard-output*)))
477
(write-accounting-notes-csv notes *standard-output*))
479
(:method ((notes cons) (destination (eql :standard-output)))
480
(write-accounting-notes-csv notes *standard-output*))
482
(:method ((notes cons) (destination (eql :syslog)))
483
(log-accounting-notes-trig notes))
485
(:method ((notes cons) (destination (eql :store)))
486
"Write the notes to the store through an api call."
487
(bt:with-lock-held (*accounting-notes-lock*)
488
(let ((correlated-notes (correlate-store-notes notes)))
489
(loop for (task . properties) in correlated-notes
490
for start-time = (or (task-start-time task) (get-universal-time))
491
for elapsed-time = (- (get-internal-real-time) (task-start-real-time task))
492
count (apply #'store-query-event task
493
:start-time start-time
494
:query-time elapsed-time
497
(defgeneric write-accounting-notes (notes destination)
498
(:documentation "write the notes (test . properties) to the destination")
499
(:method ((correlated-notes cons) (destinations list))
500
(dolist (destination destinations)
501
(write-accounting-notes correlated-notes destination)))
502
(:method ((correlated-notes cons) (destination (eql :syslog)))
503
(destructuring-bind (task . properties) correlated-notes
504
(when (member (getf properties :|state|) '(:complete :terminated))
505
(log-notice! "[statistics] ~a { <urn:uuid:~a> :timestamp ~s^^<http://www.w3.org/2001/XMLSchema#dateTime>; :query_time ~d; ~{ :~a ~s~^;~} . }"
506
(repository-uri (task-repository task))
508
(term-lexical-form (universal-time-date-time (or (task-start-time task) (get-universal-time))))
509
(- (get-internal-real-time) (task-start-real-time task))
511
(:method ((correlated-notes cons) (destination (eql :store)))
512
(destructuring-bind (task . properties) correlated-notes
513
(let* ((start-time (or (task-start-time task) (get-universal-time)))
514
(elapsed-time (- (get-internal-real-time) (task-start-real-time task))))
515
(apply #'store-query-event task
516
:start-time start-time
517
:query-time elapsed-time
519
(:method ((correlated-notes cons) (destination stream))
520
(pprint correlated-notes destination)))
522
(defun correlate-task-accounting-notes (notes)
523
(let ((cached-properties ()))
524
(labels ((correlate-properties (state properties)
526
((:complete :terminated)
527
(setf (getf cached-properties ':|state|) state)))
528
(loop for (property value) on properties by #'cddr
529
unless (member property '(:|task_id| :|state|))
532
(number (unless (and (member property '(:|bytes_read| :|bytes_written|))
534
(incf (getf cached-properties property 0) value)))
535
(string (when (plusp (length value))
536
(setf (getf cached-properties property) value)))
537
(t (setf (getf cached-properties property) value)))
538
when (and (eq state :compile) (eq property :|run_time|))
539
do (incf (getf cached-properties :|compile_run_time| 0) value)
540
when (and (eq state :parse) (eq property :|run_time|))
541
do (setf (getf cached-properties :|parse_run_time| 0) value))))
542
;; perform two passes. the queue entries need not appear in semantic order
543
;; first correlate, then emit entries for completed and terminated tasks.
544
(loop for (task task-id-key id state-key state . properties) in notes
545
do (progn (assert (and (eq task-id-key :|task_id|) (eq state-key :|state|)) ()
546
"invalid accounting note: ~s" properties)
548
(setf id (task-id task))
549
(correlate-properties state
550
(list* :|agent_id| (agent-name (task-agent task))
551
:|agent_location| (agent-location (task-agent task))
552
:|signature| (query-signature task)
553
:|user_id| (task-user-id task)
554
;;; do not include :|api_key| (task-api-key task)
559
;;; (publish-accounting-notes '(("?" :id 1 :time 2) ("?" :id 3 :time 4)) '*error-output*)
560
;;; (publish-accounting-notes '(("?" :id 1 :time 2) ("?" :id 3 :time 4)) *accounting-destination*)
562
(defun write-accounting-notes-csv (notes stream)
564
(let* ((longest (reduce #'(lambda (n1 n2) (if (> (length n1) (length n2)) n1 n2))
566
(header (loop for label in (rest longest) by #'cddr
568
(format stream "~&~{~a~^,~}~%" header)
569
(loop for (nil . properties) in notes
570
do (format stream "~{~a~^,~}~%"
571
(loop for value in (rest properties) by #'cddr
575
;;; error text does not appear here.
576
;;; in the messaging configuration, the error messages follow a distinct data flow
577
;;; while in the nginx version, they are returned through their own stream
578
;;; requester ip number is unknown at this point and must be logged by the
582
(defparameter *trig-statistics-cache* nil
583
"Used by log-accounting-notes-trig to correlate accounting entries according to query id
584
in order to produce statistics syslog entries. Intended to be used by a single thread. This
585
is either an autonomous admin thread or single request threads.")
586
(defparameter *store-statistics-cache* nil
587
"Used by log-accounting-notes-trig to correlate accounting entries according to query id
588
in order to produce statistics syslog entries. Intended to be used by a single thread. This
589
is either an autonomous admin thread or single request threads.")
591
(defun correlate-store-notes (notes &key (completion-states '(:complete :terminated)))
592
"Given a list of the form
593
((<task> :|task_id| <uuid> :|state| <state> . <property-plist>) ... )
594
correlate the values for each task by uuid.
595
mark returned properties to prevent double entries"
597
(let ((cache (or *store-statistics-cache*
598
(setq *store-statistics-cache* (make-hash-table :test 'equal :synchronized t
600
(labels ((correlate-properties (id state task properties)
601
(let* ((cached-entry (gethash id cache))
602
(cached-properties (rest cached-entry)))
603
(unless (getf properties :logged) ;; complete already seen
605
((:complete :terminated)
606
(setf (getf cached-properties :|state|) state)))
607
(loop for (property value) on properties by #'cddr
608
unless (member property '(:|task_id| :|state|))
611
(number (unless (and (member property '(:|bytes_read| :|bytes_written|))
613
(incf (getf cached-properties property 0) value)))
614
(string (when (plusp (length value))
615
(setf (getf cached-properties property) value)))
616
(t (setf (getf cached-properties property) value)))
617
when (and (eq state :compile) (eq property :|run_time|))
618
do (incf (getf cached-properties :|compile_run_time| 0) value)
619
when (and (eq state :parse) (eq property :|run_time|))
620
do (setf (getf cached-properties :|parse_run_time|) value))
621
(setf (gethash id cache) (cons task cached-properties)))))
623
(when-time-lexical-form (time)
625
(term-lexical-form (universal-time-date-time time)))))
626
;; perform two passes. the queue entries need not appear in semantic order
627
;; first correlate, then emit entries for completed and terminated tasks.
628
(loop for (task task-id-key id state-key state . properties) in notes
629
do (progn (assert (and (eq task-id-key :|task_id|) (eq state-key :|state|)) ()
630
"invalid accounting note: ~s" properties)
632
(setf id (task-id task))
633
(correlate-properties id state task
634
(list* :|agent_id| (agent-name (task-agent task))
635
:|agent_location| (agent-location (task-agent task))
636
:|signature| (query-signature task)
637
:|user_id| (task-user-id task)
638
;;; do not include :|api_key| (task-api-key task)
641
(loop for id being each hash-key of cache using (hash-value value)
642
for (task . properties) = value
643
when (and (not (getf properties :logged))
644
(or (null completion-states)
645
(member (getf properties :|state|) completion-states)))
647
and do (setf (gethash id cache) (list* task :logged (get-universal-time) properties))))))
650
(defun log-accounting-notes-trig (notes)
651
"Given a list of the form
652
((<task> :|task_id| <uuid> :|state| <state> . <property-plist>) ... )
653
correlate the values for each task by uuid and write the respective statistics as one syslog entry each."
655
(let ((cache (or *trig-statistics-cache*
656
(setq *trig-statistics-cache* (make-hash-table :test 'equal :synchronized t
658
(labels ((correlate-properties (id state task properties)
659
(let* ((cached-entry (gethash id cache))
660
(cached-properties (rest cached-entry)))
661
(unless (getf properties :logged) ;; complete already seen
663
((:complete :terminated)
664
(setf (getf cached-properties ':|state|) state)))
665
(loop for (property value) on properties by #'cddr
666
unless (member property '(:|task_id| :|state|))
669
(number (unless (and (member property '(:|bytes_read| :|bytes_written|))
671
(incf (getf cached-properties property 0) value)))
672
(string (when (plusp (length value))
673
(setf (getf cached-properties property) value)))
674
(t (setf (getf cached-properties property) value)))
675
when (and (eq state :parse) (eq property :|run_time|))
676
do (setf (getf cached-properties :|parse_run_time|) value))
677
(setf (gethash id cache) (cons task cached-properties)))))
679
(when-time-lexical-form (time)
681
(term-lexical-form (universal-time-date-time time)))))
682
;; perform two passes. the queue entries need not appear in semantic order
683
;; first correlate, then emit entries for completed and terminated tasks.
684
(loop for (task task-id-key id state-key state . properties) in notes
685
do (progn (assert (and (eq task-id-key :|task_id|) (eq state-key :|state|)) ()
686
"invalid accounting note: ~s" properties)
688
(setf id (task-id task))
689
(correlate-properties id state task
690
(list* :|agent_id| (agent-name (task-agent task))
691
:|agent_location| (task-agent-location task)
692
:|signature| (query-signature task)
693
:|user_id| (task-user-id task)
694
:|name| (or (task-name task) "sparql")
695
;;; do not include :|api_key| (task-api-key task)
697
(loop for id being each hash-key of cache using (hash-value value)
698
for (task . properties) = value
699
when (and (not (getf properties :logged))
700
(member (getf properties :|state|) '(:complete :terminated))
701
(not (getf properties :logged)))
702
do (log-notice! "[statistics] ~a { <urn:uuid:~a> :timestamp ~s^^<http://www.w3.org/2001/XMLSchema#dateTime>; :query_time ~d; ~{ :~a ~s~^;~} . }"
703
(repository-uri (task-repository task))
705
(term-lexical-form (universal-time-date-time (or (task-start-time task) (get-universal-time))))
706
(- (get-internal-real-time) (task-start-real-time task))
708
and do (setf (gethash id cache) (list* task :logged (get-universal-time) properties))))))
710
(defun log-error-notes-trig (notes)
711
"log collected notes and return the last condition"
712
(loop for (task . condition) in notes
713
when (and (typep task 'query) (typep condition 'condition))
714
do (log-notice! "[statistics] ~a { <urn:uuid:~a> :timestamp ~s^^<http://www.w3.org/2001/XMLSchema#dateTime>; :error_type ~s; :state :TERMINATED~@[; :signature ~s~] . }"
715
(repository-uri (task-repository task))
717
(term-lexical-form (universal-time-date-time (or (task-start-time task) (get-universal-time))))
718
(symbol-name (type-of condition))
719
(query-signature task))
720
finally (return condition)))
724
(defun print-error-conditions (notes &optional (stream *error-output*))
725
(format *error-output* "~&# error")
726
(loop for (nil . condition) in notes
727
do (format stream "~&~%~a" condition)
728
finally (return condition)))
730
(defgeneric print-error-conditions (notes destination)
731
(:method ((notes t) (destination null))
733
(:method ((notes null) (destination t))
735
(:method ((notes t) (destination (eql :error-output)))
736
(print-error-conditions notes *error-output*))
737
(:method ((notes t) (destination (eql :standard-output)))
738
(print-error-conditions notes *standard-output*))
739
(:method ((notes list) (destination stream))
740
; (format stream "~&# error")
741
(loop for (nil . condition) in notes
742
do (format destination "~&~a~%" condition)
743
finally (return condition))
744
(complete-output destination))
746
(:method ((notes list) (destination (eql :syslog)))
747
(loop for (nil . condition) in notes
748
do (log-error "~&~%~a" condition)
749
finally (return condition)))
750
(:method ((notes list) (destination (eql :store)))
751
(loop for (query . condition) in notes
752
for message = (format nil "~a" condition)
753
do (dydra-ndk:report-query-error (or (task-id query) "<NULL-QUERY-UUID>")
756
(condition (string (type-of condition)))
757
(t (write-to-string condition)))
759
finally (return condition)))
761
(:method ((task task) (destination t))
762
(print-error-conditions (channel-get-all (task-errors task)) destination)))
766
(defun make-message-stream ()
768
(sb-sys::make-fd-stream 0 :element-type '(unsigned-byte 8) :input t)
769
(sb-sys::make-fd-stream 1 :element-type '(unsigned-byte 8) :output t)))
772
(defun make-message-stream ()
773
(error "NYI: cannot construct message stream."))