Coverage report: /development/source/library/org/datagraph/spocq-shard/src/spocq-server/start.lisp
| Kind | Covered | All | % |
| expression | 186 | 507 | 36.7 |
| branch | 16 | 42 | 38.1 |
Key
Not instrumented
Conditionalized out
Executed
Not executed
Both branches taken
One branch taken
Neither branch taken
1
;;; -*- Mode: lisp; Syntax: ansi-common-lisp; Base: 10; Package: org.datagraph.spocq.implementation; -*-
3
(in-package :org.datagraph.spocq.server.implementation)
5
(:documentation "Start up operator for spocq as an http server"
6
"Performs initialization actions and initiates threads for http api processing
10
- server administration
13
The HTTP server operates in one of three modes, depending on the *http-interface* configuration:
14
- :proxy : as a proxy on a specified port
15
- :fastcgi : as a fastcgi back end
16
- :scgi : as an scgi back end
18
The initialization process comprises
19
- initialize-spocq : configuration processing and store connection
20
- initialize-http : start respective http interface (w/ respective threads)
21
- initialize-admin : start admin thread or dedicate the initial thread to that,
22
depending on whether terminal io exists and is interactive.")
25
;;; the general start is
29
;;; in order to limit the response functions for just ldp
31
;;; (let ((*response-functions* '(spocq.si::linked-data-platform))) (main :init-name "init-http-ldp"))
33
;;; which configures the acceptor for ldp operations.
34
;;; this are incompatible with general rdf operations, as a given resource can have any name
36
;;; initial- and re-start
39
(defun main (&rest args &key (init-name (or (getarg "--spocqinit") "init-http")) &allow-other-keys)
40
"Provide the main entry point for a simple server:
41
- configure from --spocqinit
42
- initialize spocq runtime to start logs and establish connection to store
43
- run the standard http service"
44
(when (getarg "--spocqhelp") ;; --help is seen by sbcl
45
(format *trace-output* "~a :~{~% ~a~}~%" (first (spocq.i::command-line-argument-list))
46
(sort spocq.i::*getarg-options* #'string-lessp))
48
(setq spocq.i:*configuration-pathname*
49
(merge-pathnames init-name (make-pathname :directory '(:relative) :type "sxp")))
50
(handler-case (spocq.i:initialize-spocq)
52
(log-error "spocq.si:main: termination due to condition: ~a" condition)
53
(spocq.i::maybe-exit-on-error)))
54
;; avoid first initialization error
55
(handler-case (make-instance 'spocq.i::query :sse-expression () :id "" :repository-id "system/system")
56
(error (c) (log-warn "initial instantiation error: ~a" c))
57
(:no-error (result) (log-notice "initial instantiation: ~a" result)))
58
(apply #'run-server args)
59
(log-warn "main: exiting")
63
(defun run-server (&key (max-accept-count *http-accept-count-limit*)
64
(max-thread-count dydra:*service-request-count-limit*)
65
(request-class *service-request-class*)
66
(response-class *service-response-class*)
68
(host-name (dydra:server-host-name))
69
(host-package (or (find-package host-name)
70
(make-package host-name :use ())))
73
"Initiate the http server with a background admin process."
74
(declare (ignore init-name))
75
(spocq.i:enable-interrupt :sigterm #'spocq.i:sigterm-handler)
76
(unless spocq.i:*start-timestamp*
77
(setq spocq.i:*start-timestamp* (iso-time)))
78
(setq spocq.i:*response-header-types* nil) ; to be sure that no prefixes are sent out
79
#+sbcl(sb-ext:gc :full t)
81
(dydra:log-info "Start HTTP ~a." spocq.i:*start-timestamp*)
83
(import *response-functions* host-package)
84
(with-package-iterator (next host-package :internal)
85
(loop (multiple-value-bind (symbol-p symbol) (next)
86
(unless symbol-p (return))
87
(export symbol host-package))))
88
(setq *spocq-acceptor*
89
(make-instance 'spocq-acceptor
90
:name (format nil "~a@~a" "dydra.spocq" (spocq.i::host-name))
93
:request-class request-class
94
:response-class response-class
95
:taskmaster (make-instance *class.taskmaster*
96
:name (concatenate 'string host-name "-taskmaster")
97
:max-thread-count max-thread-count
98
:max-accept-count (when max-accept-count
99
(max max-accept-count (1+ (or max-thread-count 0)))))))
100
(setf (http:acceptor-dispatch-function *spocq-acceptor*) host-package)
101
(handler-case (http:start *spocq-acceptor*)
103
(dydra:log-warn "Unable to initiate service: ~a" c)
104
(spocq.i::maybe-exit-on-error)
105
(break "Unable to initiate service: ~a" c)))
106
(dydra:log-info "run: accepting on ~a. running..." *host-port*)
107
(spocq.i:run-processing-threads)
108
;; returns only when the server exits
109
(log-warn "run: returning")
112
;;; (setf (http:acceptor-dispatch-function *spocq-acceptor*) (find-package (dydra:host-name)))
114
(defun stop-server (&key (soft t))
116
(unless (tbnl::acceptor-shutdown-p *spocq-acceptor*)
117
(hunchentoot:stop *spocq-acceptor* :soft soft))
118
(error (c) "stop-server: ~a" c)))
120
(defun terminate-server ()
122
(setq spocq.i::*run-state* :terminate)
123
(spocq.i::stop-threads))
125
(defmethod http:handle-condition ((acceptor spocq-acceptor) (condition spocq.e:message-syntax-error))
126
(http:bad-request (format nil "~a" condition)))
128
(defmethod http:handle-condition ((acceptor spocq-acceptor) (condition spocq.e:request-error))
129
(http:bad-request (format nil "~a" condition)))
131
(defmethod http:handle-condition ((acceptor spocq-acceptor) (condition dydra-ndk::foreign-function-error))
132
"if an error occurs with the store, and it is not correctable, then check if errors lead to
134
(when spocq.i::*exit-on-errors*
135
(setq spocq.i::*run-state* :terminate)
136
(log-error "http:handle-condition: termination due to condition: ~a" condition)
137
(format *error-output* "~%~a" (hunchentoot::get-backtrace))
138
(spocq.i::exit-lisp 70)))
140
(defmethod http:handle-condition ((acceptor spocq-acceptor) (condition dydra-ndk:input-output-error))
141
"if an error occurs with the store, and it is not correctable, then check if errors lead to
143
if it is a write-transaction conflict, then treat it as a client error."
144
(if (search "already has" (dydra-ndk::foreign-function-error-message condition))
145
(http:bad-request (format nil "~a" condition))
146
(when spocq.i::*exit-on-errors*
147
(setq spocq.i::*run-state* :terminate)
148
(log-error "http:handle-condition: termination due to condition: ~a" condition)
149
(format *error-output* "~%~a" (hunchentoot::get-backtrace))
150
(spocq.i::exit-lisp 70))))
152
(defmethod http:handle-condition ((acceptor spocq-acceptor) (condition spocq.e:authorization-error))
153
(http:unauthorized (format nil "~a" condition)))
155
(defmethod http:handle-condition ((acceptor spocq-acceptor) (condition spocq.e:resource-not-found-error))
156
;; applies to top-level not-found errors
157
(http:not-found (format nil "~a" condition)))
159
(defmethod http:handle-condition ((acceptor spocq-acceptor) (condition dydra-ndk::unknown-pathname))
160
;; applies to internal not-found errors
161
(http:not-found (format nil "~a" condition)))
163
(defmethod http:handle-condition ((acceptor spocq-acceptor) (condition de.setf.utility:simple-encoding-error))
164
(http:bad-request (format nil "~a" condition)))
167
(defmethod http:handle-condition ((acceptor spocq-acceptor) (condition sb-int:simple-stream-error))
168
"iff a query is active terminate it, then just return to allow the http server unwind the conntection processing."
169
(when spocq.i::*task*
170
(log-error "http:handle-condition: task termination due to stream error: ~a: ~a" spocq.i::*task* condition)
171
(spocq.i::terminate-task spocq.i::*task*)))
173
(defmethod http:handle-condition ((acceptor spocq-acceptor) (condition usocket:connection-aborted-error))
174
"iff a query is active terminate it, then just return to allow the http server unwind the conntection processing."
175
(when spocq.i::*task*
176
(log-error "http:handle-condition: task termination due to stream error: ~a: ~a" spocq.i::*task* condition)
177
(spocq.i::terminate-task spocq.i::*task*)))
180
(defmethod http:respond-to-request ((acceptor spocq-acceptor) (request t) (reply t))
181
"Perform http dispatch, but
182
- check status before processing
183
- catch cancel restarts"
184
(cond ((or (eq spocq.i:*run-state* :terminate) (eq spocq.i::*signal-state* :sigterm))
185
(tbnl::stop acceptor :soft t)
186
;; iff there are no requests in progress, exit
187
(cond ((<= (tbnl:acceptor-requests-in-progress acceptor) 1) ;; this request counts
188
(log-warn "http:respond-to-request: terminate state with exit")
189
(unwind-protect (http:service-unavailable) ;; signal then exit on unwind
192
(log-warn "http:respond-to-request: unavailable pending request completion: ~a" (tbnl:acceptor-requests-in-progress acceptor))
193
(http:service-unavailable))))
195
;; track the total request processed by an acceptor and exit
196
;; if a limit is set and reached.
197
(incf (acceptor-request-count acceptor))
198
(log-notice "http:respond-to-request: acceptor x~a @~a/~a"
199
(tbnl:acceptor-requests-in-progress acceptor)
200
(acceptor-request-count acceptor)
201
(acceptor-request-count-limit acceptor))
202
(multiple-value-prog1
203
;; invoke the base method to delegate to the resource-specific function
205
(when (and (acceptor-request-count-limit acceptor)
206
(>= (acceptor-request-count acceptor) (acceptor-request-count-limit acceptor)))
207
(log-warn "http:respond-to-request: acceptor @~a -> terminate state" (acceptor-request-count-limit acceptor))
208
(tbnl::stop acceptor :soft t)
209
(setq spocq.i:*run-state* :terminate))))))
210
;;; linked-data-platform support
213
(let ((*response-functions* '(spocq.si::linked-data-platform)))
214
(main :init-name "init-http-ldp"
215
:request-class *class.ldp-http-request*
216
:response-class *class.ldp-http-response*)))
218
(defun main-ldp-loop ()
220
(loop (sleep 5) (when (eq spocq.i::*run-state* :terminate) (return))))
223
;;; triple-pattern-fragment support
225
(defun tpf-main (&optional (args ()))
226
"Run a query just once, with all arguments taken from the command line"
227
(handler-bind ((end-of-file
229
(declare (ignore condition))
230
(log-info "EOF: return ~a." (iso-time))
231
(return-from tpf-main nil)))
232
((or spocq.e::message-syntax-error spocq.e:request-error)
233
;; emit a parse error immediately and (optionally) exit
235
(complete-output *standard-output*)
236
(log-notice "responding to message syntax error: ~a" condition)
237
(print-error-conditions (list (cons nil condition)) *error-output*)
238
(complete-output *error-output*)
239
(maybe-exit-on-error 65)
240
;; flush anything else
241
(channel-get-all *error-condition-channel*)
242
(return-from tpf-main condition)))
245
;; (format *error-output* "run-query-loop-times: top-level error:~%~a" condition)
246
(log-stacktrace "Error running query: ~a -> ~a."
248
(complete-output *standard-output*)
251
(maybe-exit-on-error 70)
252
(return-from tpf-main condition))
253
(t (return-from tpf-main nil))))))
254
(let ((management-thread nil))
255
(labels ((main-sigterm-handler (signal code context)
256
(declare (ignore signal code context))
258
(log-info "Stop: SIGTERM ~a." (iso-time))
260
(management-thread-loop ()
261
(loop (when (eq *run-state* :terminate) (return))
262
(constrain-resources)
263
(sleep *management-thread-interval*))))
264
(setq *run-state* nil) ; :initialize)
265
(setq *start-timestamp* (iso-time))
266
(initialize-spocq :title (getarg "--title"))
267
(enable-interrupt :sigterm #'main-sigterm-handler)
268
(log-info "Start ~a." *start-timestamp*)
269
(setq management-thread (bt:make-thread #'management-thread-loop))
270
;; set the configuration once from the command-line
271
(with-command-line-configuration (args)
272
(when (find *repository-id* *disabled-repositories* :test #'string-equal)
273
(error 'spocq-runtime-error
275
:expression (format nil "The repository has been disabled: ~s." *repository-id*)))
276
;; generate a new task id if none was provided
278
(setq *task-id* (format nil "~a" (uuid:make-v1-uuid))))
279
(setq *run-state* :process)
280
;; the indivdual step implementation depends on a combination of the content and response media types
281
(pipe-task *standard-input* *standard-output* *request-content-type* *response-content-type*))
283
(log-info "Stop: return ~a." (iso-time))))))
286
(defmethod acceptor-log-access ((acceptor spocq.si::spocq-acceptor) &key return-code
288
"~:[-~@[ (~A)~]~;~:*~A~@[ (~A)~]~] :~A ~A [~A--~A] ~A \"~A ~A~@[?~A~] ~
289
~A\" ~D (~A) ~:[-~;~:*~D~] \"~:[-~;~:*~A~]\" \"~:[-~;~:*~A~]~@[ ~{~A~^ ~}~]\"~%"))
290
"Replace the default method for access logging.
291
As for the tbnl acceptor format, but do not emit literal authorization, allow for nginx alternatives
292
for the true ip address, add the request id."
294
(if (and (boundp 'http:*request*) http:*request*)
295
(http:log-notice format-control
297
(or (header-in* :x-forwarded-for)
298
(header-in* :X-Real-IP))
299
(acceptor-port acceptor)
300
(not (null (authorization)))
301
(iso-time (spocq.si::request-start-time http:*request*))
303
(spocq.si::request-id http:*request*)
309
(spocq.si::acceptor-request-count acceptor)
313
nil ;; placeholder for eventual additional properties
315
(http:log-notice "- - ~A - - ~A - - - - - ~A ~A - - -"
316
(acceptor-port acceptor)
319
(spocq.si::acceptor-request-count acceptor))))
320
;;; 2018-01-23T06:25:09.801657+01:00 [info] spocq-server[79013]: 127.0.0.1 (52.178.15.158) T [2018-01-23 06:25:09] "GET /nxp/plm/sparql?user_id=productsearch&query=select%20*%0Afrom%20%3Curn:dydra:all%3E%20%0Awhere%20%7B%20?s%20?p%20?o%20%7D%20limit%202 HTTP/1.1" 200 - "-" "Java/1.8.0_131"
325
(trace SPOCQ.I::CALL-WITH-OPEN-TRANSACTION spocq.i::transaction-open spocq.i::transaction-close)
326
(trace spocq.i::call-with-open-repository spocq.i::call-with-task-environment rdfcache:%make-transaction)
327
(trace ORG.DATAGRAPH.SPOCQ.IMPLEMENTATION::REPOSITORY-INSERT-FIELD ORG.DATAGRAPH.SPOCQ.IMPLEMENTATION::REPOSITORY-DELETE-FIELD
328
ORG.DATAGRAPH.SPOCQ.IMPLEMENTATION::commit-resource ORG.DATAGRAPH.SPOCQ.IMPLEMENTATION::read-resource
329
spocq.i::send-response-message
330
spocq.i::update-class-from-field
332
(trace (setf spocq.i::instance-store-revision) spocq.i::align-resource-to-store spocq.i::resolve-repository-revision-id)
333
(setq hunchentoot::*header-stream* *trace-output*)
334
(setq spocq.i::*encoding-trace-output* *trace-output*)
335
(defparameter *ldp-root* (spocq.i::ensure-instance '|ldp|:|BasicContainer| :identifier (intern-iri "http://ldp.stage.dydra.com/")))
339
;;; replace the idle handler
341
(in-package :spocq.i)
343
(defun server-idle-handler ()
344
"Consolidated implementation for standard management tasks.
345
Return nil if state is terminate and no queries remain."
346
;; now handled in scripts explicitly, which means the channel will remain empty
347
(publish-accounting-notes (get-accounting-notes) *accounting-destination*)
348
(when (plusp (channel-count (error-condition-channel)))
349
(let ((error-notes (channel-get-all (error-condition-channel))))
350
(log-error-notes-trig error-notes)))
351
(when (and (zerop (logand (ash (get-internal-real-time) -10) #x1f))
352
(zerop (hash-table-count *query-registry*)))
353
;; if no query is running, probe the store every +/- thirty seconds
355
;;; eventually consumes lmdb readers?
356
;;; (with-open-repository (*system-repository-id* :normal-disposition :abort) t)
357
(probe-transaction *system-repository-id*)
359
(when *exit-on-errors*
360
(setq *run-state* :terminate)
361
(log-error "management-thread-step: termination due to condition: ~a" condition)
362
(format *error-output* "management-thread-step: termination due to condition: ~a" condition)
363
(spocq.i::exit-lisp 70)))))
364
(constrain-resources)
367
;; once there are no more active queries, indicate to terminate
368
(cond ((and (zerop (query-count))
369
(<= (tbnl:acceptor-requests-in-progress spocq.si::*spocq-acceptor*) 1)
370
(if (boundp 'spocq.si::*asynchronous-task-entry*)
371
(null spocq.si::*asynchronous-task-entry*)
374
(log-info "terminate: management thread terminating other threads.")
376
(spocq.si::stop-server)
378
(log-info "terminate: management thread returning w/ ~a tasks." *task-count*)
382
(warn (log-info "terminate: management thread test: queries ~s, requests ~s"
384
(tbnl:acceptor-requests-in-progress spocq.si::*spocq-acceptor*)))
385
(log-info "terminate: management thread test: queries ~s, requests ~s"
387
(tbnl:acceptor-requests-in-progress spocq.si::*spocq-acceptor*))
392
(defun run-server-management-thread (&key &allow-other-keys)
393
"The HTTP server management thread does no autonomous processing - that is handled by an admin
394
response function. It handles just the statistics and termination."
396
(log-info "starting management thread...")
397
(setq spocq.i:*management-thread* (bt:current-thread))
398
(setq spocq.i:*run-state* :run)
399
(let ((last-ffi-operator nil))
400
(loop (unless (server-idle-handler) (return nil))
402
;; the observed behaviour was that the last operator remained transaction-abort even when other were blocked
403
;; indicating that the task cleanup thread was running, but not using a blocked function
404
(when (and (eq last-ffi-operator DYDRA-NDK::*LAST-FFI-OPERATOR*)
405
(not (equalp (rest DYDRA-NDK::*LAST-FFI-OPERATOR*) "top-level"))
406
(not (eq (first DYDRA-NDK::*LAST-FFI-OPERATOR*) 'RDFCACHE::%%TRANSACTION-ABORT)))
407
(log-warn "no ffi progress: ~s" last-ffi-operator))
408
(setf last-ffi-operator DYDRA-NDK::*LAST-FFI-OPERATOR*)
409
(sleep *management-thread-interval*))
410
(log-warn "run-server-management-thread: exiting")
413
(setq *toplevel-function* 'run-server-management-thread)