Coverage report: /development/source/library/org/datagraph/spocq-shard/src/core/start.lisp

KindCoveredAll%
expression4511145 39.4
branch58156 37.2
Key
Not instrumented
Conditionalized out
Executed
Not executed
 
Both branches taken
One branch taken
Neither branch taken
1
 ;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
2
 
3
 (in-package :org.datagraph.spocq.implementation)
4
 
5
 (:documentation "start up"
6
   "")
7
 
8
 
9
 ;;;
10
 ;;; signals
11
 
12
 ;;; for info on exit code see among others
13
 ;;; http://manpages.ubuntu.com/manpages/zesty/man5/systemd.service.5.html
14
 
15
 (defun exit-lisp (&optional (return-code 0))
16
   #+digitool
17
   (declare (ignore return-code))
18
   #+clozure
19
   (ccl:quit return-code)
20
   #+digitool
21
   (ccl:quit)
22
   #+lispworks
23
   (lispworks:quit :status return-code :confirm nil :return nil :ignore-errors-p t)
24
   #+sbcl
25
   (funcall (intern (if (equal (lisp-implementation-version) "1.0.46") "QUIT" "EXIT") :sb-ext)
26
            (if (equal (lisp-implementation-version) "1.0.46") :unix-status :code)
27
            return-code))
28
 
29
 #+clozure (eval-when (:compile-toplevel)
30
             (error "need the signal handler definition operators"))
31
 
32
 (defvar *signal-state* nil)
33
 
34
 (defun sigterm-handler (signal code context)
35
   (declare (ignore signal code context))
36
   (case *run-state*
37
     (:terminate                         ; if repeated exit immediately
38
      (log-notice "sigterm-handler: repeated SIGTERM ~a." (iso-time))
39
      (exit-lisp 130))
40
     (t
41
      (log-notice "sigterm-handler: initial SIGTERM ~a." (iso-time))
42
      (setq *signal-state* :sigterm)
43
      (setq *run-state* :terminate))))
44
 
45
 (defun sighup-handler (signal code context)
46
   (declare (ignore signal code context))
47
   (setq *run-state* :restart))
48
 
49
 (defun enable-interrupt (signal handler)
50
   #+digitool
51
   (declare (ignore signal handler))
52
   #+lispworks
53
   (system:set-signal-handler (ecase signal
54
                                (:sigterm SYSTEM::UNIX-SIGTERM))
55
                              handler)
56
   #+sbcl
57
   (sb-unix::enable-interrupt (ecase signal
58
                                (:sigterm sb-unix::sigterm))
59
                              handler))
60
 
61
 ;; no longer used as it creates a config requirement which is troublesome
62
 ;; on reboot for virtual hosts
63
 (defun write-pidfile (&optional (pathname *pidfile-pathname*))
64
   (unless (pathname-name pathname)
65
     (let* ((executable (getarg 0))
66
            (name (pathname-name executable)))
67
       (setf pathname (make-pathname :name name :type "pid" :defaults pathname))))
68
   (setq *pidfile-pathname* pathname)
69
   (with-open-file (pidfile-stream pathname :direction :output :if-exists nil :if-does-not-exist :create)
70
     (format pidfile-stream "~d~%" (getpid))
71
     pathname))
72
 
73
 
74
 (defun maybe-exit-on-error (&optional (code 70))
75
   (cond ((and *exit-on-errors*
76
               (not (and (streamp *terminal-io*)
77
                         (interactive-stream-p *terminal-io*))))
78
          ;;; if exiting because a thread is hung, this does not work. (stop-threads)
79
          (log-info "Stop: exit-on-errors ~a." (iso-time))
80
          (exit-lisp code))
81
         (t
82
           (log-info "continue: exit-on-errors ~a: ~a."
83
                     (iso-time)
84
                     (list *exit-on-errors* *terminal-io* (streamp *terminal-io*)
85
                            (interactive-stream-p *terminal-io*))))))
86
 
