Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions .github/workflows/commit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,78 @@ jobs:

- run: ./etc/s3_mock.py go test -v ./replica_client_test.go -integration s3

minio-integration-test:
name: Run MinIO Integration Tests
runs-on: ubuntu-latest
needs: build
steps:
- uses: actions/checkout@v4

- uses: actions/setup-go@v5
with:
go-version-file: "go.mod"

- name: Start MinIO Server
run: |
docker run -d \
--name minio-test \
-p 9000:9000 \
-p 9001:9001 \
-e MINIO_ROOT_USER=minioadmin \
-e MINIO_ROOT_PASSWORD=minioadmin \
quay.io/minio/minio server /data --console-address ":9001"

# Wait for MinIO to be ready
echo "Waiting for MinIO server to be ready..."
for i in {1..30}; do
if docker exec minio-test mc alias set local http://localhost:9000 minioadmin minioadmin 2>/dev/null; then
echo "MinIO server is ready"
break
fi
echo "Waiting for MinIO server... ($i/30)"
sleep 1
done

# Create test bucket
docker exec minio-test mc mb local/testbucket

- run: go install ./cmd/litestream

- name: Test Query Parameter Support
run: |
# Create test database
sqlite3 /tmp/test.db "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT); INSERT INTO users (name) VALUES ('Alice'), ('Bob');"

# Test replicate with query parameters
export AWS_ACCESS_KEY_ID=minioadmin
export AWS_SECRET_ACCESS_KEY=minioadmin

# Run replication for 5 seconds
timeout 5 litestream replicate /tmp/test.db \
"s3://testbucket/test.db?endpoint=localhost:9000&forcePathStyle=true" || true

# Verify files were uploaded
docker exec minio-test mc ls local/testbucket/test.db/

# Test restore with query parameters
rm -f /tmp/restored.db
litestream restore -o /tmp/restored.db \
"s3://testbucket/test.db?endpoint=localhost:9000&forcePathStyle=true"

# Verify restored data
if [ "$(sqlite3 /tmp/restored.db 'SELECT COUNT(*) FROM users;')" != "2" ]; then
echo "ERROR: Restored database does not have expected data"
exit 1
fi

echo "MinIO integration test with query parameters passed!"

- name: Cleanup
if: always()
run: |
docker stop minio-test || true
docker rm minio-test || true

nats-docker-test:
name: Run NATS Integration Tests
runs-on: ubuntu-latest
Expand Down
48 changes: 40 additions & 8 deletions cmd/litestream/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ func NewReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestream.Re
return nil, err
}
case "s3":
if r.Client, err = newS3ReplicaClientFromConfig(c, r); err != nil {
if r.Client, err = NewS3ReplicaClientFromConfig(c, r); err != nil {
return nil, err
}
case "gs":
Expand Down Expand Up @@ -676,8 +676,9 @@ func newFileReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_
return client, nil
}

// newS3ReplicaClientFromConfig returns a new instance of s3.ReplicaClient built from config.
func newS3ReplicaClientFromConfig(c *ReplicaConfig, _ *litestream.Replica) (_ *s3.ReplicaClient, err error) {
// NewS3ReplicaClientFromConfig returns a new instance of s3.ReplicaClient built from config.
// Exported for testing.
func NewS3ReplicaClientFromConfig(c *ReplicaConfig, _ *litestream.Replica) (_ *s3.ReplicaClient, err error) {
// Ensure URL & constituent parts are not both specified.
if c.URL != "" && c.Path != "" {
return nil, fmt.Errorf("cannot specify url & path for s3 replica")
Expand All @@ -697,12 +698,35 @@ func newS3ReplicaClientFromConfig(c *ReplicaConfig, _ *litestream.Replica) (_ *s

// Apply settings from URL, if specified.
if c.URL != "" {
_, host, upath, err := ParseReplicaURL(c.URL)
_, host, upath, query, err := ParseReplicaURLWithQuery(c.URL)
if err != nil {
return nil, err
}
ubucket, uregion, uendpoint, uforcePathStyle := s3.ParseHost(host)

// Override with query parameters if provided
if qEndpoint := query.Get("endpoint"); qEndpoint != "" {
// Ensure endpoint has a scheme
if !strings.HasPrefix(qEndpoint, "http://") && !strings.HasPrefix(qEndpoint, "https://") {
// Default to http for non-TLS endpoints (common for local/dev)
qEndpoint = "http://" + qEndpoint
}
uendpoint = qEndpoint
// Default to path style for custom endpoints unless explicitly set to false
if query.Get("forcePathStyle") != "false" {
uforcePathStyle = true
}
}
if qRegion := query.Get("region"); qRegion != "" {
uregion = qRegion
}
if qForcePathStyle := query.Get("forcePathStyle"); qForcePathStyle != "" {
uforcePathStyle = qForcePathStyle == "true"
}
if qSkipVerify := query.Get("skipVerify"); qSkipVerify != "" {
skipVerify = qSkipVerify == "true"
}

// Only apply URL parts to field that have not been overridden.
if configPath == "" {
configPath = upath
Expand Down Expand Up @@ -962,21 +986,29 @@ func applyLitestreamEnv() {

// ParseReplicaURL parses a replica URL.
func ParseReplicaURL(s string) (scheme, host, urlpath string, err error) {
scheme, host, urlpath, _, err = ParseReplicaURLWithQuery(s)
return scheme, host, urlpath, err
}

// ParseReplicaURLWithQuery parses a replica URL and returns query parameters.
func ParseReplicaURLWithQuery(s string) (scheme, host, urlpath string, query url.Values, err error) {
u, err := url.Parse(s)
if err != nil {
return "", "", "", err
return "", "", "", nil, err
}

switch u.Scheme {
case "file":
scheme, u.Scheme = u.Scheme, ""
return scheme, "", path.Clean(u.String()), nil
// Remove query params from path for file URLs
u.RawQuery = ""
return scheme, "", path.Clean(u.String()), nil, nil

case "":
return u.Scheme, u.Host, u.Path, fmt.Errorf("replica url scheme required: %s", s)
return u.Scheme, u.Host, u.Path, nil, fmt.Errorf("replica url scheme required: %s", s)

default:
return u.Scheme, u.Host, strings.TrimPrefix(path.Clean(u.Path), "/"), nil
return u.Scheme, u.Host, strings.TrimPrefix(path.Clean(u.Path), "/"), u.Query(), nil
}
}

Expand Down
Loading