Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ func (c *Client) handleStartJobRequest(p *agentv1.StartJobRequest) error {
Port: int(j.MysqlBackup.Port),
Socket: j.MysqlBackup.Socket,
}
job = jobs.NewMySQLBackupJob(p.JobId, timeout, j.MysqlBackup.Name, dbConnCfg, locationConfig, j.MysqlBackup.Folder)
job = jobs.NewMySQLBackupJob(p.JobId, timeout, j.MysqlBackup.Name, dbConnCfg, locationConfig, j.MysqlBackup.Folder, j.MysqlBackup.Compression)

case *agentv1.StartJobRequest_MysqlRestoreBackup:
var locationConfig jobs.BackupLocationConfig
Expand All @@ -622,7 +622,7 @@ func (c *Client) handleStartJobRequest(p *agentv1.StartJobRequest) error {
return errors.Errorf("unknown location config: %T", j.MysqlRestoreBackup.LocationConfig)
}

job = jobs.NewMySQLRestoreJob(p.JobId, timeout, j.MysqlRestoreBackup.Name, locationConfig, j.MysqlRestoreBackup.Folder)
job = jobs.NewMySQLRestoreJob(p.JobId, timeout, j.MysqlRestoreBackup.Name, locationConfig, j.MysqlRestoreBackup.Folder, j.MysqlRestoreBackup.Compression)

case *agentv1.StartJobRequest_MongodbBackup:
var locationConfig jobs.BackupLocationConfig
Expand Down Expand Up @@ -651,7 +651,7 @@ func (c *Client) handleStartJobRequest(p *agentv1.StartJobRequest) error {
}

job, err = jobs.NewMongoDBBackupJob(p.JobId, timeout, j.MongodbBackup.Name, dsn, locationConfig,
j.MongodbBackup.EnablePitr, j.MongodbBackup.DataModel, j.MongodbBackup.Folder)
j.MongodbBackup.EnablePitr, j.MongodbBackup.DataModel, j.MongodbBackup.Folder, j.MongodbBackup.Compression)
if err != nil {
return err
}
Expand Down Expand Up @@ -684,7 +684,7 @@ func (c *Client) handleStartJobRequest(p *agentv1.StartJobRequest) error {

job = jobs.NewMongoDBRestoreJob(p.JobId, timeout, j.MongodbRestoreBackup.Name,
j.MongodbRestoreBackup.PitrTimestamp.AsTime(), dsn, locationConfig,
c.supervisor, j.MongodbRestoreBackup.Folder, j.MongodbRestoreBackup.PbmMetadata.Name)
c.supervisor, j.MongodbRestoreBackup.Folder, j.MongodbRestoreBackup.PbmMetadata.Name, j.MongodbRestoreBackup.Compression)
default:
return errors.Errorf("unknown job type: %T", j)
}
Expand Down
23 changes: 23 additions & 0 deletions agent/runner/jobs/mongodb_backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type MongoDBBackupJob struct {
dataModel backuppb.DataModel
jobLogger *pbmJobLogger
folder string
compression backuppb.BackupCompression
}

// NewMongoDBBackupJob creates new Job for MongoDB backup.
Expand All @@ -62,6 +63,7 @@ func NewMongoDBBackupJob(
pitr bool,
dataModel backuppb.DataModel,
folder string,
compression backuppb.BackupCompression,
) (*MongoDBBackupJob, error) {
if dataModel != backuppb.DataModel_DATA_MODEL_PHYSICAL && dataModel != backuppb.DataModel_DATA_MODEL_LOGICAL {
return nil, errors.Errorf("'%s' is not a supported data model for MongoDB backups", dataModel)
Expand All @@ -81,6 +83,7 @@ func NewMongoDBBackupJob(
dataModel: dataModel,
jobLogger: newPbmJobLogger(id, pbmBackupJob, dsn),
folder: folder,
compression: compression,
}, nil
}

Expand Down Expand Up @@ -216,6 +219,26 @@ func (j *MongoDBBackupJob) startBackup(ctx context.Context) (*pbmBackup, error)
return nil, errors.Errorf("'%s' is not a supported data model for backups", j.dataModel)
}

