From 6f231b0731bc0e598251ae4b91faba13faaf84f5 Mon Sep 17 00:00:00 2001 From: Jun Luo <4catcode@gmail.com> Date: Wed, 3 Sep 2025 11:03:48 +0800 Subject: [PATCH] support/datastore: add read-only http datastore. --- support/datastore/datastore.go | 2 + support/datastore/http.go | 220 ++++++++++++++++++ support/datastore/http_test.go | 392 +++++++++++++++++++++++++++++++++ 3 files changed, 614 insertions(+) create mode 100644 support/datastore/http.go create mode 100644 support/datastore/http_test.go diff --git a/support/datastore/datastore.go b/support/datastore/datastore.go index 7d07bffa44..202daea2a4 100644 --- a/support/datastore/datastore.go +++ b/support/datastore/datastore.go @@ -43,6 +43,8 @@ func NewDataStore(ctx context.Context, datastoreConfig DataStoreConfig) (DataSto return NewGCSDataStore(ctx, datastoreConfig) case "S3": return NewS3DataStore(ctx, datastoreConfig) + case "HTTP": + return NewHTTPDataStore(datastoreConfig) default: return nil, fmt.Errorf("invalid datastore type %v, not supported", datastoreConfig.Type) diff --git a/support/datastore/http.go b/support/datastore/http.go new file mode 100644 index 0000000000..0b4b627d47 --- /dev/null +++ b/support/datastore/http.go @@ -0,0 +1,220 @@ +package datastore + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "os" + "strconv" + "strings" + "time" + + "github.com/stellar/go/support/log" +) + +// HTTPDataStore implements DataStore for HTTP(S) endpoints. +// This is designed for read-only access to publicly available files. +type HTTPDataStore struct { + client *http.Client + baseURL string + headers map[string]string +} + +func NewHTTPDataStore(datastoreConfig DataStoreConfig) (DataStore, error) { + baseURL, ok := datastoreConfig.Params["base_url"] + if !ok { + return nil, errors.New("invalid HTTP config, no base_url") + } + + parsedURL, err := url.Parse(baseURL) + if err != nil { + return nil, fmt.Errorf("invalid base_url: %w", err) + } + if parsedURL.Scheme != "http" && parsedURL.Scheme != "https" { + return nil, errors.New("base_url must use http or https scheme") + } + + if !strings.HasSuffix(baseURL, "/") { + baseURL = baseURL + "/" + } + + timeout := 30 * time.Second + if timeoutStr, ok := datastoreConfig.Params["timeout"]; ok { + parsedTimeout, err := time.ParseDuration(timeoutStr) + if err != nil { + return nil, fmt.Errorf("invalid timeout: %w", err) + } + timeout = parsedTimeout + } + + headers := make(map[string]string) + for key, value := range datastoreConfig.Params { + if strings.HasPrefix(key, "header_") { + headerName := strings.TrimPrefix(key, "header_") + headers[headerName] = value + } + } + + client := &http.Client{ + Timeout: timeout, + } + + return &HTTPDataStore{ + client: client, + baseURL: baseURL, + headers: headers, + }, nil +} + +func (h *HTTPDataStore) buildURL(filePath string) string { + return h.baseURL + filePath +} + +func (h *HTTPDataStore) addHeaders(req *http.Request) { + for key, value := range h.headers { + req.Header.Set(key, value) + } +} + +func (h *HTTPDataStore) checkHTTPStatus(resp *http.Response, filePath string) error { + switch resp.StatusCode { + case http.StatusOK: + return nil + case http.StatusNotFound: + return os.ErrNotExist + default: + return fmt.Errorf("HTTP error %d for file %s", resp.StatusCode, filePath) + } +} + +func (h *HTTPDataStore) doHeadRequest(ctx context.Context, filePath string) (*http.Response, error) { + requestURL := h.buildURL(filePath) + req, err := http.NewRequestWithContext(ctx, "HEAD", requestURL, nil) + if err != nil { + return nil, fmt.Errorf("failed to create HEAD request for %s: %w", filePath, err) + } + h.addHeaders(req) + + resp, err := h.client.Do(req) + if err != nil { + return nil, fmt.Errorf("HEAD request failed for %s: %w", filePath, err) + } + + if err := h.checkHTTPStatus(resp, filePath); err != nil { + resp.Body.Close() + return nil, err + } + + return resp, nil +} + +// GetFileMetadata retrieves basic metadata for a file via HTTP HEAD request. +func (h *HTTPDataStore) GetFileMetadata(ctx context.Context, filePath string) (map[string]string, error) { + resp, err := h.doHeadRequest(ctx, filePath) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + metadata := make(map[string]string) + for key, values := range resp.Header { + if len(values) > 0 { + metadata[strings.ToLower(key)] = values[0] + } + } + + return metadata, nil +} + +// GetFileLastModified retrieves the last modified time from HTTP headers. +func (h *HTTPDataStore) GetFileLastModified(ctx context.Context, filePath string) (time.Time, error) { + metadata, err := h.GetFileMetadata(ctx, filePath) + if err != nil { + return time.Time{}, err + } + + if lastModified, ok := metadata["last-modified"]; ok { + return http.ParseTime(lastModified) + } + + return time.Time{}, errors.New("last-modified header not found") +} + +// GetFile downloads a file from the HTTP endpoint. +func (h *HTTPDataStore) GetFile(ctx context.Context, filePath string) (io.ReadCloser, error) { + requestURL := h.buildURL(filePath) + req, err := http.NewRequestWithContext(ctx, "GET", requestURL, nil) + if err != nil { + return nil, fmt.Errorf("failed to create GET request for %s: %w", filePath, err) + } + h.addHeaders(req) + + resp, err := h.client.Do(req) + if err != nil { + log.Debugf("Error retrieving file '%s': %v", filePath, err) + return nil, fmt.Errorf("GET request failed for %s: %w", filePath, err) + } + + if err := h.checkHTTPStatus(resp, filePath); err != nil { + resp.Body.Close() + return nil, err + } + + log.Debugf("File retrieved successfully: %s", filePath) + return resp.Body, nil +} + +// PutFile is not supported for HTTP datastore. +func (h *HTTPDataStore) PutFile(ctx context.Context, path string, in io.WriterTo, metaData map[string]string) error { + return errors.New("HTTP datastore is read-only, PutFile not supported") +} + +// PutFileIfNotExists is not supported for HTTP datastore. +func (h *HTTPDataStore) PutFileIfNotExists(ctx context.Context, path string, in io.WriterTo, metaData map[string]string) (bool, error) { + return false, errors.New("HTTP datastore is read-only, PutFileIfNotExists not supported") +} + +// Exists checks if a file exists by making a HEAD request. +func (h *HTTPDataStore) Exists(ctx context.Context, filePath string) (bool, error) { + resp, err := h.doHeadRequest(ctx, filePath) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return false, nil + } + return false, err + } + defer resp.Body.Close() + + return true, nil +} + +// Size retrieves the file size from Content-Length header. +func (h *HTTPDataStore) Size(ctx context.Context, filePath string) (int64, error) { + metadata, err := h.GetFileMetadata(ctx, filePath) + if err != nil { + return 0, err + } + + if contentLength, ok := metadata["content-length"]; ok { + size, err := strconv.ParseInt(contentLength, 10, 64) + if err != nil { + return 0, fmt.Errorf("invalid content-length header: %s", contentLength) + } + return size, nil + } + + return 0, errors.New("content-length header not found") +} + +// ListFilePaths is not supported for HTTP datastore. +func (h *HTTPDataStore) ListFilePaths(ctx context.Context, prefix string, limit int) ([]string, error) { + return nil, errors.New("HTTP datastore does not support listing files") +} + +// Close does nothing for HTTPDataStore as it does not maintain a persistent connection. +func (h *HTTPDataStore) Close() error { + return nil +} diff --git a/support/datastore/http_test.go b/support/datastore/http_test.go new file mode 100644 index 0000000000..4b82302bb0 --- /dev/null +++ b/support/datastore/http_test.go @@ -0,0 +1,392 @@ +package datastore + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "os" + "strconv" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// mockHTTPServer holds test data and simulates HTTP responses +type mockHTTPServer struct { + files map[string]mockHTTPFile +} + +type mockHTTPFile struct { + content string + lastModified time.Time + headers map[string]string + exists bool +} + +func (s *mockHTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + path := strings.TrimPrefix(r.URL.Path, "/") + + // Check for authentication header if required + if strings.HasPrefix(path, "private/") { + authHeader := r.Header.Get("Authorization") + if authHeader != "Bearer test-token" { + w.WriteHeader(http.StatusUnauthorized) + return + } + } + + file, exists := s.files[path] + if !exists || !file.exists { + w.WriteHeader(http.StatusNotFound) + return + } + + // Set custom headers + for key, value := range file.headers { + w.Header().Set(key, value) + } + + // Set standard headers + w.Header().Set("Content-Length", strconv.Itoa(len(file.content))) + w.Header().Set("Last-Modified", file.lastModified.Format(http.TimeFormat)) + + switch r.Method { + case http.MethodHead: + w.WriteHeader(http.StatusOK) + case http.MethodGet: + w.WriteHeader(http.StatusOK) + w.Write([]byte(file.content)) + default: + w.WriteHeader(http.StatusMethodNotAllowed) + } +} + +func setupMockServer() (*httptest.Server, *mockHTTPServer) { + now := time.Now() + mockServer := &mockHTTPServer{ + files: map[string]mockHTTPFile{ + "test.txt": { + content: "Hello, World!", + lastModified: now, + headers: map[string]string{"Content-Type": "text/plain"}, + exists: true, + }, + "data/file.json": { + content: `{"key": "value"}`, + lastModified: now.Add(-time.Hour), + headers: map[string]string{"Content-Type": "application/json"}, + exists: true, + }, + "private/secret.txt": { + content: "secret data", + lastModified: now, + headers: map[string]string{"Content-Type": "text/plain"}, + exists: true, + }, + }, + } + + server := httptest.NewServer(mockServer) + return server, mockServer +} + +func TestNewHTTPDataStore(t *testing.T) { + t.Run("valid config", func(t *testing.T) { + config := DataStoreConfig{ + Type: "HTTP", + Params: map[string]string{ + "base_url": "https://example.com/", + }, + } + + ds, err := NewHTTPDataStore(config) + require.NoError(t, err) + require.NotNil(t, ds) + + httpDS := ds.(*HTTPDataStore) + require.Equal(t, "https://example.com/", httpDS.baseURL) + require.Empty(t, httpDS.headers) + }) + + t.Run("config with path in base_url", func(t *testing.T) { + config := DataStoreConfig{ + Type: "HTTP", + Params: map[string]string{ + "base_url": "https://example.com/data/exports", + }, + } + + ds, err := NewHTTPDataStore(config) + require.NoError(t, err) + + httpDS := ds.(*HTTPDataStore) + require.Equal(t, "https://example.com/data/exports/", httpDS.baseURL) + }) + + t.Run("config with custom headers", func(t *testing.T) { + config := DataStoreConfig{ + Type: "HTTP", + Params: map[string]string{ + "base_url": "https://example.com/", + "header_Authorization": "Bearer token", + "header_X-API-Key": "key123", + }, + } + + ds, err := NewHTTPDataStore(config) + require.NoError(t, err) + + httpDS := ds.(*HTTPDataStore) + require.Equal(t, "Bearer token", httpDS.headers["Authorization"]) + require.Equal(t, "key123", httpDS.headers["X-API-Key"]) + }) + + t.Run("missing base_url", func(t *testing.T) { + config := DataStoreConfig{ + Type: "HTTP", + Params: map[string]string{}, + } + + _, err := NewHTTPDataStore(config) + require.Error(t, err) + require.Contains(t, err.Error(), "no base_url") + }) +} + +func TestHTTPDataStore_GetFile(t *testing.T) { + server, _ := setupMockServer() + defer server.Close() + + config := DataStoreConfig{ + Type: "HTTP", + Params: map[string]string{ + "base_url": server.URL + "/", + }, + } + + ds, err := NewHTTPDataStore(config) + require.NoError(t, err) + + t.Run("get existing file", func(t *testing.T) { + reader, err := ds.GetFile(context.Background(), "test.txt") + require.NoError(t, err) + defer reader.Close() + + content, err := io.ReadAll(reader) + require.NoError(t, err) + require.Equal(t, "Hello, World!", string(content)) + }) + + t.Run("get file with path", func(t *testing.T) { + reader, err := ds.GetFile(context.Background(), "data/file.json") + require.NoError(t, err) + defer reader.Close() + + content, err := io.ReadAll(reader) + require.NoError(t, err) + require.Equal(t, `{"key": "value"}`, string(content)) + }) + + t.Run("file not found", func(t *testing.T) { + _, err := ds.GetFile(context.Background(), "nonexistent.txt") + require.Error(t, err) + require.ErrorIs(t, err, os.ErrNotExist) + }) +} + +func TestHTTPDataStore_WithCustomHeaders(t *testing.T) { + server, _ := setupMockServer() + defer server.Close() + + config := DataStoreConfig{ + Type: "HTTP", + Params: map[string]string{ + "base_url": server.URL + "/", + "header_Authorization": "Bearer test-token", + }, + } + + ds, err := NewHTTPDataStore(config) + require.NoError(t, err) + + t.Run("access private file with auth", func(t *testing.T) { + reader, err := ds.GetFile(context.Background(), "private/secret.txt") + require.NoError(t, err) + defer reader.Close() + + content, err := io.ReadAll(reader) + require.NoError(t, err) + require.Equal(t, "secret data", string(content)) + }) +} + +func TestHTTPDataStore_GetFileMetadata(t *testing.T) { + server, _ := setupMockServer() + defer server.Close() + + config := DataStoreConfig{ + Type: "HTTP", + Params: map[string]string{ + "base_url": server.URL + "/", + }, + } + + ds, err := NewHTTPDataStore(config) + require.NoError(t, err) + + t.Run("get metadata for existing file", func(t *testing.T) { + metadata, err := ds.GetFileMetadata(context.Background(), "test.txt") + require.NoError(t, err) + require.Equal(t, "text/plain", metadata["content-type"]) + require.Equal(t, "13", metadata["content-length"]) + require.NotEmpty(t, metadata["last-modified"]) + }) + + t.Run("metadata for nonexistent file", func(t *testing.T) { + _, err := ds.GetFileMetadata(context.Background(), "nonexistent.txt") + require.Error(t, err) + require.ErrorIs(t, err, os.ErrNotExist) + }) +} + +func TestHTTPDataStore_GetFileLastModified(t *testing.T) { + server, _ := setupMockServer() + defer server.Close() + + config := DataStoreConfig{ + Type: "HTTP", + Params: map[string]string{ + "base_url": server.URL + "/", + }, + } + + ds, err := NewHTTPDataStore(config) + require.NoError(t, err) + + t.Run("get last modified time", func(t *testing.T) { + lastModified, err := ds.GetFileLastModified(context.Background(), "test.txt") + require.NoError(t, err) + require.False(t, lastModified.IsZero()) + }) + + t.Run("last modified for nonexistent file", func(t *testing.T) { + _, err := ds.GetFileLastModified(context.Background(), "nonexistent.txt") + require.Error(t, err) + require.ErrorIs(t, err, os.ErrNotExist) + }) +} + +func TestHTTPDataStore_Exists(t *testing.T) { + server, _ := setupMockServer() + defer server.Close() + + config := DataStoreConfig{ + Type: "HTTP", + Params: map[string]string{ + "base_url": server.URL + "/", + }, + } + + ds, err := NewHTTPDataStore(config) + require.NoError(t, err) + + t.Run("existing file", func(t *testing.T) { + exists, err := ds.Exists(context.Background(), "test.txt") + require.NoError(t, err) + require.True(t, exists) + }) + + t.Run("nonexistent file", func(t *testing.T) { + exists, err := ds.Exists(context.Background(), "nonexistent.txt") + require.NoError(t, err) + require.False(t, exists) + }) +} + +func TestHTTPDataStore_Size(t *testing.T) { + server, _ := setupMockServer() + defer server.Close() + + config := DataStoreConfig{ + Type: "HTTP", + Params: map[string]string{ + "base_url": server.URL + "/", + }, + } + + ds, err := NewHTTPDataStore(config) + require.NoError(t, err) + + t.Run("get file size", func(t *testing.T) { + size, err := ds.Size(context.Background(), "test.txt") + require.NoError(t, err) + require.Equal(t, int64(13), size) // "Hello, World!" is 13 bytes + }) + + t.Run("size for nonexistent file", func(t *testing.T) { + _, err := ds.Size(context.Background(), "nonexistent.txt") + require.Error(t, err) + require.ErrorIs(t, err, os.ErrNotExist) + }) +} + +func TestHTTPDataStore_ReadOnlyOperations(t *testing.T) { + server, _ := setupMockServer() + defer server.Close() + + config := DataStoreConfig{ + Type: "HTTP", + Params: map[string]string{ + "base_url": server.URL + "/", + }, + } + + ds, err := NewHTTPDataStore(config) + require.NoError(t, err) + + t.Run("PutFile not supported", func(t *testing.T) { + err := ds.PutFile(context.Background(), "test.txt", nil, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "read-only") + }) + + t.Run("PutFileIfNotExists not supported", func(t *testing.T) { + _, err := ds.PutFileIfNotExists(context.Background(), "test.txt", nil, nil) + require.Error(t, err) + require.Contains(t, err.Error(), "read-only") + }) + + t.Run("ListFilePaths not supported", func(t *testing.T) { + _, err := ds.ListFilePaths(context.Background(), "", 10) + require.Error(t, err) + require.Contains(t, err.Error(), "does not support listing") + }) +} + +func TestHTTPDataStore_WithPathInBaseURL(t *testing.T) { + server, _ := setupMockServer() + defer server.Close() + + config := DataStoreConfig{ + Type: "HTTP", + Params: map[string]string{ + "base_url": server.URL + "/data/", + }, + } + + ds, err := NewHTTPDataStore(config) + require.NoError(t, err) + + t.Run("get file with base URL path", func(t *testing.T) { + reader, err := ds.GetFile(context.Background(), "file.json") + require.NoError(t, err) + defer reader.Close() + + content, err := io.ReadAll(reader) + require.NoError(t, err) + require.Equal(t, `{"key": "value"}`, string(content)) + }) +}