Skip to content

Commit 6daa410

Browse files
authored
fix(protocol/management): ensure driver load success (#168)
ensure this critical path is successful
1 parent c4c90f2 commit 6daa410

File tree

3 files changed

+97
-59
lines changed

3 files changed

+97
-59
lines changed

.github/workflows/ci.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,16 @@ jobs:
2020
stable: [true]
2121
crystal:
2222
- latest
23+
- 1.14.1
2324
include:
2425
- stable: false
2526
crystal: nightly
2627
# Service containers to run with `container-job`
2728
services:
2829
sshtest:
29-
image: tutum/ubuntu:trusty
30+
image: testcontainers/sshd:1.2.0
3031
env:
31-
ROOT_PASS: somepassword
32+
PASSWORD: somepassword
3233
# Label used to access the service container
3334
redis:
3435
# Docker Hub image

shard.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
name: placeos-driver
2-
version: 7.3.6
2+
version: 7.3.7
33

44
dependencies:
55
action-controller:

src/placeos-driver/protocol/management.cr

Lines changed: 93 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class PlaceOS::Driver::Protocol::Management
3030

3131
property on_redis : Proc(RedisAction, String, String, String?, Nil) = ->(_action : RedisAction, _hash_id : String, _key_name : String, _status_value : String?) {}
3232

33-
getter shutting_down : Channel(Nil) = Channel(Nil).new(1)
33+
@running : Bool = false
3434
getter? terminated : Bool = false
3535
getter proc : Process? = nil
3636
getter pid : Int64 = -1
@@ -178,6 +178,7 @@ class PlaceOS::Driver::Protocol::Management
178178

179179
# ameba:disable Metrics/CyclomaticComplexity
180180
private def process_events
181+
# by performing most processing on this fiber we avoid the need for locking
181182
until terminated?
182183
begin
183184
case (request = @events.receive).cmd
@@ -260,29 +261,31 @@ class PlaceOS::Driver::Protocol::Management
260261
return unless io = @io
261262

262263
modules.clear
263-
264-
channel = shutting_down
265-
spawn(same_thread: true) { ensure_shutdown(channel) }
264+
exited = Channel(Nil).new
265+
spawn(same_thread: true) { ensure_shutdown(exited) }
266266

267267
# The driver will shutdown the modules gracefully
268268
json = %({"id":"t","cmd":"terminate"})
269269
io.write_bytes json.bytesize
270270
io.write json.to_slice
271271
io.flush
272+
273+
select
274+
when exited.receive
275+
when timeout 10.seconds
276+
raise "timeout on shutdown"
277+
end
272278
rescue
273279
process = proc
274280
return unless process
275-
process.terminate graceful: false
281+
process.terminate(graceful: false) rescue nil
276282
end
277283

278284
private def ensure_shutdown(channel)
279-
select
280-
when channel.receive
281-
when timeout 10.seconds
282-
if (process = proc) && channel == @shutting_down
283-
process.terminate graceful: false
284-
end
285+
if process = proc
286+
process.wait
285287
end
288+
channel.send(nil)
286289
end
287290

288291
private def exec(module_id : String, payload : String, seq : UInt64, user_id : String?) : Nil
@@ -329,15 +332,27 @@ class PlaceOS::Driver::Protocol::Management
329332

330333
# Create the host driver process, then load modules that have been assigned.
331334
private def start_process : Nil
332-
return if @io || terminated?
333-
io = nil
334-
process = nil
335+
# 15 seconds for driver to start executing
336+
wait_driver_open = begin
337+
request_lock.synchronize do
338+
return if @running || terminated?
339+
@running = true
340+
end
335341

342+
Promise.new(UNIXSocket, 15.seconds)
343+
rescue error
344+
Log.error(exception: error) { {message: "driver pre-launch issue", driver_path: @driver_path} }
345+
request_lock.synchronize { @running = false }
346+
sleep 1.second
347+
launch_driver_failed
348+
return
349+
end
350+
351+
# Prepare driver IO
352+
unix_server = nil
336353
begin
337-
# Prepare driver IO
338354
unix_socket = File.tempname("pos", ".driver")
339355
unix_server = UNIXServer.new(unix_socket)
340-
wait_driver_open = Promise.new(UNIXSocket, 15.seconds)
341356

342357
# no need to keep the server open once the process has checked in
343358
spawn do
@@ -346,24 +361,39 @@ class PlaceOS::Driver::Protocol::Management
346361
# We want to be manually flushing our writes
347362
client.sync = false
348363
wait_driver_open.resolve client
349-
unix_server.close
350364
rescue error
351365
wait_driver_open.reject error
366+
ensure
367+
unix_server.close
352368
end
353369
end
354370

355371
@launch_count += 1
356372
@launch_time = Time.utc.to_unix
373+
rescue error
374+
# launch driver failed, we should attempt to restart it here
375+
Log.error(exception: error) { {message: "driver socket init issue", driver_path: @driver_path} }
376+
request_lock.synchronize { @running = false }
377+
unix_server.try(&.close) rescue nil
378+
sleep 1.second
379+
launch_driver_failed
380+
return
381+
end
357382

358-
fetch_proc = Promise.new(Process)
359-
spawn(same_thread: true) { launch_driver(fetch_proc, unix_socket) }
360-
@proc = process = fetch_proc.get
361-
@pid = process.pid
362-
363-
io = wait_driver_open.get
383+
io = begin
384+
spawn(same_thread: true) { launch_driver(unix_socket) }
385+
wait_driver_open.get
386+
rescue error
387+
Log.error(exception: error) { {message: "driver launch timeout", driver_path: @driver_path} }
388+
request_lock.synchronize { @running = false }
389+
sleep 1.second
390+
launch_driver_failed
391+
return
392+
end
364393

365-
# Start processing the output of the driver
366-
loaded = Promise.new(Nil)
394+
begin
395+
# Start processing the output of the driver (15 seconds for it to check-in)
396+
loaded = Promise.new(Nil, 15.seconds)
367397
spawn(same_thread: true) { process_comms(io, loaded) }
368398
loaded.get
369399

@@ -375,61 +405,68 @@ class PlaceOS::Driver::Protocol::Management
375405
io.flush
376406
end
377407

378-
# events can now write directly to the io, driver is running
379408
@io = io
380409
rescue error
381-
Log.error(exception: error) { {message: "failed to launch driver", driver_path: @driver_path} }
382-
383-
if io.nil?
384-
if process
385-
process.terminate graceful: false
386-
end
410+
Log.error(exception: error) { {message: "driver launch failed", driver_path: @driver_path} }
387411

388-
# attempt to relaunch
389-
sleep 5.seconds
390-
return if @io || terminated?
391-
spawn(same_thread: true) { relaunch("-1") }
412+
# try to force close any running instances
413+
request_lock.synchronize { @running = false }
414+
io.close rescue nil
415+
if process = @proc
416+
process.terminate(graceful: false) rescue nil
392417
end
418+
419+
# attempt to relaunch
420+
sleep 5.seconds
421+
launch_driver_failed
393422
end
394423
end
395424

396425
# launches the driver and manages the process
397-
private def launch_driver(fetch_proc, unix_socket) : Nil
426+
private def launch_driver(unix_socket) : Nil
427+
request_lock.synchronize { return unless @running }
428+
398429
Process.run(
399430
@driver_path,
400431
@on_edge ? {"-p", "-e", "-s", unix_socket} : {"-p", "-s", unix_socket},
401432
input: Process::Redirect::Close,
402433
output: Process::Redirect::Inherit,
403434
error: Process::Redirect::Inherit
404435
) do |process|
405-
fetch_proc.resolve process
436+
@proc = process
437+
@pid = process.pid
406438
end
407439

440+
# this executes once the driver has exited
408441
status = $?
409-
last_exit_code = status.exit_code.to_s
410-
411-
Log.warn { {message: "driver process exited with #{last_exit_code}", driver_path: @driver_path} } unless status.success?
412-
413-
if io = @io
414-
@pid = -1_i64
415-
@proc = nil
416-
io.close rescue nil
417-
@shutting_down.send nil
418-
@events.send(Request.new(last_exit_code, :exited))
419-
end
442+
request_lock.synchronize { @running = false }
443+
@pid = -1_i64
444+
@proc = nil
445+
@last_exit_code = exit_code = status.exit_code
446+
File.delete(unix_socket) rescue nil
447+
Log.info { {message: "driver process exited with #{exit_code}", driver_path: @driver_path} } unless status.success?
420448
rescue error
421449
Log.error(exception: error) { "error launching driver: #{@driver_path}" }
422-
fetch_proc.reject error
450+
ensure
451+
@io.try(&.close) rescue nil
452+
@io = nil
453+
sleep 200.milliseconds
454+
launch_driver_failed
423455
end
424456

425-
private def relaunch(last_exit_code : String) : Nil
426-
@io = nil
457+
private def launch_driver_failed
458+
running = request_lock.synchronize { @running }
459+
return if terminated? || running
460+
427461
@pid = -1_i64
428462
@proc = nil
429-
return if terminated?
430-
@last_exit_code = last_exit_code.to_i? || -1
463+
@io.try(&.close) rescue nil
464+
@io = nil
465+
@events.send(Request.new(@last_exit_code.to_s, :exited)) unless terminated?
466+
end
431467

432-
@shutting_down = Channel(Nil).new(1)
468+
private def relaunch(_last_exit_code : String) : Nil
469+
return if terminated?
433470
start_process unless modules.empty?
434471
end
435472

@@ -473,7 +510,7 @@ class PlaceOS::Driver::Protocol::Management
473510
end
474511
rescue error : IO::Error
475512
# Input stream closed. This should only occur on termination
476-
Log.debug(exception: error) { "comms closed for #{@driver_path}" } unless terminated?
513+
Log.warn(exception: error) { "comms closed for #{@driver_path}" } unless terminated?
477514
loaded.reject error
478515
ensure
479516
# Reject any pending request

0 commit comments

Comments
 (0)