switch j.compression {
case backuppb.BackupCompression_BACKUP_COMPRESSION_DEFAULT:
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

specifies PBMs configuration options, and by default no specific compression

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks to me like it's defaulting to gzip though.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its an empty case block, allowing to use PBM's default option to compress with s2, can explicitly specify that if that's preferred.

case backuppb.BackupCompression_BACKUP_COMPRESSION_GZIP:
pbmArgs = append(pbmArgs, "--compression=gzip")
case backuppb.BackupCompression_BACKUP_COMPRESSION_SNAPPY:
pbmArgs = append(pbmArgs, "--compression=snappy")
case backuppb.BackupCompression_BACKUP_COMPRESSION_LZ4:
pbmArgs = append(pbmArgs, "--compression=lz4")
case backuppb.BackupCompression_BACKUP_COMPRESSION_S2:
pbmArgs = append(pbmArgs, "--compression=s2")
Comment on lines +230 to +231
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we sure pbm supports this compression method?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case backuppb.BackupCompression_BACKUP_COMPRESSION_PGZIP:
pbmArgs = append(pbmArgs, "--compression=pgzip")
case backuppb.BackupCompression_BACKUP_COMPRESSION_ZSTD:
pbmArgs = append(pbmArgs, "--compression=zstd")
case backuppb.BackupCompression_BACKUP_COMPRESSION_NONE:
pbmArgs = append(pbmArgs, "--compression=none")
default:
return nil, errors.Errorf("unknown compression: %s", j.compression)
}

if err := execPBMCommand(ctx, j.dsn, &result, pbmArgs...); err != nil {
return nil, err
}
Expand Down
108 changes: 103 additions & 5 deletions agent/runner/jobs/mongodb_backup_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ func TestNewMongoDBBackupJob(t *testing.T) {
testJobDuration := 1 * time.Second

tests := []struct {
name string
dataModel backuppb.DataModel
pitr bool
errMsg string
name string
dataModel backuppb.DataModel
pitr bool
errMsg string
compression backuppb.BackupCompression
}{
{
name: "logical backup model",
Expand All @@ -107,12 +108,36 @@ func TestNewMongoDBBackupJob(t *testing.T) {
dataModel: backuppb.DataModel_DATA_MODEL_PHYSICAL,
errMsg: "PITR is only supported for logical backups",
},
{
name: "logical backup with LZ4 compression",
dataModel: backuppb.DataModel_DATA_MODEL_LOGICAL,
errMsg: "",
compression: backuppb.BackupCompression_BACKUP_COMPRESSION_LZ4,
},
{
name: "physical backup with ZSTD compression",
dataModel: backuppb.DataModel_DATA_MODEL_LOGICAL,
errMsg: "",
compression: backuppb.BackupCompression_BACKUP_COMPRESSION_ZSTD,
},
{
name: "logical backup with PGZIP compression",
dataModel: backuppb.DataModel_DATA_MODEL_LOGICAL,
errMsg: "",
compression: backuppb.BackupCompression_BACKUP_COMPRESSION_PGZIP,
},
{
name: "physical backup with no compression",
dataModel: backuppb.DataModel_DATA_MODEL_LOGICAL,
errMsg: "",
compression: backuppb.BackupCompression_BACKUP_COMPRESSION_NONE,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
_, err := NewMongoDBBackupJob(t.Name(), testJobDuration, t.Name(), "", BackupLocationConfig{}, tc.pitr, tc.dataModel, "artifact_folder")
_, err := NewMongoDBBackupJob(t.Name(), testJobDuration, t.Name(), "", BackupLocationConfig{}, tc.pitr, tc.dataModel, "artifact_folder", tc.compression)
if tc.errMsg == "" {
assert.NoError(t, err)
} else {
Expand All @@ -121,3 +146,76 @@ func TestNewMongoDBBackupJob(t *testing.T) {
})
}
}

func TestMongoDBBackupJobCompression(t *testing.T) {
t.Parallel()
testJobDuration := 1 * time.Second

tests := []struct {
name string
compression backuppb.BackupCompression
shouldError bool
}{
{
name: "GZIP compression",
compression: backuppb.BackupCompression_BACKUP_COMPRESSION_GZIP,
shouldError: false,
},
{
name: "Snappy compression",
compression: backuppb.BackupCompression_BACKUP_COMPRESSION_SNAPPY,
shouldError: false,
},
{
name: "LZ4 compression",
compression: backuppb.BackupCompression_BACKUP_COMPRESSION_LZ4,
shouldError: false,
},
{
name: "S2 compression",
compression: backuppb.BackupCompression_BACKUP_COMPRESSION_S2,
shouldError: false,
},
{
name: "PGZIP compression",
compression: backuppb.BackupCompression_BACKUP_COMPRESSION_PGZIP,
shouldError: false,
},
{
name: "ZSTD compression",
compression: backuppb.BackupCompression_BACKUP_COMPRESSION_ZSTD,
shouldError: false,
},
{
name: "None compression",
compression: backuppb.BackupCompression_BACKUP_COMPRESSION_NONE,
shouldError: false,
},
}

for _, tc := range tests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
job, err := NewMongoDBBackupJob(
t.Name(),
testJobDuration,
t.Name(),
"",
BackupLocationConfig{},
false,
backuppb.DataModel_DATA_MODEL_LOGICAL,
"artifact_folder",
tc.compression)

if tc.shouldError {
assert.Error(t, err)
assert.Nil(t, job)
} else {
assert.NoError(t, err)
assert.NotNil(t, job)
assert.Equal(t, tc.compression, job.compression)
}
})
}
}
4 changes: 4 additions & 0 deletions agent/runner/jobs/mongodb_restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"

