diff --git a/.github/workflows/commit.yml b/.github/workflows/commit.yml index 9827031b..bd2d5802 100644 --- a/.github/workflows/commit.yml +++ b/.github/workflows/commit.yml @@ -113,6 +113,78 @@ jobs: - run: ./etc/s3_mock.py go test -v ./replica_client_test.go -integration s3 + minio-integration-test: + name: Run MinIO Integration Tests + runs-on: ubuntu-latest + needs: build + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-go@v5 + with: + go-version-file: "go.mod" + + - name: Start MinIO Server + run: | + docker run -d \ + --name minio-test \ + -p 9000:9000 \ + -p 9001:9001 \ + -e MINIO_ROOT_USER=minioadmin \ + -e MINIO_ROOT_PASSWORD=minioadmin \ + quay.io/minio/minio server /data --console-address ":9001" + + # Wait for MinIO to be ready + echo "Waiting for MinIO server to be ready..." + for i in {1..30}; do + if docker exec minio-test mc alias set local http://localhost:9000 minioadmin minioadmin 2>/dev/null; then + echo "MinIO server is ready" + break + fi + echo "Waiting for MinIO server... ($i/30)" + sleep 1 + done + + # Create test bucket + docker exec minio-test mc mb local/testbucket + + - run: go install ./cmd/litestream + + - name: Test Query Parameter Support + run: | + # Create test database + sqlite3 /tmp/test.db "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT); INSERT INTO users (name) VALUES ('Alice'), ('Bob');" + + # Test replicate with query parameters + export AWS_ACCESS_KEY_ID=minioadmin + export AWS_SECRET_ACCESS_KEY=minioadmin + + # Run replication for 5 seconds + timeout 5 litestream replicate /tmp/test.db \ + "s3://testbucket/test.db?endpoint=localhost:9000&forcePathStyle=true" || true + + # Verify files were uploaded + docker exec minio-test mc ls local/testbucket/test.db/ + + # Test restore with query parameters + rm -f /tmp/restored.db + litestream restore -o /tmp/restored.db \ + "s3://testbucket/test.db?endpoint=localhost:9000&forcePathStyle=true" + + # Verify restored data + if [ "$(sqlite3 /tmp/restored.db 'SELECT COUNT(*) FROM users;')" != "2" ]; then + echo "ERROR: Restored database does not have expected data" + exit 1 + fi + + echo "MinIO integration test with query parameters passed!" + + - name: Cleanup + if: always() + run: | + docker stop minio-test || true + docker rm minio-test || true + nats-docker-test: name: Run NATS Integration Tests runs-on: ubuntu-latest diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index 0121edf2..90573a4f 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -619,7 +619,7 @@ func NewReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestream.Re return nil, err } case "s3": - if r.Client, err = newS3ReplicaClientFromConfig(c, r); err != nil { + if r.Client, err = NewS3ReplicaClientFromConfig(c, r); err != nil { return nil, err } case "gs": @@ -676,8 +676,9 @@ func newFileReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ return client, nil } -// newS3ReplicaClientFromConfig returns a new instance of s3.ReplicaClient built from config. -func newS3ReplicaClientFromConfig(c *ReplicaConfig, _ *litestream.Replica) (_ *s3.ReplicaClient, err error) { +// NewS3ReplicaClientFromConfig returns a new instance of s3.ReplicaClient built from config. +// Exported for testing. +func NewS3ReplicaClientFromConfig(c *ReplicaConfig, _ *litestream.Replica) (_ *s3.ReplicaClient, err error) { // Ensure URL & constituent parts are not both specified. if c.URL != "" && c.Path != "" { return nil, fmt.Errorf("cannot specify url & path for s3 replica") @@ -697,12 +698,35 @@ func newS3ReplicaClientFromConfig(c *ReplicaConfig, _ *litestream.Replica) (_ *s // Apply settings from URL, if specified. if c.URL != "" { - _, host, upath, err := ParseReplicaURL(c.URL) + _, host, upath, query, err := ParseReplicaURLWithQuery(c.URL) if err != nil { return nil, err } ubucket, uregion, uendpoint, uforcePathStyle := s3.ParseHost(host) + // Override with query parameters if provided + if qEndpoint := query.Get("endpoint"); qEndpoint != "" { + // Ensure endpoint has a scheme + if !strings.HasPrefix(qEndpoint, "http://") && !strings.HasPrefix(qEndpoint, "https://") { + // Default to http for non-TLS endpoints (common for local/dev) + qEndpoint = "http://" + qEndpoint + } + uendpoint = qEndpoint + // Default to path style for custom endpoints unless explicitly set to false + if query.Get("forcePathStyle") != "false" { + uforcePathStyle = true + } + } + if qRegion := query.Get("region"); qRegion != "" { + uregion = qRegion + } + if qForcePathStyle := query.Get("forcePathStyle"); qForcePathStyle != "" { + uforcePathStyle = qForcePathStyle == "true" + } + if qSkipVerify := query.Get("skipVerify"); qSkipVerify != "" { + skipVerify = qSkipVerify == "true" + } + // Only apply URL parts to field that have not been overridden. if configPath == "" { configPath = upath @@ -962,21 +986,29 @@ func applyLitestreamEnv() { // ParseReplicaURL parses a replica URL. func ParseReplicaURL(s string) (scheme, host, urlpath string, err error) { + scheme, host, urlpath, _, err = ParseReplicaURLWithQuery(s) + return scheme, host, urlpath, err +} + +// ParseReplicaURLWithQuery parses a replica URL and returns query parameters. +func ParseReplicaURLWithQuery(s string) (scheme, host, urlpath string, query url.Values, err error) { u, err := url.Parse(s) if err != nil { - return "", "", "", err + return "", "", "", nil, err } switch u.Scheme { case "file": scheme, u.Scheme = u.Scheme, "" - return scheme, "", path.Clean(u.String()), nil + // Remove query params from path for file URLs + u.RawQuery = "" + return scheme, "", path.Clean(u.String()), nil, nil case "": - return u.Scheme, u.Host, u.Path, fmt.Errorf("replica url scheme required: %s", s) + return u.Scheme, u.Host, u.Path, nil, fmt.Errorf("replica url scheme required: %s", s) default: - return u.Scheme, u.Host, strings.TrimPrefix(path.Clean(u.Path), "/"), nil + return u.Scheme, u.Host, strings.TrimPrefix(path.Clean(u.Path), "/"), u.Query(), nil } } diff --git a/cmd/litestream/main_test.go b/cmd/litestream/main_test.go index dadbc323..5d0bc2f8 100644 --- a/cmd/litestream/main_test.go +++ b/cmd/litestream/main_test.go @@ -622,3 +622,243 @@ func TestConfig_DefaultValues(t *testing.T) { t.Errorf("expected default snapshot retention of 24h, got %v", *config.Snapshot.Retention) } } + +func TestParseReplicaURLWithQuery(t *testing.T) { + t.Run("S3WithEndpoint", func(t *testing.T) { + url := "s3://mybucket/path/to/db?endpoint=localhost:9000®ion=us-east-1&forcePathStyle=true" + scheme, host, path, query, err := main.ParseReplicaURLWithQuery(url) + if err != nil { + t.Fatal(err) + } + if scheme != "s3" { + t.Errorf("expected scheme 's3', got %q", scheme) + } + if host != "mybucket" { + t.Errorf("expected host 'mybucket', got %q", host) + } + if path != "path/to/db" { + t.Errorf("expected path 'path/to/db', got %q", path) + } + if query.Get("endpoint") != "localhost:9000" { + t.Errorf("expected endpoint 'localhost:9000', got %q", query.Get("endpoint")) + } + if query.Get("region") != "us-east-1" { + t.Errorf("expected region 'us-east-1', got %q", query.Get("region")) + } + if query.Get("forcePathStyle") != "true" { + t.Errorf("expected forcePathStyle 'true', got %q", query.Get("forcePathStyle")) + } + }) + + t.Run("S3WithoutQuery", func(t *testing.T) { + url := "s3://mybucket/path/to/db" + scheme, host, path, query, err := main.ParseReplicaURLWithQuery(url) + if err != nil { + t.Fatal(err) + } + if scheme != "s3" { + t.Errorf("expected scheme 's3', got %q", scheme) + } + if host != "mybucket" { + t.Errorf("expected host 'mybucket', got %q", host) + } + if path != "path/to/db" { + t.Errorf("expected path 'path/to/db', got %q", path) + } + if len(query) != 0 { + t.Errorf("expected no query parameters, got %v", query) + } + }) + + t.Run("FileURL", func(t *testing.T) { + url := "file:///path/to/db" + scheme, host, path, query, err := main.ParseReplicaURLWithQuery(url) + if err != nil { + t.Fatal(err) + } + if scheme != "file" { + t.Errorf("expected scheme 'file', got %q", scheme) + } + if host != "" { + t.Errorf("expected empty host, got %q", host) + } + if path != "/path/to/db" { + t.Errorf("expected path '/path/to/db', got %q", path) + } + if query != nil { + t.Errorf("expected nil query for file URL, got %v", query) + } + }) + + t.Run("BackwardCompatibility", func(t *testing.T) { + // Test that ParseReplicaURL still works as before + url := "s3://mybucket/path/to/db?endpoint=localhost:9000" + scheme, host, path, err := main.ParseReplicaURL(url) + if err != nil { + t.Fatal(err) + } + if scheme != "s3" { + t.Errorf("expected scheme 's3', got %q", scheme) + } + if host != "mybucket" { + t.Errorf("expected host 'mybucket', got %q", host) + } + if path != "path/to/db" { + t.Errorf("expected path 'path/to/db', got %q", path) + } + }) + + t.Run("S3TigrisExample", func(t *testing.T) { + url := "s3://mybucket/db?endpoint=fly.storage.tigris.dev®ion=auto" + scheme, host, path, query, err := main.ParseReplicaURLWithQuery(url) + if err != nil { + t.Fatal(err) + } + if scheme != "s3" { + t.Errorf("expected scheme 's3', got %q", scheme) + } + if host != "mybucket" { + t.Errorf("expected host 'mybucket', got %q", host) + } + if path != "db" { + t.Errorf("expected path 'db', got %q", path) + } + if query.Get("endpoint") != "fly.storage.tigris.dev" { + t.Errorf("expected endpoint 'fly.storage.tigris.dev', got %q", query.Get("endpoint")) + } + if query.Get("region") != "auto" { + t.Errorf("expected region 'auto', got %q", query.Get("region")) + } + }) + + t.Run("S3WithSkipVerify", func(t *testing.T) { + url := "s3://mybucket/db?endpoint=self-signed.local&skipVerify=true" + _, _, _, query, err := main.ParseReplicaURLWithQuery(url) + if err != nil { + t.Fatal(err) + } + if query.Get("skipVerify") != "true" { + t.Errorf("expected skipVerify 'true', got %q", query.Get("skipVerify")) + } + }) +} + +func TestNewS3ReplicaClientFromConfig(t *testing.T) { + t.Run("URLWithEndpointQuery", func(t *testing.T) { + config := &main.ReplicaConfig{ + URL: "s3://mybucket/path/to/db?endpoint=localhost:9000®ion=us-west-2&forcePathStyle=true&skipVerify=true", + } + + client, err := main.NewS3ReplicaClientFromConfig(config, nil) + if err != nil { + t.Fatal(err) + } + + if client.Bucket != "mybucket" { + t.Errorf("expected bucket 'mybucket', got %q", client.Bucket) + } + if client.Path != "path/to/db" { + t.Errorf("expected path 'path/to/db', got %q", client.Path) + } + if client.Endpoint != "http://localhost:9000" { + t.Errorf("expected endpoint 'http://localhost:9000', got %q", client.Endpoint) + } + if client.Region != "us-west-2" { + t.Errorf("expected region 'us-west-2', got %q", client.Region) + } + if !client.ForcePathStyle { + t.Error("expected ForcePathStyle to be true") + } + if !client.SkipVerify { + t.Error("expected SkipVerify to be true") + } + }) + + t.Run("URLWithoutQuery", func(t *testing.T) { + config := &main.ReplicaConfig{ + URL: "s3://mybucket.s3.amazonaws.com/path/to/db", + } + + client, err := main.NewS3ReplicaClientFromConfig(config, nil) + if err != nil { + t.Fatal(err) + } + + if client.Bucket != "mybucket" { + t.Errorf("expected bucket 'mybucket', got %q", client.Bucket) + } + if client.Path != "path/to/db" { + t.Errorf("expected path 'path/to/db', got %q", client.Path) + } + // Should use default AWS settings + if client.Endpoint != "" { + t.Errorf("expected empty endpoint for AWS S3, got %q", client.Endpoint) + } + if client.ForcePathStyle { + t.Error("expected ForcePathStyle to be false for AWS S3") + } + }) + + t.Run("ConfigOverridesQuery", func(t *testing.T) { + config := &main.ReplicaConfig{ + URL: "s3://mybucket/path?endpoint=from-query®ion=us-east-1", + Endpoint: "from-config", + Region: "us-west-1", + } + + client, err := main.NewS3ReplicaClientFromConfig(config, nil) + if err != nil { + t.Fatal(err) + } + + // Config values should take precedence over query params + if client.Endpoint != "from-config" { + t.Errorf("expected endpoint from config 'from-config', got %q", client.Endpoint) + } + if client.Region != "us-west-1" { + t.Errorf("expected region from config 'us-west-1', got %q", client.Region) + } + }) + + t.Run("TigrisExample", func(t *testing.T) { + config := &main.ReplicaConfig{ + URL: "s3://my-tigris-bucket/db.sqlite?endpoint=fly.storage.tigris.dev®ion=auto", + } + + client, err := main.NewS3ReplicaClientFromConfig(config, nil) + if err != nil { + t.Fatal(err) + } + + if client.Bucket != "my-tigris-bucket" { + t.Errorf("expected bucket 'my-tigris-bucket', got %q", client.Bucket) + } + if client.Endpoint != "http://fly.storage.tigris.dev" { + t.Errorf("expected Tigris endpoint, got %q", client.Endpoint) + } + if client.Region != "auto" { + t.Errorf("expected region 'auto' for Tigris, got %q", client.Region) + } + if !client.ForcePathStyle { + t.Error("expected ForcePathStyle to be true for custom endpoint") + } + }) + + t.Run("HTTPSEndpoint", func(t *testing.T) { + config := &main.ReplicaConfig{ + URL: "s3://mybucket/path?endpoint=https://secure.storage.com®ion=us-east-1", + } + + client, err := main.NewS3ReplicaClientFromConfig(config, nil) + if err != nil { + t.Fatal(err) + } + + if client.Endpoint != "https://secure.storage.com" { + t.Errorf("expected endpoint 'https://secure.storage.com', got %q", client.Endpoint) + } + if !client.ForcePathStyle { + t.Error("expected ForcePathStyle to be true for custom endpoint") + } + }) +} diff --git a/s3/replica_client.go b/s3/replica_client.go index cb9a7d7d..fc3b54f2 100644 --- a/s3/replica_client.go +++ b/s3/replica_client.go @@ -594,7 +594,14 @@ func ParseHost(host string) (bucket, region, endpoint string, forcePathStyle boo } // Check common object storage providers - if a := digitaloceanRegex.FindStringSubmatch(host); len(a) > 1 { + // Check for AWS S3 URLs first + if a := awsS3Regex.FindStringSubmatch(host); len(a) > 1 { + bucket = a[1] + if len(a) > 2 && a[2] != "" { + region = a[2] + } + return bucket, region, "", false + } else if a := digitaloceanRegex.FindStringSubmatch(host); len(a) > 1 { region = a[2] return "", region, fmt.Sprintf("https://%s.digitaloceanspaces.com", region), false } else if a := backblazeRegex.FindStringSubmatch(host); len(a) > 1 { @@ -618,6 +625,7 @@ func ParseHost(host string) (bucket, region, endpoint string, forcePathStyle boo } var ( + awsS3Regex = regexp.MustCompile(`^(.+)\.s3(?:\.([^.]+))?\.amazonaws\.com$`) digitaloceanRegex = regexp.MustCompile(`^(?:(.+)\.)?([^.]+)\.digitaloceanspaces.com$`) backblazeRegex = regexp.MustCompile(`^(?:(.+)\.)?s3.([^.]+)\.backblazeb2.com$`) filebaseRegex = regexp.MustCompile(`^(?:(.+)\.)?s3.filebase.com$`)