Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: make it so that journald logs are ascribed to containers #247

Open
wants to merge 3 commits into
base: docker-1.13.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/docker/libnetwork/types"
agentexec "github.com/docker/swarmkit/agent/exec"
"github.com/opencontainers/runc/libcontainer/label"
specs "github.com/opencontainers/runtime-spec/specs-go"
)

const configFileName = "config.v2.json"
Expand Down Expand Up @@ -85,6 +86,7 @@ type CommonContainer struct {
HasBeenManuallyStopped bool // used for unless-stopped restart policy
MountPoints map[string]*volume.MountPoint
HostConfig *containertypes.HostConfig `json:"-"` // do not serialize the host config in the json, otherwise we'll make the container unportable
Spec *specs.Spec `json:"-"` // ditto
ExecCommands *exec.Store `json:"-"`
SecretStore agentexec.SecretGetter `json:"-"`
SecretReferences []*swarmtypes.SecretReference
Expand Down Expand Up @@ -313,7 +315,7 @@ func (container *Container) StartLogger(cfg containertypes.LogConfig) (logger.Lo
if err != nil {
return nil, fmt.Errorf("Failed to get logging factory: %v", err)
}
ctx := logger.Context{
cctx := logger.CommonContext{
Config: cfg.Config,
ContainerID: container.ID,
ContainerName: container.Name,
Expand All @@ -326,6 +328,7 @@ func (container *Container) StartLogger(cfg containertypes.LogConfig) (logger.Lo
ContainerLabels: container.Config.Labels,
DaemonName: "docker",
}
ctx := configurePlatformLogger(cctx, container)

// Set logging file for "json-logger"
if cfg.Type == jsonfilelog.Name {
Expand Down
14 changes: 14 additions & 0 deletions container/logger_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package container

import (
"github.com/docker/docker/daemon/logger"
)

// configurePlatformLogger takes a logger.CommonContext and adds any
// OS-specific information that is exclusive to a logger.Context.
func configurePlatformLogger(ctx logger.CommonContext, container *Container) logger.Context {
return logger.Context{
CommonContext: ctx,
ContainerCGroup: *container.Spec.Linux.CgroupsPath,
}
}
9 changes: 9 additions & 0 deletions container/logger_notlinux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// +build !linux

package container

// configurePlatformLogger takes a logger.CommonContext and adds any
// OS-specific information that is exclusive to a logger.Context.
func configurePlatformLogger(ctx logger.CommonContext, container *Container) logger.Context {
return logger.Context(ctx)
}
5 changes: 3 additions & 2 deletions daemon/logger/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
"time"
)

// Context provides enough information for a logging driver to do its function.
type Context struct {
// CommonContext provides almost enough information for a logging driver to do
// its function, but not anything that's OS-specific.
type CommonContext struct {
Config map[string]string
ContainerID string
ContainerName string
Expand Down
14 changes: 14 additions & 0 deletions daemon/logger/context_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package logger

// Context provides enough information for a logging driver to do its function.
type Context struct {
// These fields are shared across all definitions.
CommonContext
// Fields below this point are platform-specific.
ContainerCGroup string
}

// CGroup returns the name of the container's cgroup.
func (ctx *Context) CGroup() (string, error) {
return ctx.ContainerCGroup, nil
}
6 changes: 6 additions & 0 deletions daemon/logger/context_notlinux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// +build !linux

package logger

// Context provides enough information for a logging driver to do its function.
type Context CommonContext
179 changes: 157 additions & 22 deletions daemon/logger/journald/journald.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,61 @@
package journald

import (
"encoding/gob"
"flag"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"syscall"
"unicode"

"github.com/Sirupsen/logrus"
"github.com/coreos/go-systemd/journal"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/loggerutils"
"github.com/docker/docker/pkg/reexec"
"golang.org/x/sys/unix"
)

const name = "journald"
const handler = "journal-logger"

type journald struct {
// for reading
vars map[string]string // additional variables and values to send to the journal along with the log message
readers readerList
// for writing
writing sync.Mutex
cmd *exec.Cmd
pipe io.WriteCloser
encoder *gob.Encoder
}

type readerList struct {
mu sync.Mutex
readers map[*logger.LogWatcher]*logger.LogWatcher
}

// MessageWithVars describes the packet format that we use when forwarding log
// messages from the daemon to a helper process.
type MessageWithVars struct {
logger.Message
Vars map[string]string
}

func init() {
if err := logger.RegisterLogDriver(name, New); err != nil {
logrus.Fatal(err)
}
if err := logger.RegisterLogOptValidator(name, validateLogOpt); err != nil {
logrus.Fatal(err)
}
gob.Register(MessageWithVars{})
reexec.Register(handler, journalLoggerMain)
}

// sanitizeKeyMode returns the sanitized string so that it could be used in journald.
Expand Down Expand Up @@ -62,30 +88,48 @@ func New(ctx logger.Context) (logger.Logger, error) {
if !journal.Enabled() {
return nil, fmt.Errorf("journald is not enabled on this host")
}
// Strip a leading slash so that people can search for
// CONTAINER_NAME=foo rather than CONTAINER_NAME=/foo.
name := ctx.ContainerName
if name[0] == '/' {
name = name[1:]
}

// parse log tag
// parse the log tag
tag, err := loggerutils.ParseLogTag(ctx, loggerutils.DefaultTemplate)
if err != nil {
return nil, err
}

// build the set of values which we'll send to the journal every time
vars := map[string]string{
"CONTAINER_ID": ctx.ContainerID[:12],
"CONTAINER_ID_FULL": ctx.ContainerID,
"CONTAINER_NAME": name,
"CONTAINER_ID": ctx.ID(),
"CONTAINER_ID_FULL": ctx.FullID(),
"CONTAINER_NAME": ctx.Name(),
"CONTAINER_TAG": tag,
}
extraAttrs := ctx.ExtraAttributes(sanitizeKeyMod)
for k, v := range extraAttrs {
vars[k] = v
}
return &journald{vars: vars, readers: readerList{readers: make(map[*logger.LogWatcher]*logger.LogWatcher)}}, nil
// start the helper
cgroupSpec, err := ctx.CGroup()
if err != nil {
return nil, err
}
cmd := reexec.Command(handler, cgroupSpec)
cmd.Dir = "/"
pipe, err := cmd.StdinPipe()
if err != nil {
return nil, fmt.Errorf("error opening pipe to logging helper: %v", err)
}
err = cmd.Start()
if err != nil {
return nil, fmt.Errorf("error starting logging helper: %v", err)
}
encoder := gob.NewEncoder(pipe)
// gather up everything we need to hand back
j := &journald{
vars: vars,
readers: readerList{readers: make(map[*logger.LogWatcher]*logger.LogWatcher)},
cmd: cmd,
pipe: pipe,
encoder: encoder,
}
return j, nil
}

// We don't actually accept any options, but we have to supply a callback for
Expand All @@ -104,19 +148,110 @@ func validateLogOpt(cfg map[string]string) error {
}

func (s *journald) Log(msg *logger.Message) error {
vars := map[string]string{}
for k, v := range s.vars {
vars[k] = v
}
if msg.Partial {
vars["CONTAINER_PARTIAL_MESSAGE"] = "true"
}
if msg.Source == "stderr" {
return journal.Send(string(msg.Line), journal.PriErr, vars)
// build the message struct for the helper, and send it on down
message := MessageWithVars{
Message: *msg,
Vars: s.vars,
}
return journal.Send(string(msg.Line), journal.PriInfo, vars)
s.writing.Lock()
defer s.writing.Unlock()
return s.encoder.Encode(&message)
}

func (s *journald) Name() string {
return name
}

func (s *journald) closeWriter() {
s.pipe.Close()
if err := s.cmd.Wait(); err != nil {
eerr, ok := err.(*exec.ExitError)
if !ok {
logrus.Errorf("error waiting on log handler: %v", err)
return
}
status, ok := eerr.Sys().(syscall.WaitStatus)
if !ok {
logrus.Errorf("error waiting on log handler: %v", err)
return
}
if !status.Signaled() || (status.Signal() != syscall.SIGTERM && status.Signal() != syscall.SIGKILL) {
logrus.Errorf("error waiting on log handler: %v", err)
return
}
}
}

func loggerLog(f string, args ...interface{}) {
s := fmt.Sprintf(f, args...)
journal.Send(s, journal.PriInfo, nil)
fmt.Fprintln(os.Stderr, s)
}

func joinScope(scope string) error {
// This is... not ideal. But if we're here, we're just going to have
// to assume that we know how to compute the same path that runc is
// going to use, based on a value of the form "parent:docker:ID", where
// the "docker" is literal.
parts := strings.Split(scope, ":")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will only work when cgroupdriver is set to systemd. We also need to handle cgroupfs (which is the upstream default) if we plan to try and upstream this.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also look into using libcontainer cgroups library for doing this as it handles both the drivers.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing the derivation right using libcontainer makes sense, since it's already a dependency. I'll have another look at that. Though that does leave open a question - how often do we see a running journald when systemd isn't managing cgroups?

fs, err := os.Open("/sys/fs/cgroup")
if err != nil {
return err
}
defer fs.Close()
mountPoint := fs.Name()
controllers, err := fs.Readdirnames(-1)
if err != nil {
return err
}
for _, controller := range controllers {
scopeDir := filepath.Join(mountPoint, controller, parts[0], parts[1]+"-"+parts[2]+".scope")
procsFile := filepath.Join(scopeDir, "cgroup.procs")
f, err := os.OpenFile(procsFile, os.O_WRONLY, 0644)
if err != nil && !os.IsNotExist(err) {
return err
}
defer f.Close()
fmt.Fprintln(f, unix.Getpid())
}
return nil
}

func journalLoggerMain() {
flag.Parse()
args := flag.Args()
if len(args) < 0 {
loggerLog("should be invoked with the name of the container's scope")
return
}
joined := false
decoder := gob.NewDecoder(os.Stdin)
for {
var msg MessageWithVars
// wait for the next chunk of data to log
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF {
break
}
loggerLog("error decoding message: %v", err)
continue
}
// if we haven't joined the container's scope yet, do that now
if !joined {
if err := joinScope(args[0]); err != nil {
loggerLog("error joining scope %q: %v", args[0], err)
}
joined = true
}
msg.Vars["CONTAINER_SOURCE"] = msg.Source
// add a note if this message is a partial message
if msg.Partial {
msg.Vars["CONTAINER_PARTIAL_MESSAGE"] = "true"
}
if msg.Source == "stderr" {
journal.Send(string(msg.Line), journal.PriErr, msg.Vars)
continue
}
journal.Send(string(msg.Line), journal.PriInfo, msg.Vars)
}
}
1 change: 1 addition & 0 deletions daemon/logger/journald/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ import (

func (s *journald) Close() error {
s.readers.mu.Lock()
s.closeWriter()
for reader := range s.readers.readers {
reader.Close()
}
Expand Down
1 change: 1 addition & 0 deletions daemon/logger/journald/read_unsupported.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
package journald

func (s *journald) Close() error {
s.closeWriter()
return nil
}
1 change: 1 addition & 0 deletions daemon/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (daemon *Daemon) containerStart(container *container.Container, checkpoint
if err != nil {
return err
}
container.Spec = spec

createOptions, err := daemon.getLibcontainerdCreateOptions(container)
if err != nil {
Expand Down