agentv1 "github.com/percona/pmm/api/agent/v1"
backupv1 "github.com/percona/pmm/api/backup/v1"
)

const (
Expand All @@ -48,6 +49,7 @@ type MongoDBRestoreJob struct {
jobLogger *pbmJobLogger
folder string
pbmBackupName string
compression backupv1.BackupCompression
}

// NewMongoDBRestoreJob creates new Job for MongoDB backup restore.
Expand All @@ -61,6 +63,7 @@ func NewMongoDBRestoreJob(
restarter agentsRestarter,
folder string,
pbmBackupName string,
compression backupv1.BackupCompression,
) *MongoDBRestoreJob {
return &MongoDBRestoreJob{
id: id,
Expand All @@ -74,6 +77,7 @@ func NewMongoDBRestoreJob(
jobLogger: newPbmJobLogger(id, pbmRestoreJob, dbConfig),
folder: folder,
pbmBackupName: pbmBackupName,
compression: compression,
}
}

Expand Down
33 changes: 29 additions & 4 deletions agent/runner/jobs/mysql_backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,19 @@ type MySQLBackupJob struct {
connConf DBConnConfig
locationConfig BackupLocationConfig
folder string
compression backuppb.BackupCompression
}

// NewMySQLBackupJob constructs new Job for MySQL backup.
func NewMySQLBackupJob(id string, timeout time.Duration, name string, connConf DBConnConfig, locationConfig BackupLocationConfig, folder string) *MySQLBackupJob {
func NewMySQLBackupJob(
id string,
timeout time.Duration,
name string,
connConf DBConnConfig,
locationConfig BackupLocationConfig,
folder string,
compression backuppb.BackupCompression,
) *MySQLBackupJob {
return &MySQLBackupJob{
id: id,
timeout: timeout,
Expand All @@ -58,6 +67,7 @@ func NewMySQLBackupJob(id string, timeout time.Duration, name string, connConf D
connConf: connConf,
locationConfig: locationConfig,
folder: folder,
compression: compression,
}
}

Expand Down Expand Up @@ -119,8 +129,10 @@ func (j *MySQLBackupJob) binariesInstalled() error {
return errors.Wrapf(err, "lookpath: %s", xtrabackupBin)
}

if _, err := exec.LookPath(qpressBin); err != nil {
return errors.Wrapf(err, "lookpath: %s", qpressBin)
if j.compression == backuppb.BackupCompression_BACKUP_COMPRESSION_QUICKLZ {
if _, err := exec.LookPath(qpressBin); err != nil {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensures we only check that qpress is installed if user wants to explicitly use quicklz, other compression options are installed as part of xtrabackup, so no need to explicitly check for them

return errors.Wrapf(err, "lookpath: %s", qpressBin)
}
}

if j.locationConfig.Type == S3BackupLocationType {
Expand Down Expand Up @@ -149,12 +161,25 @@ func (j *MySQLBackupJob) backup(ctx context.Context) (rerr error) {

xtrabackupCmd := exec.CommandContext(pipeCtx,
xtrabackupBin,
"--compress",
"--backup",
// Target dir is created, even though it's empty, because we are streaming it to cloud.
// https://jira.percona.com/browse/PXB-2602
"--target-dir="+tmpDir) // #nosec G204

switch j.compression {
case backuppb.BackupCompression_BACKUP_COMPRESSION_DEFAULT:
xtrabackupCmd.Args = append(xtrabackupCmd.Args, "--compress")
case backuppb.BackupCompression_BACKUP_COMPRESSION_QUICKLZ:
xtrabackupCmd.Args = append(xtrabackupCmd.Args, "--compress=quicklz")
case backuppb.BackupCompression_BACKUP_COMPRESSION_ZSTD:
xtrabackupCmd.Args = append(xtrabackupCmd.Args, "--compress=zstd")
case backuppb.BackupCompression_BACKUP_COMPRESSION_LZ4:
xtrabackupCmd.Args = append(xtrabackupCmd.Args, "--compress=lz4")
case backuppb.BackupCompression_BACKUP_COMPRESSION_NONE:
default:
return errors.Errorf("unknown compression: %s", j.compression)
}

if j.connConf.User != "" {
xtrabackupCmd.Args = append(xtrabackupCmd.Args, "--user="+j.connConf.User)
xtrabackupCmd.Args = append(xtrabackupCmd.Args, "--password="+j.connConf.Password)
Expand Down
Loading
Loading