Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/grpc/mpi/v1/command.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/grpc/mpi/v1/common.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/grpc/mpi/v1/files.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions internal/command/command_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,18 +228,36 @@ func (cp *CommandPlugin) processDataPlaneResponse(ctx context.Context, msg *bus.
}

func (cp *CommandPlugin) processConnectionReset(ctx context.Context, msg *bus.Message) {
var subscribeCtx context.Context
slog.DebugContext(ctx, "Command plugin received connection reset message")

if newConnection, ok := msg.Data.(grpc.GrpcConnectionInterface); ok {
slog.DebugContext(ctx, "Canceling Subscribe after connection reset")
ctxWithMetadata := cp.config.NewContextWithLabels(ctx)
cp.subscribeMutex.Lock()
defer cp.subscribeMutex.Unlock()

if cp.subscribeCancel != nil {
cp.subscribeCancel()
slog.DebugContext(ctxWithMetadata, "Successfully canceled subscribe after connection reset")
}

connectionErr := cp.conn.Close(ctx)
if connectionErr != nil {
slog.ErrorContext(ctx, "Command plugin: unable to close connection", "error", connectionErr)
}

cp.conn = newConnection
err := cp.commandService.UpdateClient(ctx, cp.conn.CommandServiceClient())
if err != nil {
slog.ErrorContext(ctx, "Failed to reset connection", "error", err)
return
}

slog.DebugContext(ctxWithMetadata, "Starting new subscribe after connection reset")
subscribeCtx, cp.subscribeCancel = context.WithCancel(ctxWithMetadata)
go cp.commandService.Subscribe(subscribeCtx)

slog.DebugContext(ctx, "Command service client reset successfully")
}
}
Expand Down
47 changes: 37 additions & 10 deletions internal/file/file_manager_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type (
reader *bufio.Reader,
chunkID uint32,
) (mpi.FileDataChunk_Content, error)
WriteManifestFile(updatedFiles map[string]*model.ManifestFile,
WriteManifestFile(ctx context.Context, updatedFiles map[string]*model.ManifestFile,
manifestDir, manifestPath string) (writeError error)
}

Expand All @@ -68,6 +68,7 @@ type (
fileToUpdate *mpi.File,
) error
SetIsConnected(isConnected bool)
UpdateClient(ctx context.Context, fileServiceClient mpi.FileServiceClient)
}

fileManagerServiceInterface interface {
Expand All @@ -85,6 +86,7 @@ type (
) (map[string]*model.FileCache, map[string][]byte, error)
IsConnected() bool
SetIsConnected(isConnected bool)
ResetClient(ctx context.Context, fileServiceClient mpi.FileServiceClient)
}
)

Expand All @@ -101,6 +103,7 @@ type FileManagerService struct {
currentFilesOnDisk map[string]*mpi.File // key is file path
previousManifestFiles map[string]*model.ManifestFile
manifestFilePath string
rollbackManifest bool
filesMutex sync.RWMutex
}

Expand All @@ -116,10 +119,16 @@ func NewFileManagerService(fileServiceClient mpi.FileServiceClient, agentConfig
currentFilesOnDisk: make(map[string]*mpi.File),
previousManifestFiles: make(map[string]*model.ManifestFile),
manifestFilePath: agentConfig.ManifestDir + "/manifest.json",
rollbackManifest: true,
manifestLock: manifestLock,
}
}

func (fms *FileManagerService) ResetClient(ctx context.Context, fileServiceClient mpi.FileServiceClient) {
fms.fileServiceOperator.UpdateClient(ctx, fileServiceClient)
slog.DebugContext(ctx, "File manager service reset client successfully")
}

