From f56dab06765ff5c9fec217b993a789431103698a Mon Sep 17 00:00:00 2001 From: David Gageot Date: Sat, 26 Dec 2015 13:36:02 +0100 Subject: [PATCH 1/7] Make test 10s faster Signed-off-by: David Gageot --- .../drivers/plugin/localbinary/plugin.go | 9 +++++++-- .../drivers/plugin/localbinary/plugin_test.go | 20 ++++++++++--------- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/libmachine/drivers/plugin/localbinary/plugin.go b/libmachine/drivers/plugin/localbinary/plugin.go index 81c91e02b1..4c0685eec9 100644 --- a/libmachine/drivers/plugin/localbinary/plugin.go +++ b/libmachine/drivers/plugin/localbinary/plugin.go @@ -73,6 +73,7 @@ type Plugin struct { MachineName string addrCh chan string stopCh chan bool + timeout time.Duration } type Executor struct { @@ -230,13 +231,17 @@ func (lbp *Plugin) Serve() error { func (lbp *Plugin) Address() (string, error) { if lbp.Addr == "" { + if lbp.timeout == 0 { + lbp.timeout = defaultTimeout + } + select { case lbp.Addr = <-lbp.addrCh: log.Debugf("Plugin server listening at address %s", lbp.Addr) close(lbp.addrCh) return lbp.Addr, nil - case <-time.After(defaultTimeout): - return "", fmt.Errorf("Failed to dial the plugin server in %s", defaultTimeout) + case <-time.After(lbp.timeout): + return "", fmt.Errorf("Failed to dial the plugin server in %s", lbp.timeout) } } return lbp.Addr, nil diff --git a/libmachine/drivers/plugin/localbinary/plugin_test.go b/libmachine/drivers/plugin/localbinary/plugin_test.go index 87323ed9ac..8d5dae09a7 100644 --- a/libmachine/drivers/plugin/localbinary/plugin_test.go +++ b/libmachine/drivers/plugin/localbinary/plugin_test.go @@ -10,6 +10,7 @@ import ( "os" "github.com/docker/machine/libmachine/log" + "github.com/stretchr/testify/assert" ) type FakeExecutor struct { @@ -56,15 +57,16 @@ func TestLocalBinaryPluginAddressTimeout(t *testing.T) { if testing.Short() { t.Skip("Skipping timeout test") } - lbp := &Plugin{} - lbp.addrCh = make(chan string, 1) - go func() { - _, err := lbp.Address() - if err == nil { - t.Fatalf("Expected to get a timeout error, instead got %s", err) - } - }() - time.Sleep(defaultTimeout + 1) + + lbp := &Plugin{ + addrCh: make(chan string, 1), + timeout: 1 * time.Second, + } + + addr, err := lbp.Address() + + assert.Empty(t, addr) + assert.EqualError(t, err, "Failed to dial the plugin server in 1s") } func TestLocalBinaryPluginClose(t *testing.T) { From 0cd8312e72b439fb711b226d7f1922d41db064cc Mon Sep 17 00:00:00 2001 From: David Gageot Date: Sat, 26 Dec 2015 13:38:35 +0100 Subject: [PATCH 2/7] Remove dead code Signed-off-by: David Gageot --- libmachine/drivers/rpc/client_driver.go | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/libmachine/drivers/rpc/client_driver.go b/libmachine/drivers/rpc/client_driver.go index 54b3f35332..3de425e69e 100644 --- a/libmachine/drivers/rpc/client_driver.go +++ b/libmachine/drivers/rpc/client_driver.go @@ -66,8 +66,6 @@ const ( RestartMethod = `.Restart` KillMethod = `.Kill` UpgradeMethod = `.Upgrade` - LocalArtifactPathMethod = `.LocalArtifactPath` - GlobalArtifactPathMethod = `.GlobalArtifactPath` ) func (ic *InternalClient) Call(serviceMethod string, args interface{}, reply interface{}) error { @@ -346,25 +344,6 @@ func (c *RPCClientDriver) Kill() error { return c.Client.Call(KillMethod, struct{}{}, nil) } -func (c *RPCClientDriver) LocalArtifactPath(file string) string { - var path string - - if err := c.Client.Call(LocalArtifactPathMethod, file, &path); err != nil { - log.Warnf("Error attempting call to get LocalArtifactPath: %s", err) - } - - return path -} - -func (c *RPCClientDriver) GlobalArtifactPath() string { - globalArtifactPath, err := c.rpcStringCall(GlobalArtifactPathMethod) - if err != nil { - log.Warnf("Error attempting call to get GlobalArtifactPath: %s", err) - } - - return globalArtifactPath -} - func (c *RPCClientDriver) Upgrade() error { return c.Client.Call(UpgradeMethod, struct{}{}, nil) } From 34f6e201657628a091bd4faa7a788ca1ee05be85 Mon Sep 17 00:00:00 2001 From: David Gageot Date: Sat, 26 Dec 2015 13:39:06 +0100 Subject: [PATCH 3/7] Print an error when a driver cannot be closed Signed-off-by: David Gageot --- libmachine/drivers/rpc/client_driver.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/libmachine/drivers/rpc/client_driver.go b/libmachine/drivers/rpc/client_driver.go index 3de425e69e..d04c91ab34 100644 --- a/libmachine/drivers/rpc/client_driver.go +++ b/libmachine/drivers/rpc/client_driver.go @@ -88,13 +88,14 @@ func NewInternalClient(rpcclient *rpc.Client) *InternalClient { func CloseDrivers() { openedDriversLock.Lock() + defer openedDriversLock.Unlock() for _, openedDriver := range openedDrivers { - openedDriver.close() + if err := openedDriver.close(); err != nil { + log.Warnf("Error closing a plugin driver: %s", err) + } } openedDrivers = []*RPCClientDriver{} - - openedDriversLock.Unlock() } func NewRPCClientDriver(driverName string, rawDriver []byte) (*RPCClientDriver, error) { From 232c0ea37e443098103233112631340ac7c4202b Mon Sep 17 00:00:00 2001 From: Nathan LeClaire Date: Fri, 18 Dec 2015 18:54:09 -0800 Subject: [PATCH 4/7] Fix truncated plugin binary logs Signed-off-by: Nathan LeClaire --- .../drivers/plugin/localbinary/plugin.go | 29 ++++++++++--------- libmachine/drivers/plugin/register_driver.go | 3 ++ libmachine/drivers/rpc/client_driver.go | 12 ++++---- 3 files changed, 24 insertions(+), 20 deletions(-) diff --git a/libmachine/drivers/plugin/localbinary/plugin.go b/libmachine/drivers/plugin/localbinary/plugin.go index 4c0685eec9..03f2a9251d 100644 --- a/libmachine/drivers/plugin/localbinary/plugin.go +++ b/libmachine/drivers/plugin/localbinary/plugin.go @@ -79,6 +79,7 @@ type Plugin struct { type Executor struct { pluginStdout, pluginStderr io.ReadCloser DriverName string + cmd *exec.Cmd binaryPath string } @@ -124,14 +125,14 @@ func (lbe *Executor) Start() (*bufio.Scanner, *bufio.Scanner, error) { log.Debugf("Launching plugin server for driver %s", lbe.DriverName) - cmd := exec.Command(lbe.binaryPath) + lbe.cmd = exec.Command(lbe.binaryPath) - lbe.pluginStdout, err = cmd.StdoutPipe() + lbe.pluginStdout, err = lbe.cmd.StdoutPipe() if err != nil { return nil, nil, fmt.Errorf("Error getting cmd stdout pipe: %s", err) } - lbe.pluginStderr, err = cmd.StderrPipe() + lbe.pluginStderr, err = lbe.cmd.StderrPipe() if err != nil { return nil, nil, fmt.Errorf("Error getting cmd stderr pipe: %s", err) } @@ -142,7 +143,7 @@ func (lbe *Executor) Start() (*bufio.Scanner, *bufio.Scanner, error) { os.Setenv(PluginEnvKey, PluginEnvVal) os.Setenv(PluginEnvDriverName, lbe.DriverName) - if err := cmd.Start(); err != nil { + if err := lbe.cmd.Start(); err != nil { return nil, nil, fmt.Errorf("Error starting plugin binary: %s", err) } @@ -150,34 +151,34 @@ func (lbe *Executor) Start() (*bufio.Scanner, *bufio.Scanner, error) { } func (lbe *Executor) Close() error { + if err := lbe.cmd.Wait(); err != nil { + return fmt.Errorf("Error waiting for binary close: %s", err) + } + if err := lbe.pluginStdout.Close(); err != nil { - return err + return fmt.Errorf("Error closing plugin stdout: %s", err) } if err := lbe.pluginStderr.Close(); err != nil { - return err + return fmt.Errorf("Error closing plugin stderr: %s", err) } return nil } func stream(scanner *bufio.Scanner, streamOutCh chan<- string, stopCh <-chan bool) { - lines := make(chan string) - go func() { - for scanner.Scan() { - lines <- scanner.Text() - } - }() for { select { case <-stopCh: close(streamOutCh) return - case line := <-lines: - streamOutCh <- strings.Trim(line, "\n") + default: + scanner.Scan() + line := scanner.Text() if err := scanner.Err(); err != nil { log.Warnf("Scanning stream: %s", err) } + streamOutCh <- strings.Trim(line, "\n") } } } diff --git a/libmachine/drivers/plugin/register_driver.go b/libmachine/drivers/plugin/register_driver.go index 23ebe3ae96..27e0dbc33d 100644 --- a/libmachine/drivers/plugin/register_driver.go +++ b/libmachine/drivers/plugin/register_driver.go @@ -30,6 +30,7 @@ Please use this plugin through the main 'docker-machine' binary. } log.SetDebug(true) + os.Setenv("MACHINE_DEBUG", "1") rpcd := rpcdriver.NewRPCServerDriver(d) rpc.RegisterName(rpcdriver.RPCServiceNameV0, rpcd) @@ -50,10 +51,12 @@ Please use this plugin through the main 'docker-machine' binary. for { select { case <-rpcd.CloseCh: + log.Debug("Closing plugin on server side") os.Exit(0) case <-rpcd.HeartbeatCh: continue case <-time.After(heartbeatTimeout): + // TODO: Add heartbeat retry logic os.Exit(1) } } diff --git a/libmachine/drivers/rpc/client_driver.go b/libmachine/drivers/rpc/client_driver.go index d04c91ab34..1c06876f7e 100644 --- a/libmachine/drivers/rpc/client_driver.go +++ b/libmachine/drivers/rpc/client_driver.go @@ -189,12 +189,6 @@ func (c *RPCClientDriver) close() error { c.heartbeatDoneCh <- true close(c.heartbeatDoneCh) - log.Debug("Making call to close connection to plugin binary") - - if err := c.plugin.Close(); err != nil { - return err - } - log.Debug("Making call to close driver server") if err := c.Client.Call(CloseMethod, struct{}{}, nil); err != nil { @@ -203,6 +197,12 @@ func (c *RPCClientDriver) close() error { log.Debug("Successfully made call to close driver server") + log.Debug("Making call to close connection to plugin binary") + + if err := c.plugin.Close(); err != nil { + return err + } + return nil } From 501c4f34602971713a80230704f8af970923bfd3 Mon Sep 17 00:00:00 2001 From: David Gageot Date: Mon, 28 Dec 2015 11:32:31 +0100 Subject: [PATCH 5/7] Read only if there is something to read Signed-off-by: David Gageot --- libmachine/drivers/plugin/localbinary/plugin.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/libmachine/drivers/plugin/localbinary/plugin.go b/libmachine/drivers/plugin/localbinary/plugin.go index 03f2a9251d..bad58ac120 100644 --- a/libmachine/drivers/plugin/localbinary/plugin.go +++ b/libmachine/drivers/plugin/localbinary/plugin.go @@ -173,12 +173,13 @@ func stream(scanner *bufio.Scanner, streamOutCh chan<- string, stopCh <-chan boo close(streamOutCh) return default: - scanner.Scan() - line := scanner.Text() - if err := scanner.Err(); err != nil { - log.Warnf("Scanning stream: %s", err) + if scanner.Scan() { + line := scanner.Text() + if err := scanner.Err(); err != nil { + log.Warnf("Scanning stream: %s", err) + } + streamOutCh <- strings.Trim(line, "\n") } - streamOutCh <- strings.Trim(line, "\n") } } } From b185bb3dbd4d5b06e5c254310ffd5486b7e39922 Mon Sep 17 00:00:00 2001 From: David Gageot Date: Mon, 28 Dec 2015 11:33:38 +0100 Subject: [PATCH 6/7] Keep on heartbeating after failure Signed-off-by: David Gageot --- libmachine/drivers/rpc/client_driver.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/libmachine/drivers/rpc/client_driver.go b/libmachine/drivers/rpc/client_driver.go index 1c06876f7e..03db4042b5 100644 --- a/libmachine/drivers/rpc/client_driver.go +++ b/libmachine/drivers/rpc/client_driver.go @@ -158,8 +158,6 @@ func NewRPCClientDriver(driverName string, rawDriver []byte) (*RPCClientDriver, case <-time.After(heartbeatInterval): if err := c.Client.Call(HeartbeatMethod, struct{}{}, nil); err != nil { log.Warnf("Error attempting heartbeat call to plugin server: %s", err) - c.close() - return } } } From c180a7962bf50f9b8387eaf73507289d9b3923bc Mon Sep 17 00:00:00 2001 From: David Gageot Date: Mon, 28 Dec 2015 13:11:28 +0100 Subject: [PATCH 7/7] Fix race condition. cmdWait will do the closing Signed-off-by: David Gageot --- .../drivers/plugin/localbinary/plugin.go | 48 ++++++------------- 1 file changed, 14 insertions(+), 34 deletions(-) diff --git a/libmachine/drivers/plugin/localbinary/plugin.go b/libmachine/drivers/plugin/localbinary/plugin.go index bad58ac120..2b16fc3bd4 100644 --- a/libmachine/drivers/plugin/localbinary/plugin.go +++ b/libmachine/drivers/plugin/localbinary/plugin.go @@ -32,12 +32,11 @@ const ( type PluginStreamer interface { // Return a channel for receiving the output of the stream line by - // line, and a channel for stopping the stream when we are finished - // reading from it. + // line. // // It happens to be the case that we do this all inside of the main // plugin struct today, but that may not be the case forever. - AttachStream(*bufio.Scanner) (<-chan string, chan<- bool) + AttachStream(*bufio.Scanner) <-chan string } type PluginServer interface { @@ -155,40 +154,23 @@ func (lbe *Executor) Close() error { return fmt.Errorf("Error waiting for binary close: %s", err) } - if err := lbe.pluginStdout.Close(); err != nil { - return fmt.Errorf("Error closing plugin stdout: %s", err) - } - - if err := lbe.pluginStderr.Close(); err != nil { - return fmt.Errorf("Error closing plugin stderr: %s", err) - } - return nil } -func stream(scanner *bufio.Scanner, streamOutCh chan<- string, stopCh <-chan bool) { - for { - select { - case <-stopCh: - close(streamOutCh) - return - default: - if scanner.Scan() { - line := scanner.Text() - if err := scanner.Err(); err != nil { - log.Warnf("Scanning stream: %s", err) - } - streamOutCh <- strings.Trim(line, "\n") - } +func stream(scanner *bufio.Scanner, streamOutCh chan<- string) { + for scanner.Scan() { + line := scanner.Text() + if err := scanner.Err(); err != nil { + log.Warnf("Scanning stream: %s", err) } + streamOutCh <- strings.Trim(line, "\n") } } -func (lbp *Plugin) AttachStream(scanner *bufio.Scanner) (<-chan string, chan<- bool) { +func (lbp *Plugin) AttachStream(scanner *bufio.Scanner) <-chan string { streamOutCh := make(chan string) - stopCh := make(chan bool) - go stream(scanner, streamOutCh, stopCh) - return streamOutCh, stopCh + go stream(scanner, streamOutCh) + return streamOutCh } func (lbp *Plugin) execServer() error { @@ -207,8 +189,8 @@ func (lbp *Plugin) execServer() error { lbp.addrCh <- strings.TrimSpace(addr) - stdOutCh, stopStdoutCh := lbp.AttachStream(outScanner) - stdErrCh, stopStderrCh := lbp.AttachStream(errScanner) + stdOutCh := lbp.AttachStream(outScanner) + stdErrCh := lbp.AttachStream(errScanner) for { select { @@ -216,9 +198,7 @@ func (lbp *Plugin) execServer() error { log.Infof(pluginOut, lbp.MachineName, out) case err := <-stdErrCh: log.Debugf(pluginErr, lbp.MachineName, err) - case _ = <-lbp.stopCh: - stopStdoutCh <- true - stopStderrCh <- true + case <-lbp.stopCh: if err := lbp.Executor.Close(); err != nil { return fmt.Errorf("Error closing local plugin binary: %s", err) }