87
 (defun complete-output (destination)
88
   "Iff the destination is standard or error output, write a null marker.
89
    In all cases finish output to ensure the reader has everything."
90
   (typecase destination
91
     (stream (fresh-line destination)
92
             ;; add a termination marker to separate results for persistent back-end processors
93
             (when (or (eq destination *error-output*) (eq destination *standard-output*))
94
               (write-char #\null destination))
95
             (finish-output destination))
96
     (t ))
97
   (log-debug "complete-output: ~s" destination))
98
 
99
 ;;;
100
 ;;; initial- and re-start
101
 
102
 (defun error-condition-channel ()
103
   (or *error-condition-channel*
104
       (setf *error-condition-channel* (make-pool :name "error conditions"))))
105
 (defun accounting-notes ()
106
   (or *accounting-notes*
107
       (setf *accounting-notes* (make-array 32 :adjustable t :fill-pointer 0))))
108
 
109
 (defun initialize-channels ()
110
   ;; create on-demand
111
   ;; (setq *accounting-notes* nil)
112
   ;; (setq *error-condition-channel*  nil)
113
   ;; use a multi-reader/multi-writer queue to carry tasks to threads for them to process per-task operations
114
   (setq *algebra-task-channel* (make-pool :name "algebra tasks"))
115
   (setq *service-channel* (make-pool :name "service"))
116
 
117
   ;; warm-up the type system, but do not instantiate
118
   (let ((tmp-channel (allocate-instance (find-class *make-channel.class*))))
119
     (assert-argument-types initialize-channels
120
                            (tmp-channel channel))))
121
 
122
 
123
 (defun initialize-spocq (&key (title (getarg "--title")) debugger)
124
   (case *run-state*
125
     ((nil :initialize)
126
      (setq *run-state* :initialize)
127
      (setq *start-timestamp* (iso-time))
128
      (enable-interrupt :sigterm #'sigterm-handler)
129
      ;; randomize clock component of uuids
130
      (setq uuid::*clock-seq* 0)
131
      #+sbcl
132
      (progn
133
   ;; prevent 
134
        (unless (or debugger
135
                    (and (streamp *terminal-io*) (interactive-stream-p *terminal-io*)))
136
          ;; avoid hanging with stack overflows &co
137
          (sb-ext:disable-debugger))
138
        (setq sb-impl::*default-external-format* :utf-8)
139
        (proclaim '(sb-ext:muffle-conditions sb-ext:compiler-note))
140
        ;; double th gc interval
141
        (setf (sb-ext:bytes-consed-between-gcs) 750000000
142
              (sb-ext::generation-number-of-gcs-before-promotion 0) 2)
143
        (sb-ext:gc ))
144
      (setq *service-name* (or (getarg "--service-name") *service-name*))
145
      (setq *configuration-pathname* (or (getarg "--spocqinit") *configuration-pathname*))
146
      (setq *log-pathname* (or (getarg "--spocqlog") *log-pathname*))
147
      (open-log :options '(:pid) :identity *service-name* :title title)
148
      (log-notice! "init: Build ~a/~a; Parser: ~s."
149
                   *build-revision* *build-timestamp* *query-parser*)
150
 
151
      ;;; process initialization has two phases
152
      ;;; first, load the installation configuration
153
      (unless (load-system-configuration *configuration-pathname*)
154
        (error "Cannot load system configuration: ~s." *configuration-pathname*))
155
      ;;; then apply any settings present on the process' command-line
156
      (unless (load-command-line-configuration)
157
        (error "Cannot load command-line configuration."))
158
      (initialize-built-in-repositories)
159
      (initialize-channels)
160
      ;;; load and initialize libraries
161
      (handler-bind ((warning #'muffle-warning))
162
        (unless (ignore-errors (ensure-directories-exist *log-pathname*))
163
          (error "Cannot locate log file: ~s." *log-pathname*))
164
        #+sbcl (pushnew 'log-memory-usage sb-ext:*after-gc-hooks*)
165
        #+lispworks (sys:set-automatic-gc-callback nil 'log-memory-usage)
166
      
167
        ;; bind vocabulary uri objects to their lexical and store designators
168
        (initialize-store *class.repository* *store-uri* *rdfcache-pathname*)
169
 
170
        #+use-monkeylib-bcrypt ;; this conflicts with dydra-ndk
171
        (when (find-package :bcrypt)
172
          (cffi:load-foreign-library (intern (symbol-name :libbcrypt) :bcrypt) :search-PATH #P"/opt/dydra/lib") )
173
        )
174
      
175
      ;; work around clos errors
176
      (ignore-errors (make-instance 'query :repository-id "system/system")))
177
 
178
     (:reinitialize
179
      (setq *run-state* :initialize)
180
      (open-log :options '(:pid) :identity *service-name* :title title)
181
      (log-notice! "reinit: Build ~a/~a; Parser: ~s."
182
                   *build-revision* *build-timestamp* *query-parser*)
183
      (unless (load-system-configuration *configuration-pathname*)
184
        (error "Cannot load configuration: ~s." *configuration-pathname*))
185
      ;; reset the processor instance
186
      (reinitialize-store *class.repository*)))
187
   ;; reset the processor instance
188
   (setq *request-processor* nil)
189
   *run-state*)
190
 
191
 
192
 (defun main ()
193
   (let ((result (main-task-loop (command-line-argument-list))))
194
     (typecase result
195
       (query                            ; normal completion, return the reqult and allow runtime to exit
196
        result)
197
       (sb-sys:interactive-interrupt
198
        nil)
199
       (condition                        ; some error occurred
200
        (exit-lisp 70))
201
       (t
202
        result))))
203
 
204
 
205
 (defun stop-threads ()
206
   ;; in case being done from a top-level, first kill the management thread
207
   ;; otherwise it will start killing threads as well
208
   (when (and *management-thread*
209
              (not (eq *management-thread* (bt:current-thread)))
210
              (bt:thread-alive-p *management-thread*))
211
     (bt:destroy-thread *management-thread*)
212
     #-sbcl(bt:join-thread *management-thread*)
213
     #+sbcl(sb-thread:join-thread *management-thread* :default nil))
214
   ;; disable further requests
215
   (setq *run-state* :terminate)
216
   ;; but - don't wait for them to stop
217
   ;; first cancel those which have the handler this makes them available
218
   (cancel-algebra-threads)
219
   ;; now terminate everything
220
   (dolist (thread (bt:all-threads))
221
     (unless ;; (or (eq (bt:current-thread) thread) (eq *management-thread* thread))
222
         (eq (bt:current-thread) thread)
223
       ;; allow for race conditions
224
       (multiple-value-bind (result condition)
225
                            (ignore-errors (progn
226
                                             (when (bt:thread-alive-p thread)
227
                                               (bt:destroy-thread thread))
228
                                             #-sbcl(bt:join-thread thread)
229
                                             #+sbcl(sb-thread:join-thread thread :default nil :timeout 1)))
230
         (declare (ignore result))
231
         (when condition 
232
           (log-debug "stop-threads: error in thread termination: ~a" condition))))))
233
 
234
 (defun stop ()
235
   (log-info "stopping ...")
236
   (when *algebra-task-channel*
237
     (loop for task in (channel-get-all *algebra-task-channel*)
238
       do (log-warn "stop with task pending operation: ~a" task)))
239
   (when *service-channel* (channel-get-all *service-channel*))
240
   (map-queries #'(lambda (q) (handler-case (terminate-task q)
241
                                (error (c)
242
                                       (warn "Error in termination: ~a" c)
243
                                       (setf (task-state q) :terminate)))))
244
   (stop-threads)
245
   (log-info "stopped."))
246
 
247
 (defun stop-idle-threads ()
248
   (dolist (thread (bt:all-threads))
249
     (unless (or (eq *management-thread* thread)
250
                 (eq thread (bt:current-thread))
251
                 (sb-thread:symbol-value-in-thread '*task* thread nil))
252
       (when (bt:thread-alive-p thread)
253
         ;; allow for race conditions
254
         (ignore-errors  (bt:destroy-thread thread)
255
                         #-sbcl(bt:join-thread thread)
256
                         #+sbcl(sb-thread:join-thread thread :default nil))))))
257
 
258
 (defun terminate ()
259
   "Shut the engine down by draining queries - waiting for processes to complete, and then exiting."
260
   (setq *run-state* :terminate)
261
   (loop (when (zerop (query-count)) (return))
262
         (sleep .1))
263
   (stop-threads)
264
   (log-info "terminated."))
265
 
266
 
267
 (defun run-toplevel (&rest args &key (toplevel-function *toplevel-function* tl-s))
268
   (when tl-s
269
     (setf args (copy-list args))
270
     (remf args :toplevel-function))
271
   (log-info "starting toplevel [~a]... (~s . ~s)." *thread-name* toplevel-function args)
272
   (handler-bind ((error (lambda (condition)
273
                           ;; (break "toplevel error: ~a" condition)
274
                           (if (and (streamp *terminal-io*(interactive-stream-p *terminal-io*))
275
                               (break "Error at toplevel: ~a." condition)
276
                               (log-stacktrace "Error at toplevel: ~a." condition))
277
                           #+sbcl
278
                           (ignore-errors
279
                            (write-log :error "abstract channel class precedence: ~s"
280
                                       (sb-mop:class-precedence-list (find-class 'amqp:channel)))
281
                            (write-log :error "concrete channel sub-classes' precedence: ~s"
282
                                       (mapcar #'sb-mop:class-precedence-list
283
                                               (sb-mop:class-direct-subclasses (find-class 'amqp:channel))))
284
                            (write-log :error "abstract connection class precedence: ~s"
285
                                       (sb-mop:class-precedence-list (find-class 'amqp:connection)))
286
                            (write-log :error "concrete channel sub-classes' precedence: ~s"
287
                                       (mapcar #'sb-mop:class-precedence-list
288
                                               (sb-mop:class-direct-subclasses (find-class 'amqp:connection)))))
289
                           (sleep 5)
290
                           (return-from run-toplevel nil))))
291
                 (apply toplevel-function args)))
292
 
293
 (defun run-toplevel-loop (&rest args)
294
   (log-info "Thread [~@[~a~]] run-toplevel-loop starting." *thread-name*)
295
   (loop while (apply #'run-toplevel args)
296
     do (case *run-state*
297
          (:restart
298
           (log-info "Thread re-loading configuration.")
299
           (unless (load-configuration *configuration-pathname*)
300
             (log-error "Cannot load configuration: ~s." *configuration-pathname*))
301
           (setq *run-state* :process))))
302
   (log-info "Thread [~@[~a~]] run-toplevel-loop returning." *thread-name*))
303
 
304
 (defun run-management-thread (&key &allow-other-keys)
305
   "The HTTP server management thread does no autonomous processing - that is handled by an admin
306
  response function. It handles just the statistics and termination."
307
 
308
   (log-info "starting management thread...")
309
   (setq spocq.i:*management-thread* (bt:current-thread))
310
   (setq spocq.i:*run-state* :run)
311
   (loop (unless (management-idle-handler) (return spocq.i:*run-state*))
312
         (sleep *management-thread-interval*)))
313
 
314
 (defun management-idle-handler ()
315
   "Consolidated implementation for standard management tasks.
316
  Return nil if state is terminate and no queries remain."
317
   ;; now handled in scripts explicitly, which means the channel will remain empty
318
   (publish-accounting-notes (get-accounting-notes) *accounting-destination*)
319
   (when (plusp (channel-count *error-condition-channel*))
320
     (let ((error-notes (channel-get-all *error-condition-channel*)))
321
       (log-error-notes-trig error-notes)))
322
   (when (and (zerop (logand (ash (get-internal-real-time) -10) #x1f))
323
              (zerop (hash-table-count *query-registry*)))
324
     ;; if no query is running, probe the store every +/- thirty seconds
325
     (handler-case
326
         ;;; eventually consumes lmdb readers?
327
         ;;; (with-open-repository (*system-repository-id* :normal-disposition :abort) t)
328
         (probe-transaction *system-repository-id*)
329
       (error (condition)
330
              (when *exit-on-errors*
331
                (setq *run-state* :terminate)
332
                (log-error "management-thread-step: termination due to condition: ~a" condition)
333
                (format *error-output* "management-thread-step: termination due to condition: ~a" condition)
334
                (spocq.i::exit-lisp 70)))))
335
   (handler-case (make-v1-uuid-string)
336
     (bt:timeout (c)
337
       (log-error "management-idle-handler: uuid timeout, terminating: ~a" c)
338
       (setf *run-state* :terminate)))
339
   (constrain-resources)
340
   (case *run-state*
341
     (:terminate
342
      ;; once there are no more active queries, indicate to terminate
343
      (when (zerop (query-count))
344
        (log-info "terminate: management thread terminating other threads.")
345
        (stop-threads)
346
        (log-info "terminate: management thread returning@ ~a tasks." *task-count*)
347
        nil))
348
     (t
349
      t)))
350
 
351
 (defun start-management-thread (&key (name "management"))
352
   (or *management-thread*
353
       (setq *management-thread*
354
             (bt:make-thread *toplevel-function*
355
                             :name name
356
                             :initial-bindings `((*thread-name* . ,name)
357
                                                 (*task* . nil)
358
                                                 (*class.request-processor* . ',*class.request-processor*))))))
359
 #+(or)
360
 (defun management-thread-toplevel ()
361
   (loop (management-thread-step (request-processor))
362
         (sleep *management-thread-interval*)))
363
 
364
 #+(or)
365
 (defgeneric management-thread-step (processor)
366
   (:method ((processor t))
367
     
368
     (case *run-state*
369
       (:terminate
370
        (stop-threads)
371
        (log-info "Stop: SIGTERM ~a." (iso-time))
372
        (exit-lisp 130))
373
       (t
374
        (constrain-resources)))))
375
 
376
 (defun run-processing-threads (&optional (thread-specifications *thread-specifications*))
377
   "create threads as per configuration plus a management thread
378
    if threads are specified, start each and then run the mangement loop.
379
    iff the session is interactive initiate a repl.
380
    otherwise just run the management thread loop."
381
   (loop for (name function . spec) in thread-specifications
382
     do (log-notice "initiate processing thread: ~s ~s" name function)
383
     do (let ((old (find name (bt:all-threads) :test #'equal :key #'bt:thread-name))
384
              (thread (make-processing-thread function
385
                                              :name name
386
                                              :initial-bindings (acons '*task* nil
387
                                                                       (acons '*thread-name* name spec)))))
388
          (cond (old
389
                 (bt:destroy-thread old)
390
                 (setf *processing-threads* (substitute thread old *processing-threads*)))
391
                (t
392
                 (push thread *processing-threads*)))))
393
   ;; always start a management thread to look for things to kill and reap.
394
   (unwind-protect
395
       (cond ((and (streamp *terminal-io*(interactive-stream-p *terminal-io*))
396
              ;; if interactive, put the management thread in the background
397
              (bt:make-thread #'(lambda () (run-toplevel-loop :toplevel-function *toplevel-function*))
398
                              :name "top-level"
399
                              :initial-bindings (acons '*task* nil nil))
400
              #+sbcl
401
              (SB-IMPL::TOPLEVEL-REPL nil)
402
              (log-warn "run-processing-threads: interactive toplevel returned"))
403
             (t
404
              (run-toplevel-loop :toplevel-function *toplevel-function*)
405
              (log-warn "run-processing-threads: toplevel loop returned")))
406
     (stop-threads)
407
     (log-warn "final threads ~s" (bt:all-threads))
408
     ;; (delete-file spocq.i:*pidfile-pathname*)
409
     (log-warn "run-processing-threads: returning")))
410
 ;;; accounting
411
 
412
 (defgeneric publish-accounting-notes (notes destination)
413
   (:method :before ((notes t) (destination t))
414
     (when notes
415
       (log-trace "publish-accounting-notes.before to (~s ~s)" (type-of notes) destination)))
416
   (:method :after ((notes t) (destination t))
417
     (when notes
418
       (log-trace "publish-accounting-notes.after to (~s ~s)" (type-of notes) destination)))
419
 
420
   (:method ((notes null) (destination t))
421
     )
422
   (:method ((notes t) (destination null))
423
     )
424
   (:method ((notes t) (destinations list))
425
     (dolist (destination destinations)
426
       (publish-accounting-notes notes destination)))
427
 
428
   (:method ((task task) (destination t))
429
     ;; publish, all collected notes
430
     ;; nb. 20151123: included in finalize-task and changed to no longer retain the notes
431
     (let ((notes (channel-get-all (task-statistics task))))
432
       ;; force completion as the task is provided
433
       (push (list task :|task_id| (task-id task)
434
                   :|state| (case (task-state task) (:terminated :terminated) (t :complete)))
435
             notes)
436
       (publish-accounting-notes notes destination)))
437
 
438
   (:method ((notes cons) (destination (eql '*accounting-io*)))
439
     (publish-accounting-notes notes :amqp))
440
 
441
   (:method ((notes cons) (destination (eql :accounting-io)))
442
     (publish-accounting-notes notes :amqp))
443
 
444
   (:method ((notes cons) (destination (eql :amqp)))
445
     (if *accounting-io*
446
       (loop for (task . properties) in notes
447
             do (send-account-note task properties *accounting-io*))
448
       (amqp:with-open-connection (accounting-connection :uri *broker-uri*)
449
         (amqp:with-open-channel (accounting-io accounting-connection
450
                                                :element-type '(unsigned-byte 8)
451
                                                :content-type *accounting-content-type*)
452
           (log-info "management processor accounting channel : ~s" accounting-io)
453
           (setf (amqp.u:channel-content-type accounting-io) *accounting-content-type*)
454
           (let ((accounting-exchange (amqp:exchange accounting-io :exchange *accounting-exchange*))
455
                 (accounting-queue (amqp:queue accounting-io :queue *accounting-queue*)))
456
             ;; instantiate the channels' basic and specify non-persistent delivery mode
457
             (amqp:basic accounting-io :delivery-mode 1)
458
             ;; removed :auto-delete t on rabbitmq transition to 0.9.1, but it shouldn't be present
459
             ;; anyway, since the exchanges should be permanent
460
             (amqp:declare accounting-exchange  :type "topic")
461
             (amqp:declare accounting-queue :auto-delete t)
462
             (let ((*accounting-io* accounting-io))
463
               ;; recurse with the channel bound
464
               (publish-accounting-notes notes :amqp)))))))
465
 
466
   (:method ((notes cons) (destination stream))
467
     (write-accounting-notes-csv notes destination))
468
 
469
   (:method ((notes cons) (destination (eql '*error-output*)))
470
     (write-accounting-notes-csv notes *error-output*))
471
 
472
   (:method ((notes cons) (destination (eql :error-output)))
473
     (format *error-output* "~&# accounting")
474
     (write-accounting-notes-csv notes *error-output*))
475
 
476
   (:method ((notes cons) (destination (eql '*standard-output*)))
477
     (write-accounting-notes-csv notes *standard-output*))
478
 
479
   (:method ((notes cons) (destination (eql :standard-output)))
480
     (write-accounting-notes-csv notes *standard-output*))
481
 
482
   (:method ((notes cons) (destination (eql :syslog)))
483
     (log-accounting-notes-trig notes))
484
 
485
   (:method ((notes cons) (destination (eql :store)))
486
     "Write the notes to the store through an api call."
487
     (bt:with-lock-held (*accounting-notes-lock*)
488
       (let ((correlated-notes (correlate-store-notes notes)))
489
         (loop for (task . properties) in correlated-notes
490
           for start-time = (or (task-start-time task) (get-universal-time))
491
           for elapsed-time = (- (get-internal-real-time) (task-start-real-time task))
492
           count (apply #'store-query-event task
493
                        :start-time start-time
494
                        :query-time elapsed-time
495
                        properties))))))
496
 
497
 (defgeneric write-accounting-notes (notes destination)
498
   (:documentation "write the notes (test . properties) to the destination")
499
   (:method ((correlated-notes cons) (destinations list))
500
     (dolist (destination destinations)
501
       (write-accounting-notes correlated-notes destination)))
502
   (:method ((correlated-notes cons) (destination (eql :syslog)))
503
     (destructuring-bind (task . properties) correlated-notes
504
       (when (member (getf properties :|state|) '(:complete :terminated))
505
         (log-notice! "[statistics] ~a { <urn:uuid:~a> :timestamp ~s^^<http://www.w3.org/2001/XMLSchema#dateTime>; :query_time ~d; ~{ :~a ~s~^;~} . }"
506
                      (repository-uri (task-repository task))
507
                      (task-id task)
508
                      (term-lexical-form (universal-time-date-time (or (task-start-time task) (get-universal-time))))
509
                      (- (get-internal-real-time) (task-start-real-time task))
510
                      properties))))
511
   (:method ((correlated-notes cons) (destination (eql :store)))
512
     (destructuring-bind (task . properties) correlated-notes
513
       (let* ((start-time (or (task-start-time task) (get-universal-time)))
514
              (elapsed-time (- (get-internal-real-time) (task-start-real-time task))))
515
         (apply #'store-query-event task
516
                :start-time start-time
517
                :query-time elapsed-time
518
                properties))))
519
   (:method ((correlated-notes cons) (destination stream))
520
     (pprint correlated-notes destination)))
521
 
522
 (defun correlate-task-accounting-notes (notes)
523
   (let ((cached-properties ()))
524
     (labels ((correlate-properties (state properties)
525
                (case state
526
                  ((:complete :terminated)
527
                   (setf (getf cached-properties ':|state|) state)))
528
                (loop for (property value) on properties by #'cddr
529
                  unless (member property '(:|task_id| :|state|))
530
                  do (typecase value
531
                       (null )
532
                       (number (unless (and (member property '(:|bytes_read| :|bytes_written|))
533
                                            (zerop value))
534
                                 (incf (getf cached-properties property 0) value)))
535
                       (string (when (plusp (length value))
536
                                 (setf (getf cached-properties property) value)))
537
                       (t (setf (getf cached-properties property) value)))
538
                  when (and (eq state :compile(eq property :|run_time|))
539
                  do (incf (getf cached-properties :|compile_run_time| 0) value)
540
                  when (and (eq state :parse(eq property :|run_time|))
541
                  do (setf (getf cached-properties :|parse_run_time| 0) value))))
542
         ;; perform two passes. the queue entries need not appear in semantic order
543
         ;; first correlate, then emit entries for completed and terminated tasks.
544
         (loop for (task task-id-key id state-key state . properties) in notes
545
               do (progn (assert (and (eq task-id-key :|task_id|(eq state-key :|state|)) ()
546
                                 "invalid accounting note: ~s" properties)
547
                         (when (and state id)
548
                           (setf id (task-id task))
549
                           (correlate-properties state
550
                                                 (list* :|agent_id| (agent-name (task-agent task))
551
                                                        :|agent_location| (agent-location (task-agent task))
552
                                                        :|signature| (query-signature task)
553
                                                        :|user_id| (task-user-id task)
554
                                                        ;;; do not include :|api_key| (task-api-key task)
555
                                                        properties)))))
556
       cached-properties)))
557
 
558
 
559
 ;;; (publish-accounting-notes '(("?" :id 1 :time 2) ("?" :id 3 :time 4)) '*error-output*)
560
 ;;; (publish-accounting-notes '(("?" :id 1 :time 2) ("?" :id 3 :time 4)) *accounting-destination*)
561
 
562
 (defun write-accounting-notes-csv (notes stream)
563
   (fresh-line stream)
564
   (let* ((longest (reduce #'(lambda (n1 n2) (if (> (length n1) (length n2)) n1 n2))
565
                           notes))
566
          (header (loop for label in (rest longest) by #'cddr
567
                    collect label)))
568
     (format stream "~&~{~a~^,~}~%" header)
569
     (loop for (nil . properties) in notes
570
           do (format stream "~{~a~^,~}~%"
571
                      (loop for value in (rest properties) by #'cddr
572
                        collect value)))))
573
 
574
 
575
 ;;; error text does not appear here.
576
 ;;; in the messaging configuration, the error messages follow a distinct data flow
577
 ;;; while in the nginx version, they are returned through their own stream
578
 ;;; requester ip number is unknown at this point and must be logged by the 
579
 ;;; front-end
580
 
581
 
582
 (defparameter *trig-statistics-cache* nil
583
   "Used by log-accounting-notes-trig to correlate accounting entries according to query id
584
  in order to produce statistics syslog entries. Intended to be used by a single thread. This
585
  is either an autonomous admin thread or single request threads.")
586
 (defparameter *store-statistics-cache* nil
587
   "Used by log-accounting-notes-trig to correlate accounting entries according to query id
588
  in order to produce statistics syslog entries. Intended to be used by a single thread. This
589
  is either an autonomous admin thread or single request threads.")
590
 
591
 (defun correlate-store-notes (notes &key (completion-states '(:complete :terminated)))
592
   "Given a list of the form
593
     ((<task> :|task_id| <uuid> :|state| <state> . <property-plist>) ... )
594
  correlate the values for each task by uuid.
595
   mark returned properties to prevent double entries"
596
   
597
   (let ((cache (or *store-statistics-cache*
598
                    (setq *store-statistics-cache* (make-hash-table :test 'equal :synchronized t
599
                                                              :weakness :key)))))
600
       (labels ((correlate-properties (id state task properties)
601
                  (let* ((cached-entry (gethash id cache))
602
                         (cached-properties (rest cached-entry)))
603
                    (unless (getf properties :logged) ;; complete already  seen
604
                      (case state
605
                        ((:complete :terminated)
606
                         (setf (getf cached-properties :|state|) state)))
607
                      (loop for (property value) on properties by #'cddr
608
                        unless (member property '(:|task_id| :|state|))
609
                        do (typecase value
610
                             (null )
611
                             (number (unless (and (member property '(:|bytes_read| :|bytes_written|))
612
                                                  (zerop value))
613
                                       (incf (getf cached-properties property 0) value)))
614
                             (string (when (plusp (length value))
615
                                       (setf (getf cached-properties property) value)))
616
                             (t (setf (getf cached-properties property) value)))
617
                        when (and (eq state :compile(eq property :|run_time|))
618
                        do (incf (getf cached-properties :|compile_run_time| 0) value)
619
                        when (and (eq state :parse(eq property :|run_time|))
620
                        do (setf (getf cached-properties :|parse_run_time|) value))
621
                      (setf (gethash id cache) (cons task cached-properties)))))
622
                #+(or)
623
                (when-time-lexical-form (time)
624
                  (when time
625
                    (term-lexical-form (universal-time-date-time time)))))
626
         ;; perform two passes. the queue entries need not appear in semantic order
627
         ;; first correlate, then emit entries for completed and terminated tasks.
628
         (loop for (task task-id-key id state-key state . properties) in notes
629
               do (progn (assert (and (eq task-id-key :|task_id|(eq state-key :|state|)) ()
630
                                 "invalid accounting note: ~s" properties)
631
                         (when (and state id)
632
                           (setf id (task-id task))
633
                           (correlate-properties id state task
634
                                                 (list* :|agent_id| (agent-name (task-agent task))
635
                                                        :|agent_location| (agent-location (task-agent task))
636
                                                        :|signature| (query-signature task)
637
                                                        :|user_id| (task-user-id task)
638
                                                        ;;; do not include :|api_key| (task-api-key task)
639
                                                        properties)))))
640
 
641
         (loop for id being each hash-key of cache using (hash-value value)
642
           for (task . properties) = value
643
           when (and (not (getf properties :logged))
644
                     (or (null completion-states)
645
                         (member (getf properties :|state|) completion-states)))
646
           collect value
647
           and do (setf (gethash id cache) (list* task :logged (get-universal-time) properties))))))
648
 
649
 
650
 (defun log-accounting-notes-trig (notes)
651
   "Given a list of the form
652
     ((<task> :|task_id| <uuid> :|state| <state> . <property-plist>) ... )
653
  correlate the values for each task by uuid and write the respective statistics as one syslog entry each."
654
   
655
   (let ((cache (or *trig-statistics-cache*
656
                    (setq *trig-statistics-cache* (make-hash-table :test 'equal :synchronized t
657
                                                                   :weakness :key)))))
658
       (labels ((correlate-properties (id state task properties)
659
                  (let* ((cached-entry (gethash id cache))
660
                         (cached-properties (rest cached-entry)))
661
                    (unless (getf properties :logged) ;; complete already  seen
662
                      (case state
663
                        ((:complete :terminated)
664
                         (setf (getf cached-properties ':|state|) state)))
665
                      (loop for (property value) on properties by #'cddr
666
                        unless (member property '(:|task_id| :|state|))
667
                        do (typecase value
668
                             (null )
669
                             (number (unless (and (member property '(:|bytes_read| :|bytes_written|))
670
                                                  (zerop value))
671
                                       (incf (getf cached-properties property 0) value)))
672
                             (string (when (plusp (length value))
673
                                       (setf (getf cached-properties property) value)))
674
                             (t (setf (getf cached-properties property) value)))
675
                        when (and (eq state :parse(eq property :|run_time|))
676
                        do (setf (getf cached-properties :|parse_run_time|) value))
677
                      (setf (gethash id cache) (cons task cached-properties)))))
678
                #+(or)
679
                (when-time-lexical-form (time)
680
                  (when time
681
                    (term-lexical-form (universal-time-date-time time)))))
682
         ;; perform two passes. the queue entries need not appear in semantic order
683
         ;; first correlate, then emit entries for completed and terminated tasks.
684
         (loop for (task task-id-key id state-key state . properties) in notes
685
               do (progn (assert (and (eq task-id-key :|task_id|(eq state-key :|state|)) ()
686
                                 "invalid accounting note: ~s" properties)
687
                         (when (and state id)
688
                           (setf id (task-id task))
689
                           (correlate-properties id state task
690
                                                 (list* :|agent_id| (agent-name (task-agent task))
691
                                                        :|agent_location| (task-agent-location task)
692
                                                        :|signature| (query-signature task)
693
                                                        :|user_id| (task-user-id task)
694
                                                        :|name| (or (task-name task) "sparql")
695
                                                        ;;; do not include :|api_key| (task-api-key task)
696
                                                        properties)))))
697
         (loop for id being each hash-key of cache using (hash-value value)
698
           for (task . properties) = value
699
           when (and (not (getf properties :logged))
700
                     (member (getf properties :|state|) '(:complete :terminated))
701
                     (not  (getf properties :logged)))
702
           do (log-notice! "[statistics] ~a { <urn:uuid:~a> :timestamp ~s^^<http://www.w3.org/2001/XMLSchema#dateTime>; :query_time ~d; ~{ :~a ~s~^;~} . }"
703
                            (repository-uri (task-repository task))
704
                            id
705
                            (term-lexical-form (universal-time-date-time (or (task-start-time task) (get-universal-time))))
706
                            (- (get-internal-real-time) (task-start-real-time task))
707
                            properties)
708
           and do (setf (gethash id cache) (list* task :logged (get-universal-time) properties))))))
709
 
710
 (defun log-error-notes-trig (notes)
711
   "log collected notes and return the last condition"
712
   (loop for (task . condition) in notes
713
         when (and (typep task 'query) (typep condition 'condition))
714
         do (log-notice! "[statistics] ~a { <urn:uuid:~a>  :timestamp ~s^^<http://www.w3.org/2001/XMLSchema#dateTime>; :error_type ~s; :state :TERMINATED~@[; :signature ~s~] . }"
715
                         (repository-uri (task-repository task))
716
                         (task-id task)
717
                         (term-lexical-form (universal-time-date-time (or (task-start-time task) (get-universal-time))))
718
                         (symbol-name (type-of condition))
719
                         (query-signature task))
720
         finally (return condition)))
721
 
722
 
723
 #+(or)
724
 (defun print-error-conditions (notes &optional (stream *error-output*))
725
   (format *error-output* "~&# error")
726
   (loop for (nil . condition) in notes
727
         do (format stream "~&~%~a" condition)
728
         finally (return condition)))
729
 
730
 (defgeneric print-error-conditions (notes destination)
731
   (:method ((notes t) (destination null))
732
     nil)
733
   (:method ((notes null) (destination t))
734
     nil)
735
   (:method ((notes t) (destination (eql :error-output)))
736
     (print-error-conditions notes *error-output*))
737
   (:method ((notes t) (destination (eql :standard-output)))
738
     (print-error-conditions notes *standard-output*))
739
   (:method ((notes list) (destination stream))
740
     ; (format stream "~&# error")
741
     (loop for (nil . condition) in notes
742
       do (format destination "~&~a~%" condition)
743
       finally (return condition))
744
     (complete-output destination))
745
   
746
   (:method ((notes list) (destination (eql :syslog)))
747
     (loop for (nil . condition) in notes
748
       do (log-error "~&~%~a" condition)
749
       finally (return condition)))
750
   (:method ((notes list) (destination (eql :store)))
751
     (loop for (query . condition) in notes
752
       for message = (format nil "~a" condition)
753
       do (dydra-ndk:report-query-error (or (task-id query) "<NULL-QUERY-UUID>")
754
                                        (typecase condition
755
                                          (string condition)
756
                                          (condition (string (type-of condition)))
757
                                          (t (write-to-string condition)))
758
                                        :string message)
759
       finally (return condition)))
760
 
761
   (:method ((task task) (destination t))
762
     (print-error-conditions (channel-get-all (task-errors task)) destination)))
763
 
764
 
765
 #+sbcl
766
 (defun make-message-stream ()
767
   (make-two-way-stream
768
    (sb-sys::make-fd-stream 0 :element-type '(unsigned-byte 8) :input t)
769
    (sb-sys::make-fd-stream 1 :element-type '(unsigned-byte 8) :output t)))
770
 
771
 #+mcl
772
 (defun make-message-stream ()
773
   (error "NYI: cannot construct message stream."))
774
 
775
 
776
 
777