From b1191e658d19264305cc396b4384ec7cb1e872a2 Mon Sep 17 00:00:00 2001 From: bluepal-prasanthi-moparthi Date: Wed, 20 Aug 2025 16:40:57 +0530 Subject: [PATCH 01/25] Replication - Add endpoint to Create New Batch --- v2/arangodb/client_impl.go | 2 + v2/arangodb/client_replication.go | 44 ++++++++++ v2/arangodb/client_replication_impl.go | 113 +++++++++++++++++++++++++ 3 files changed, 159 insertions(+) create mode 100644 v2/arangodb/client_replication.go create mode 100644 v2/arangodb/client_replication_impl.go diff --git a/v2/arangodb/client_impl.go b/v2/arangodb/client_impl.go index cbb3dad4..01968e23 100644 --- a/v2/arangodb/client_impl.go +++ b/v2/arangodb/client_impl.go @@ -40,6 +40,7 @@ func newClient(connection connection.Connection) *client { c.clientAsyncJob = newClientAsyncJob(c) c.clientFoxx = newClientFoxx(c) c.clientTask = newClientTask(c) + c.clientReplication = newClientReplication(c) c.Requests = NewRequests(connection) @@ -58,6 +59,7 @@ type client struct { *clientAsyncJob *clientFoxx *clientTask + *clientReplication Requests } diff --git a/v2/arangodb/client_replication.go b/v2/arangodb/client_replication.go new file mode 100644 index 00000000..a9bc1e1a --- /dev/null +++ b/v2/arangodb/client_replication.go @@ -0,0 +1,44 @@ +// +// DISCLAIMER +// +// Copyright 2023-2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package arangodb + +import "context" + +// ClientReplication defines replication API methods. +type ClientReplication interface { + // CreateNewBatch creates a new replication batch. + CreateNewBatch(ctx context.Context, dbName string, DBserver *string, state *bool, opt CreateNewBatchOptions) (CreateNewBatchResponse, error) +} + +// CreateNewBatchOptions represents the request body for creating a batch. +type CreateNewBatchOptions struct { + Ttl int `json:"ttl"` +} + +// CreateNewBatchResponse represents the response for batch creation. +type CreateNewBatchResponse struct { + // The ID of the created batch + ID string `json:"id"` + // The last tick of the created batch + LastTick string `json:"lastTick"` + // Only present if the state URL parameter was set to true + State map[string]interface{} `json:"state,omitempty"` +} diff --git a/v2/arangodb/client_replication_impl.go b/v2/arangodb/client_replication_impl.go new file mode 100644 index 00000000..d117a1b1 --- /dev/null +++ b/v2/arangodb/client_replication_impl.go @@ -0,0 +1,113 @@ +// +// DISCLAIMER +// +// Copyright 2023-2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package arangodb + +import ( + "context" + "fmt" + "net/http" + "net/url" + + "github.com/pkg/errors" + + "github.com/arangodb/go-driver/v2/arangodb/shared" + "github.com/arangodb/go-driver/v2/connection" +) + +type clientReplication struct { + client *client +} + +func newClientReplication(client *client) *clientReplication { + return &clientReplication{ + client: client, + } +} + +var _ ClientReplication = &clientReplication{} + +func (c *clientReplication) url(dbName string, pathSegments []string, queryParams map[string]interface{}) string { + + base := connection.NewUrl("_db", url.PathEscape(dbName), "_api", "replication") + for _, seg := range pathSegments { + base = fmt.Sprintf("%s/%s", base, url.PathEscape(seg)) + } + + if len(queryParams) > 0 { + q := url.Values{} + for k, v := range queryParams { + switch val := v.(type) { + case string: + q.Set(k, val) + case bool: + q.Set(k, fmt.Sprintf("%t", val)) + case int, int64, float64: + q.Set(k, fmt.Sprintf("%v", val)) + default: + // skip unsupported types or handle as needed + } + } + base = fmt.Sprintf("%s?%s", base, q.Encode()) + } + return base +} + +func (c *clientReplication) CreateNewBatch(ctx context.Context, dbName string, DBserver *string, state *bool, opt CreateNewBatchOptions) (CreateNewBatchResponse, error) { + // Build query params + queryParams := map[string]interface{}{} + if state != nil { + queryParams["state"] = *state + } + + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return CreateNewBatchResponse{}, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + if DBserver == nil || *DBserver == "" { + return CreateNewBatchResponse{}, errors.New("DBserver must be specified when creating a batch on a coordinator") + } + queryParams["DBserver"] = *DBserver + } + + // Build URL + url := c.url(dbName, []string{"batch"}, queryParams) + + // Prepare response wrapper + var response struct { + shared.ResponseStruct `json:",inline"` + CreateNewBatchResponse `json:",inline"` + } + + resp, err := connection.CallPost(ctx, c.client.connection, url, opt, &response) + if err != nil { + return CreateNewBatchResponse{}, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.CreateNewBatchResponse, nil + default: + return CreateNewBatchResponse{}, response.AsArangoErrorWithCode(code) + } +} From 535cc9c4f6b9f69f1a505b7e20ad8682dfddec7a Mon Sep 17 00:00:00 2001 From: bluepal-prasanthi-moparthi Date: Wed, 20 Aug 2025 19:22:51 +0530 Subject: [PATCH 02/25] Replication - Add test case for Create New Batch --- v2/arangodb/client.go | 1 + v2/tests/client_replication_test.go | 68 +++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+) create mode 100644 v2/tests/client_replication_test.go diff --git a/v2/arangodb/client.go b/v2/arangodb/client.go index ad52721e..7c046548 100644 --- a/v2/arangodb/client.go +++ b/v2/arangodb/client.go @@ -37,4 +37,5 @@ type Client interface { ClientAsyncJob ClientFoxx ClientTasks + ClientReplication } diff --git a/v2/tests/client_replication_test.go b/v2/tests/client_replication_test.go new file mode 100644 index 00000000..3d243261 --- /dev/null +++ b/v2/tests/client_replication_test.go @@ -0,0 +1,68 @@ +// +// DISCLAIMER +// +// Copyright 2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package tests + +import ( + "context" + "testing" + + "github.com/arangodb/go-driver/v2/arangodb" + "github.com/arangodb/go-driver/v2/utils" + "github.com/stretchr/testify/require" +) + +func Test_CreateNewBatch(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole is %s\n", serverRole) + + var dbServer *string + state := utils.NewType(true) + + if serverRole == arangodb.ServerRoleCoordinator { + clusterHealth, err := client.Health(ctx) // Ensure the client is healthy + require.NoError(t, err) + for id, db := range clusterHealth.Health { + if db.Role == arangodb.ServerRoleDBServer { + s := string(id) + dbServer = &s + break + } + } + } + + db, err := client.GetDatabase(ctx, "_system", nil) + require.NoError(t, err) + + batch, err := client.CreateNewBatch(ctx, db.Name(), dbServer, state, arangodb.CreateNewBatchOptions{ + Ttl: 300, + }) + require.NoError(t, err) + require.NotNil(t, batch) + require.NotEmpty(t, batch.ID) + require.NotEmpty(t, batch.LastTick) + require.NotNil(t, batch.State) + }) + }) +} From 69ec96cba77a7e93427cb63697655117f71b41f3 Mon Sep 17 00:00:00 2001 From: bluepal-prasanthi-moparthi Date: Thu, 21 Aug 2025 11:30:56 +0530 Subject: [PATCH 03/25] modified passing params in post call --- v2/arangodb/client_replication_impl.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v2/arangodb/client_replication_impl.go b/v2/arangodb/client_replication_impl.go index d117a1b1..406b6a36 100644 --- a/v2/arangodb/client_replication_impl.go +++ b/v2/arangodb/client_replication_impl.go @@ -99,7 +99,7 @@ func (c *clientReplication) CreateNewBatch(ctx context.Context, dbName string, D CreateNewBatchResponse `json:",inline"` } - resp, err := connection.CallPost(ctx, c.client.connection, url, opt, &response) + resp, err := connection.CallPost(ctx, c.client.connection, url, &response, opt) if err != nil { return CreateNewBatchResponse{}, errors.WithStack(err) } From b169576a6196165d800e2e45be8f090728f79f90 Mon Sep 17 00:00:00 2001 From: bluepal-prasanthi-moparthi Date: Thu, 21 Aug 2025 14:33:02 +0530 Subject: [PATCH 04/25] Replication: Add end point to retrieves the inventory of a replication batch --- v2/arangodb/client_replication.go | 183 ++++++++++++++++++++++++- v2/arangodb/client_replication_impl.go | 60 ++++++++ 2 files changed, 242 insertions(+), 1 deletion(-) diff --git a/v2/arangodb/client_replication.go b/v2/arangodb/client_replication.go index a9bc1e1a..e9a470ab 100644 --- a/v2/arangodb/client_replication.go +++ b/v2/arangodb/client_replication.go @@ -20,12 +20,17 @@ package arangodb -import "context" +import ( + "context" + "time" +) // ClientReplication defines replication API methods. type ClientReplication interface { // CreateNewBatch creates a new replication batch. CreateNewBatch(ctx context.Context, dbName string, DBserver *string, state *bool, opt CreateNewBatchOptions) (CreateNewBatchResponse, error) + // GetInventory retrieves the inventory of a replication batch. + GetInventory(ctx context.Context, dbName string, params InventoryQueryParams) (InventoryResponse, error) } // CreateNewBatchOptions represents the request body for creating a batch. @@ -42,3 +47,179 @@ type CreateNewBatchResponse struct { // Only present if the state URL parameter was set to true State map[string]interface{} `json:"state,omitempty"` } + +// InventoryQueryParams represents the query parameters for the replication inventory API. +type InventoryQueryParams struct { + // IncludeSystem indicates whether to include system collections in the inventory. + IncludeSystem *bool `json:"includeSystem"` + // Global indicates whether to return global inventory or not. + // If true, the inventory will include all collections across all DBServers. + Global *bool `json:"global"` + // BatchID is the ID of the replication batch to query. + BatchID int64 `json:"batchId"` + // Collection is the name of the collection to restrict inventory to. + Collection *string `json:"collection,omitempty"` + + // Only for Coordinators + // Restrict to a specific DBserver in cluster + DBserver *string `json:"DBserver,omitempty"` +} + +// InventoryResponse represents the full response from the replication inventory API. +type InventoryResponse struct { + // Collections is the list of collections in the inventory. + Collections []CollectionsInventoryResponse `json:"collections,omitempty"` + // Database properties. + Properties PropertiesInventoryResponse `json:"properties,omitempty"` + // Views present in the database. + Views []ViewInventoryResponse `json:"views,omitempty"` + // Replication state information. + State StateInventoryResponse `json:"state,omitempty"` + // Last log tick at the time of inventory. + Tick *string `json:"tick,omitempty"` +} + +// CollectionsInventoryResponse represents a collection entry in the inventory. +type CollectionsInventoryResponse struct { + // Indexes defined on the collection. + // Note: Primary indexes and edge indexes are not included in this array. + Indexes []IndexesInventoryResponse `json:"indexes,omitempty"` + // Collection properties and metadata. + Parameters ParametersInventoryResponse `json:"parameters,omitempty"` +} + +// ParametersInventoryResponse represents metadata and settings of a collection. +type ParametersInventoryResponse struct { + // AllowUserKeys indicates whether user keys are allowed. + AllowUserKeys *bool `json:"allowUserKeys,omitempty"` + // CacheEnabled indicates whether in-memory cache is enabled. + CacheEnabled *bool `json:"cacheEnabled,omitempty"` + // Cid is the collection ID. + Cid *string `json:"cid,omitempty"` + // ComputedValues holds the computed values for the collection. + ComputedValues interface{} `json:"computedValues,omitempty"` + // Deleted indicates whether the collection is deleted. + Deleted *bool `json:"deleted,omitempty"` + // GloballyUniqueId is the globally unique identifier for the collection. + GloballyUniqueId *string `json:"globallyUniqueId,omitempty"` + // ID is the collection ID. + ID *string `json:"id,omitempty"` + // InternalValidatorType is the internal validator type. + InternalValidatorType *int `json:"internalValidatorType,omitempty"` + // IsDisjoint indicates whether disjoint smart graphs are used. + IsDisjoint *bool `json:"isDisjoint,omitempty"` + // IsSmart indicates whether the collection is a smart graph collection. + IsSmart *bool `json:"isSmart,omitempty"` + // IsSmartChild indicates whether the collection this is a child shard of a smart graph. + IsSmartChild *bool `json:"isSmartChild,omitempty"` + // IsSystem indicates whether the collection is a system collection. + IsSystem *bool `json:"isSystem,omitempty"` + // KeyOptions defines the key generation options for the collection. + KeyOptions *KeyOpts `json:"keyOptions,omitempty"` + // MinReplicationFactor defines the minimum replication factor for the collection. + MinReplicationFactor *int `json:"minReplicationFactor,omitempty"` + // Name is the name of the collection. + Name *string `json:"name,omitempty"` + // NumberOfShards defines the number of shards for the collection. + NumberOfShards *int `json:"numberOfShards,omitempty"` + // PlanId is the plan ID for the collection. + PlanId *string `json:"planId,omitempty"` + // ReplicationFactor defines the replication factor for the collection. + ReplicationFactor *int `json:"replicationFactor,omitempty"` + // Schema defines the schema for the collection. + Schema interface{} `json:"schema,omitempty"` + // ShardKeys defines the shard keys for the collection. + ShardKeys []string `json:"shardKeys,omitempty"` + // ShardingStrategy defines the sharding strategy for the collection. + ShardingStrategy *string `json:"shardingStrategy,omitempty"` + // Shards defines the shards for the collection. + Shards map[string][]string `json:"shards,omitempty"` + // Status defines the Collection status code. + Status *int `json:"status,omitempty"` + // SyncByRevision indicates whether the collection is synced by revision. + SyncByRevision *bool `json:"syncByRevision,omitempty"` + // Type defines the Collection type (document/edge). + Type *int `json:"type,omitempty"` + // UsesRevisionsAsDocumentIds indicates whether document revisions are used as document IDs. + UsesRevisionsAsDocumentIds *bool `json:"usesRevisionsAsDocumentIds,omitempty"` + // Version defines the version of the collection. + Version *int `json:"version,omitempty"` + // WaitForSync indicates whether the collection should wait for sync. + WaitForSync *bool `json:"waitForSync,omitempty"` + // WriteConcern defines the write concern level for the collection. + WriteConcern *int `json:"writeConcern,omitempty"` +} + +// IndexesInventoryResponse represents metadata for an index in the collection. +type IndexesInventoryResponse struct { + // Index ID + ID *string `json:"id,omitempty"` + // Index type (hash, skiplist, etc.) + Type *string `json:"type,omitempty"` + // Index name + Name *string `json:"name,omitempty"` + // Indexed fields + Fields []string `json:"fields,omitempty"` + // Unique indicates whether the index enforces uniqueness. + Unique *bool `json:"unique,omitempty"` + // Sparse indicates whether the index skips null values. + Sparse *bool `json:"sparse,omitempty"` + // Deduplicate indicates whether the index enforces deduplication. + Deduplicate *bool `json:"deduplicate,omitempty"` + // Estimates indicates whether the index supports estimates. + Estimates *bool `json:"estimates,omitempty"` + // CacheEnabled indicates whether the index is cache enabled. + CacheEnabled *bool `json:"cacheEnabled,omitempty"` +} + +// KeyOpts represents options for document key generation. +type KeyOpts struct { + // Whether user-supplied keys are allowed + AllowUserKeys *bool `json:"allowUserKeys,omitempty"` + // Key type (autoincrement, traditional, etc.) + Type *string `json:"type,omitempty"` + // Last value for autoincrement keys + LastValue *int `json:"lastValue,omitempty"` +} + +// PropertiesInventoryResponse represents database-level properties. +type PropertiesInventoryResponse struct { + // Database ID + ID *string `json:"id,omitempty"` + // Database name + Name *string `json:"name,omitempty"` + // Whether this is a system database + IsSystem *bool `json:"isSystem,omitempty"` + // Default sharding method + Sharding *string `json:"sharding,omitempty"` + // Default replication factor + ReplicationFactor *int `json:"replicationFactor,omitempty"` + // Default write concern + WriteConcern *int `json:"writeConcern,omitempty"` + // Replication protocol version + ReplicationVersion *string `json:"replicationVersion,omitempty"` +} + +// StateInventoryResponse represents replication state at the time of inventory. +type StateInventoryResponse struct { + // Whether replication is running + Running *bool `json:"running,omitempty"` + // Last committed log tick + LastLogTick *string `json:"lastLogTick,omitempty"` + // Last uncommitted log tick + LastUncommittedLogTick *string `json:"lastUncommittedLogTick,omitempty"` + // Total number of events + TotalEvents *int `json:"totalEvents,omitempty"` + // Timestamp of the state + Time *time.Time `json:"time,omitempty"` +} + +// ViewInventoryResponse represents a view entry in the inventory. +type ViewInventoryResponse struct { + // View ID + ID *string `json:"id,omitempty"` + // View name + Name *string `json:"name,omitempty"` + // View type (e.g. "arangosearch") + Type *string `json:"type,omitempty"` +} diff --git a/v2/arangodb/client_replication_impl.go b/v2/arangodb/client_replication_impl.go index 406b6a36..24fda49f 100644 --- a/v2/arangodb/client_replication_impl.go +++ b/v2/arangodb/client_replication_impl.go @@ -30,6 +30,7 @@ import ( "github.com/arangodb/go-driver/v2/arangodb/shared" "github.com/arangodb/go-driver/v2/connection" + "github.com/arangodb/go-driver/v2/utils" ) type clientReplication struct { @@ -111,3 +112,62 @@ func (c *clientReplication) CreateNewBatch(ctx context.Context, dbName string, D return CreateNewBatchResponse{}, response.AsArangoErrorWithCode(code) } } + +func (c *clientReplication) GetInventory(ctx context.Context, dbName string, params InventoryQueryParams) (InventoryResponse, error) { + // Build query params + queryParams := map[string]interface{}{} + + if params.IncludeSystem == nil { + queryParams["includeSystem"] = utils.NewType(true) + } else { + queryParams["includeSystem"] = *params.IncludeSystem + } + + if params.Global == nil { + queryParams["global"] = utils.NewType(false) + } else { + queryParams["global"] = *params.Global + } + + if params.BatchID == 0 { + return InventoryResponse{}, errors.New("batchId must be specified when querying inventory") + } + queryParams["batchId"] = params.BatchID + + if params.Collection != nil { + queryParams["collection"] = *params.Collection + } + + // Check server role + serverRole, err := c.client.ServerRole(ctx) + if err != nil { + return InventoryResponse{}, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + if params.DBserver == nil || *params.DBserver == "" { + return InventoryResponse{}, errors.New("DBserver must be specified when querying inventory on a coordinator") + } + queryParams["DBserver"] = *params.DBserver + } + + // Build URL + url := c.url(dbName, []string{"inventory"}, queryParams) + + // Prepare response wrapper + var response struct { + shared.ResponseStruct `json:",inline"` + InventoryResponse `json:",inline"` + } + + resp, err := connection.CallGet(ctx, c.client.connection, url, &response) + if err != nil { + return InventoryResponse{}, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.InventoryResponse, nil + default: + return InventoryResponse{}, response.AsArangoErrorWithCode(code) + } +} From d28a6416b1279312b6da1953f1c5bda987690507 Mon Sep 17 00:00:00 2001 From: bluepal-prasanthi-moparthi Date: Thu, 21 Aug 2025 17:07:59 +0530 Subject: [PATCH 05/25] Replication: Add test case to retrieves the inventory of a replication batch --- v2/arangodb/client_replication.go | 40 ++++++++++++++------------ v2/arangodb/client_replication_impl.go | 2 +- v2/tests/client_replication_test.go | 7 +++++ 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/v2/arangodb/client_replication.go b/v2/arangodb/client_replication.go index e9a470ab..ef73d344 100644 --- a/v2/arangodb/client_replication.go +++ b/v2/arangodb/client_replication.go @@ -56,7 +56,7 @@ type InventoryQueryParams struct { // If true, the inventory will include all collections across all DBServers. Global *bool `json:"global"` // BatchID is the ID of the replication batch to query. - BatchID int64 `json:"batchId"` + BatchID string `json:"batchId"` // Collection is the name of the collection to restrict inventory to. Collection *string `json:"collection,omitempty"` @@ -90,6 +90,8 @@ type CollectionsInventoryResponse struct { // ParametersInventoryResponse represents metadata and settings of a collection. type ParametersInventoryResponse struct { + // Reusable basic properties like ID and Name + BasicProperties // AllowUserKeys indicates whether user keys are allowed. AllowUserKeys *bool `json:"allowUserKeys,omitempty"` // CacheEnabled indicates whether in-memory cache is enabled. @@ -102,8 +104,6 @@ type ParametersInventoryResponse struct { Deleted *bool `json:"deleted,omitempty"` // GloballyUniqueId is the globally unique identifier for the collection. GloballyUniqueId *string `json:"globallyUniqueId,omitempty"` - // ID is the collection ID. - ID *string `json:"id,omitempty"` // InternalValidatorType is the internal validator type. InternalValidatorType *int `json:"internalValidatorType,omitempty"` // IsDisjoint indicates whether disjoint smart graphs are used. @@ -118,14 +118,12 @@ type ParametersInventoryResponse struct { KeyOptions *KeyOpts `json:"keyOptions,omitempty"` // MinReplicationFactor defines the minimum replication factor for the collection. MinReplicationFactor *int `json:"minReplicationFactor,omitempty"` - // Name is the name of the collection. - Name *string `json:"name,omitempty"` // NumberOfShards defines the number of shards for the collection. NumberOfShards *int `json:"numberOfShards,omitempty"` // PlanId is the plan ID for the collection. PlanId *string `json:"planId,omitempty"` // ReplicationFactor defines the replication factor for the collection. - ReplicationFactor *int `json:"replicationFactor,omitempty"` + ReplicationFactor interface{} `json:"replicationFactor,omitempty"` // Schema defines the schema for the collection. Schema interface{} `json:"schema,omitempty"` // ShardKeys defines the shard keys for the collection. @@ -152,12 +150,10 @@ type ParametersInventoryResponse struct { // IndexesInventoryResponse represents metadata for an index in the collection. type IndexesInventoryResponse struct { - // Index ID - ID *string `json:"id,omitempty"` + // Reusable basic properties like ID and Name + BasicProperties // Index type (hash, skiplist, etc.) Type *string `json:"type,omitempty"` - // Index name - Name *string `json:"name,omitempty"` // Indexed fields Fields []string `json:"fields,omitempty"` // Unique indicates whether the index enforces uniqueness. @@ -184,16 +180,14 @@ type KeyOpts struct { // PropertiesInventoryResponse represents database-level properties. type PropertiesInventoryResponse struct { - // Database ID - ID *string `json:"id,omitempty"` - // Database name - Name *string `json:"name,omitempty"` + // Reusable basic properties like ID and Name + BasicProperties // Whether this is a system database IsSystem *bool `json:"isSystem,omitempty"` // Default sharding method Sharding *string `json:"sharding,omitempty"` // Default replication factor - ReplicationFactor *int `json:"replicationFactor,omitempty"` + ReplicationFactor interface{} `json:"replicationFactor,omitempty"` // Default write concern WriteConcern *int `json:"writeConcern,omitempty"` // Replication protocol version @@ -216,10 +210,18 @@ type StateInventoryResponse struct { // ViewInventoryResponse represents a view entry in the inventory. type ViewInventoryResponse struct { - // View ID - ID *string `json:"id,omitempty"` - // View name - Name *string `json:"name,omitempty"` + // Reusable basic properties like ID and Name + BasicProperties // View type (e.g. "arangosearch") Type *string `json:"type,omitempty"` + // View properties + Properties map[string]interface{} `json:"properties,omitempty"` +} + +// BasicProperties represents reusable ID and Name fields common to collections, views, etc. +type BasicProperties struct { + // Unique identifier (collection ID, view ID, etc.) + ID *string `json:"id,omitempty"` + // Human-readable name + Name *string `json:"name,omitempty"` } diff --git a/v2/arangodb/client_replication_impl.go b/v2/arangodb/client_replication_impl.go index 24fda49f..d20034bc 100644 --- a/v2/arangodb/client_replication_impl.go +++ b/v2/arangodb/client_replication_impl.go @@ -129,7 +129,7 @@ func (c *clientReplication) GetInventory(ctx context.Context, dbName string, par queryParams["global"] = *params.Global } - if params.BatchID == 0 { + if params.BatchID == "" { return InventoryResponse{}, errors.New("batchId must be specified when querying inventory") } queryParams["batchId"] = params.BatchID diff --git a/v2/tests/client_replication_test.go b/v2/tests/client_replication_test.go index 3d243261..26417555 100644 --- a/v2/tests/client_replication_test.go +++ b/v2/tests/client_replication_test.go @@ -63,6 +63,13 @@ func Test_CreateNewBatch(t *testing.T) { require.NotEmpty(t, batch.ID) require.NotEmpty(t, batch.LastTick) require.NotNil(t, batch.State) + + resp, err := client.GetInventory(ctx, db.Name(), arangodb.InventoryQueryParams{ + BatchID: batch.ID, + DBserver: dbServer, + }) + require.NoError(t, err) + require.NotNil(t, resp) }) }) } From edf7b324ebf4d9bcd7895b2c3962393026db25da Mon Sep 17 00:00:00 2001 From: bluepal-prasanthi-moparthi Date: Thu, 21 Aug 2025 19:55:21 +0530 Subject: [PATCH 06/25] Replication: Add endpoint for delete batch --- v2/arangodb/client_replication.go | 2 ++ v2/arangodb/client_replication_impl.go | 32 ++++++++++++++++++++++++++ v2/tests/client_replication_test.go | 17 ++++++++++---- 3 files changed, 46 insertions(+), 5 deletions(-) diff --git a/v2/arangodb/client_replication.go b/v2/arangodb/client_replication.go index ef73d344..5f7ef9cb 100644 --- a/v2/arangodb/client_replication.go +++ b/v2/arangodb/client_replication.go @@ -31,6 +31,8 @@ type ClientReplication interface { CreateNewBatch(ctx context.Context, dbName string, DBserver *string, state *bool, opt CreateNewBatchOptions) (CreateNewBatchResponse, error) // GetInventory retrieves the inventory of a replication batch. GetInventory(ctx context.Context, dbName string, params InventoryQueryParams) (InventoryResponse, error) + // DeleteBatch deletes a replication batch. + DeleteBatch(ctx context.Context, dbName string, DBserver *string, batchId string) error } // CreateNewBatchOptions represents the request body for creating a batch. diff --git a/v2/arangodb/client_replication_impl.go b/v2/arangodb/client_replication_impl.go index d20034bc..f31a2fa4 100644 --- a/v2/arangodb/client_replication_impl.go +++ b/v2/arangodb/client_replication_impl.go @@ -171,3 +171,35 @@ func (c *clientReplication) GetInventory(ctx context.Context, dbName string, par return InventoryResponse{}, response.AsArangoErrorWithCode(code) } } + +func (c *clientReplication) DeleteBatch(ctx context.Context, dbName string, DBserver *string, batchId string) error { + params := map[string]interface{}{} + // Check server role + serverRole, err := c.client.ServerRole(ctx) + if err != nil { + return errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + if DBserver == nil || *DBserver == "" { + return errors.New("DBserver must be specified when querying inventory on a coordinator") + } + params["DBserver"] = *DBserver + } + + // Build URL + url := c.url(dbName, []string{"batch", batchId}, params) + + // Prepare response wrapper + // var response shared.ResponseStruct + resp, err := connection.CallDelete(ctx, c.client.connection, url, nil) + if err != nil { + return errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusNoContent: + return nil + default: + return shared.NewResponseStruct().AsArangoErrorWithCode(code) + } +} diff --git a/v2/tests/client_replication_test.go b/v2/tests/client_replication_test.go index 26417555..eb4e7f0d 100644 --- a/v2/tests/client_replication_test.go +++ b/v2/tests/client_replication_test.go @@ -64,12 +64,19 @@ func Test_CreateNewBatch(t *testing.T) { require.NotEmpty(t, batch.LastTick) require.NotNil(t, batch.State) - resp, err := client.GetInventory(ctx, db.Name(), arangodb.InventoryQueryParams{ - BatchID: batch.ID, - DBserver: dbServer, + t.Run("GetInventory", func(t *testing.T) { + resp, err := client.GetInventory(ctx, db.Name(), arangodb.InventoryQueryParams{ + BatchID: batch.ID, + DBserver: dbServer, + }) + require.NoError(t, err) + require.NotNil(t, resp) + }) + + t.Run("DeleteBatch", func(t *testing.T) { + err := client.DeleteBatch(ctx, db.Name(), dbServer, batch.ID) + require.NoError(t, err) }) - require.NoError(t, err) - require.NotNil(t, resp) }) }) } From f4b8c12bf92c643528569516f392d06fd47f4dd0 Mon Sep 17 00:00:00 2001 From: bluepal-prasanthi-moparthi Date: Fri, 22 Aug 2025 13:10:05 +0530 Subject: [PATCH 07/25] Replication: Add endpoint for extend batch --- v2/arangodb/client_replication.go | 2 ++ v2/arangodb/client_replication_impl.go | 38 ++++++++++++++++++++++++-- v2/tests/client_replication_test.go | 7 +++++ 3 files changed, 45 insertions(+), 2 deletions(-) diff --git a/v2/arangodb/client_replication.go b/v2/arangodb/client_replication.go index 5f7ef9cb..4a5cc3a2 100644 --- a/v2/arangodb/client_replication.go +++ b/v2/arangodb/client_replication.go @@ -33,6 +33,8 @@ type ClientReplication interface { GetInventory(ctx context.Context, dbName string, params InventoryQueryParams) (InventoryResponse, error) // DeleteBatch deletes a replication batch. DeleteBatch(ctx context.Context, dbName string, DBserver *string, batchId string) error + // ExtendBatch extends the TTL of a replication batch. + ExtendBatch(ctx context.Context, dbName string, DBserver *string, batchId string, opt CreateNewBatchOptions) error } // CreateNewBatchOptions represents the request body for creating a batch. diff --git a/v2/arangodb/client_replication_impl.go b/v2/arangodb/client_replication_impl.go index f31a2fa4..2f74afca 100644 --- a/v2/arangodb/client_replication_impl.go +++ b/v2/arangodb/client_replication_impl.go @@ -189,8 +189,6 @@ func (c *clientReplication) DeleteBatch(ctx context.Context, dbName string, DBse // Build URL url := c.url(dbName, []string{"batch", batchId}, params) - // Prepare response wrapper - // var response shared.ResponseStruct resp, err := connection.CallDelete(ctx, c.client.connection, url, nil) if err != nil { return errors.WithStack(err) @@ -203,3 +201,39 @@ func (c *clientReplication) DeleteBatch(ctx context.Context, dbName string, DBse return shared.NewResponseStruct().AsArangoErrorWithCode(code) } } + +func (c *clientReplication) ExtendBatch(ctx context.Context, dbName string, DBserver *string, batchId string, opt CreateNewBatchOptions) error { + + if batchId == "" { + return errors.New("batchId must be specified for extend batch") + } + + // Build query params + queryParams := map[string]interface{}{} + // Check server role + serverRole, err := c.client.ServerRole(ctx) + if err != nil { + return errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + if DBserver == nil || *DBserver == "" { + return errors.New("DBserver must be specified when extending a batch on a coordinator") + } + queryParams["DBserver"] = *DBserver + } + + // Build URL + url := c.url(dbName, []string{"batch", batchId}, queryParams) + + resp, err := connection.CallPut(ctx, c.client.connection, url, nil, opt) + if err != nil { + return errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusNoContent: + return nil + default: + return shared.NewResponseStruct().AsArangoErrorWithCode(code) + } +} diff --git a/v2/tests/client_replication_test.go b/v2/tests/client_replication_test.go index eb4e7f0d..1f8b5101 100644 --- a/v2/tests/client_replication_test.go +++ b/v2/tests/client_replication_test.go @@ -73,6 +73,13 @@ func Test_CreateNewBatch(t *testing.T) { require.NotNil(t, resp) }) + t.Run("ExtendBatch", func(t *testing.T) { + err := client.ExtendBatch(ctx, db.Name(), dbServer, batch.ID, arangodb.CreateNewBatchOptions{ + Ttl: 600, + }) + require.NoError(t, err) + }) + t.Run("DeleteBatch", func(t *testing.T) { err := client.DeleteBatch(ctx, db.Name(), dbServer, batch.ID) require.NoError(t, err) From c60e93f1515395ce38942328f37b7678a3237947 Mon Sep 17 00:00:00 2001 From: bluepal-prasanthi-moparthi Date: Fri, 22 Aug 2025 19:39:12 +0530 Subject: [PATCH 08/25] Replication: Add end point to dump collection --- v2/arangodb/client_replication.go | 11 ++++++ v2/arangodb/client_replication_impl.go | 50 ++++++++++++++++++++++++++ v2/tests/client_replication_test.go | 29 +++++++++++++++ 3 files changed, 90 insertions(+) diff --git a/v2/arangodb/client_replication.go b/v2/arangodb/client_replication.go index 4a5cc3a2..175a46d3 100644 --- a/v2/arangodb/client_replication.go +++ b/v2/arangodb/client_replication.go @@ -35,6 +35,8 @@ type ClientReplication interface { DeleteBatch(ctx context.Context, dbName string, DBserver *string, batchId string) error // ExtendBatch extends the TTL of a replication batch. ExtendBatch(ctx context.Context, dbName string, DBserver *string, batchId string, opt CreateNewBatchOptions) error + + Dump(ctx context.Context, dbName string, params ReplicationDumpParams) ([]byte, error) } // CreateNewBatchOptions represents the request body for creating a batch. @@ -229,3 +231,12 @@ type BasicProperties struct { // Human-readable name Name *string `json:"name,omitempty"` } + +type ReplicationDumpParams struct { + // Collection name + Collection string `json:"collection"` + // Size of each chunk in bytes + ChunkSize *int32 `json:"chunkSize,omitempty"` + // BatchID is the ID of the replication batch. + BatchID string `json:"batchId"` +} diff --git a/v2/arangodb/client_replication_impl.go b/v2/arangodb/client_replication_impl.go index 2f74afca..3fdc2f8e 100644 --- a/v2/arangodb/client_replication_impl.go +++ b/v2/arangodb/client_replication_impl.go @@ -23,6 +23,7 @@ package arangodb import ( "context" "fmt" + "io" "net/http" "net/url" @@ -237,3 +238,52 @@ func (c *clientReplication) ExtendBatch(ctx context.Context, dbName string, DBse return shared.NewResponseStruct().AsArangoErrorWithCode(code) } } + +func (c *clientReplication) Dump(ctx context.Context, dbName string, params ReplicationDumpParams) ([]byte, error) { + + role, err := c.client.ServerRole(ctx) + if err != nil { + return nil, errors.WithStack(err) + } + if role != ServerRoleSingle { + return nil, errors.Errorf("replication dump not supported on role %s", role) + } + + // Build query params + queryParams := map[string]interface{}{} + if params.ChunkSize != nil && *params.ChunkSize != 0 { + queryParams["chunkSize"] = params.ChunkSize + } + if params.Collection == "" { + return nil, errors.New("collection must be specified when querying replication dump") + } + queryParams["collection"] = params.Collection + if params.BatchID == "" { + return nil, errors.New("batchId must be specified when querying replication dump") + } + queryParams["batchId"] = params.BatchID + + // Build URL + url := c.url(dbName, []string{"dump"}, queryParams) + req, err := c.client.Connection().NewRequest(http.MethodGet, url) + if err != nil { + return nil, err + } + + var data []byte + // Call Do with nil result (we'll handle body manually) + resp, err := c.client.Connection().Do(ctx, req, &data, http.StatusOK, http.StatusNoContent) + if err != nil { + return nil, err + } + defer resp.RawResponse().Body.Close() + + if resp.Code() == http.StatusNoContent { + return nil, nil + } + if resp.Code() != http.StatusOK { + return nil, (&shared.ResponseStruct{}).AsArangoErrorWithCode(resp.Code()) + } + + return io.ReadAll(resp.RawResponse().Body) +} diff --git a/v2/tests/client_replication_test.go b/v2/tests/client_replication_test.go index 1f8b5101..e74e2ba3 100644 --- a/v2/tests/client_replication_test.go +++ b/v2/tests/client_replication_test.go @@ -23,6 +23,7 @@ package tests import ( "context" "testing" + "time" "github.com/arangodb/go-driver/v2/arangodb" "github.com/arangodb/go-driver/v2/utils" @@ -80,6 +81,34 @@ func Test_CreateNewBatch(t *testing.T) { require.NoError(t, err) }) + t.Run("GetReplicationDump", func(t *testing.T) { + WithCollectionV2(t, db, nil, func(col arangodb.Collection) { + docs := []map[string]interface{}{ + {"_key": "doc1", "name": "Alice"}, + {"_key": "doc2", "name": "Bob"}, + {"_key": "doc3", "name": "Charlie"}, + } + for _, doc := range docs { + resp, err := col.CreateDocument(ctx, doc) + require.NoError(t, err) + require.NotNil(t, resp) + } + + // Give Arango some time to flush + time.Sleep(200 * time.Millisecond) + // Attempt to dump the collection + if serverRole == arangodb.ServerRoleSingle { + _, err := client.Dump(ctx, db.Name(), arangodb.ReplicationDumpParams{ + BatchID: batch.ID, + Collection: col.Name(), + }) + require.NoError(t, err) + } else { + t.Skipf("Dump only allowed for single server deployments. This is a %s server", serverRole) + } + }) + }) + t.Run("DeleteBatch", func(t *testing.T) { err := client.DeleteBatch(ctx, db.Name(), dbServer, batch.ID) require.NoError(t, err) From 3d260e1593052fb0428b9ded6d6e197d87c3a9d8 Mon Sep 17 00:00:00 2001 From: bluepal-prasanthi-moparthi Date: Mon, 25 Aug 2025 14:44:30 +0530 Subject: [PATCH 09/25] Replication: add endpoint to get the replication logger state --- v2/arangodb/client_replication.go | 31 ++++++++++++++++++++++- v2/arangodb/client_replication_impl.go | 34 ++++++++++++++++++++++++++ v2/tests/client_replication_test.go | 31 +++++++++++++++++++++++ 3 files changed, 95 insertions(+), 1 deletion(-) diff --git a/v2/arangodb/client_replication.go b/v2/arangodb/client_replication.go index 175a46d3..e7997bfe 100644 --- a/v2/arangodb/client_replication.go +++ b/v2/arangodb/client_replication.go @@ -35,8 +35,10 @@ type ClientReplication interface { DeleteBatch(ctx context.Context, dbName string, DBserver *string, batchId string) error // ExtendBatch extends the TTL of a replication batch. ExtendBatch(ctx context.Context, dbName string, DBserver *string, batchId string, opt CreateNewBatchOptions) error - + // Dump retrieves a chunk of data from a collection in a replication batch. Dump(ctx context.Context, dbName string, params ReplicationDumpParams) ([]byte, error) + // LoggerState retrieves the state of the replication logger. + LoggerState(ctx context.Context, dbName string, DBserver *string) (LoggerStateResponse, error) } // CreateNewBatchOptions represents the request body for creating a batch. @@ -240,3 +242,30 @@ type ReplicationDumpParams struct { // BatchID is the ID of the replication batch. BatchID string `json:"batchId"` } + +type State struct { + // Whether replication is running + Running *bool `json:"running"` + // Last committed log tick + LastLogTick *string `json:"lastLogTick"` + // Last uncommitted log tick + LastUncommittedLogTick *string `json:"lastUncommittedLogTick"` + // Total number of events + TotalEvents *int64 `json:"totalEvents"` + // Timestamp of the state + Time *time.Time `json:"time"` +} + +type Server struct { + // Version of the server + Version *string `json:"version"` + ServerId *string `json:"serverId"` + // Engine of the server + Engine *string `json:"engine"` +} + +type LoggerStateResponse struct { + State State `json:"state"` + Server Server `json:"server"` + Clients []map[string]interface{} `json:"clients,inline"` +} diff --git a/v2/arangodb/client_replication_impl.go b/v2/arangodb/client_replication_impl.go index 3fdc2f8e..e1aaa5e0 100644 --- a/v2/arangodb/client_replication_impl.go +++ b/v2/arangodb/client_replication_impl.go @@ -287,3 +287,37 @@ func (c *clientReplication) Dump(ctx context.Context, dbName string, params Repl return io.ReadAll(resp.RawResponse().Body) } + +func (c *clientReplication) LoggerState(ctx context.Context, dbName string, DBserver *string) (LoggerStateResponse, error) { + // Build query params + queryParams := map[string]interface{}{} + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return LoggerStateResponse{}, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + if DBserver == nil || *DBserver == "" { + return LoggerStateResponse{}, errors.New("DBserver must be specified when creating a batch on a coordinator") + } + queryParams["DBserver"] = *DBserver + } + // Build URL + url := c.url(dbName, []string{"logger-state"}, queryParams) + + var response struct { + shared.ResponseStruct `json:",inline"` + LoggerStateResponse `json:",inline"` + } + resp, err := connection.CallGet(ctx, c.client.connection, url, &response) + if err != nil { + return LoggerStateResponse{}, errors.WithStack(err) + } + switch code := resp.Code(); code { + case http.StatusOK: + return response.LoggerStateResponse, nil + default: + return LoggerStateResponse{}, response.AsArangoErrorWithCode(code) + } +} diff --git a/v2/tests/client_replication_test.go b/v2/tests/client_replication_test.go index e74e2ba3..a843c2bc 100644 --- a/v2/tests/client_replication_test.go +++ b/v2/tests/client_replication_test.go @@ -116,3 +116,34 @@ func Test_CreateNewBatch(t *testing.T) { }) }) } + +func Test_LoggerState(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + WithDatabase(t, client, nil, func(db arangodb.Database) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole is %s\n", serverRole) + + var dbServer *string + if serverRole == arangodb.ServerRoleCoordinator { + clusterHealth, err := client.Health(ctx) // Ensure the client is healthy + require.NoError(t, err) + for id, db := range clusterHealth.Health { + if db.Role == arangodb.ServerRoleDBServer { + s := string(id) + dbServer = &s + break + } + } + } + resp, err := client.LoggerState(ctx, db.Name(), dbServer) + require.NoError(t, err) + require.NotNil(t, resp) + require.NotEmpty(t, resp.State) + require.NotEmpty(t, resp.Server) + require.GreaterOrEqual(t, len(resp.Clients), 0) + }) + }) + }) +} From a5c2852209d127408fc9c873d39a075942469259 Mon Sep 17 00:00:00 2001 From: bluepal-prasanthi-moparthi Date: Mon, 25 Aug 2025 16:55:07 +0530 Subject: [PATCH 10/25] Replication: add endpoint to get the first available replication tick value --- v2/arangodb/client_replication.go | 7 ++++++ v2/arangodb/client_replication_impl.go | 30 ++++++++++++++++++++++++++ v2/tests/client_replication_test.go | 21 ++++++++++++++++++ 3 files changed, 58 insertions(+) diff --git a/v2/arangodb/client_replication.go b/v2/arangodb/client_replication.go index e7997bfe..ec7a1860 100644 --- a/v2/arangodb/client_replication.go +++ b/v2/arangodb/client_replication.go @@ -39,6 +39,8 @@ type ClientReplication interface { Dump(ctx context.Context, dbName string, params ReplicationDumpParams) ([]byte, error) // LoggerState retrieves the state of the replication logger. LoggerState(ctx context.Context, dbName string, DBserver *string) (LoggerStateResponse, error) + // LoggerFirstTick retrieves the first tick of the replication logger. + LoggerFirstTick(ctx context.Context, dbName string) (LoggerFirstTickResponse, error) } // CreateNewBatchOptions represents the request body for creating a batch. @@ -269,3 +271,8 @@ type LoggerStateResponse struct { Server Server `json:"server"` Clients []map[string]interface{} `json:"clients,inline"` } + +type LoggerFirstTickResponse struct { + // The first tick of the logger + FirstTick *string `json:"firstTick,omitempty"` +} diff --git a/v2/arangodb/client_replication_impl.go b/v2/arangodb/client_replication_impl.go index e1aaa5e0..c3b7dc0e 100644 --- a/v2/arangodb/client_replication_impl.go +++ b/v2/arangodb/client_replication_impl.go @@ -321,3 +321,33 @@ func (c *clientReplication) LoggerState(ctx context.Context, dbName string, DBse return LoggerStateResponse{}, response.AsArangoErrorWithCode(code) } } + +func (c *clientReplication) LoggerFirstTick(ctx context.Context, dbName string) (LoggerFirstTickResponse, error) { + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return LoggerFirstTickResponse{}, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + return LoggerFirstTickResponse{}, errors.New("replication logger-first-tick is not supported on Coordinators") + + } + // Build URL + url := c.url(dbName, []string{"logger-first-tick"}, nil) + + var response struct { + shared.ResponseStruct `json:",inline"` + LoggerFirstTickResponse `json:",inline"` + } + resp, err := connection.CallGet(ctx, c.client.connection, url, &response) + if err != nil { + return LoggerFirstTickResponse{}, errors.WithStack(err) + } + switch code := resp.Code(); code { + case http.StatusOK: + return response.LoggerFirstTickResponse, nil + default: + return LoggerFirstTickResponse{}, response.AsArangoErrorWithCode(code) + } +} diff --git a/v2/tests/client_replication_test.go b/v2/tests/client_replication_test.go index a843c2bc..35d7b334 100644 --- a/v2/tests/client_replication_test.go +++ b/v2/tests/client_replication_test.go @@ -147,3 +147,24 @@ func Test_LoggerState(t *testing.T) { }) }) } + +func Test_LoggerFirstTick(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + WithDatabase(t, client, nil, func(db arangodb.Database) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole is %s\n", serverRole) + + if serverRole == arangodb.ServerRoleCoordinator { + t.Skipf("Not supported on Coordinators (role: %s)", serverRole) + } + + resp, err := client.LoggerFirstTick(ctx, db.Name()) + require.NoError(t, err) + require.NotNil(t, resp) + require.NotEmpty(t, resp.FirstTick) + }) + }) + }) +} From bdcf33659429b94338f5b09780737a744a019ff2 Mon Sep 17 00:00:00 2001 From: bluepal-prasanthi-moparthi Date: Mon, 25 Aug 2025 18:58:19 +0530 Subject: [PATCH 11/25] Replication: Add endpoint for logger-tick-ranges --- v2/arangodb/client_replication.go | 13 +++++++++++++ v2/arangodb/client_replication_impl.go | 25 +++++++++++++++++++++++++ v2/tests/client_replication_test.go | 20 ++++++++++++++++++++ 3 files changed, 58 insertions(+) diff --git a/v2/arangodb/client_replication.go b/v2/arangodb/client_replication.go index ec7a1860..dc48b915 100644 --- a/v2/arangodb/client_replication.go +++ b/v2/arangodb/client_replication.go @@ -41,6 +41,8 @@ type ClientReplication interface { LoggerState(ctx context.Context, dbName string, DBserver *string) (LoggerStateResponse, error) // LoggerFirstTick retrieves the first tick of the replication logger. LoggerFirstTick(ctx context.Context, dbName string) (LoggerFirstTickResponse, error) + // LoggerTickRange retrieves the currently available ranges of tick values for all currently available WAL logfiles. + LoggerTickRange(ctx context.Context, dbName string) ([]LoggerTickRangeResponseObj, error) } // CreateNewBatchOptions represents the request body for creating a batch. @@ -276,3 +278,14 @@ type LoggerFirstTickResponse struct { // The first tick of the logger FirstTick *string `json:"firstTick,omitempty"` } + +type LoggerTickRangeResponseObj struct { + // Name of the logfile + Datafile *string `json:"datafile,omitempty"` + // Status of the datafile, in textual form (e.g. "sealed", "open") + Status *string `json:"status,omitempty"` + // Minimum tick value contained in logfile + TickMin *string `json:"tickMin,omitempty"` + // Maximum tick value contained in logfile + TickMax *string `json:"tickMax,omitempty"` +} diff --git a/v2/arangodb/client_replication_impl.go b/v2/arangodb/client_replication_impl.go index c3b7dc0e..bf7844d2 100644 --- a/v2/arangodb/client_replication_impl.go +++ b/v2/arangodb/client_replication_impl.go @@ -351,3 +351,28 @@ func (c *clientReplication) LoggerFirstTick(ctx context.Context, dbName string) return LoggerFirstTickResponse{}, response.AsArangoErrorWithCode(code) } } + +func (c *clientReplication) LoggerTickRange(ctx context.Context, dbName string) ([]LoggerTickRangeResponseObj, error) { + // Check server role + serverRole, err := c.client.ServerRole(ctx) + if err != nil { + return nil, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + return nil, errors.New("replication logger-tick-ranges is not supported on Coordinators") + } + // Build URL + url := c.url(dbName, []string{"logger-tick-ranges"}, nil) + + var response []LoggerTickRangeResponseObj + resp, err := connection.CallGet(ctx, c.client.connection, url, &response) + if err != nil { + return nil, errors.WithStack(err) + } + switch code := resp.Code(); code { + case http.StatusOK: + return response, nil + default: + return nil, (&shared.ResponseStruct{}).AsArangoErrorWithCode(resp.Code()) + } +} diff --git a/v2/tests/client_replication_test.go b/v2/tests/client_replication_test.go index 35d7b334..cca62dbd 100644 --- a/v2/tests/client_replication_test.go +++ b/v2/tests/client_replication_test.go @@ -168,3 +168,23 @@ func Test_LoggerFirstTick(t *testing.T) { }) }) } + +func Test_LoggerTickRange(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + WithDatabase(t, client, nil, func(db arangodb.Database) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole is %s\n", serverRole) + + if serverRole == arangodb.ServerRoleCoordinator { + t.Skipf("Not supported on Coordinators (role: %s)", serverRole) + } + + resp, err := client.LoggerTickRange(ctx, db.Name()) + require.NoError(t, err) + require.NotNil(t, resp) + }) + }) + }) +} From 7c7a4a72dad7e5581f7f0f59a1155f54410e534e Mon Sep 17 00:00:00 2001 From: bluepal-prasanthi-moparthi Date: Tue, 26 Aug 2025 15:54:13 +0530 Subject: [PATCH 12/25] Replication: Add endpoint to get the applier configuration --- v2/arangodb/client_replication.go | 59 ++++++++++++++++++++++++++ v2/arangodb/client_replication_impl.go | 38 ++++++++++++++++- v2/tests/client_replication_test.go | 27 ++++++++++++ 3 files changed, 123 insertions(+), 1 deletion(-) diff --git a/v2/arangodb/client_replication.go b/v2/arangodb/client_replication.go index dc48b915..72c254b5 100644 --- a/v2/arangodb/client_replication.go +++ b/v2/arangodb/client_replication.go @@ -43,6 +43,8 @@ type ClientReplication interface { LoggerFirstTick(ctx context.Context, dbName string) (LoggerFirstTickResponse, error) // LoggerTickRange retrieves the currently available ranges of tick values for all currently available WAL logfiles. LoggerTickRange(ctx context.Context, dbName string) ([]LoggerTickRangeResponseObj, error) + // GetApplierConfig retrieves the configuration of the replication applier. + GetApplierConfig(ctx context.Context, dbName string, global *bool) (ApplierConfigResponse, error) } // CreateNewBatchOptions represents the request body for creating a batch. @@ -289,3 +291,60 @@ type LoggerTickRangeResponseObj struct { // Maximum tick value contained in logfile TickMax *string `json:"tickMax,omitempty"` } + +type ApplierConfigResponse struct { + // Logger server endpoint (e.g., tcp://127.0.0.1:8529) + Endpoint *string `json:"endpoint,omitempty"` + // Database name (e.g., "_system") + Database *string `json:"database,omitempty"` + // Optional username for authentication + Username *string `json:"username,omitempty"` + // Password for authentication + Password *string `json:"password,omitempty"` + // Maximum connection attempts before stopping + MaxConnectRetries int `json:"maxConnectRetries"` + // Timeout (seconds) for connecting to endpoint + ConnectTimeout int `json:"connectTimeout"` + // Timeout (seconds) for individual requests + RequestTimeout int `json:"requestTimeout"` + // Max size of log transfer packets + ChunkSize int `json:"chunkSize"` + // Whether applier auto-starts on server startup + AutoStart bool `json:"autoStart"` + // Whether adaptive polling is used + AdaptivePolling bool `json:"adaptivePolling"` + // Whether system collections are included + IncludeSystem bool `json:"includeSystem"` + // Whether full automatic resync is performed if needed + AutoResync bool `json:"autoResync"` + // Number of auto-resync retries before giving up + AutoResyncRetries int `json:"autoResyncRetries"` + // Max wait time (seconds) for initial sync + InitialSyncMaxWaitTime int `json:"initialSyncMaxWaitTime"` + // Idle time (seconds) before retrying failed connection + ConnectionRetryWaitTime int `json:"connectionRetryWaitTime"` + // Minimum idle wait time (seconds) when no new data + IdleMinWaitTime int `json:"idleMinWaitTime"` + // Maximum idle wait time (seconds) when no new data (may be fractional, hence float64) + IdleMaxWaitTime float64 `json:"idleMaxWaitTime"` + // If true, aborts if start tick not available on leader + RequireFromPresent bool `json:"requireFromPresent"` + // If true, logs each applier operation (debugging only) + Verbose bool `json:"verbose"` + // Type of collection restriction ("include" or "exclude") + RestrictType string `json:"restrictType"` + // Collections included/excluded depending on RestrictType + RestrictCollections []string `json:"restrictCollections"` + // Max number of errors to ignore + IgnoreErrors *int `json:"ignoreErrors,omitempty"` + // SSL protocol version + SslProtocol *int `json:"sslProtocol,omitempty"` + // Whether to skip create/drop collection operations + SkipCreateDrop *bool `json:"skipCreateDrop,omitempty"` + // Max packet size (bytes) + MaxPacketSize *int64 `json:"maxPacketSize,omitempty"` + // Whether to include Foxx queues + IncludeFoxxQueues *bool `json:"includeFoxxQueues,omitempty"` + // Whether incremental sync is used + Incremental *bool `json:"incremental,omitempty"` +} diff --git a/v2/arangodb/client_replication_impl.go b/v2/arangodb/client_replication_impl.go index bf7844d2..799599aa 100644 --- a/v2/arangodb/client_replication_impl.go +++ b/v2/arangodb/client_replication_impl.go @@ -331,7 +331,6 @@ func (c *clientReplication) LoggerFirstTick(ctx context.Context, dbName string) } if serverRole == ServerRoleCoordinator { return LoggerFirstTickResponse{}, errors.New("replication logger-first-tick is not supported on Coordinators") - } // Build URL url := c.url(dbName, []string{"logger-first-tick"}, nil) @@ -376,3 +375,40 @@ func (c *clientReplication) LoggerTickRange(ctx context.Context, dbName string) return nil, (&shared.ResponseStruct{}).AsArangoErrorWithCode(resp.Code()) } } + +func (c *clientReplication) GetApplierConfig(ctx context.Context, dbName string, global *bool) (ApplierConfigResponse, error) { + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return ApplierConfigResponse{}, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + return ApplierConfigResponse{}, errors.New("replication applier-config is not supported on Coordinators") + } + + // Build query params + queryParams := map[string]interface{}{} + if global != nil { + queryParams["global"] = *global + } + + // Build URL + url := c.url(dbName, []string{"applier-config"}, queryParams) + + var response struct { + shared.ResponseStruct `json:",inline"` + ApplierConfigResponse `json:",inline"` + } + resp, err := connection.CallGet(ctx, c.client.connection, url, &response) + if err != nil { + return ApplierConfigResponse{}, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.ApplierConfigResponse, nil + default: + return ApplierConfigResponse{}, response.AsArangoErrorWithCode(code) + } +} diff --git a/v2/tests/client_replication_test.go b/v2/tests/client_replication_test.go index cca62dbd..fed61aee 100644 --- a/v2/tests/client_replication_test.go +++ b/v2/tests/client_replication_test.go @@ -188,3 +188,30 @@ func Test_LoggerTickRange(t *testing.T) { }) }) } + +func Test_GetApplierConfig(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole is %s\n", serverRole) + + if serverRole == arangodb.ServerRoleCoordinator { + t.Skipf("Not supported on Coordinators (role: %s)", serverRole) + } + db, err := client.GetDatabase(ctx, "_system", nil) + require.NoError(t, err) + t.Run("Running applier config with setting global:true", func(t *testing.T) { + + resp, err := client.GetApplierConfig(ctx, db.Name(), utils.NewType(false)) + require.NoError(t, err) + require.NotNil(t, resp) + }) + t.Run("Running applier config with setting global:nil", func(t *testing.T) { + resp, err := client.GetApplierConfig(ctx, db.Name(), nil) + require.NoError(t, err) + require.NotNil(t, resp) + }) + }) + }) +} From 070623c218f8ce6f4ea84c4c9ef8c55a95a11b95 Mon Sep 17 00:00:00 2001 From: bluepal-prasanthi-moparthi Date: Thu, 28 Aug 2025 14:20:56 +0530 Subject: [PATCH 13/25] Replication: Add endpoint to update applier config --- v2/arangodb/client_replication.go | 88 ++++++++++++++++++- v2/arangodb/client_replication_impl.go | 114 +++++++++++++++++++++++++ v2/tests/client_replication_test.go | 37 ++++++++ 3 files changed, 238 insertions(+), 1 deletion(-) diff --git a/v2/arangodb/client_replication.go b/v2/arangodb/client_replication.go index 72c254b5..b6b28c36 100644 --- a/v2/arangodb/client_replication.go +++ b/v2/arangodb/client_replication.go @@ -45,6 +45,8 @@ type ClientReplication interface { LoggerTickRange(ctx context.Context, dbName string) ([]LoggerTickRangeResponseObj, error) // GetApplierConfig retrieves the configuration of the replication applier. GetApplierConfig(ctx context.Context, dbName string, global *bool) (ApplierConfigResponse, error) + // UpdateApplierConfig updates the configuration of the replication applier. + UpdateApplierConfig(ctx context.Context, dbName string, global *bool, opts UpdateApplierConfigOptions) (ApplierConfigResponse, error) } // CreateNewBatchOptions represents the request body for creating a batch. @@ -297,7 +299,7 @@ type ApplierConfigResponse struct { Endpoint *string `json:"endpoint,omitempty"` // Database name (e.g., "_system") Database *string `json:"database,omitempty"` - // Optional username for authentication + // Username for authentication Username *string `json:"username,omitempty"` // Password for authentication Password *string `json:"password,omitempty"` @@ -348,3 +350,87 @@ type ApplierConfigResponse struct { // Whether incremental sync is used Incremental *bool `json:"incremental,omitempty"` } + +// UpdateApplierConfigOptions holds the configuration options for the replication applier. +// These settings can only be changed when the applier is not running. +type UpdateApplierConfigOptions struct { + // AdaptivePolling controls whether the replication applier uses adaptive polling. + AdaptivePolling *bool `json:"adaptivePolling"` + + // AutoResync, if set to true, allows the applier to automatically + // trigger a full resynchronization if it falls too far behind. + AutoResync *bool `json:"autoResync"` + + // AutoResyncRetries defines how many times the applier should retry + // automatic resynchronization after failure. + AutoResyncRetries *int `json:"autoResyncRetries"` + + // AutoStart indicates if the applier should start automatically + // once configured. + AutoStart *bool `json:"autoStart"` + + // ChunkSize is the maximum size (in bytes) of the data batches + // fetched by the applier. + ChunkSize *int `json:"chunkSize"` + + // ConnectTimeout is the timeout (in seconds) for the initial + // connection attempt to the master endpoint. + ConnectTimeout *int `json:"connectTimeout"` + + // ConnectionRetryWaitTime is the wait time (in seconds) before retrying + // a failed connection attempt. + ConnectionRetryWaitTime *int `json:"connectionRetryWaitTime"` + + // Database is the name of the database on the master that the applier + // should replicate from. + Database *string `json:"database"` + + // Endpoint specifies the master server endpoint (e.g., "tcp://127.0.0.1:8529") + // from which replication data is pulled. This is required. + Endpoint *string `json:"endpoint"` + + // IdleMaxWaitTime is the maximum wait time (in seconds) between + // polling requests when the applier is idle. + IdleMaxWaitTime *int `json:"idleMaxWaitTime"` + + // IdleMinWaitTime is the minimum wait time (in seconds) between + // polling requests when the applier is idle. + IdleMinWaitTime *int `json:"idleMinWaitTime"` + + // IncludeSystem specifies whether system collections should be + // replicated as well. + IncludeSystem *bool `json:"includeSystem"` + + // InitialSyncMaxWaitTime defines the maximum wait time (in seconds) + // for the initial synchronization step. + InitialSyncMaxWaitTime *int `json:"initialSyncMaxWaitTime"` + + // MaxConnectRetries is the maximum number of retries for + // initial connection attempts. + MaxConnectRetries *int `json:"maxConnectRetries"` + + // Password is the password used when connecting to the master. + Password *string `json:"password"` + + // RequestTimeout specifies the timeout (in seconds) for individual + // HTTP requests made by the applier. + RequestTimeout *int `json:"requestTimeout"` + + // RequireFromPresent, if true, requires the replication to start from + // the present and not accept missing history. + RequireFromPresent *bool `json:"requireFromPresent"` + + // RestrictCollections is an optional list of collections to include + // or exclude in replication, depending on RestrictType. + RestrictCollections *[]string `json:"restrictCollections"` + + // RestrictType determines how RestrictCollections is interpreted: + // "include" or "exclude". + RestrictType *string `json:"restrictType"` + + // Username is the username used when connecting to the master. + Username *string `json:"username"` + + // Verbose controls the verbosity of the applier's logging. + Verbose *bool `json:"verbose"` +} diff --git a/v2/arangodb/client_replication_impl.go b/v2/arangodb/client_replication_impl.go index 799599aa..4f2da6ca 100644 --- a/v2/arangodb/client_replication_impl.go +++ b/v2/arangodb/client_replication_impl.go @@ -412,3 +412,117 @@ func (c *clientReplication) GetApplierConfig(ctx context.Context, dbName string, return ApplierConfigResponse{}, response.AsArangoErrorWithCode(code) } } + +func formUpdateApplierConfigParams(opts UpdateApplierConfigOptions) (map[string]interface{}, error) { + params := map[string]interface{}{} + + // Required + if opts.Endpoint == nil || *opts.Endpoint == "" { + return nil, RequiredFieldError("endpoint") + } + params["endpoint"] = *opts.Endpoint + + // Optional + if opts.Database != nil { + params["database"] = *opts.Database + } + if opts.Username != nil { + params["username"] = *opts.Username + } + if opts.Password != nil { + params["password"] = *opts.Password + } + if opts.MaxConnectRetries != nil { + params["maxConnectRetries"] = *opts.MaxConnectRetries + } + if opts.ConnectTimeout != nil { + params["connectTimeout"] = *opts.ConnectTimeout + } + if opts.RequestTimeout != nil { + params["requestTimeout"] = *opts.RequestTimeout + } + if opts.IdleMinWaitTime != nil { + params["idleMinWaitTime"] = *opts.IdleMinWaitTime + } + if opts.IdleMaxWaitTime != nil { + params["idleMaxWaitTime"] = *opts.IdleMaxWaitTime + } + if opts.InitialSyncMaxWaitTime != nil { + params["initialSyncMaxWaitTime"] = *opts.InitialSyncMaxWaitTime + } + if opts.IncludeSystem != nil { + params["includeSystem"] = *opts.IncludeSystem + } + if opts.ChunkSize != nil { + params["chunkSize"] = *opts.ChunkSize + } + if opts.AutoStart != nil { + params["autoStart"] = *opts.AutoStart + } + if opts.RestrictCollections != nil { + params["restrictCollections"] = *opts.RestrictCollections + } + if opts.RestrictType != nil { + params["restrictType"] = *opts.RestrictType + } + if opts.AdaptivePolling != nil { + params["adaptivePolling"] = *opts.AdaptivePolling + } + if opts.AutoResync != nil { + params["autoResync"] = *opts.AutoResync + } + if opts.AutoResyncRetries != nil { + params["autoResyncRetries"] = *opts.AutoResyncRetries + } + if opts.RequireFromPresent != nil { + params["requireFromPresent"] = *opts.RequireFromPresent + } + if opts.Verbose != nil { + params["verbose"] = *opts.Verbose + } + + return params, nil +} + +func (c *clientReplication) UpdateApplierConfig(ctx context.Context, dbName string, global *bool, opts UpdateApplierConfigOptions) (ApplierConfigResponse, error) { + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return ApplierConfigResponse{}, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + return ApplierConfigResponse{}, errors.New("replication applier-config is not supported on Coordinators") + } + + // Build query params + queryParams := map[string]interface{}{} + if global != nil { + queryParams["global"] = *global + } + + // Build URL + url := c.url(dbName, []string{"applier-config"}, queryParams) + + var response struct { + shared.ResponseStruct `json:",inline"` + ApplierConfigResponse `json:",inline"` + } + + requestParams, err := formUpdateApplierConfigParams(opts) + if err != nil { + return ApplierConfigResponse{}, errors.WithStack(err) + } + + resp, err := connection.CallPut(ctx, c.client.connection, url, &response, requestParams) + if err != nil { + return ApplierConfigResponse{}, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.ApplierConfigResponse, nil + default: + return ApplierConfigResponse{}, response.AsArangoErrorWithCode(code) + } +} diff --git a/v2/tests/client_replication_test.go b/v2/tests/client_replication_test.go index fed61aee..6b121c3b 100644 --- a/v2/tests/client_replication_test.go +++ b/v2/tests/client_replication_test.go @@ -215,3 +215,40 @@ func Test_GetApplierConfig(t *testing.T) { }) }) } + +func Test_UpdateApplierConfig(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole is %s\n", serverRole) + + if serverRole == arangodb.ServerRoleCoordinator { + t.Skipf("Not supported on Coordinators (role: %s)", serverRole) + } + db, err := client.GetDatabase(ctx, "_system", nil) + require.NoError(t, err) + t.Run("Update applier config with setting global:true", func(t *testing.T) { + resp, err := client.UpdateApplierConfig(ctx, db.Name(), utils.NewType(true), arangodb.UpdateApplierConfigOptions{ + ChunkSize: utils.NewType(1234), + AutoStart: utils.NewType(true), + Endpoint: utils.NewType("tcp://127.0.0.1:8529"), + Database: utils.NewType(db.Name()), + Username: utils.NewType("root"), + }) + require.NoError(t, err) + require.NotNil(t, resp) + }) + t.Run("Update applier config with setting global:false", func(t *testing.T) { + resp, err := client.UpdateApplierConfig(ctx, db.Name(), utils.NewType(false), arangodb.UpdateApplierConfigOptions{ + ChunkSize: utils.NewType(2596), + AutoStart: utils.NewType(false), + Endpoint: utils.NewType("tcp://127.0.0.1:8529"), + Database: utils.NewType(db.Name()), + }) + require.NoError(t, err) + require.NotNil(t, resp) + }) + }) + }) +} From 89d3ec0fac268a54991d41a82e9129d375d16210 Mon Sep 17 00:00:00 2001 From: bluepal-prasanthi-moparthi Date: Thu, 28 Aug 2025 18:46:07 +0530 Subject: [PATCH 14/25] Replication: Add endpoints for Start, Stop and Get the replication applier --- v2/arangodb/client_replication.go | 153 +++++++++++++++++++++---- v2/arangodb/client_replication_impl.go | 117 +++++++++++++++++++ v2/tests/client_replication_test.go | 123 ++++++++++++++++++++ 3 files changed, 370 insertions(+), 23 deletions(-) diff --git a/v2/arangodb/client_replication.go b/v2/arangodb/client_replication.go index b6b28c36..7974e846 100644 --- a/v2/arangodb/client_replication.go +++ b/v2/arangodb/client_replication.go @@ -47,6 +47,12 @@ type ClientReplication interface { GetApplierConfig(ctx context.Context, dbName string, global *bool) (ApplierConfigResponse, error) // UpdateApplierConfig updates the configuration of the replication applier. UpdateApplierConfig(ctx context.Context, dbName string, global *bool, opts UpdateApplierConfigOptions) (ApplierConfigResponse, error) + // ApplierStart starts the replication applier. + ApplierStart(ctx context.Context, dbName string, global *bool, from *string) (ApplierStateResp, error) + // ApplierStop stops the replication applier. + ApplierStop(ctx context.Context, dbName string, global *bool) (ApplierStateResp, error) + // GetApplierState retrieves the state of the replication applier. + GetApplierState(ctx context.Context, dbName string, global *bool) (ApplierStateResp, error) } // CreateNewBatchOptions represents the request body for creating a batch. @@ -67,10 +73,10 @@ type CreateNewBatchResponse struct { // InventoryQueryParams represents the query parameters for the replication inventory API. type InventoryQueryParams struct { // IncludeSystem indicates whether to include system collections in the inventory. - IncludeSystem *bool `json:"includeSystem"` + IncludeSystem *bool `json:"includeSystem,omitempty"` // Global indicates whether to return global inventory or not. // If true, the inventory will include all collections across all DBServers. - Global *bool `json:"global"` + Global *bool `json:"global,omitempty"` // BatchID is the ID of the replication batch to query. BatchID string `json:"batchId"` // Collection is the name of the collection to restrict inventory to. @@ -355,82 +361,183 @@ type ApplierConfigResponse struct { // These settings can only be changed when the applier is not running. type UpdateApplierConfigOptions struct { // AdaptivePolling controls whether the replication applier uses adaptive polling. - AdaptivePolling *bool `json:"adaptivePolling"` + AdaptivePolling *bool `json:"adaptivePolling,omitempty"` // AutoResync, if set to true, allows the applier to automatically // trigger a full resynchronization if it falls too far behind. - AutoResync *bool `json:"autoResync"` + AutoResync *bool `json:"autoResync,omitempty"` // AutoResyncRetries defines how many times the applier should retry // automatic resynchronization after failure. - AutoResyncRetries *int `json:"autoResyncRetries"` + AutoResyncRetries *int `json:"autoResyncRetries,omitempty"` // AutoStart indicates if the applier should start automatically // once configured. - AutoStart *bool `json:"autoStart"` + AutoStart *bool `json:"autoStart,omitempty"` // ChunkSize is the maximum size (in bytes) of the data batches // fetched by the applier. - ChunkSize *int `json:"chunkSize"` + ChunkSize *int `json:"chunkSize,omitempty"` // ConnectTimeout is the timeout (in seconds) for the initial // connection attempt to the master endpoint. - ConnectTimeout *int `json:"connectTimeout"` + ConnectTimeout *int `json:"connectTimeout,omitempty"` // ConnectionRetryWaitTime is the wait time (in seconds) before retrying // a failed connection attempt. - ConnectionRetryWaitTime *int `json:"connectionRetryWaitTime"` + ConnectionRetryWaitTime *int `json:"connectionRetryWaitTime,omitempty"` // Database is the name of the database on the master that the applier // should replicate from. - Database *string `json:"database"` + Database *string `json:"database,omitempty"` // Endpoint specifies the master server endpoint (e.g., "tcp://127.0.0.1:8529") // from which replication data is pulled. This is required. - Endpoint *string `json:"endpoint"` + Endpoint *string `json:"endpoint,omitempty"` // IdleMaxWaitTime is the maximum wait time (in seconds) between // polling requests when the applier is idle. - IdleMaxWaitTime *int `json:"idleMaxWaitTime"` + IdleMaxWaitTime *int `json:"idleMaxWaitTime,omitempty"` // IdleMinWaitTime is the minimum wait time (in seconds) between // polling requests when the applier is idle. - IdleMinWaitTime *int `json:"idleMinWaitTime"` + IdleMinWaitTime *int `json:"idleMinWaitTime,omitempty"` // IncludeSystem specifies whether system collections should be // replicated as well. - IncludeSystem *bool `json:"includeSystem"` + IncludeSystem *bool `json:"includeSystem,omitempty"` // InitialSyncMaxWaitTime defines the maximum wait time (in seconds) // for the initial synchronization step. - InitialSyncMaxWaitTime *int `json:"initialSyncMaxWaitTime"` + InitialSyncMaxWaitTime *int `json:"initialSyncMaxWaitTime,omitempty"` // MaxConnectRetries is the maximum number of retries for // initial connection attempts. - MaxConnectRetries *int `json:"maxConnectRetries"` + MaxConnectRetries *int `json:"maxConnectRetries,omitempty"` // Password is the password used when connecting to the master. - Password *string `json:"password"` + Password *string `json:"password,omitempty"` // RequestTimeout specifies the timeout (in seconds) for individual // HTTP requests made by the applier. - RequestTimeout *int `json:"requestTimeout"` + RequestTimeout *int `json:"requestTimeout,omitempty"` // RequireFromPresent, if true, requires the replication to start from // the present and not accept missing history. - RequireFromPresent *bool `json:"requireFromPresent"` + RequireFromPresent *bool `json:"requireFromPresent,omitempty"` // RestrictCollections is an optional list of collections to include // or exclude in replication, depending on RestrictType. - RestrictCollections *[]string `json:"restrictCollections"` + RestrictCollections *[]string `json:"restrictCollections,omitempty"` // RestrictType determines how RestrictCollections is interpreted: // "include" or "exclude". - RestrictType *string `json:"restrictType"` + RestrictType *string `json:"restrictType,omitempty"` // Username is the username used when connecting to the master. - Username *string `json:"username"` + Username *string `json:"username,omitempty"` // Verbose controls the verbosity of the applier's logging. - Verbose *bool `json:"verbose"` + Verbose *bool `json:"verbose,omitempty"` +} + +// ApplierState represents the current state of the replication applier. +type ApplierState struct { + // Started indicates when the applier was started. + Started *string `json:"started"` + + // Running is true if the applier is currently running. + Running *bool `json:"running"` + + // Phase describes the current applier phase (e.g., "running", "inactive"). + Phase *string `json:"phase"` + + // LastAppliedContinuousTick is the tick of the last operation applied by the applier. + LastAppliedContinuousTick *string `json:"lastAppliedContinuousTick"` + + // LastProcessedContinuousTick is the tick of the last operation processed. + LastProcessedContinuousTick *string `json:"lastProcessedContinuousTick"` + + // LastAvailableContinuousTick is the last tick available on the replication logger. + LastAvailableContinuousTick *string `json:"lastAvailableContinuousTick"` + + // SafeResumeTick is the tick from which the applier can safely resume. + SafeResumeTick *string `json:"safeResumeTick"` + + // TicksBehind indicates how many ticks the applier is behind the latest log. + TicksBehind *int64 `json:"ticksBehind,omitempty"` + + // Progress provides detailed information about the last progress event. + Progress *ApplierProgress `json:"progress,omitempty"` + + // TotalRequests is the total number of requests made by the applier. + TotalRequests *int `json:"totalRequests,omitempty"` + + // TotalFailedConnects counts the number of failed connection attempts. + TotalFailedConnects *int `json:"totalFailedConnects,omitempty"` + + // TotalEvents is the total number of replication events processed. + TotalEvents *int `json:"totalEvents,omitempty"` + + // TotalDocuments is the number of document operations applied. + TotalDocuments *int `json:"totalDocuments,omitempty"` + + // TotalRemovals is the number of document removal operations applied. + TotalRemovals *int `json:"totalRemovals,omitempty"` + + // TotalResyncs counts how many times a resync was triggered. + TotalResyncs *int `json:"totalResyncs,omitempty"` + + // TotalOperationsExcluded is the number of operations ignored (due to filters, etc.). + TotalOperationsExcluded *int `json:"totalOperationsExcluded,omitempty"` + + // TotalApplyTime is the cumulative time (in ms) spent applying operations. + TotalApplyTime *int `json:"totalApplyTime,omitempty"` + + // AverageApplyTime is the average time (in ms) spent applying operations. + AverageApplyTime *int `json:"averageApplyTime,omitempty"` + + // TotalFetchTime is the cumulative time (in ms) spent fetching operations. + TotalFetchTime *int `json:"totalFetchTime,omitempty"` + + // AverageFetchTime is the average time (in ms) spent fetching operations. + AverageFetchTime *int `json:"averageFetchTime,omitempty"` + + // LastError contains information about the last error, if any. + LastError *struct { + // ErrorNum is the numeric error code of the last error. + ErrorNum *int `json:"errorNum,omitempty"` + } `json:"lastError,omitempty"` + + // Time is the timestamp of this applier state snapshot. + Time time.Time `json:"time,omitempty"` +} + +// ApplierProgress contains details about the applier's last progress event. +type ApplierProgress struct { + // Time is when the progress message was recorded. + Time *string `json:"time,omitempty"` + + // Message provides a short description of the progress (e.g., "applied batch"). + Message *string `json:"message,omitempty"` + + // FailedConnects counts failed connection attempts at this progress point. + FailedConnects *int `json:"failedConnects,omitempty"` +} + +type ApplierStateResp struct { + // State holds detailed information about the applier's current state. + State ApplierState `json:"state"` + + // Server contains information about the server providing this state. + Server struct { + // Version is the ArangoDB version. + Version *string `json:"version"` + + // ServerId is the unique ID of the server. + ServerId *string `json:"serverId"` + } `json:"server"` + + // Endpoint is the endpoint this applier is connected to. + Endpoint *string `json:"endpoint"` } diff --git a/v2/arangodb/client_replication_impl.go b/v2/arangodb/client_replication_impl.go index 4f2da6ca..b6263237 100644 --- a/v2/arangodb/client_replication_impl.go +++ b/v2/arangodb/client_replication_impl.go @@ -526,3 +526,120 @@ func (c *clientReplication) UpdateApplierConfig(ctx context.Context, dbName stri return ApplierConfigResponse{}, response.AsArangoErrorWithCode(code) } } + +func (c *clientReplication) ApplierStart(ctx context.Context, dbName string, global *bool, from *string) (ApplierStateResp, error) { + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return ApplierStateResp{}, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + return ApplierStateResp{}, errors.New("replication applier-start is not supported on Coordinators") + } + + // Build query params + queryParams := map[string]interface{}{} + if global != nil { + queryParams["global"] = *global + } + if from != nil && *from != "" { + queryParams["from"] = *from + } + + // Build URL + url := c.url(dbName, []string{"applier-start"}, queryParams) + + var response struct { + shared.ResponseStruct `json:",inline"` + ApplierStateResp `json:",inline"` + } + + resp, err := connection.CallPut(ctx, c.client.connection, url, &response, nil) + if err != nil { + return ApplierStateResp{}, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.ApplierStateResp, nil + default: + return ApplierStateResp{}, response.AsArangoErrorWithCode(code) + } +} + +func (c *clientReplication) ApplierStop(ctx context.Context, dbName string, global *bool) (ApplierStateResp, error) { + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return ApplierStateResp{}, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + return ApplierStateResp{}, errors.New("replication applier-stop is not supported on Coordinators") + } + + // Build query params + queryParams := map[string]interface{}{} + if global != nil { + queryParams["global"] = *global + } + + // Build URL + url := c.url(dbName, []string{"applier-stop"}, queryParams) + + var response struct { + shared.ResponseStruct `json:",inline"` + ApplierStateResp `json:",inline"` + } + + resp, err := connection.CallPut(ctx, c.client.connection, url, &response, nil) + if err != nil { + return ApplierStateResp{}, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.ApplierStateResp, nil + default: + return ApplierStateResp{}, response.AsArangoErrorWithCode(code) + } +} + +func (c *clientReplication) GetApplierState(ctx context.Context, dbName string, global *bool) (ApplierStateResp, error) { + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return ApplierStateResp{}, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + return ApplierStateResp{}, errors.New("replication applier-stop is not supported on Coordinators") + } + + // Build query params + queryParams := map[string]interface{}{} + if global != nil { + queryParams["global"] = *global + } + + // Build URL + url := c.url(dbName, []string{"applier-state"}, queryParams) + + var response struct { + shared.ResponseStruct `json:",inline"` + ApplierStateResp `json:",inline"` + } + + resp, err := connection.CallGet(ctx, c.client.connection, url, &response) + if err != nil { + return ApplierStateResp{}, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.ApplierStateResp, nil + default: + return ApplierStateResp{}, response.AsArangoErrorWithCode(code) + } +} diff --git a/v2/tests/client_replication_test.go b/v2/tests/client_replication_test.go index 6b121c3b..3a85a4a8 100644 --- a/v2/tests/client_replication_test.go +++ b/v2/tests/client_replication_test.go @@ -252,3 +252,126 @@ func Test_UpdateApplierConfig(t *testing.T) { }) }) } + +func Test_ApplierStart(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole is %s\n", serverRole) + time.Sleep(1 * time.Second) + if serverRole == arangodb.ServerRoleCoordinator { + t.Skipf("Not supported on Coordinators (role: %s)", serverRole) + } + + db, err := client.GetDatabase(ctx, "_system", nil) + require.NoError(t, err) + + batch, err := client.CreateNewBatch(ctx, db.Name(), nil, utils.NewType(true), arangodb.CreateNewBatchOptions{ + Ttl: 600, + }) + require.NoError(t, err) + require.NotNil(t, batch) + t.Run("Update applier config with setting global:true", func(t *testing.T) { + resp, err := client.UpdateApplierConfig(ctx, db.Name(), utils.NewType(true), arangodb.UpdateApplierConfigOptions{ + ChunkSize: utils.NewType(2596), + AutoStart: utils.NewType(false), + Endpoint: utils.NewType("tcp://127.0.0.1:8529"), + Database: utils.NewType(db.Name()), + }) + require.NoError(t, err) + require.NotNil(t, resp) + }) + t.Logf("Batch ID: %s", batch.ID) + t.Run("Applier Start with query params", func(t *testing.T) { + resp, err := client.ApplierStart(ctx, db.Name(), utils.NewType(true), utils.NewType(batch.ID)) + require.NoError(t, err) + require.NotNil(t, resp) + // Log useful debug info + t.Logf("Applier start:\n running=%v\n phase=%s\n message=%s\n failedConnects=%d", + *resp.State.Running, + *resp.State.Phase, + *resp.State.Progress.Message, + *resp.State.Progress.FailedConnects, + ) + }) + t.Run("Applier_State_with_query_params", func(t *testing.T) { + ctx := context.Background() + + state, err := client.GetApplierState(ctx, db.Name(), utils.NewType(true)) + require.NoError(t, err, "failed to get applier state") + require.NotNil(t, state.State) + + // Log useful debug info + t.Logf("Applier state:\n running=%v\n phase=%s\n message=%s\n failedConnects=%d", + *state.State.Running, + *state.State.Phase, + *state.State.Progress.Message, + *state.State.Progress.FailedConnects, + ) + }) + t.Run("Applier Stop with query params", func(t *testing.T) { + resp, err := client.ApplierStop(ctx, db.Name(), utils.NewType(true)) + require.NoError(t, err) + require.NotNil(t, resp) + // Log useful debug info + t.Logf("Applier stop:\n running=%v\n phase=%s\n message=%s\n failedConnects=%d", + *resp.State.Running, + *resp.State.Phase, + *resp.State.Progress.Message, + *resp.State.Progress.FailedConnects, + ) + }) + t.Run("Update applier config with out query params", func(t *testing.T) { + resp, err := client.UpdateApplierConfig(ctx, db.Name(), nil, arangodb.UpdateApplierConfigOptions{ + ChunkSize: utils.NewType(2596), + AutoStart: utils.NewType(false), + Endpoint: utils.NewType("tcp://127.0.0.1:8529"), + Database: utils.NewType(db.Name()), + }) + require.NoError(t, err) + require.NotNil(t, resp) + }) + t.Logf("Batch ID: %s", batch.ID) + t.Run("Applier Start with out query params", func(t *testing.T) { + resp, err := client.ApplierStart(ctx, db.Name(), nil, nil) + require.NoError(t, err) + require.NotNil(t, resp) + // Log useful debug info + t.Logf("Applier start 346:\n running=%v\n phase=%s\n message=%s\n failedConnects=%d", + *resp.State.Running, + *resp.State.Phase, + *resp.State.Progress.Message, + *resp.State.Progress.FailedConnects, + ) + }) + t.Run("Applier State with out query params", func(t *testing.T) { + ctx := context.Background() + + state, err := client.GetApplierState(ctx, db.Name(), nil) + require.NoError(t, err, "failed to get applier state") + require.NotNil(t, state.State) + + // Log useful debug info + t.Logf("Applier state:\n running=%v\n phase=%s\n message=%s\n failedConnects=%d", + *state.State.Running, + *state.State.Phase, + *state.State.Progress.Message, + *state.State.Progress.FailedConnects, + ) + }) + t.Run("Applier Stop with out query params", func(t *testing.T) { + resp, err := client.ApplierStop(ctx, db.Name(), nil) + require.NoError(t, err) + require.NotNil(t, resp) + // Log useful debug info + t.Logf("Applier stop:\n running=%v\n phase=%s\n message=%s\n failedConnects=%d", + *resp.State.Running, + *resp.State.Phase, + *resp.State.Progress.Message, + *resp.State.Progress.FailedConnects, + ) + }) + }) + }) +} From 10148c537f3603790e168f6248deb75fb483309f Mon Sep 17 00:00:00 2001 From: bluepal-prasanthi-moparthi Date: Fri, 29 Aug 2025 13:25:55 +0530 Subject: [PATCH 15/25] Replication: Add endpoint to fetch Replication Server Id --- v2/arangodb/client_replication.go | 2 ++ v2/arangodb/client_replication_impl.go | 23 +++++++++++++++++++++++ v2/tests/client_replication_test.go | 15 +++++++++++++++ 3 files changed, 40 insertions(+) diff --git a/v2/arangodb/client_replication.go b/v2/arangodb/client_replication.go index 7974e846..d86f9977 100644 --- a/v2/arangodb/client_replication.go +++ b/v2/arangodb/client_replication.go @@ -53,6 +53,8 @@ type ClientReplication interface { ApplierStop(ctx context.Context, dbName string, global *bool) (ApplierStateResp, error) // GetApplierState retrieves the state of the replication applier. GetApplierState(ctx context.Context, dbName string, global *bool) (ApplierStateResp, error) + // GetReplicationServerId retrieves the server ID used for replication. + GetReplicationServerId(ctx context.Context, dbName string) (string, error) } // CreateNewBatchOptions represents the request body for creating a batch. diff --git a/v2/arangodb/client_replication_impl.go b/v2/arangodb/client_replication_impl.go index b6263237..16a7de09 100644 --- a/v2/arangodb/client_replication_impl.go +++ b/v2/arangodb/client_replication_impl.go @@ -643,3 +643,26 @@ func (c *clientReplication) GetApplierState(ctx context.Context, dbName string, return ApplierStateResp{}, response.AsArangoErrorWithCode(code) } } + +func (c *clientReplication) GetReplicationServerId(ctx context.Context, dbName string) (string, error) { + + // Build URL + url := c.url(dbName, []string{"server-id"}, nil) + + var response struct { + shared.ResponseStruct `json:",inline"` + ServerId string `json:"serverId"` + } + + resp, err := connection.CallGet(ctx, c.client.connection, url, &response) + if err != nil { + return "", errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.ServerId, nil + default: + return "", response.AsArangoErrorWithCode(code) + } +} diff --git a/v2/tests/client_replication_test.go b/v2/tests/client_replication_test.go index 3a85a4a8..77060c30 100644 --- a/v2/tests/client_replication_test.go +++ b/v2/tests/client_replication_test.go @@ -375,3 +375,18 @@ func Test_ApplierStart(t *testing.T) { }) }) } + +func Test_GetReplicationServerId(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + db, err := client.GetDatabase(ctx, "_system", nil) + require.NoError(t, err) + t.Run("Get replication server ID", func(t *testing.T) { + resp, err := client.GetReplicationServerId(ctx, db.Name()) + require.NoError(t, err) + require.NotNil(t, resp) + t.Logf("Replication Server ID: %s", resp) + }) + }) + }) +} From a91f2a4703cd1af6666ed0d4552de838b24c3b4a Mon Sep 17 00:00:00 2001 From: bluepal-prasanthi-moparthi Date: Fri, 29 Aug 2025 15:13:03 +0530 Subject: [PATCH 16/25] Replication: Add new end point make-follower --- v2/arangodb/client_replication.go | 12 ++++++-- v2/arangodb/client_replication_impl.go | 42 ++++++++++++++++++++++++-- v2/tests/client_replication_test.go | 34 ++++++++++++++++++--- 3 files changed, 78 insertions(+), 10 deletions(-) diff --git a/v2/arangodb/client_replication.go b/v2/arangodb/client_replication.go index d86f9977..ce0b79d7 100644 --- a/v2/arangodb/client_replication.go +++ b/v2/arangodb/client_replication.go @@ -46,7 +46,7 @@ type ClientReplication interface { // GetApplierConfig retrieves the configuration of the replication applier. GetApplierConfig(ctx context.Context, dbName string, global *bool) (ApplierConfigResponse, error) // UpdateApplierConfig updates the configuration of the replication applier. - UpdateApplierConfig(ctx context.Context, dbName string, global *bool, opts UpdateApplierConfigOptions) (ApplierConfigResponse, error) + UpdateApplierConfig(ctx context.Context, dbName string, global *bool, opts ApplierOptions) (ApplierConfigResponse, error) // ApplierStart starts the replication applier. ApplierStart(ctx context.Context, dbName string, global *bool, from *string) (ApplierStateResp, error) // ApplierStop stops the replication applier. @@ -55,6 +55,8 @@ type ClientReplication interface { GetApplierState(ctx context.Context, dbName string, global *bool) (ApplierStateResp, error) // GetReplicationServerId retrieves the server ID used for replication. GetReplicationServerId(ctx context.Context, dbName string) (string, error) + // MakeFollower makes the current server a follower of the specified leader. + MakeFollower(ctx context.Context, dbName string, opts ApplierOptions) (ApplierStateResp, error) } // CreateNewBatchOptions represents the request body for creating a batch. @@ -359,9 +361,9 @@ type ApplierConfigResponse struct { Incremental *bool `json:"incremental,omitempty"` } -// UpdateApplierConfigOptions holds the configuration options for the replication applier. +// ApplierOptions holds the configuration options for the replication applier. // These settings can only be changed when the applier is not running. -type UpdateApplierConfigOptions struct { +type ApplierOptions struct { // AdaptivePolling controls whether the replication applier uses adaptive polling. AdaptivePolling *bool `json:"adaptivePolling,omitempty"` @@ -509,6 +511,10 @@ type ApplierState struct { LastError *struct { // ErrorNum is the numeric error code of the last error. ErrorNum *int `json:"errorNum,omitempty"` + // ErrorMessage is the descriptive message of the last error. + ErrorMessage *string `json:"errorMessage,omitempty"` + // Time is the timestamp of the last error. + Time time.Time `json:"time,omitempty"` } `json:"lastError,omitempty"` // Time is the timestamp of this applier state snapshot. diff --git a/v2/arangodb/client_replication_impl.go b/v2/arangodb/client_replication_impl.go index 16a7de09..8a4bdf12 100644 --- a/v2/arangodb/client_replication_impl.go +++ b/v2/arangodb/client_replication_impl.go @@ -413,7 +413,7 @@ func (c *clientReplication) GetApplierConfig(ctx context.Context, dbName string, } } -func formUpdateApplierConfigParams(opts UpdateApplierConfigOptions) (map[string]interface{}, error) { +func formApplierParams(opts ApplierOptions) (map[string]interface{}, error) { params := map[string]interface{}{} // Required @@ -484,7 +484,7 @@ func formUpdateApplierConfigParams(opts UpdateApplierConfigOptions) (map[string] return params, nil } -func (c *clientReplication) UpdateApplierConfig(ctx context.Context, dbName string, global *bool, opts UpdateApplierConfigOptions) (ApplierConfigResponse, error) { +func (c *clientReplication) UpdateApplierConfig(ctx context.Context, dbName string, global *bool, opts ApplierOptions) (ApplierConfigResponse, error) { // Check server role serverRole, err := c.client.ServerRole(ctx) @@ -509,7 +509,7 @@ func (c *clientReplication) UpdateApplierConfig(ctx context.Context, dbName stri ApplierConfigResponse `json:",inline"` } - requestParams, err := formUpdateApplierConfigParams(opts) + requestParams, err := formApplierParams(opts) if err != nil { return ApplierConfigResponse{}, errors.WithStack(err) } @@ -666,3 +666,39 @@ func (c *clientReplication) GetReplicationServerId(ctx context.Context, dbName s return "", response.AsArangoErrorWithCode(code) } } + +func (c *clientReplication) MakeFollower(ctx context.Context, dbName string, opts ApplierOptions) (ApplierStateResp, error) { + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return ApplierStateResp{}, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + return ApplierStateResp{}, errors.New("replication make-follower is not supported on Coordinators") + } + + // Build URL + url := c.url(dbName, []string{"make-follower"}, nil) + + var response struct { + shared.ResponseStruct `json:",inline"` + ApplierStateResp `json:",inline"` + } + requestParams, err := formApplierParams(opts) + if err != nil { + return ApplierStateResp{}, errors.WithStack(err) + } + + resp, err := connection.CallPut(ctx, c.client.connection, url, &response, requestParams) + if err != nil { + return ApplierStateResp{}, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.ApplierStateResp, nil + default: + return ApplierStateResp{}, response.AsArangoErrorWithCode(code) + } +} diff --git a/v2/tests/client_replication_test.go b/v2/tests/client_replication_test.go index 77060c30..4cff4c70 100644 --- a/v2/tests/client_replication_test.go +++ b/v2/tests/client_replication_test.go @@ -229,7 +229,7 @@ func Test_UpdateApplierConfig(t *testing.T) { db, err := client.GetDatabase(ctx, "_system", nil) require.NoError(t, err) t.Run("Update applier config with setting global:true", func(t *testing.T) { - resp, err := client.UpdateApplierConfig(ctx, db.Name(), utils.NewType(true), arangodb.UpdateApplierConfigOptions{ + resp, err := client.UpdateApplierConfig(ctx, db.Name(), utils.NewType(true), arangodb.ApplierOptions{ ChunkSize: utils.NewType(1234), AutoStart: utils.NewType(true), Endpoint: utils.NewType("tcp://127.0.0.1:8529"), @@ -240,7 +240,7 @@ func Test_UpdateApplierConfig(t *testing.T) { require.NotNil(t, resp) }) t.Run("Update applier config with setting global:false", func(t *testing.T) { - resp, err := client.UpdateApplierConfig(ctx, db.Name(), utils.NewType(false), arangodb.UpdateApplierConfigOptions{ + resp, err := client.UpdateApplierConfig(ctx, db.Name(), utils.NewType(false), arangodb.ApplierOptions{ ChunkSize: utils.NewType(2596), AutoStart: utils.NewType(false), Endpoint: utils.NewType("tcp://127.0.0.1:8529"), @@ -273,7 +273,7 @@ func Test_ApplierStart(t *testing.T) { require.NoError(t, err) require.NotNil(t, batch) t.Run("Update applier config with setting global:true", func(t *testing.T) { - resp, err := client.UpdateApplierConfig(ctx, db.Name(), utils.NewType(true), arangodb.UpdateApplierConfigOptions{ + resp, err := client.UpdateApplierConfig(ctx, db.Name(), utils.NewType(true), arangodb.ApplierOptions{ ChunkSize: utils.NewType(2596), AutoStart: utils.NewType(false), Endpoint: utils.NewType("tcp://127.0.0.1:8529"), @@ -323,7 +323,7 @@ func Test_ApplierStart(t *testing.T) { ) }) t.Run("Update applier config with out query params", func(t *testing.T) { - resp, err := client.UpdateApplierConfig(ctx, db.Name(), nil, arangodb.UpdateApplierConfigOptions{ + resp, err := client.UpdateApplierConfig(ctx, db.Name(), nil, arangodb.ApplierOptions{ ChunkSize: utils.NewType(2596), AutoStart: utils.NewType(false), Endpoint: utils.NewType("tcp://127.0.0.1:8529"), @@ -390,3 +390,29 @@ func Test_GetReplicationServerId(t *testing.T) { }) }) } + +func Test_MakeFollower(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole is %s\n", serverRole) + + if serverRole == arangodb.ServerRoleCoordinator { + t.Skipf("Not supported on Coordinators (role: %s)", serverRole) + } + db, err := client.GetDatabase(ctx, "_system", nil) + require.NoError(t, err) + t.Run("Make Follower", func(t *testing.T) { + resp, err := client.MakeFollower(ctx, db.Name(), arangodb.ApplierOptions{ + ChunkSize: utils.NewType(1234), + Endpoint: utils.NewType("tcp://127.0.0.1:8529"), + Database: utils.NewType(db.Name()), + Username: utils.NewType("root"), + }) + require.NoError(t, err) + require.NotNil(t, resp) + }) + }) + }) +} From ab3a51039c1cba9d208c2a4d3a789fc4fbb63343 Mon Sep 17 00:00:00 2001 From: bluepal-prasanthi-moparthi Date: Fri, 29 Aug 2025 16:26:27 +0530 Subject: [PATCH 17/25] Replication: Add endpoint to fetch the tick ranges available in the WAL --- v2/arangodb/client_replication.go | 29 ++++++++++++++++++------ v2/arangodb/client_replication_impl.go | 31 ++++++++++++++++++++++++++ v2/tests/client_replication_test.go | 22 ++++++++++++++++++ 3 files changed, 75 insertions(+), 7 deletions(-) diff --git a/v2/arangodb/client_replication.go b/v2/arangodb/client_replication.go index ce0b79d7..9a80fec8 100644 --- a/v2/arangodb/client_replication.go +++ b/v2/arangodb/client_replication.go @@ -57,6 +57,8 @@ type ClientReplication interface { GetReplicationServerId(ctx context.Context, dbName string) (string, error) // MakeFollower makes the current server a follower of the specified leader. MakeFollower(ctx context.Context, dbName string, opts ApplierOptions) (ApplierStateResp, error) + // GetWalRange retrieves the WAL range information. + GetWalRange(ctx context.Context, dbName string) (WalRangeResponse, error) } // CreateNewBatchOptions represents the request body for creating a batch. @@ -538,14 +540,27 @@ type ApplierStateResp struct { State ApplierState `json:"state"` // Server contains information about the server providing this state. - Server struct { - // Version is the ArangoDB version. - Version *string `json:"version"` - - // ServerId is the unique ID of the server. - ServerId *string `json:"serverId"` - } `json:"server"` + Server ApplierServer `json:"server"` // Endpoint is the endpoint this applier is connected to. Endpoint *string `json:"endpoint"` } + +type ApplierServer struct { + // Version is the ArangoDB version. + Version *string `json:"version"` + + // ServerId is the unique ID of the server. + ServerId *string `json:"serverId"` +} + +type WalRangeResponse struct { + // Time is the timestamp when the range was recorded. + Time time.Time `json:"time"` + // Minimum tick in the range + TickMin string `json:"tickMin"` + // Maximum tick in the range + TickMax string `json:"tickMax"` + // Server information + Server ApplierServer `json:"server"` +} diff --git a/v2/arangodb/client_replication_impl.go b/v2/arangodb/client_replication_impl.go index 8a4bdf12..415a3623 100644 --- a/v2/arangodb/client_replication_impl.go +++ b/v2/arangodb/client_replication_impl.go @@ -702,3 +702,34 @@ func (c *clientReplication) MakeFollower(ctx context.Context, dbName string, opt return ApplierStateResp{}, response.AsArangoErrorWithCode(code) } } + +func (c *clientReplication) GetWalRange(ctx context.Context, dbName string) (WalRangeResponse, error) { + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return WalRangeResponse{}, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + return WalRangeResponse{}, errors.New("WAL range is not supported on Coordinators") + } + // Build URL + url := connection.NewUrl("_db", url.PathEscape(dbName), "_api", "wal", "range") + + var response struct { + shared.ResponseStruct `json:",inline"` + WalRangeResponse `json:",inline"` + } + + resp, err := connection.CallGet(ctx, c.client.connection, url, &response) + if err != nil { + return WalRangeResponse{}, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.WalRangeResponse, nil + default: + return WalRangeResponse{}, response.AsArangoErrorWithCode(code) + } +} diff --git a/v2/tests/client_replication_test.go b/v2/tests/client_replication_test.go index 4cff4c70..01337f21 100644 --- a/v2/tests/client_replication_test.go +++ b/v2/tests/client_replication_test.go @@ -416,3 +416,25 @@ func Test_MakeFollower(t *testing.T) { }) }) } + +func Test_GetWalRange(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + db, err := client.GetDatabase(ctx, "_system", nil) + require.NoError(t, err) + + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole is %s\n", serverRole) + if serverRole == arangodb.ServerRoleCoordinator { + t.Skipf("Not supported on Coordinators (role: %s)", serverRole) + } + + t.Run("Get WAL range", func(t *testing.T) { + resp, err := client.GetWalRange(ctx, db.Name()) + require.NoError(t, err) + require.NotNil(t, resp) + }) + }) + }) +} From 124140977300c80585ad3703caa88dddfa9a651f Mon Sep 17 00:00:00 2001 From: bluepal-prasanthi-moparthi Date: Fri, 29 Aug 2025 17:21:53 +0530 Subject: [PATCH 18/25] Replication: Add endpoint to fetch the last available tick value --- v2/arangodb/client_replication.go | 17 ++++++++-- v2/arangodb/client_replication_impl.go | 45 ++++++++++++++++++++++---- v2/tests/client_replication_test.go | 10 ++++-- 3 files changed, 60 insertions(+), 12 deletions(-) diff --git a/v2/arangodb/client_replication.go b/v2/arangodb/client_replication.go index 9a80fec8..a8db2200 100644 --- a/v2/arangodb/client_replication.go +++ b/v2/arangodb/client_replication.go @@ -57,8 +57,10 @@ type ClientReplication interface { GetReplicationServerId(ctx context.Context, dbName string) (string, error) // MakeFollower makes the current server a follower of the specified leader. MakeFollower(ctx context.Context, dbName string, opts ApplierOptions) (ApplierStateResp, error) - // GetWalRange retrieves the WAL range information. - GetWalRange(ctx context.Context, dbName string) (WalRangeResponse, error) + // GetWALRange retrieves the WAL range information. + GetWALRange(ctx context.Context, dbName string) (WALRangeResponse, error) + // GetWALLastTick retrieves the last available tick information. + GetWALLastTick(ctx context.Context, dbName string) (WALLastTickResponse, error) } // CreateNewBatchOptions represents the request body for creating a batch. @@ -554,7 +556,7 @@ type ApplierServer struct { ServerId *string `json:"serverId"` } -type WalRangeResponse struct { +type WALRangeResponse struct { // Time is the timestamp when the range was recorded. Time time.Time `json:"time"` // Minimum tick in the range @@ -564,3 +566,12 @@ type WalRangeResponse struct { // Server information Server ApplierServer `json:"server"` } + +type WALLastTickResponse struct { + // Time is the timestamp when the range was recorded. + Time time.Time `json:"time"` + // Tick contains the last available tick + Tick string `json:"tick"` + // Server information + Server ApplierServer `json:"server"` +} diff --git a/v2/arangodb/client_replication_impl.go b/v2/arangodb/client_replication_impl.go index 415a3623..8566f9fa 100644 --- a/v2/arangodb/client_replication_impl.go +++ b/v2/arangodb/client_replication_impl.go @@ -703,33 +703,64 @@ func (c *clientReplication) MakeFollower(ctx context.Context, dbName string, opt } } -func (c *clientReplication) GetWalRange(ctx context.Context, dbName string) (WalRangeResponse, error) { +func (c *clientReplication) GetWALRange(ctx context.Context, dbName string) (WALRangeResponse, error) { // Check server role serverRole, err := c.client.ServerRole(ctx) if err != nil { - return WalRangeResponse{}, errors.WithStack(err) + return WALRangeResponse{}, errors.WithStack(err) } if serverRole == ServerRoleCoordinator { - return WalRangeResponse{}, errors.New("WAL range is not supported on Coordinators") + return WALRangeResponse{}, errors.New("WAL range is not supported on Coordinators") } // Build URL url := connection.NewUrl("_db", url.PathEscape(dbName), "_api", "wal", "range") var response struct { shared.ResponseStruct `json:",inline"` - WalRangeResponse `json:",inline"` + WALRangeResponse `json:",inline"` } resp, err := connection.CallGet(ctx, c.client.connection, url, &response) if err != nil { - return WalRangeResponse{}, errors.WithStack(err) + return WALRangeResponse{}, errors.WithStack(err) } switch code := resp.Code(); code { case http.StatusOK: - return response.WalRangeResponse, nil + return response.WALRangeResponse, nil default: - return WalRangeResponse{}, response.AsArangoErrorWithCode(code) + return WALRangeResponse{}, response.AsArangoErrorWithCode(code) + } +} + +func (c *clientReplication) GetWALLastTick(ctx context.Context, dbName string) (WALLastTickResponse, error) { + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return WALLastTickResponse{}, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + return WALLastTickResponse{}, errors.New("WAL last tick is not supported on Coordinators") + } + // Build URL + url := connection.NewUrl("_db", url.PathEscape(dbName), "_api", "wal", "lastTick") + + var response struct { + shared.ResponseStruct `json:",inline"` + WALLastTickResponse `json:",inline"` + } + + resp, err := connection.CallGet(ctx, c.client.connection, url, &response) + if err != nil { + return WALLastTickResponse{}, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.WALLastTickResponse, nil + default: + return WALLastTickResponse{}, response.AsArangoErrorWithCode(code) } } diff --git a/v2/tests/client_replication_test.go b/v2/tests/client_replication_test.go index 01337f21..0456db5b 100644 --- a/v2/tests/client_replication_test.go +++ b/v2/tests/client_replication_test.go @@ -417,7 +417,7 @@ func Test_MakeFollower(t *testing.T) { }) } -func Test_GetWalRange(t *testing.T) { +func Test_GetWALReplicationEndpoints(t *testing.T) { Wrap(t, func(t *testing.T, client arangodb.Client) { withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { db, err := client.GetDatabase(ctx, "_system", nil) @@ -431,7 +431,13 @@ func Test_GetWalRange(t *testing.T) { } t.Run("Get WAL range", func(t *testing.T) { - resp, err := client.GetWalRange(ctx, db.Name()) + resp, err := client.GetWALRange(ctx, db.Name()) + require.NoError(t, err) + require.NotNil(t, resp) + }) + + t.Run("Get WAL last tick", func(t *testing.T) { + resp, err := client.GetWALLastTick(ctx, db.Name()) require.NoError(t, err) require.NotNil(t, resp) }) From b9f585ea55509c92f57ded5bf6e8f399e49a357c Mon Sep 17 00:00:00 2001 From: bluepal-prasanthi-moparthi Date: Mon, 1 Sep 2025 11:36:12 +0530 Subject: [PATCH 19/25] Replication: Add this endpoint _api/wal/tail --- v2/arangodb/client_replication.go | 32 +++++++++++ v2/arangodb/client_replication_impl.go | 78 ++++++++++++++++++++++++++ v2/tests/client_replication_test.go | 55 +++++++++++++++++- 3 files changed, 164 insertions(+), 1 deletion(-) diff --git a/v2/arangodb/client_replication.go b/v2/arangodb/client_replication.go index a8db2200..e313fed6 100644 --- a/v2/arangodb/client_replication.go +++ b/v2/arangodb/client_replication.go @@ -61,6 +61,8 @@ type ClientReplication interface { GetWALRange(ctx context.Context, dbName string) (WALRangeResponse, error) // GetWALLastTick retrieves the last available tick information. GetWALLastTick(ctx context.Context, dbName string) (WALLastTickResponse, error) + // GetWALTail retrieves the tail of the WAL. + GetWALTail(ctx context.Context, dbName string, params *WALTailOptions) ([]byte, error) } // CreateNewBatchOptions represents the request body for creating a batch. @@ -575,3 +577,33 @@ type WALLastTickResponse struct { // Server information Server ApplierServer `json:"server"` } + +type WALTailOptions struct { + // Global indicates whether operations for all databases should be included. + // If set to false, only the operations for the current database are included. + // The value true is only valid on the _system database. + Global *bool `json:"global,omitempty"` + + // From specifies the exclusive lower bound tick value for the replication. + From *int64 `json:"from,omitempty"` + + // To specifies the inclusive upper bound tick value for the replication. + To *int64 `json:"to,omitempty"` + + // LastScanned specifies the last scanned tick value (for RocksDB multi-response support). + LastScanned *int `json:"lastScanned,omitempty"` + + // ChunkSize specifies the approximate maximum size of the returned result in bytes. + ChunkSize *int `json:"chunkSize,omitempty"` + + // SyncerId specifies the ID of the client used to tail results. + // Required if ServerId is not provided. + SyncerId *int64 `json:"syncerId,omitempty"` + + // ServerId specifies the ID of the client machine. + // Required if SyncerId is not provided. + ServerId *int64 `json:"serverId,omitempty"` + + // ClientInfo provides a short description of the client (informational only). + ClientInfo *string `json:"clientInfo,omitempty"` +} diff --git a/v2/arangodb/client_replication_impl.go b/v2/arangodb/client_replication_impl.go index 8566f9fa..55abb2b4 100644 --- a/v2/arangodb/client_replication_impl.go +++ b/v2/arangodb/client_replication_impl.go @@ -21,6 +21,7 @@ package arangodb import ( + "bytes" "context" "fmt" "io" @@ -764,3 +765,80 @@ func (c *clientReplication) GetWALLastTick(ctx context.Context, dbName string) ( return WALLastTickResponse{}, response.AsArangoErrorWithCode(code) } } + +func (c *clientReplication) formQueryParamsForTail(params *WALTailOptions) map[string]interface{} { + queryParams := map[string]interface{}{} + if params == nil { + return nil + } + if params.Global != nil { + queryParams["global"] = *params.Global + } + if params.From != nil { + queryParams["from"] = *params.From + } + if params.To != nil { + queryParams["to"] = *params.To + } + if params.LastScanned != nil { + queryParams["lastScanned"] = *params.LastScanned + } + if params.ChunkSize != nil { + queryParams["chunkSize"] = *params.ChunkSize + } + if params.SyncerId != nil { + queryParams["syncerId"] = *params.SyncerId + } + if params.ServerId != nil { + queryParams["serverId"] = *params.ServerId + } + if params.ClientInfo != nil { + queryParams["clientInfo"] = *params.ClientInfo + } + return queryParams +} + +func (c *clientReplication) GetWALTail(ctx context.Context, dbName string, params *WALTailOptions) ([]byte, error) { + + role, err := c.client.ServerRole(ctx) + if err != nil { + return nil, errors.WithStack(err) + } + if role == ServerRoleCoordinator { + return nil, errors.Errorf("replication Tail not supported on role %s", role) + } + + // Build query params + queryParams := c.formQueryParamsForTail(params) + + // Build URL + url := connection.NewUrl("_db", url.PathEscape(dbName), "_api", "wal", "tail") + req, err := c.client.Connection().NewRequest(http.MethodGet, url) + if err != nil { + return nil, err + } + + // Add query params + for k, v := range queryParams { + req.AddQuery(k, fmt.Sprintf("%v", v)) + } + + // Use a bytes.Buffer to capture the response + var buf bytes.Buffer + resp, err := c.client.Connection().Do(ctx, req, &buf, http.StatusOK, http.StatusNoContent) + if err != nil { + return nil, err + } + + fmt.Printf("resp code %d\n", resp.Code()) + + if resp.Code() == http.StatusNoContent { + return nil, nil + } + + if resp.Code() != http.StatusOK { + return nil, (&shared.ResponseStruct{}).AsArangoErrorWithCode(resp.Code()) + } + + return buf.Bytes(), nil +} diff --git a/v2/tests/client_replication_test.go b/v2/tests/client_replication_test.go index 0456db5b..c16d7f76 100644 --- a/v2/tests/client_replication_test.go +++ b/v2/tests/client_replication_test.go @@ -22,6 +22,8 @@ package tests import ( "context" + "fmt" + "strconv" "testing" "time" @@ -98,11 +100,12 @@ func Test_CreateNewBatch(t *testing.T) { time.Sleep(200 * time.Millisecond) // Attempt to dump the collection if serverRole == arangodb.ServerRoleSingle { - _, err := client.Dump(ctx, db.Name(), arangodb.ReplicationDumpParams{ + resp, err := client.Dump(ctx, db.Name(), arangodb.ReplicationDumpParams{ BatchID: batch.ID, Collection: col.Name(), }) require.NoError(t, err) + require.GreaterOrEqual(t, len(resp), 0) } else { t.Skipf("Dump only allowed for single server deployments. This is a %s server", serverRole) } @@ -441,6 +444,56 @@ func Test_GetWALReplicationEndpoints(t *testing.T) { require.NoError(t, err) require.NotNil(t, resp) }) + + WithCollectionV2(t, db, nil, func(coll arangodb.Collection) { + // WAL range before inserts + rangeResp, err := client.GetWALRange(ctx, db.Name()) + require.NoError(t, err) + fromTick, err := strconv.ParseInt(rangeResp.TickMax, 10, 64) + require.NoError(t, err) + t.Logf("Starting fromTick: %d\n", fromTick) + t.Run("Update applier config with out query params", func(t *testing.T) { + resp, err := client.UpdateApplierConfig(ctx, db.Name(), nil, arangodb.ApplierOptions{ + Endpoint: utils.NewType("tcp://127.0.0.1:8529"), + Database: utils.NewType(db.Name()), + Verbose: utils.NewType(true), + }) + require.NoError(t, err) + require.NotNil(t, resp) + }) + t.Run("Applier Start with out query params", func(t *testing.T) { + resp, err := client.ApplierStart(ctx, db.Name(), nil, nil) + require.NoError(t, err) + require.NotNil(t, resp) + }) + // Insert docs + t.Run("Inserting 5 documents", func(t *testing.T) { + for i := 0; i < 5; i++ { + resp, err := coll.CreateDocument(ctx, map[string]string{"foo": fmt.Sprintf("bar-%d", i)}) + require.NoError(t, err) + require.NotNil(t, resp) + } + }) + // Force sync and check WAL range again + time.Sleep(500 * time.Millisecond) // Increase sleep time + t.Run("Get WAL Tail with query params", func(t *testing.T) { + tailResp, err := client.GetWALTail(ctx, db.Name(), + &arangodb.WALTailOptions{ + Global: utils.NewType(true), + From: utils.NewType(fromTick), + ChunkSize: utils.NewType(1024 * 1024), + LastScanned: utils.NewType(0), + }) + require.NoError(t, err) + require.GreaterOrEqual(t, len(tailResp), 0) + }) + + t.Run("Applier Stop with out query params", func(t *testing.T) { + resp, err := client.ApplierStop(ctx, db.Name(), nil) + require.NoError(t, err) + require.NotNil(t, resp) + }) + }) }) }) } From a97756aa47fc55d029feb6cc7649728610b7d63b Mon Sep 17 00:00:00 2001 From: bluepal-prasanthi-moparthi Date: Mon, 1 Sep 2025 18:13:51 +0530 Subject: [PATCH 20/25] Replication: Add revisions tree endpoint --- v2/arangodb/client_replication.go | 5 ++ v2/arangodb/client_replication_impl.go | 82 ++++++++++++++++++++++++++ v2/tests/client_replication_test.go | 48 +++++++++++++++ 3 files changed, 135 insertions(+) diff --git a/v2/arangodb/client_replication.go b/v2/arangodb/client_replication.go index e313fed6..8a9ba0b0 100644 --- a/v2/arangodb/client_replication.go +++ b/v2/arangodb/client_replication.go @@ -22,6 +22,7 @@ package arangodb import ( "context" + "encoding/json" "time" ) @@ -63,6 +64,10 @@ type ClientReplication interface { GetWALLastTick(ctx context.Context, dbName string) (WALLastTickResponse, error) // GetWALTail retrieves the tail of the WAL. GetWALTail(ctx context.Context, dbName string, params *WALTailOptions) ([]byte, error) + // RebuildShardRevisionTree rebuilds the Merkle tree for a shard. + RebuildShardRevisionTree(ctx context.Context, dbName string, shardID ShardID) error + // GetShardRevisionTree retrieves the Merkle tree for a shard. + GetShardRevisionTree(ctx context.Context, dbName string, shardID ShardID, batchId string) (json.RawMessage, error) } // CreateNewBatchOptions represents the request body for creating a batch. diff --git a/v2/arangodb/client_replication_impl.go b/v2/arangodb/client_replication_impl.go index 55abb2b4..438c3cbb 100644 --- a/v2/arangodb/client_replication_impl.go +++ b/v2/arangodb/client_replication_impl.go @@ -23,6 +23,7 @@ package arangodb import ( "bytes" "context" + "encoding/json" "fmt" "io" "net/http" @@ -188,6 +189,9 @@ func (c *clientReplication) DeleteBatch(ctx context.Context, dbName string, DBse params["DBserver"] = *DBserver } + if batchId == "" { + return errors.New("batchId must be specified for delete batch") + } // Build URL url := c.url(dbName, []string{"batch", batchId}, params) @@ -704,6 +708,84 @@ func (c *clientReplication) MakeFollower(ctx context.Context, dbName string, opt } } +// RebuildShardRevisionTree triggers a rebuild of the Merkle tree for a specific shard. +// This API must be called directly against a DBServer (not a Coordinator). +func (c *clientReplication) RebuildShardRevisionTree(ctx context.Context, dbName string, shardID ShardID) error { + // Ensure we are on a DBServer + role, err := c.client.ServerRole(ctx) + if err != nil { + return errors.WithStack(err) + } + if role != ServerRoleDBServer { + return fmt.Errorf("rebuild revision tree is only supported on DBServers, got role=%s", role) + } + + if shardID == "" { + return RequiredFieldError("shardID") + } + + // Build URL + queryParams := map[string]interface{}{ + "collection": shardID, + } + url := c.url(dbName, []string{"revisions", "tree"}, queryParams) + + var response struct { + shared.ResponseStruct `json:",inline"` + } + + resp, err := connection.CallPost(ctx, c.client.connection, url, &response, nil) + if err != nil { + return errors.WithStack(err) + } + + if resp.Code() == http.StatusNoContent { + return nil + } + return response.AsArangoErrorWithCode(resp.Code()) +} + +func (c *clientReplication) GetShardRevisionTree(ctx context.Context, dbName string, shardID ShardID, batchId string) (json.RawMessage, error) { + role, err := c.client.ServerRole(ctx) + if err != nil { + return nil, errors.WithStack(err) + } + if role != ServerRoleDBServer { + return nil, fmt.Errorf("get revision tree is only supported on DBServers, got role=%s", role) + } + + if shardID == "" { + return nil, RequiredFieldError("shardID") + } + if batchId == "" { + return nil, RequiredFieldError("batchId") + } + + queryParams := map[string]interface{}{ + "collection": shardID, + "batchId": batchId, + } + + url := c.url(dbName, []string{"revisions", "tree"}, queryParams) + + var response struct { + shared.ResponseStruct `json:",inline"` + RevisionTree json.RawMessage `json:"revisionTree,omitempty"` + } + + resp, err := connection.CallGet(ctx, c.client.connection, url, &response) + if err != nil { + return nil, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.RevisionTree, nil + default: + return nil, response.AsArangoErrorWithCode(code) + } +} + func (c *clientReplication) GetWALRange(ctx context.Context, dbName string) (WALRangeResponse, error) { // Check server role serverRole, err := c.client.ServerRole(ctx) diff --git a/v2/tests/client_replication_test.go b/v2/tests/client_replication_test.go index c16d7f76..3ec04fc1 100644 --- a/v2/tests/client_replication_test.go +++ b/v2/tests/client_replication_test.go @@ -23,6 +23,7 @@ package tests import ( "context" "fmt" + "os" "strconv" "testing" "time" @@ -497,3 +498,50 @@ func Test_GetWALReplicationEndpoints(t *testing.T) { }) }) } + +func Test_RebuildShardRevisionTree(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + if os.Getenv("TEST_CONNECTION") == "vst" { + skipBelowVersion(client, ctx, "3.8", t) + } + + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole is %s\n", serverRole) + + if serverRole != arangodb.ServerRoleDBServer { + t.Skipf("Not supported on role: %s", serverRole) + } + + db, err := client.GetDatabase(ctx, "_system", nil) + require.NoError(t, err) + + WithCollectionV2(t, db, nil, func(col arangodb.Collection) { + docs := []map[string]interface{}{ + {"_key": "doc1", "name": "Alice"}, + {"_key": "doc2", "name": "Bob"}, + {"_key": "doc3", "name": "Charlie"}, + } + for _, doc := range docs { + resp, err := col.CreateDocument(ctx, doc) + require.NoError(t, err) + require.NotNil(t, resp) + } + + var shardId arangodb.ShardID + shards, err := col.Shards(ctx, true) + require.NoError(t, err) + require.NotNil(t, shards) + + for existingShardId := range shards.Shards { + shardId = existingShardId + break + } + + err = client.RebuildShardRevisionTree(ctx, db.Name(), shardId) + require.NoError(t, err) + }) + }) + }) +} From 19e36b1c992fb94d427fccaa34e1c91609605009 Mon Sep 17 00:00:00 2001 From: bluepal-prasanthi-moparthi Date: Mon, 1 Sep 2025 20:06:52 +0530 Subject: [PATCH 21/25] Replication: Add endpoint to list document revisions in range --- v2/arangodb/client_replication.go | 10 ++++ v2/arangodb/client_replication_impl.go | 53 ++++++++++++++++++++ v2/tests/client_replication_test.go | 68 ++++++++++++++++++++++++++ 3 files changed, 131 insertions(+) diff --git a/v2/arangodb/client_replication.go b/v2/arangodb/client_replication.go index 8a9ba0b0..4d8d0c0d 100644 --- a/v2/arangodb/client_replication.go +++ b/v2/arangodb/client_replication.go @@ -68,6 +68,8 @@ type ClientReplication interface { RebuildShardRevisionTree(ctx context.Context, dbName string, shardID ShardID) error // GetShardRevisionTree retrieves the Merkle tree for a shard. GetShardRevisionTree(ctx context.Context, dbName string, shardID ShardID, batchId string) (json.RawMessage, error) + // ListDocumentRevisionsInRange retrieves documents by their revision IDs. + ListDocumentRevisionsInRange(ctx context.Context, dbName string, queryParams RevisionQueryParams, opts [][2]string) ([][2]string, error) } // CreateNewBatchOptions represents the request body for creating a batch. @@ -612,3 +614,11 @@ type WALTailOptions struct { // ClientInfo provides a short description of the client (informational only). ClientInfo *string `json:"clientInfo,omitempty"` } + +type RevisionQueryParams struct { + // Collection Name + Collection string `json:"collection"` + BatchId string `json:"batchId"` + // The revisionId at which to resume, if a previous request was truncated + Resume *string `json:"resume,omitempty"` +} diff --git a/v2/arangodb/client_replication_impl.go b/v2/arangodb/client_replication_impl.go index 438c3cbb..f3ad531d 100644 --- a/v2/arangodb/client_replication_impl.go +++ b/v2/arangodb/client_replication_impl.go @@ -786,6 +786,59 @@ func (c *clientReplication) GetShardRevisionTree(ctx context.Context, dbName str } } +func (c *clientReplication) checkRevisionQueryParams(queryParams RevisionQueryParams) (map[string]interface{}, error) { + params := map[string]interface{}{} + if queryParams.Collection == "" { + return nil, RequiredFieldError("collection") + } + if queryParams.BatchId == "" { + return nil, RequiredFieldError("batchId") + } + if queryParams.Resume != nil { + params["resume"] = *queryParams.Resume + } + params["collection"] = queryParams.Collection + params["batchId"] = queryParams.BatchId + return params, nil +} + +func (c *clientReplication) ListDocumentRevisionsInRange(ctx context.Context, dbName string, queryParams RevisionQueryParams, opts [][2]string) ([][2]string, error) { + + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return nil, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + return nil, errors.New("WAL range is not supported on Coordinators") + } + params, err := c.checkRevisionQueryParams(queryParams) + if err != nil { + return nil, err + } + // Build URL + + url := c.url(dbName, []string{"revisions", "ranges"}, params) + + var response struct { + shared.ResponseStruct `json:",inline"` + Ranges [][2]string `json:"ranges,omitempty"` + } + + resp, err := connection.CallPut(ctx, c.client.connection, url, &response, opts) + if err != nil { + return nil, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.Ranges, nil + default: + return nil, response.AsArangoErrorWithCode(code) + } +} + func (c *clientReplication) GetWALRange(ctx context.Context, dbName string) (WALRangeResponse, error) { // Check server role serverRole, err := c.client.ServerRole(ctx) diff --git a/v2/tests/client_replication_test.go b/v2/tests/client_replication_test.go index 3ec04fc1..303c0585 100644 --- a/v2/tests/client_replication_test.go +++ b/v2/tests/client_replication_test.go @@ -545,3 +545,71 @@ func Test_RebuildShardRevisionTree(t *testing.T) { }) }) } +func Test_ListDocumentRevisionsInRange(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + if os.Getenv("TEST_CONNECTION") == "vst" { + skipBelowVersion(client, ctx, "3.8", t) + } + + db, err := client.GetDatabase(ctx, "_system", nil) + require.NoError(t, err) + + WithCollectionV2(t, db, nil, func(col arangodb.Collection) { + var revs []string + // Insert documents + for i := 1; i <= 10; i++ { + doc := map[string]interface{}{ + "_key": fmt.Sprintf("doc%d", i), + "name": fmt.Sprintf("User %d", i), + } + resp, err := col.CreateDocument(ctx, doc) + require.NoError(t, err) + require.NotNil(t, resp) + revs = append(revs, resp.Rev) + } + require.NotEmpty(t, revs) + time.Sleep(500 * time.Millisecond) + + // Determine DBServer for cluster + var dbServer *string + state := utils.NewType(true) + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole: %s", serverRole) + + if serverRole == arangodb.ServerRoleCoordinator { + clusterHealth, err := client.Health(ctx) + require.NoError(t, err) + for id, db := range clusterHealth.Health { + if db.Role == arangodb.ServerRoleDBServer { + s := string(id) + dbServer = &s + break + } + } + } + + // Create a replication batch + batch, err := client.CreateNewBatch(ctx, db.Name(), dbServer, state, arangodb.CreateNewBatchOptions{Ttl: 300}) + require.NoError(t, err) + require.NotNil(t, batch) + require.NotEmpty(t, batch.ID) + + // Prepare pairs for ListDocumentRevisionsInRange + var opts [][2]string + for i := 0; i < len(revs)-1; i++ { + opts = append(opts, [2]string{revs[i], revs[i+1]}) + } + + // Call ListDocumentRevisionsInRange + revIds, err := client.ListDocumentRevisionsInRange(ctx, db.Name(), arangodb.RevisionQueryParams{ + BatchId: batch.ID, + Collection: col.Name(), + }, opts) + require.NoError(t, err) + require.NotNil(t, revIds) + }) + }) + }) +} From ef0909b9b30c18bf480c7f8ebff8a21c9ece4dba Mon Sep 17 00:00:00 2001 From: bluepal-prasanthi-moparthi Date: Tue, 2 Sep 2025 12:54:33 +0530 Subject: [PATCH 22/25] Repliation: Add endpoint to fetch revision documents --- v2/arangodb/client_replication.go | 2 + v2/arangodb/client_replication_impl.go | 38 +++++++++- v2/tests/client_replication_test.go | 96 ++++++++++++++++++++------ 3 files changed, 114 insertions(+), 22 deletions(-) diff --git a/v2/arangodb/client_replication.go b/v2/arangodb/client_replication.go index 4d8d0c0d..03036b6e 100644 --- a/v2/arangodb/client_replication.go +++ b/v2/arangodb/client_replication.go @@ -70,6 +70,8 @@ type ClientReplication interface { GetShardRevisionTree(ctx context.Context, dbName string, shardID ShardID, batchId string) (json.RawMessage, error) // ListDocumentRevisionsInRange retrieves documents by their revision IDs. ListDocumentRevisionsInRange(ctx context.Context, dbName string, queryParams RevisionQueryParams, opts [][2]string) ([][2]string, error) + // FetchRevisionDocuments retrieves documents by their revision IDs. + FetchRevisionDocuments(ctx context.Context, dbName string, queryParams RevisionQueryParams, opts []string) ([]map[string]interface{}, error) } // CreateNewBatchOptions represents the request body for creating a batch. diff --git a/v2/arangodb/client_replication_impl.go b/v2/arangodb/client_replication_impl.go index f3ad531d..1b1e7920 100644 --- a/v2/arangodb/client_replication_impl.go +++ b/v2/arangodb/client_replication_impl.go @@ -811,7 +811,7 @@ func (c *clientReplication) ListDocumentRevisionsInRange(ctx context.Context, db return nil, errors.WithStack(err) } if serverRole == ServerRoleCoordinator { - return nil, errors.New("WAL range is not supported on Coordinators") + return nil, errors.New("replication revisions range is not supported on Coordinators") } params, err := c.checkRevisionQueryParams(queryParams) if err != nil { @@ -839,6 +839,42 @@ func (c *clientReplication) ListDocumentRevisionsInRange(ctx context.Context, db } } +func (c *clientReplication) FetchRevisionDocuments(ctx context.Context, dbName string, queryParams RevisionQueryParams, opts []string) ([]map[string]interface{}, error) { + + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return nil, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + return nil, errors.New("replication revisions documents is not supported on Coordinators") + } + params, err := c.checkRevisionQueryParams(queryParams) + if err != nil { + return nil, err + } + + // Build URL + url := c.url(dbName, []string{"revisions", "documents"}, params) + + var response []map[string]interface{} + + resp, err := connection.CallPut(ctx, c.client.connection, url, &response, opts) + if err != nil { + return nil, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response, nil + default: + return nil, (&shared.ResponseStruct{}).AsArangoErrorWithCode(resp.Code()) + } +} + +//startReplicationSync + func (c *clientReplication) GetWALRange(ctx context.Context, dbName string) (WALRangeResponse, error) { // Check server role serverRole, err := c.client.ServerRole(ctx) diff --git a/v2/tests/client_replication_test.go b/v2/tests/client_replication_test.go index 303c0585..fac86e3f 100644 --- a/v2/tests/client_replication_test.go +++ b/v2/tests/client_replication_test.go @@ -502,10 +502,12 @@ func Test_GetWALReplicationEndpoints(t *testing.T) { func Test_RebuildShardRevisionTree(t *testing.T) { Wrap(t, func(t *testing.T, client arangodb.Client) { withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + // Version checking if os.Getenv("TEST_CONNECTION") == "vst" { skipBelowVersion(client, ctx, "3.8", t) } + // Role check serverRole, err := client.ServerRole(ctx) require.NoError(t, err) t.Logf("ServerRole is %s\n", serverRole) @@ -514,10 +516,12 @@ func Test_RebuildShardRevisionTree(t *testing.T) { t.Skipf("Not supported on role: %s", serverRole) } + // Check for DB existence db, err := client.GetDatabase(ctx, "_system", nil) require.NoError(t, err) WithCollectionV2(t, db, nil, func(col arangodb.Collection) { + // Insert documents docs := []map[string]interface{}{ {"_key": "doc1", "name": "Alice"}, {"_key": "doc2", "name": "Bob"}, @@ -538,7 +542,7 @@ func Test_RebuildShardRevisionTree(t *testing.T) { shardId = existingShardId break } - + // Call Rebuild Shard Revision Tree err = client.RebuildShardRevisionTree(ctx, db.Name(), shardId) require.NoError(t, err) }) @@ -548,10 +552,20 @@ func Test_RebuildShardRevisionTree(t *testing.T) { func Test_ListDocumentRevisionsInRange(t *testing.T) { Wrap(t, func(t *testing.T, client arangodb.Client) { withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + // Version checking if os.Getenv("TEST_CONNECTION") == "vst" { skipBelowVersion(client, ctx, "3.8", t) } + // Role check + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole: %s", serverRole) + if serverRole == arangodb.ServerRoleCoordinator { + t.Skipf("Not supported on Coordinators (role: %s)", serverRole) + } + + // Check for DB existence db, err := client.GetDatabase(ctx, "_system", nil) require.NoError(t, err) @@ -571,27 +585,9 @@ func Test_ListDocumentRevisionsInRange(t *testing.T) { require.NotEmpty(t, revs) time.Sleep(500 * time.Millisecond) - // Determine DBServer for cluster - var dbServer *string - state := utils.NewType(true) - serverRole, err := client.ServerRole(ctx) - require.NoError(t, err) - t.Logf("ServerRole: %s", serverRole) - - if serverRole == arangodb.ServerRoleCoordinator { - clusterHealth, err := client.Health(ctx) - require.NoError(t, err) - for id, db := range clusterHealth.Health { - if db.Role == arangodb.ServerRoleDBServer { - s := string(id) - dbServer = &s - break - } - } - } - // Create a replication batch - batch, err := client.CreateNewBatch(ctx, db.Name(), dbServer, state, arangodb.CreateNewBatchOptions{Ttl: 300}) + state := utils.NewType(true) + batch, err := client.CreateNewBatch(ctx, db.Name(), nil, state, arangodb.CreateNewBatchOptions{Ttl: 300}) require.NoError(t, err) require.NotNil(t, batch) require.NotEmpty(t, batch.ID) @@ -613,3 +609,61 @@ func Test_ListDocumentRevisionsInRange(t *testing.T) { }) }) } + +func Test_FetchRevisionDocuments(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + // Version checking + if os.Getenv("TEST_CONNECTION") == "vst" { + skipBelowVersion(client, ctx, "3.8", t) + } + + // Role check + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole: %s", serverRole) + + if serverRole == arangodb.ServerRoleCoordinator { + t.Skipf("Not supported on Coordinators (role: %s)", serverRole) + } + + // Check for DB existence + db, err := client.GetDatabase(ctx, "_system", nil) + require.NoError(t, err) + + WithCollectionV2(t, db, nil, func(col arangodb.Collection) { + var revs []string + // Insert documents + docs := []map[string]interface{}{} + docs = append(docs, map[string]interface{}{"_key": "doc1", "name": "Alice"}) + docs = append(docs, map[string]interface{}{"_key": "doc2", "subjects": []string{"English", "Maths"}}) + docs = append(docs, map[string]interface{}{"_key": "doc3", "age": 30, "active": true}) + docs = append(docs, map[string]interface{}{"_key": "doc4", "profile": map[string]interface{}{"city": "Berlin", "country": "Germany"}}) + + for _, doc := range docs { + resp, err := col.CreateDocument(ctx, doc) + require.NoError(t, err) + require.NotNil(t, resp) + revs = append(revs, resp.Rev) + } + require.NotEmpty(t, revs) + time.Sleep(500 * time.Millisecond) + + // Create a replication batch + state := utils.NewType(true) + batch, err := client.CreateNewBatch(ctx, db.Name(), nil, state, arangodb.CreateNewBatchOptions{Ttl: 300}) + require.NoError(t, err) + require.NotNil(t, batch) + require.NotEmpty(t, batch.ID) + + // Call FetchRevisionDocuments + revDocs, err := client.FetchRevisionDocuments(ctx, db.Name(), arangodb.RevisionQueryParams{ + BatchId: batch.ID, + Collection: col.Name(), + }, revs) + require.NoError(t, err) + require.NotNil(t, revDocs) + }) + }) + }) +} From c2a13b3275879f29a9e7767cb13eb0c38067ea68 Mon Sep 17 00:00:00 2001 From: bluepal-prasanthi-moparthi Date: Tue, 2 Sep 2025 15:59:54 +0530 Subject: [PATCH 23/25] Replication: Add endpoint to ttart replication from a remote endpoint --- v2/arangodb/client_replication.go | 57 +++++++++++++++++++++ v2/arangodb/client_replication_impl.go | 68 +++++++++++++++++++++++++- v2/tests/client_replication_test.go | 44 +++++++++++++++++ 3 files changed, 168 insertions(+), 1 deletion(-) diff --git a/v2/arangodb/client_replication.go b/v2/arangodb/client_replication.go index 03036b6e..2c00c9a7 100644 --- a/v2/arangodb/client_replication.go +++ b/v2/arangodb/client_replication.go @@ -72,6 +72,8 @@ type ClientReplication interface { ListDocumentRevisionsInRange(ctx context.Context, dbName string, queryParams RevisionQueryParams, opts [][2]string) ([][2]string, error) // FetchRevisionDocuments retrieves documents by their revision IDs. FetchRevisionDocuments(ctx context.Context, dbName string, queryParams RevisionQueryParams, opts []string) ([]map[string]interface{}, error) + // StartReplicationSync starts the replication synchronization process. + StartReplicationSync(ctx context.Context, dbName string, opts ReplicationSyncOptions) (ReplicationSyncResult, error) } // CreateNewBatchOptions represents the request body for creating a batch. @@ -624,3 +626,58 @@ type RevisionQueryParams struct { // The revisionId at which to resume, if a previous request was truncated Resume *string `json:"resume,omitempty"` } + +// ReplicationSyncResult is the response format from /_api/replication/sync +type ReplicationSyncResult struct { + // Collections is an array of collections that were synchronized + // from the leader to the local database. + Collections []map[string]interface{} `json:"collections"` + + // LastLogTick is the log tick value from the leader at the time + // the synchronization started. This value can be used later as + // the "from" tick when setting up continuous replication. + LastLogTick string `json:"lastLogTick"` +} + +// ReplicationSyncOptions holds optional parameters for sync +type ReplicationSyncOptions struct { + // Database specifies the name of the leader database to replicate from. + // If not provided, defaults to the current database. + Database *string `json:"database,omitempty"` + + // Endpoint is the leader endpoint (e.g., "http+tcp://192.168.1.65:8529"). + // This field is required. + Endpoint string `json:"endpoint,omitempty"` + + // Username is the login name used for authentication on the leader endpoint. + Username string `json:"username,omitempty"` + + // Password is the corresponding password used for authentication + // on the leader endpoint. + Password string `json:"password,omitempty"` + + // IncludeSystem determines whether system collections should be included + // in the synchronization. Defaults to false. + IncludeSystem *bool `json:"includeSystem,omitempty"` + + // Incremental indicates if incremental synchronization should be used. + // When true, only differences are transferred, which is faster if the + // local database already has some of the leader’s data. + // When false (default), a full synchronization is performed. + Incremental *bool `json:"incremental,omitempty"` + + // RestrictType controls how RestrictCollections is applied. + // Possible values: + // - "include": only collections listed in RestrictCollections are replicated. + // - "exclude": all collections except those listed are replicated. + RestrictType *string `json:"restrictType,omitempty"` + + // RestrictCollections is the list of collections used together with + // RestrictType to filter which collections are synchronized. + RestrictCollections *[]string `json:"restrictCollections,omitempty"` + + // InitialSyncMaxWaitSec specifies the maximum wait time (in seconds) + // the synchronization will wait for a response from the leader during + // initial collection data fetch. A value of 0 disables the wait time limit. + InitialSyncMaxWaitSec int `json:"initialSyncMaxWaitSec,omitempty"` +} diff --git a/v2/arangodb/client_replication_impl.go b/v2/arangodb/client_replication_impl.go index 1b1e7920..de3bd898 100644 --- a/v2/arangodb/client_replication_impl.go +++ b/v2/arangodb/client_replication_impl.go @@ -873,7 +873,73 @@ func (c *clientReplication) FetchRevisionDocuments(ctx context.Context, dbName s } } -//startReplicationSync +func (c *clientReplication) formSyncBodyParams(opts ReplicationSyncOptions) (map[string]interface{}, error) { + params := map[string]interface{}{} + if opts.Endpoint == "" { + return nil, RequiredFieldError("endpoint") + } + params["endpoint"] = opts.Endpoint + if opts.Database != nil && *opts.Database != "" { + params["database"] = *opts.Database + } + if opts.Username != "" { + params["username"] = opts.Username + } + if opts.Password != "" { + params["password"] = opts.Password + } + if opts.IncludeSystem != nil { + params["includeSystem"] = *opts.IncludeSystem + } + if opts.Incremental != nil { + params["incremental"] = *opts.Incremental + } + if opts.RestrictType != nil && *opts.RestrictType != "" { + params["restrictType"] = *opts.RestrictType + } + if opts.RestrictCollections != nil && len(*opts.RestrictCollections) > 0 { + params["restrictCollections"] = *opts.RestrictCollections + } + params["initialSyncMaxWaitTime"] = opts.InitialSyncMaxWaitSec + return params, nil +} + +func (c *clientReplication) StartReplicationSync(ctx context.Context, dbName string, opts ReplicationSyncOptions) (ReplicationSyncResult, error) { + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return ReplicationSyncResult{}, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + return ReplicationSyncResult{}, errors.New("replication sync is not supported on Coordinators") + } + // Form request body params + body, err := c.formSyncBodyParams(opts) + if err != nil { + return ReplicationSyncResult{}, err + } + + // Build URL + url := c.url(dbName, []string{"sync"}, nil) + + var response struct { + shared.ResponseStruct `json:",inline"` + ReplicationSyncResult `json:",inline"` + } + + resp, err := connection.CallPut(ctx, c.client.connection, url, &response, body) + if err != nil { + return ReplicationSyncResult{}, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.ReplicationSyncResult, nil + default: + return ReplicationSyncResult{}, response.AsArangoErrorWithCode(code) + } +} func (c *clientReplication) GetWALRange(ctx context.Context, dbName string) (WALRangeResponse, error) { // Check server role diff --git a/v2/tests/client_replication_test.go b/v2/tests/client_replication_test.go index fac86e3f..ae60916f 100644 --- a/v2/tests/client_replication_test.go +++ b/v2/tests/client_replication_test.go @@ -667,3 +667,47 @@ func Test_FetchRevisionDocuments(t *testing.T) { }) }) } + +func Test_StartReplicationSync(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + // Version checking + if os.Getenv("TEST_CONNECTION") == "vst" { + skipBelowVersion(client, ctx, "3.8", t) + } + + // Role check + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole: %s", serverRole) + + if serverRole == arangodb.ServerRoleCoordinator || serverRole == arangodb.ServerRoleSingle { + t.Skipf("Replication sync not supported on role: %s", serverRole) + } + + // Check for DB existence + db, err := client.GetDatabase(ctx, "_system", nil) + require.NoError(t, err) + + opts := arangodb.ReplicationSyncOptions{ + Endpoint: "http+tcp://127.0.0.1:8529", + Username: "root", + // Password: "passwd", + // RestrictType: utils.NewType("include"), + // RestrictCollections: utils.NewType([]string{"airports"}), + } + + result, err := client.StartReplicationSync(ctx, db.Name(), opts) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.Collections) == 0 { + t.Errorf("expected collections in result") + } + if result.LastLogTick == "" { + t.Errorf("expected lastLogTick in result") + } + }) + }) +} From 8b60f122242e13b1f35f6e43ee4cefba9ffc9d58 Mon Sep 17 00:00:00 2001 From: bluepal-prasanthi-moparthi Date: Tue, 2 Sep 2025 16:03:53 +0530 Subject: [PATCH 24/25] Add note in CHNAGELOG file --- v2/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/v2/CHANGELOG.md b/v2/CHANGELOG.md index bf1b2b5b..c377d572 100644 --- a/v2/CHANGELOG.md +++ b/v2/CHANGELOG.md @@ -9,6 +9,7 @@ - Add SSO auth token implementation - Add missing endpoints from foxx to v2 - Switch to Go 1.23.12 +- Add missing endpoints from replication to v2 ## [2.1.3](https://github.com/arangodb/go-driver/tree/v2.1.3) (2025-02-21) - Switch to Go 1.22.11 From b9e27110ffa96bb07d0fdbfc77ef8ae60b677114 Mon Sep 17 00:00:00 2001 From: bluepal-prasanthi-moparthi Date: Fri, 5 Sep 2025 13:50:32 +0530 Subject: [PATCH 25/25] Resolved merge conflicts --- v2/CHANGELOG.md | 2 +- v2/arangodb/client_replication_impl.go | 2 -- v2/tests/client_replication_test.go | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/v2/CHANGELOG.md b/v2/CHANGELOG.md index c377d572..1068504e 100644 --- a/v2/CHANGELOG.md +++ b/v2/CHANGELOG.md @@ -1,6 +1,7 @@ # Change Log ## [master](https://github.com/arangodb/go-driver/tree/master) (N/A) +- Add missing endpoints from replication to v2 ## [2.1.5](https://github.com/arangodb/go-driver/tree/v2.1.5) (2025-08-31) - Add tasks endpoints to v2 @@ -9,7 +10,6 @@ - Add SSO auth token implementation - Add missing endpoints from foxx to v2 - Switch to Go 1.23.12 -- Add missing endpoints from replication to v2 ## [2.1.3](https://github.com/arangodb/go-driver/tree/v2.1.3) (2025-02-21) - Switch to Go 1.22.11 diff --git a/v2/arangodb/client_replication_impl.go b/v2/arangodb/client_replication_impl.go index de3bd898..38514be3 100644 --- a/v2/arangodb/client_replication_impl.go +++ b/v2/arangodb/client_replication_impl.go @@ -1067,8 +1067,6 @@ func (c *clientReplication) GetWALTail(ctx context.Context, dbName string, param return nil, err } - fmt.Printf("resp code %d\n", resp.Code()) - if resp.Code() == http.StatusNoContent { return nil, nil } diff --git a/v2/tests/client_replication_test.go b/v2/tests/client_replication_test.go index ae60916f..73e53e64 100644 --- a/v2/tests/client_replication_test.go +++ b/v2/tests/client_replication_test.go @@ -342,7 +342,7 @@ func Test_ApplierStart(t *testing.T) { require.NoError(t, err) require.NotNil(t, resp) // Log useful debug info - t.Logf("Applier start 346:\n running=%v\n phase=%s\n message=%s\n failedConnects=%d", + t.Logf("Applier start:\n running=%v\n phase=%s\n message=%s\n failedConnects=%d", *resp.State.Running, *resp.State.Phase, *resp.State.Progress.Message,