Skip to content
This repository has been archived by the owner on Sep 26, 2021. It is now read-only.

Commit

Permalink
Merge pull request #2697 from dgageot/truncate-logs
Browse files Browse the repository at this point in the history
Fix truncated plugin binary logs
  • Loading branch information
dgageot committed Dec 28, 2015
2 parents 01cc4af + c180a79 commit 98a4905
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 82 deletions.
69 changes: 28 additions & 41 deletions libmachine/drivers/plugin/localbinary/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,11 @@ const (

type PluginStreamer interface {
// Return a channel for receiving the output of the stream line by
// line, and a channel for stopping the stream when we are finished
// reading from it.
// line.
//
// It happens to be the case that we do this all inside of the main
// plugin struct today, but that may not be the case forever.
AttachStream(*bufio.Scanner) (<-chan string, chan<- bool)
AttachStream(*bufio.Scanner) <-chan string
}

type PluginServer interface {
Expand Down Expand Up @@ -73,11 +72,13 @@ type Plugin struct {
MachineName string
addrCh chan string
stopCh chan bool
timeout time.Duration
}

type Executor struct {
pluginStdout, pluginStderr io.ReadCloser
DriverName string
cmd *exec.Cmd
binaryPath string
}

Expand Down Expand Up @@ -123,14 +124,14 @@ func (lbe *Executor) Start() (*bufio.Scanner, *bufio.Scanner, error) {

log.Debugf("Launching plugin server for driver %s", lbe.DriverName)

cmd := exec.Command(lbe.binaryPath)
lbe.cmd = exec.Command(lbe.binaryPath)

lbe.pluginStdout, err = cmd.StdoutPipe()
lbe.pluginStdout, err = lbe.cmd.StdoutPipe()
if err != nil {
return nil, nil, fmt.Errorf("Error getting cmd stdout pipe: %s", err)
}

lbe.pluginStderr, err = cmd.StderrPipe()
lbe.pluginStderr, err = lbe.cmd.StderrPipe()
if err != nil {
return nil, nil, fmt.Errorf("Error getting cmd stderr pipe: %s", err)
}
Expand All @@ -141,51 +142,35 @@ func (lbe *Executor) Start() (*bufio.Scanner, *bufio.Scanner, error) {
os.Setenv(PluginEnvKey, PluginEnvVal)
os.Setenv(PluginEnvDriverName, lbe.DriverName)

if err := cmd.Start(); err != nil {
if err := lbe.cmd.Start(); err != nil {
return nil, nil, fmt.Errorf("Error starting plugin binary: %s", err)
}

return outScanner, errScanner, nil
}

func (lbe *Executor) Close() error {
if err := lbe.pluginStdout.Close(); err != nil {
return err
}

if err := lbe.pluginStderr.Close(); err != nil {
return err
if err := lbe.cmd.Wait(); err != nil {
return fmt.Errorf("Error waiting for binary close: %s", err)
}

return nil
}

func stream(scanner *bufio.Scanner, streamOutCh chan<- string, stopCh <-chan bool) {
lines := make(chan string)
go func() {
for scanner.Scan() {
lines <- scanner.Text()
}
}()
for {
select {
case <-stopCh:
close(streamOutCh)
return
case line := <-lines:
streamOutCh <- strings.Trim(line, "\n")
if err := scanner.Err(); err != nil {
log.Warnf("Scanning stream: %s", err)
}
func stream(scanner *bufio.Scanner, streamOutCh chan<- string) {
for scanner.Scan() {
line := scanner.Text()
if err := scanner.Err(); err != nil {
log.Warnf("Scanning stream: %s", err)
}
streamOutCh <- strings.Trim(line, "\n")
}
}

func (lbp *Plugin) AttachStream(scanner *bufio.Scanner) (<-chan string, chan<- bool) {
func (lbp *Plugin) AttachStream(scanner *bufio.Scanner) <-chan string {
streamOutCh := make(chan string)
stopCh := make(chan bool)
go stream(scanner, streamOutCh, stopCh)
return streamOutCh, stopCh
go stream(scanner, streamOutCh)
return streamOutCh
}

func (lbp *Plugin) execServer() error {
Expand All @@ -204,18 +189,16 @@ func (lbp *Plugin) execServer() error {

lbp.addrCh <- strings.TrimSpace(addr)

stdOutCh, stopStdoutCh := lbp.AttachStream(outScanner)
stdErrCh, stopStderrCh := lbp.AttachStream(errScanner)
stdOutCh := lbp.AttachStream(outScanner)
stdErrCh := lbp.AttachStream(errScanner)

for {
select {
case out := <-stdOutCh:
log.Infof(pluginOut, lbp.MachineName, out)
case err := <-stdErrCh:
log.Debugf(pluginErr, lbp.MachineName, err)
case _ = <-lbp.stopCh:
stopStdoutCh <- true
stopStderrCh <- true
case <-lbp.stopCh:
if err := lbp.Executor.Close(); err != nil {
return fmt.Errorf("Error closing local plugin binary: %s", err)
}
Expand All @@ -230,13 +213,17 @@ func (lbp *Plugin) Serve() error {

func (lbp *Plugin) Address() (string, error) {
if lbp.Addr == "" {
if lbp.timeout == 0 {
lbp.timeout = defaultTimeout
}

select {
case lbp.Addr = <-lbp.addrCh:
log.Debugf("Plugin server listening at address %s", lbp.Addr)
close(lbp.addrCh)
return lbp.Addr, nil
case <-time.After(defaultTimeout):
return "", fmt.Errorf("Failed to dial the plugin server in %s", defaultTimeout)
case <-time.After(lbp.timeout):
return "", fmt.Errorf("Failed to dial the plugin server in %s", lbp.timeout)
}
}
return lbp.Addr, nil
Expand Down
20 changes: 11 additions & 9 deletions libmachine/drivers/plugin/localbinary/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"

"github.com/docker/machine/libmachine/log"
"github.com/stretchr/testify/assert"
)

type FakeExecutor struct {
Expand Down Expand Up @@ -56,15 +57,16 @@ func TestLocalBinaryPluginAddressTimeout(t *testing.T) {
if testing.Short() {
t.Skip("Skipping timeout test")
}
lbp := &Plugin{}
lbp.addrCh = make(chan string, 1)
go func() {
_, err := lbp.Address()
if err == nil {
t.Fatalf("Expected to get a timeout error, instead got %s", err)
}
}()
time.Sleep(defaultTimeout + 1)

lbp := &Plugin{
addrCh: make(chan string, 1),
timeout: 1 * time.Second,
}

addr, err := lbp.Address()

assert.Empty(t, addr)
assert.EqualError(t, err, "Failed to dial the plugin server in 1s")
}

func TestLocalBinaryPluginClose(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions libmachine/drivers/plugin/register_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Please use this plugin through the main 'docker-machine' binary.
}

log.SetDebug(true)
os.Setenv("MACHINE_DEBUG", "1")

rpcd := rpcdriver.NewRPCServerDriver(d)
rpc.RegisterName(rpcdriver.RPCServiceNameV0, rpcd)
Expand All @@ -50,10 +51,12 @@ Please use this plugin through the main 'docker-machine' binary.
for {
select {
case <-rpcd.CloseCh:
log.Debug("Closing plugin on server side")
os.Exit(0)
case <-rpcd.HeartbeatCh:
continue
case <-time.After(heartbeatTimeout):
// TODO: Add heartbeat retry logic
os.Exit(1)
}
}
Expand Down
42 changes: 10 additions & 32 deletions libmachine/drivers/rpc/client_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ const (
RestartMethod = `.Restart`
KillMethod = `.Kill`
UpgradeMethod = `.Upgrade`
LocalArtifactPathMethod = `.LocalArtifactPath`
GlobalArtifactPathMethod = `.GlobalArtifactPath`
)

func (ic *InternalClient) Call(serviceMethod string, args interface{}, reply interface{}) error {
Expand All @@ -90,13 +88,14 @@ func NewInternalClient(rpcclient *rpc.Client) *InternalClient {

func CloseDrivers() {
openedDriversLock.Lock()
defer openedDriversLock.Unlock()

for _, openedDriver := range openedDrivers {
openedDriver.close()
if err := openedDriver.close(); err != nil {
log.Warnf("Error closing a plugin driver: %s", err)
}
}
openedDrivers = []*RPCClientDriver{}

openedDriversLock.Unlock()
}

func NewRPCClientDriver(driverName string, rawDriver []byte) (*RPCClientDriver, error) {
Expand Down Expand Up @@ -159,8 +158,6 @@ func NewRPCClientDriver(driverName string, rawDriver []byte) (*RPCClientDriver,
case <-time.After(heartbeatInterval):
if err := c.Client.Call(HeartbeatMethod, struct{}{}, nil); err != nil {
log.Warnf("Error attempting heartbeat call to plugin server: %s", err)
c.close()
return
}
}
}
Expand Down Expand Up @@ -190,12 +187,6 @@ func (c *RPCClientDriver) close() error {
c.heartbeatDoneCh <- true
close(c.heartbeatDoneCh)

log.Debug("Making call to close connection to plugin binary")

if err := c.plugin.Close(); err != nil {
return err
}

log.Debug("Making call to close driver server")

if err := c.Client.Call(CloseMethod, struct{}{}, nil); err != nil {
Expand All @@ -204,6 +195,12 @@ func (c *RPCClientDriver) close() error {

log.Debug("Successfully made call to close driver server")

log.Debug("Making call to close connection to plugin binary")

if err := c.plugin.Close(); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -346,25 +343,6 @@ func (c *RPCClientDriver) Kill() error {
return c.Client.Call(KillMethod, struct{}{}, nil)
}

func (c *RPCClientDriver) LocalArtifactPath(file string) string {
var path string

if err := c.Client.Call(LocalArtifactPathMethod, file, &path); err != nil {
log.Warnf("Error attempting call to get LocalArtifactPath: %s", err)
}

return path
}

func (c *RPCClientDriver) GlobalArtifactPath() string {
globalArtifactPath, err := c.rpcStringCall(GlobalArtifactPathMethod)
if err != nil {
log.Warnf("Error attempting call to get GlobalArtifactPath: %s", err)
}

return globalArtifactPath
}

func (c *RPCClientDriver) Upgrade() error {
return c.Client.Call(UpgradeMethod, struct{}{}, nil)
}

0 comments on commit 98a4905

Please sign in to comment.