Skip to content
This repository was archived by the owner on Nov 19, 2020. It is now read-only.

Commit 596ac5e

Browse files
committed
plugin: wip
1 parent ea95a86 commit 596ac5e

File tree

14 files changed

+991
-5
lines changed

14 files changed

+991
-5
lines changed

cmd/plugin/main.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package main
2+
3+
import (
4+
log "github.com/hashicorp/go-hclog"
5+
6+
"github.com/hashicorp/nomad/plugins"
7+
"github.com/jet/damon/plugin"
8+
)
9+
10+
func main() {
11+
// Serve the plugin
12+
plugins.Serve(factory)
13+
}
14+
15+
// factory returns a new instance of the LXC driver plugin
16+
func factory(log log.Logger) interface{} {
17+
return plugin.NewDriverPlugin(log)
18+
}

container/container.go

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package container
33
import (
44
"context"
55
"fmt"
6+
"os"
67
"os/exec"
78
"runtime"
89
"time"
@@ -38,8 +39,10 @@ const MinimumCPUMHz = 100
3839
type Container struct {
3940
Name string
4041
StartTime time.Time
42+
PID int
4143
Logger Logger
4244
doneCh chan struct{}
45+
token *win32.Token
4346
job *win32.JobObject
4447
proc *win32.Process
4548
result Result
@@ -117,7 +120,7 @@ func RunContained(cmd *exec.Cmd, cfg *Config) (*Container, error) {
117120
}
118121
container.Logger = logger
119122
if cfg.RestrictedToken {
120-
cfg.Logger.Logln("creating restricted token")
123+
logger.Logln("creating restricted token")
121124
rt, err := token.CreateRestrictedToken(win32.TokenRestrictions{
122125
DisableMaxPrivilege: true,
123126
LUAToken: true,
@@ -132,7 +135,7 @@ func RunContained(cmd *exec.Cmd, cfg *Config) (*Container, error) {
132135
token = rt
133136
}
134137
defer logger.CloseLogError(token, "couldn't closed process token")
135-
138+
container.token = token
136139
proc, err := win32.StartProcess(cmd, win32.AccessToken(token), win32.Suspended)
137140
if err != nil {
138141
return nil, errors.Wrapf(err, "unable to start process")
@@ -142,6 +145,7 @@ func RunContained(cmd *exec.Cmd, cfg *Config) (*Container, error) {
142145
return nil, err
143146
}
144147
container.proc = proc
148+
container.PID = int(container.proc.Pid())
145149
eli := &win32.ExtendedLimitInformation{
146150
KillOnJobClose: true,
147151
}
@@ -183,6 +187,7 @@ func RunContained(cmd *exec.Cmd, cfg *Config) (*Container, error) {
183187
logger.CloseLogError(job, "failed to close JobObject")
184188
return nil, errors.Wrapf(err, "container: Could not resume process main thread")
185189
}
190+
container.StartTime = time.Now()
186191
container.doneCh = make(chan struct{})
187192
go (&container).wait()
188193
return &container, nil
@@ -328,6 +333,42 @@ func (c *Container) Done() <-chan struct{} {
328333
return c.doneCh
329334
}
330335

336+
func (c *Container) Signal(sig os.Signal) error {
337+
return c.proc.Signal(sig)
338+
}
339+
340+
// Task is a program run in the context of a Container's job object
341+
// that is not the main program
342+
343+
func (c *Container) Exec(cfg TaskConfig) (*Task, error) {
344+
if len(cfg.Command) == 0 {
345+
return nil, fmt.Errorf("exec requires at least 1 argument")
346+
}
347+
name := cfg.Command[0]
348+
var args []string
349+
if len(cfg.Command) > 1 {
350+
args = cfg.Command[1:]
351+
}
352+
ec := exec.Command(name, args...)
353+
ec.Env = cfg.EnvList
354+
ec.Stderr = cfg.Stderr
355+
ec.Stdout = cfg.Stdout
356+
ec.Dir = cfg.Dir
357+
proc, err := win32.StartProcess(ec, win32.AccessToken(c.token), win32.Suspended)
358+
if err != nil {
359+
return nil, err
360+
}
361+
if err := c.job.Assign(proc); err != nil {
362+
c.Logger.Error(proc.Kill(), "unable to kill exec process")
363+
return nil, err
364+
}
365+
if err := proc.Resume(); err != nil {
366+
c.Logger.Error(proc.Kill(), "unable to kill exec process")
367+
return nil, err
368+
}
369+
return &Task{osProcess: proc}, nil
370+
}
371+
331372
func (c *Container) killOnError(err error) error {
332373
if err != nil {
333374
c.Logger.Error(c.proc.Kill(), "unable to kill child process")

container/task.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package container
2+
3+
import (
4+
"context"
5+
"io"
6+
7+
"github.com/jet/damon/win32"
8+
)
9+
10+
type TaskConfig struct {
11+
Command []string
12+
Dir string
13+
EnvList []string
14+
Stdout io.Writer
15+
Stderr io.Writer
16+
}
17+
18+
type Task struct {
19+
osProcess *win32.Process
20+
}
21+
22+
func (t *Task) Wait(ctx context.Context) (int, error) {
23+
exitCh := make(chan int, 1)
24+
errCh := make(chan error, 1)
25+
go func() {
26+
defer close(exitCh)
27+
defer close(errCh)
28+
res, err := t.osProcess.Wait()
29+
if err != nil {
30+
errCh <- err
31+
return
32+
}
33+
if res.Err != nil {
34+
errCh <- res.Err
35+
return
36+
}
37+
exitCh <- res.ExitStatus
38+
}()
39+
select {
40+
case err := <-errCh:
41+
t.osProcess.Kill()
42+
return -1, err
43+
case res := <-exitCh:
44+
return res, nil
45+
case <-ctx.Done():
46+
t.osProcess.Kill()
47+
return -1, ctx.Err()
48+
}
49+
}

make.ps1

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@ Param(
1616
[Parameter(ParameterSetName='Build')]
1717
[switch]$Build,
1818

19-
[Parameter(ParameterSetName='Build')]
20-
[string]$OutFile = "damon.exe"
19+
[Parameter(ParameterSetName = 'Build')]
20+
[string]$OutFile = "damon.exe",
21+
22+
[Parameter(ParameterSetName = 'Build')]
23+
[string]$PluginOutFile = "damon-plugin.exe"
2124
)
2225

2326
$GOLANG_LINT_VERSION="1.10.2"
@@ -78,6 +81,7 @@ if($Build) {
7881

7982
Write-Host $gcflags
8083
go.exe build -o $OutFile -ldflags="$ldflags" ./cmd/standalone
84+
go.exe build -o $PluginOutFile -ldflags="$ldflags" ./cmd/plugin
8185
exit $LASTEXITCODE
8286
}
8387

metrics/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ func (c *CPUCollector) Sample(m CPUMeasurement) CPUSample {
444444
c.lock.Unlock()
445445

446446
// total cpu time = total time * num cores
447-
ttime := (m.TotalTime - t0) * time.Duration(c.Cores)
447+
ttime := (m.TotalTime - t0)
448448
tmhz := c.MHzPerCore * float64(c.Cores)
449449

450450
kperc := float64(m.KernelTime-k0) / float64(ttime)

plugin/damon.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package plugin
2+
3+
import (
4+
// "fmt"
5+
6+
"fmt"
7+
"io"
8+
"os/exec"
9+
"strings"
10+
11+
log "github.com/hashicorp/go-hclog"
12+
"github.com/hashicorp/nomad/client/lib/fifo"
13+
"github.com/jet/damon/container"
14+
15+
"github.com/hashicorp/nomad/plugins/drivers"
16+
)
17+
18+
type damonExec struct {
19+
cmd *exec.Cmd
20+
ccfg *container.Config
21+
taskConfig TaskConfig
22+
cfg *drivers.TaskConfig
23+
stdout io.WriteCloser
24+
stderr io.WriteCloser
25+
logger log.Logger
26+
}
27+
28+
type hcLogWrapper struct {
29+
Logger log.Logger
30+
}
31+
32+
func (l hcLogWrapper) Logln(v ...interface{}) {
33+
if l.Logger == nil {
34+
return
35+
}
36+
l.Logger.Debug(fmt.Sprint(v...))
37+
}
38+
func (l hcLogWrapper) Error(err error, msg string) {
39+
if l.Logger == nil {
40+
return
41+
}
42+
l.Logger.Error(msg, "error", err)
43+
}
44+
45+
func getCPUMHz(cfg *drivers.TaskConfig, taskConfig TaskConfig, logger log.Logger) int {
46+
if taskConfig.CPULimit > 0 {
47+
return taskConfig.CPULimit
48+
}
49+
return int(cfg.Resources.NomadResources.Cpu.CpuShares)
50+
}
51+
52+
func getMemoryMB(cfg *drivers.TaskConfig, taskConfig TaskConfig, logger log.Logger) int {
53+
if taskConfig.MemoryLimit > 0 {
54+
return taskConfig.MemoryLimit
55+
}
56+
return int(cfg.Resources.NomadResources.Memory.MemoryMB)
57+
}
58+
59+
func newDamonExec(cfg *drivers.TaskConfig, taskConfig TaskConfig, logger log.Logger) (*damonExec, error) {
60+
var d damonExec
61+
d.ccfg = &container.Config{
62+
Name: cfg.ID,
63+
EnforceCPU: taskConfig.EnforceCPULimit,
64+
CPUMHzLimit: getCPUMHz(cfg, taskConfig, logger),
65+
EnforceMemory: taskConfig.EnforceMemoryLimit,
66+
MemoryMBLimit: getMemoryMB(cfg, taskConfig, logger),
67+
RestrictedToken: taskConfig.RestrictedToken,
68+
CPUHardCap: true,
69+
Logger: hcLogWrapper{Logger: logger},
70+
}
71+
d.cfg = cfg
72+
d.taskConfig = taskConfig
73+
d.cmd = exec.Command(taskConfig.Command, taskConfig.Args...)
74+
d.cmd.Dir = cfg.TaskDir().Dir
75+
d.cmd.Env = cfg.EnvList()
76+
d.logger = logger
77+
return &d, nil
78+
}
79+
80+
func (d *damonExec) startContainer(commandCfg *drivers.TaskConfig) (*taskHandle, error) {
81+
d.logger.Debug("running executable", "task_id", commandCfg.ID, "command", d.cmd.Path, "args", strings.Join(d.cmd.Args, " "))
82+
stdout, err := d.Stdout()
83+
if err != nil {
84+
return nil, err
85+
}
86+
stderr, err := d.Stderr()
87+
if err != nil {
88+
return nil, err
89+
}
90+
cmd := d.cmd
91+
cmd.Stdout = stdout
92+
cmd.Stderr = stderr
93+
c, err := container.RunContained(d.cmd, d.ccfg)
94+
if err != nil {
95+
defer d.Close()
96+
return nil, err
97+
}
98+
return &taskHandle{
99+
container: c,
100+
pid: c.PID,
101+
logger: d.logger,
102+
taskConfig: commandCfg,
103+
startedAt: c.StartTime,
104+
procState: drivers.TaskStateRunning,
105+
}, nil
106+
}
107+
108+
func (d *damonExec) Stdout() (io.Writer, error) {
109+
if d.stdout == nil {
110+
if d.cfg.StdoutPath == "" {
111+
return DevNull, nil
112+
}
113+
stdout, err := fifo.OpenWriter(d.cfg.StdoutPath)
114+
if err != nil {
115+
return nil, fmt.Errorf("failed to open stdout fifo '%s': %v", d.cfg.StdoutPath, err)
116+
}
117+
d.logger.Trace("stdout fifo opened", "path", "task_id", d.cfg.ID, d.cfg.StderrPath)
118+
d.stdout = stdout
119+
}
120+
return d.stdout, nil
121+
}
122+
123+
func (d *damonExec) Stderr() (io.Writer, error) {
124+
if d.stderr == nil {
125+
if d.cfg.StderrPath == "" {
126+
return DevNull, nil
127+
}
128+
stderr, err := fifo.OpenWriter(d.cfg.StderrPath)
129+
if err != nil {
130+
return nil, fmt.Errorf("failed to open stderr fifo '%s': %v", d.cfg.StderrPath, err)
131+
}
132+
d.logger.Trace("stderr fifo opened", "path", "task_id", d.cfg.ID, d.cfg.StderrPath)
133+
d.stderr = stderr
134+
}
135+
return d.stderr, nil
136+
}
137+
138+
func (d *damonExec) Close() {
139+
d.logger.Trace("damon closed", "task_id", d.cfg.ID)
140+
if d.stdout != nil {
141+
d.logger.Trace("stdout fifo closed", "task_id", d.cfg.ID)
142+
d.stdout.Close()
143+
}
144+
if d.stderr != nil {
145+
d.logger.Trace("stderr fifo closed", "task_id", d.cfg.ID)
146+
d.stderr.Close()
147+
}
148+
}

plugin/dev_null.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package plugin
2+
3+
type devNull int
4+
5+
// DevNull is a no-op io device
6+
// It implements:
7+
// - io.Reader
8+
// - io.Writer
9+
// - io.Closer (implicitly: io.ReadCloser & io.WriteCloser)
10+
// - io.Seeker (implicitly: io.ReadSeeker & io.WriteSeeker)
11+
// - io.ReaderAt
12+
// - io.WriterAt
13+
// - io.StringWriter
14+
const DevNull = devNull(0)
15+
16+
// Read implements a no-op io.Reader
17+
func (n devNull) Read(p []byte) (int, error) {
18+
return 0, nil
19+
}
20+
21+
// ReadAt implements a no-op io.ReaderAt
22+
func (n devNull) ReadAt(p []byte, off int64) (int, error) {
23+
return 0, nil
24+
}
25+
26+
//Write implements a discarding io.Writer
27+
func (n devNull) Write(p []byte) (int, error) {
28+
return len(p), nil
29+
}
30+
31+
//WriteAt implements a discarding io.WriterAt
32+
func (n devNull) WriteAt(p []byte, off int64) (int, error) {
33+
return len(p), nil
34+
}
35+
36+
// WriteString implements a discarding io.StringWriter
37+
func (n devNull) WriteString(s string) (int, error) {
38+
return len(s), nil
39+
}
40+
41+
// Close implements a no-op io.Closer
42+
func (n devNull) Close() error {
43+
return nil
44+
}
45+
46+
// Seek implements a no-op io.Seeker
47+
func (n devNull) Seek(offset, whence int) (int64, error) {
48+
return 0, nil
49+
}

0 commit comments

Comments
 (0)