func (fms *FileManagerService) IsConnected() bool {
return fms.fileServiceOperator.IsConnected()
}
Expand All @@ -131,6 +140,7 @@ func (fms *FileManagerService) SetIsConnected(isConnected bool) {
func (fms *FileManagerService) ConfigApply(ctx context.Context,
configApplyRequest *mpi.ConfigApplyRequest,
) (status model.WriteStatus, err error) {
fms.rollbackManifest = true
fileOverview := configApplyRequest.GetOverview()

if fileOverview == nil {
Expand Down Expand Up @@ -161,6 +171,7 @@ func (fms *FileManagerService) ConfigApply(ctx context.Context,

fileErr := fms.executeFileActions(ctx)
if fileErr != nil {
fms.rollbackManifest = false
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does this boolean get reset to true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good spot, I forgot adding now

return model.RollbackRequired, fileErr
}
fileOverviewFiles := files.ConvertToMapOfFiles(fileOverview.GetFiles())
Expand All @@ -179,6 +190,7 @@ func (fms *FileManagerService) ClearCache() {
clear(fms.previousManifestFiles)
}

//nolint:revive // cognitive-complexity of 13 max is 12, loop is needed cant be broken up
func (fms *FileManagerService) Rollback(ctx context.Context, instanceID string) error {
slog.InfoContext(ctx, "Rolling back config for instance", "instance_id", instanceID)

Expand Down Expand Up @@ -212,10 +224,13 @@ func (fms *FileManagerService) Rollback(ctx context.Context, instanceID string)
}
}

manifestFileErr := fms.fileOperator.WriteManifestFile(fms.previousManifestFiles,
fms.agentConfig.ManifestDir, fms.manifestFilePath)
if manifestFileErr != nil {
return manifestFileErr
if fms.rollbackManifest {
slog.DebugContext(ctx, "Rolling back manifest file", "manifest_previous", fms.previousManifestFiles)
manifestFileErr := fms.fileOperator.WriteManifestFile(ctx, fms.previousManifestFiles,
fms.agentConfig.ManifestDir, fms.manifestFilePath)
if manifestFileErr != nil {
return manifestFileErr
}
}

return nil
Expand Down Expand Up @@ -374,7 +389,7 @@ func (fms *FileManagerService) UpdateCurrentFilesOnDisk(
fms.currentFilesOnDisk[currentFile.GetFileMeta().GetName()] = currentFile
}

err := fms.UpdateManifestFile(currentFiles, referenced)
err := fms.UpdateManifestFile(ctx, currentFiles, referenced)
if err != nil {
return fmt.Errorf("failed to update manifest file: %w", err)
}
Expand All @@ -385,12 +400,24 @@ func (fms *FileManagerService) UpdateCurrentFilesOnDisk(
// seems to be a control flag, avoid control coupling
//
//nolint:revive // referenced is a required flag
func (fms *FileManagerService) UpdateManifestFile(currentFiles map[string]*mpi.File, referenced bool) (err error) {
slog.Debug("Updating manifest file", "current_files", currentFiles, "referenced", referenced)
func (fms *FileManagerService) UpdateManifestFile(ctx context.Context,
currentFiles map[string]*mpi.File, referenced bool,
) (err error) {
slog.DebugContext(ctx, "Updating manifest file", "current_files", currentFiles, "referenced", referenced)
currentManifestFiles, _, readError := fms.manifestFile()

// When agent is first started the manifest is updated when an NGINX instance is found, but the manifest file
// will be empty leading to previousManifestFiles being empty. This was causing issues if the first config
// apply failed leading to the manifest file being rolled back to an empty file.
// If the currentManifestFiles is empty then we can assume the Agent has just started and this is the first
// write of the Manifest file, so set previousManifestFiles to be the currentFiles.
if len(currentManifestFiles) == 0 {
currentManifestFiles = fms.convertToManifestFileMap(currentFiles, referenced)
}

fms.previousManifestFiles = currentManifestFiles
if readError != nil && !errors.Is(readError, os.ErrNotExist) {
slog.Debug("Error reading manifest file", "current_manifest_files",
slog.DebugContext(ctx, "Error reading manifest file", "current_manifest_files",
currentManifestFiles, "updated_files", currentFiles, "referenced", referenced)

return fmt.Errorf("unable to read manifest file: %w", readError)
Expand All @@ -416,7 +443,7 @@ func (fms *FileManagerService) UpdateManifestFile(currentFiles map[string]*mpi.F
updatedFiles = manifestFiles
}

return fms.fileOperator.WriteManifestFile(updatedFiles, fms.agentConfig.ManifestDir, fms.manifestFilePath)
return fms.fileOperator.WriteManifestFile(ctx, updatedFiles, fms.agentConfig.ManifestDir, fms.manifestFilePath)
}

func (fms *FileManagerService) manifestFile() (map[string]*model.ManifestFile, map[string]*mpi.File, error) {
Expand Down
Loading
Loading