Skip to content

Commit

Permalink
Merge pull request #39 from antoinetran/fix_issue35_kubectllogsfollow…
Browse files Browse the repository at this point in the history
…_rebased

Fix #35 'kubectl logs -f' implemented
  • Loading branch information
dciangot authored Dec 13, 2024
2 parents 1bea02e + 5880378 commit 813e44c
Show file tree
Hide file tree
Showing 7 changed files with 365 additions and 115 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
envs.sh
bin
bin
# Eclipse IDE
.project
.settings
4 changes: 2 additions & 2 deletions pkg/slurm/Create.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (h *SidecarHandler) SubmitHandler(w http.ResponseWriter, r *http.Request) {
// singularity run will honor the entrypoint/command (if exist) in container image, while exec will override entrypoint.
// Thus if pod command (equivalent to container entrypoint) exist, we do exec, and other case we do run
singularityCommand := ""
if container.Command != nil && len(container.Command) != 0 {
if len(container.Command) != 0 {
singularityCommand = "exec"
} else {
singularityCommand = "run"
Expand Down Expand Up @@ -116,7 +116,7 @@ func (h *SidecarHandler) SubmitHandler(w http.ResponseWriter, r *http.Request) {
os.RemoveAll(filesPath)
return
}

// prepareEnvs creates a file in the working directory, that must exist. This is created at prepareMounts.
envs := prepareEnvs(spanCtx, h.Config, data, container)

Expand Down
6 changes: 5 additions & 1 deletion pkg/slurm/Delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ func (h *SidecarHandler) StopHandler(w http.ResponseWriter, r *http.Request) {
defer span.End()
defer commonIL.SetDurationSpan(start, span)

log.G(h.Ctx).Info("Slurm Sidecar: received Stop call")
// For debugging purpose, when we have many kubectl logs, we can differentiate each one.
sessionContext := GetSessionContext(r)
sessionContextMessage := GetSessionContextMessage(sessionContext)

log.G(h.Ctx).Info(sessionContextMessage, "Slurm Sidecar: received Stop call")
statusCode := http.StatusOK

bodyBytes, err := io.ReadAll(r.Body)
Expand Down
235 changes: 203 additions & 32 deletions pkg/slurm/GetLogs.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package slurm

import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
"net/http"
"os"
"strconv"
"strings"
"time"

Expand All @@ -17,6 +23,131 @@ import (
trace "go.opentelemetry.io/otel/trace"
)

// Logs in follow mode (get logs until the death of the container) with "kubectl -f".
func (h *SidecarHandler) GetLogsFollowMode(
spanCtx context.Context,
podUid string,
w http.ResponseWriter,
r *http.Request,
path string,
req commonIL.LogStruct,
containerOutputPath string,
containerOutput []byte,
sessionContext string,
) error {
// Follow until this file exist, that indicates the end of container, thus the end of following.
containerStatusPath := path + "/" + req.ContainerName + ".status"
// Get the offset of what we read.
containerOutputLastOffset := len(containerOutput)
sessionContextMessage := GetSessionContextMessage(sessionContext)
log.G(h.Ctx).Debug(sessionContextMessage, "Check container status", containerStatusPath, " with current length/offset: ", containerOutputLastOffset)

var containerOutputFd *os.File
var err error
for {
containerOutputFd, err = os.Open(containerOutputPath)
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
// Case the file does not exist yet, we loop until it exist.
notFoundMsg := sessionContextMessage + "Cannot open in follow mode the container logs " + containerOutputPath + " because it does not exist yet, sleeping before retrying..."
log.G(h.Ctx).Debug(notFoundMsg)
// Warning: if we don't write anything to body before 30s, there will be a timeout in VK to API DoReq(), thus we send an informational message that the log is not ready.
w.Write([]byte(notFoundMsg))
// Flush otherwise it will be as if nothing was written.
if f, ok := w.(http.Flusher); ok {
log.G(h.Ctx).Debug(sessionContextMessage, "wrote file not found yet, now flushing this message...")
f.Flush()
} else {
log.G(h.Ctx).Error(sessionContextMessage, "wrote file not found but could not flush because server does not support Flusher.")
}
time.Sleep(4 * time.Second)
continue
} else {
// Case unknown error.
errWithContext := fmt.Errorf(sessionContextMessage+"could not open file to follow logs at %s error type: %s error: %w", containerOutputPath, fmt.Sprintf("%#v", err), err)
log.G(h.Ctx).Error(errWithContext)
w.Write([]byte(errWithContext.Error()))
return err
}
}
// File exist.
log.G(h.Ctx).Debug(sessionContextMessage, "opened for follow mode the container logs ", containerOutputPath)
break
}
defer containerOutputFd.Close()

// We follow only from after what is already read.
_, err = containerOutputFd.Seek(int64(containerOutputLastOffset), 0)
if err != nil {
errWithContext := fmt.Errorf(sessionContextMessage+"error during Seek() of GetLogsFollowMode() in GetLogsHandler of file %s offset %d type: %s %w", containerOutputPath, containerOutputLastOffset, fmt.Sprintf("%#v", err), err)
w.Write([]byte(errWithContext.Error()))
return errWithContext
}

containerOutputReader := bufio.NewReader(containerOutputFd)

bufferBytes := make([]byte, 4096)

// Looping until we get end of job.
// TODO: handle the Ctrl+C of kubectl logs.
var isContainerDead bool = false
for {
n, errRead := containerOutputReader.Read(bufferBytes)
if errRead != nil && errRead != io.EOF {
// Error during read.
h.logErrorVerbose(sessionContextMessage+"error doing Read() of GetLogsFollowMode", h.Ctx, w, errRead)
return errRead
}
// Write ASAP what we could read of it.
_, err = w.Write(bufferBytes[:n])
if err != nil {
h.logErrorVerbose(sessionContextMessage+"error doing Write() of GetLogsFollowMode", h.Ctx, w, err)
return err
}

// Flush otherwise it will take time to appear in kubectl logs.
if f, ok := w.(http.Flusher); ok {
log.G(h.Ctx).Debug(sessionContextMessage, "wrote some logs, now flushing...")
f.Flush()
} else {
log.G(h.Ctx).Error(sessionContextMessage, "wrote some logs but could not flush because server does not support Flusher.")
}

if errRead != nil {
if errRead == io.EOF {
// Nothing more to read, but in follow mode, is the container still alive?
if isContainerDead {
// Container already marked as dead, and we tried to get logs one last time. Exiting the loop.
log.G(h.Ctx).Info(sessionContextMessage, "Container was found dead and no more logs are found at this step, exiting following mode...")
break
}
// Checking if container is dead (meaning the job ID is not in context anymore, OR if the status file exist).
if !checkIfJidExists(spanCtx, (h.JIDs), podUid) {
// The JID disappeared, so the container is dead, probably from a POD delete request. Trying to get the latest log one last time.
// Because the moment we found this, there might be some more logs to read.
isContainerDead = true
log.G(h.Ctx).Info(sessionContextMessage, "Container is found dead thanks to missing JID, reading last logs...")
} else if _, err := os.Stat(containerStatusPath); errors.Is(err, os.ErrNotExist) {
// The status file of the container does not exist, so the container is still alive. Continuing to follow logs.
// Sleep because otherwise it can be a stress to file system to always read it when it has nothing.
log.G(h.Ctx).Debug(sessionContextMessage, "EOF of container logs, sleeping 4s before retrying...")
time.Sleep(4 * time.Second)
} else {
// The status file exist, so the container is dead. Trying to get the latest log one last time.
// Because the moment we found the status file, there might be some more logs to read.
isContainerDead = true
log.G(h.Ctx).Info(sessionContextMessage, "Container is found dead thanks to status file, reading last logs...")
}
continue
} else {
// Unreachable code.
}
}
}
// No error, err = nil
return nil
}

// GetLogsHandler reads Jobs' output file to return what's logged inside.
// What's returned is based on the provided parameters (Tail/LimitBytes/Timestamps/etc)
func (h *SidecarHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request) {
Expand All @@ -28,22 +159,23 @@ func (h *SidecarHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request)
defer span.End()
defer commonIL.SetDurationSpan(start, span)

log.G(h.Ctx).Info("Docker Sidecar: received GetLogs call")
// For debugging purpose, when we have many kubectl logs, we can differentiate each one.
sessionContext := GetSessionContext(r)
sessionContextMessage := GetSessionContextMessage(sessionContext)

log.G(h.Ctx).Info(sessionContextMessage, "Docker Sidecar: received GetLogs call")
var req commonIL.LogStruct
statusCode := http.StatusOK
currentTime := time.Now()

bodyBytes, err := io.ReadAll(r.Body)
if err != nil {
statusCode = http.StatusInternalServerError
h.handleError(spanCtx, w, statusCode, err)
h.logErrorVerbose(sessionContextMessage+"error during ReadAll() in GetLogsHandler request body", spanCtx, w, err)
return
}

err = json.Unmarshal(bodyBytes, &req)
if err != nil {
statusCode = http.StatusInternalServerError
h.handleError(spanCtx, w, statusCode, err)
h.logErrorVerbose(sessionContextMessage+"error during Unmarshal() in GetLogsHandler request body", spanCtx, w, err)
return
}

Expand All @@ -60,32 +192,26 @@ func (h *SidecarHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request)
)

path := h.Config.DataRootFolder + req.Namespace + "-" + req.PodUID
containerOutputPath := path + "/" + req.ContainerName + ".out"
var output []byte
if req.Opts.Timestamps {
h.handleError(spanCtx, w, statusCode, err)
h.logErrorVerbose(sessionContextMessage+"unsupported option req.Opts.Timestamps", spanCtx, w, err)
return
}
containerOutput, err := h.ReadLogs(containerOutputPath, span, spanCtx, w, sessionContextMessage)
if err != nil {
// Error already handled in waitAndReadLogs
return
}
jobOutput, err := h.ReadLogs(path+"/"+"job.out", span, spanCtx, w, sessionContextMessage)
if err != nil {
// Error already handled in waitAndReadLogs
return
} else {
log.G(h.Ctx).Info("Reading " + path + "/" + req.ContainerName + ".out")
containerOutput, err1 := os.ReadFile(path + "/" + req.ContainerName + ".out")
if err1 != nil {
log.G(h.Ctx).Error("Failed to read container logs.")
}
jobOutput, err2 := os.ReadFile(path + "/" + "job.out")
if err2 != nil {
log.G(h.Ctx).Error("Failed to read job logs.")
}

if err1 != nil && err2 != nil {
span.AddEvent("Error retrieving logs")
h.handleError(spanCtx, w, statusCode, err)
return
}

output = append(output, jobOutput...)
output = append(output, containerOutput...)

}

output = append(output, jobOutput...)
output = append(output, containerOutput...)

var returnedLogs string

if req.Opts.Tail != 0 {
Expand Down Expand Up @@ -140,12 +266,57 @@ func (h *SidecarHandler) GetLogsHandler(w http.ResponseWriter, r *http.Request)
}
}

commonIL.SetDurationSpan(start, span, commonIL.WithHTTPReturnCode(statusCode))
commonIL.SetDurationSpan(start, span, commonIL.WithHTTPReturnCode(http.StatusOK))

//w.Header().Set("Transfer-Encoding", "chunked")
w.Header().Set("Content-Type", "text/plain")

if statusCode != http.StatusOK {
w.Write([]byte("Some errors occurred while checking container status. Check Docker Sidecar's logs"))
log.G(h.Ctx).Info(sessionContextMessage, "writing response headers and OK status")
w.WriteHeader(http.StatusOK)

log.G(h.Ctx).Info(sessionContextMessage, "writing response body len: ", len(returnedLogs))
n, err := w.Write([]byte(returnedLogs))
log.G(h.Ctx).Info(sessionContextMessage, "written response body len: ", n)
if err != nil {
h.logErrorVerbose(sessionContextMessage+"error during Write() in GetLogsHandler, could write bytes: "+strconv.Itoa(n), spanCtx, w, err)
return
}

// Flush or else, it could be lost in the pipe.
if f, ok := w.(http.Flusher); ok {
log.G(h.Ctx).Debug(sessionContextMessage, "flushing after wrote response body bytes: "+strconv.Itoa(n))
f.Flush()
} else {
w.WriteHeader(statusCode)
w.Write([]byte(returnedLogs))
log.G(h.Ctx).Error(sessionContextMessage, "wrote response body but could not flush because server does not support Flusher.")
return
}

if req.Opts.Follow {
err := h.GetLogsFollowMode(spanCtx, req.PodUID, w, r, path, req, containerOutputPath, containerOutput, sessionContext)
if err != nil {
h.logErrorVerbose(sessionContextMessage+"follow mode error", spanCtx, w, err)
}
}
}

// Goal: read the file if it exist. If not, return empty.
// Important to wait because if we don't wait and return empty array, it will generates a JSON unmarshall error in InterLink VK.
// Fail for any error not related to file not existing (eg: permission error will raise an error).
// Already handle error.
func (h *SidecarHandler) ReadLogs(logsPath string, span trace.Span, ctx context.Context, w http.ResponseWriter, sessionContextMessage string) ([]byte, error) {
var output []byte
var err error
log.G(h.Ctx).Info(sessionContextMessage, "reading file ", logsPath)
output, err = os.ReadFile(logsPath)
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
log.G(h.Ctx).Info(sessionContextMessage, "file ", logsPath, " not found.")
output = make([]byte, 0)
} else {
span.AddEvent("Error retrieving logs")
h.logErrorVerbose(sessionContextMessage+"error during ReadFile() of readLogs() in GetLogsHandler of file "+logsPath, ctx, w, err)
return nil, err
}
}
return output, nil
}
Loading

0 comments on commit 813e44c

Please sign in to comment.