diff --git a/v2/CHANGELOG.md b/v2/CHANGELOG.md index bf1b2b5b..1068504e 100644 --- a/v2/CHANGELOG.md +++ b/v2/CHANGELOG.md @@ -1,6 +1,7 @@ # Change Log ## [master](https://github.com/arangodb/go-driver/tree/master) (N/A) +- Add missing endpoints from replication to v2 ## [2.1.5](https://github.com/arangodb/go-driver/tree/v2.1.5) (2025-08-31) - Add tasks endpoints to v2 diff --git a/v2/arangodb/client.go b/v2/arangodb/client.go index ad52721e..7c046548 100644 --- a/v2/arangodb/client.go +++ b/v2/arangodb/client.go @@ -37,4 +37,5 @@ type Client interface { ClientAsyncJob ClientFoxx ClientTasks + ClientReplication } diff --git a/v2/arangodb/client_impl.go b/v2/arangodb/client_impl.go index cbb3dad4..01968e23 100644 --- a/v2/arangodb/client_impl.go +++ b/v2/arangodb/client_impl.go @@ -40,6 +40,7 @@ func newClient(connection connection.Connection) *client { c.clientAsyncJob = newClientAsyncJob(c) c.clientFoxx = newClientFoxx(c) c.clientTask = newClientTask(c) + c.clientReplication = newClientReplication(c) c.Requests = NewRequests(connection) @@ -58,6 +59,7 @@ type client struct { *clientAsyncJob *clientFoxx *clientTask + *clientReplication Requests } diff --git a/v2/arangodb/client_replication.go b/v2/arangodb/client_replication.go new file mode 100644 index 00000000..2c00c9a7 --- /dev/null +++ b/v2/arangodb/client_replication.go @@ -0,0 +1,683 @@ +// +// DISCLAIMER +// +// Copyright 2023-2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package arangodb + +import ( + "context" + "encoding/json" + "time" +) + +// ClientReplication defines replication API methods. +type ClientReplication interface { + // CreateNewBatch creates a new replication batch. + CreateNewBatch(ctx context.Context, dbName string, DBserver *string, state *bool, opt CreateNewBatchOptions) (CreateNewBatchResponse, error) + // GetInventory retrieves the inventory of a replication batch. + GetInventory(ctx context.Context, dbName string, params InventoryQueryParams) (InventoryResponse, error) + // DeleteBatch deletes a replication batch. + DeleteBatch(ctx context.Context, dbName string, DBserver *string, batchId string) error + // ExtendBatch extends the TTL of a replication batch. + ExtendBatch(ctx context.Context, dbName string, DBserver *string, batchId string, opt CreateNewBatchOptions) error + // Dump retrieves a chunk of data from a collection in a replication batch. + Dump(ctx context.Context, dbName string, params ReplicationDumpParams) ([]byte, error) + // LoggerState retrieves the state of the replication logger. + LoggerState(ctx context.Context, dbName string, DBserver *string) (LoggerStateResponse, error) + // LoggerFirstTick retrieves the first tick of the replication logger. + LoggerFirstTick(ctx context.Context, dbName string) (LoggerFirstTickResponse, error) + // LoggerTickRange retrieves the currently available ranges of tick values for all currently available WAL logfiles. + LoggerTickRange(ctx context.Context, dbName string) ([]LoggerTickRangeResponseObj, error) + // GetApplierConfig retrieves the configuration of the replication applier. + GetApplierConfig(ctx context.Context, dbName string, global *bool) (ApplierConfigResponse, error) + // UpdateApplierConfig updates the configuration of the replication applier. + UpdateApplierConfig(ctx context.Context, dbName string, global *bool, opts ApplierOptions) (ApplierConfigResponse, error) + // ApplierStart starts the replication applier. + ApplierStart(ctx context.Context, dbName string, global *bool, from *string) (ApplierStateResp, error) + // ApplierStop stops the replication applier. + ApplierStop(ctx context.Context, dbName string, global *bool) (ApplierStateResp, error) + // GetApplierState retrieves the state of the replication applier. + GetApplierState(ctx context.Context, dbName string, global *bool) (ApplierStateResp, error) + // GetReplicationServerId retrieves the server ID used for replication. + GetReplicationServerId(ctx context.Context, dbName string) (string, error) + // MakeFollower makes the current server a follower of the specified leader. + MakeFollower(ctx context.Context, dbName string, opts ApplierOptions) (ApplierStateResp, error) + // GetWALRange retrieves the WAL range information. + GetWALRange(ctx context.Context, dbName string) (WALRangeResponse, error) + // GetWALLastTick retrieves the last available tick information. + GetWALLastTick(ctx context.Context, dbName string) (WALLastTickResponse, error) + // GetWALTail retrieves the tail of the WAL. + GetWALTail(ctx context.Context, dbName string, params *WALTailOptions) ([]byte, error) + // RebuildShardRevisionTree rebuilds the Merkle tree for a shard. + RebuildShardRevisionTree(ctx context.Context, dbName string, shardID ShardID) error + // GetShardRevisionTree retrieves the Merkle tree for a shard. + GetShardRevisionTree(ctx context.Context, dbName string, shardID ShardID, batchId string) (json.RawMessage, error) + // ListDocumentRevisionsInRange retrieves documents by their revision IDs. + ListDocumentRevisionsInRange(ctx context.Context, dbName string, queryParams RevisionQueryParams, opts [][2]string) ([][2]string, error) + // FetchRevisionDocuments retrieves documents by their revision IDs. + FetchRevisionDocuments(ctx context.Context, dbName string, queryParams RevisionQueryParams, opts []string) ([]map[string]interface{}, error) + // StartReplicationSync starts the replication synchronization process. + StartReplicationSync(ctx context.Context, dbName string, opts ReplicationSyncOptions) (ReplicationSyncResult, error) +} + +// CreateNewBatchOptions represents the request body for creating a batch. +type CreateNewBatchOptions struct { + Ttl int `json:"ttl"` +} + +// CreateNewBatchResponse represents the response for batch creation. +type CreateNewBatchResponse struct { + // The ID of the created batch + ID string `json:"id"` + // The last tick of the created batch + LastTick string `json:"lastTick"` + // Only present if the state URL parameter was set to true + State map[string]interface{} `json:"state,omitempty"` +} + +// InventoryQueryParams represents the query parameters for the replication inventory API. +type InventoryQueryParams struct { + // IncludeSystem indicates whether to include system collections in the inventory. + IncludeSystem *bool `json:"includeSystem,omitempty"` + // Global indicates whether to return global inventory or not. + // If true, the inventory will include all collections across all DBServers. + Global *bool `json:"global,omitempty"` + // BatchID is the ID of the replication batch to query. + BatchID string `json:"batchId"` + // Collection is the name of the collection to restrict inventory to. + Collection *string `json:"collection,omitempty"` + + // Only for Coordinators + // Restrict to a specific DBserver in cluster + DBserver *string `json:"DBserver,omitempty"` +} + +// InventoryResponse represents the full response from the replication inventory API. +type InventoryResponse struct { + // Collections is the list of collections in the inventory. + Collections []CollectionsInventoryResponse `json:"collections,omitempty"` + // Database properties. + Properties PropertiesInventoryResponse `json:"properties,omitempty"` + // Views present in the database. + Views []ViewInventoryResponse `json:"views,omitempty"` + // Replication state information. + State StateInventoryResponse `json:"state,omitempty"` + // Last log tick at the time of inventory. + Tick *string `json:"tick,omitempty"` +} + +// CollectionsInventoryResponse represents a collection entry in the inventory. +type CollectionsInventoryResponse struct { + // Indexes defined on the collection. + // Note: Primary indexes and edge indexes are not included in this array. + Indexes []IndexesInventoryResponse `json:"indexes,omitempty"` + // Collection properties and metadata. + Parameters ParametersInventoryResponse `json:"parameters,omitempty"` +} + +// ParametersInventoryResponse represents metadata and settings of a collection. +type ParametersInventoryResponse struct { + // Reusable basic properties like ID and Name + BasicProperties + // AllowUserKeys indicates whether user keys are allowed. + AllowUserKeys *bool `json:"allowUserKeys,omitempty"` + // CacheEnabled indicates whether in-memory cache is enabled. + CacheEnabled *bool `json:"cacheEnabled,omitempty"` + // Cid is the collection ID. + Cid *string `json:"cid,omitempty"` + // ComputedValues holds the computed values for the collection. + ComputedValues interface{} `json:"computedValues,omitempty"` + // Deleted indicates whether the collection is deleted. + Deleted *bool `json:"deleted,omitempty"` + // GloballyUniqueId is the globally unique identifier for the collection. + GloballyUniqueId *string `json:"globallyUniqueId,omitempty"` + // InternalValidatorType is the internal validator type. + InternalValidatorType *int `json:"internalValidatorType,omitempty"` + // IsDisjoint indicates whether disjoint smart graphs are used. + IsDisjoint *bool `json:"isDisjoint,omitempty"` + // IsSmart indicates whether the collection is a smart graph collection. + IsSmart *bool `json:"isSmart,omitempty"` + // IsSmartChild indicates whether the collection this is a child shard of a smart graph. + IsSmartChild *bool `json:"isSmartChild,omitempty"` + // IsSystem indicates whether the collection is a system collection. + IsSystem *bool `json:"isSystem,omitempty"` + // KeyOptions defines the key generation options for the collection. + KeyOptions *KeyOpts `json:"keyOptions,omitempty"` + // MinReplicationFactor defines the minimum replication factor for the collection. + MinReplicationFactor *int `json:"minReplicationFactor,omitempty"` + // NumberOfShards defines the number of shards for the collection. + NumberOfShards *int `json:"numberOfShards,omitempty"` + // PlanId is the plan ID for the collection. + PlanId *string `json:"planId,omitempty"` + // ReplicationFactor defines the replication factor for the collection. + ReplicationFactor interface{} `json:"replicationFactor,omitempty"` + // Schema defines the schema for the collection. + Schema interface{} `json:"schema,omitempty"` + // ShardKeys defines the shard keys for the collection. + ShardKeys []string `json:"shardKeys,omitempty"` + // ShardingStrategy defines the sharding strategy for the collection. + ShardingStrategy *string `json:"shardingStrategy,omitempty"` + // Shards defines the shards for the collection. + Shards map[string][]string `json:"shards,omitempty"` + // Status defines the Collection status code. + Status *int `json:"status,omitempty"` + // SyncByRevision indicates whether the collection is synced by revision. + SyncByRevision *bool `json:"syncByRevision,omitempty"` + // Type defines the Collection type (document/edge). + Type *int `json:"type,omitempty"` + // UsesRevisionsAsDocumentIds indicates whether document revisions are used as document IDs. + UsesRevisionsAsDocumentIds *bool `json:"usesRevisionsAsDocumentIds,omitempty"` + // Version defines the version of the collection. + Version *int `json:"version,omitempty"` + // WaitForSync indicates whether the collection should wait for sync. + WaitForSync *bool `json:"waitForSync,omitempty"` + // WriteConcern defines the write concern level for the collection. + WriteConcern *int `json:"writeConcern,omitempty"` +} + +// IndexesInventoryResponse represents metadata for an index in the collection. +type IndexesInventoryResponse struct { + // Reusable basic properties like ID and Name + BasicProperties + // Index type (hash, skiplist, etc.) + Type *string `json:"type,omitempty"` + // Indexed fields + Fields []string `json:"fields,omitempty"` + // Unique indicates whether the index enforces uniqueness. + Unique *bool `json:"unique,omitempty"` + // Sparse indicates whether the index skips null values. + Sparse *bool `json:"sparse,omitempty"` + // Deduplicate indicates whether the index enforces deduplication. + Deduplicate *bool `json:"deduplicate,omitempty"` + // Estimates indicates whether the index supports estimates. + Estimates *bool `json:"estimates,omitempty"` + // CacheEnabled indicates whether the index is cache enabled. + CacheEnabled *bool `json:"cacheEnabled,omitempty"` +} + +// KeyOpts represents options for document key generation. +type KeyOpts struct { + // Whether user-supplied keys are allowed + AllowUserKeys *bool `json:"allowUserKeys,omitempty"` + // Key type (autoincrement, traditional, etc.) + Type *string `json:"type,omitempty"` + // Last value for autoincrement keys + LastValue *int `json:"lastValue,omitempty"` +} + +// PropertiesInventoryResponse represents database-level properties. +type PropertiesInventoryResponse struct { + // Reusable basic properties like ID and Name + BasicProperties + // Whether this is a system database + IsSystem *bool `json:"isSystem,omitempty"` + // Default sharding method + Sharding *string `json:"sharding,omitempty"` + // Default replication factor + ReplicationFactor interface{} `json:"replicationFactor,omitempty"` + // Default write concern + WriteConcern *int `json:"writeConcern,omitempty"` + // Replication protocol version + ReplicationVersion *string `json:"replicationVersion,omitempty"` +} + +// StateInventoryResponse represents replication state at the time of inventory. +type StateInventoryResponse struct { + // Whether replication is running + Running *bool `json:"running,omitempty"` + // Last committed log tick + LastLogTick *string `json:"lastLogTick,omitempty"` + // Last uncommitted log tick + LastUncommittedLogTick *string `json:"lastUncommittedLogTick,omitempty"` + // Total number of events + TotalEvents *int `json:"totalEvents,omitempty"` + // Timestamp of the state + Time *time.Time `json:"time,omitempty"` +} + +// ViewInventoryResponse represents a view entry in the inventory. +type ViewInventoryResponse struct { + // Reusable basic properties like ID and Name + BasicProperties + // View type (e.g. "arangosearch") + Type *string `json:"type,omitempty"` + // View properties + Properties map[string]interface{} `json:"properties,omitempty"` +} + +// BasicProperties represents reusable ID and Name fields common to collections, views, etc. +type BasicProperties struct { + // Unique identifier (collection ID, view ID, etc.) + ID *string `json:"id,omitempty"` + // Human-readable name + Name *string `json:"name,omitempty"` +} + +type ReplicationDumpParams struct { + // Collection name + Collection string `json:"collection"` + // Size of each chunk in bytes + ChunkSize *int32 `json:"chunkSize,omitempty"` + // BatchID is the ID of the replication batch. + BatchID string `json:"batchId"` +} + +type State struct { + // Whether replication is running + Running *bool `json:"running"` + // Last committed log tick + LastLogTick *string `json:"lastLogTick"` + // Last uncommitted log tick + LastUncommittedLogTick *string `json:"lastUncommittedLogTick"` + // Total number of events + TotalEvents *int64 `json:"totalEvents"` + // Timestamp of the state + Time *time.Time `json:"time"` +} + +type Server struct { + // Version of the server + Version *string `json:"version"` + ServerId *string `json:"serverId"` + // Engine of the server + Engine *string `json:"engine"` +} + +type LoggerStateResponse struct { + State State `json:"state"` + Server Server `json:"server"` + Clients []map[string]interface{} `json:"clients,inline"` +} + +type LoggerFirstTickResponse struct { + // The first tick of the logger + FirstTick *string `json:"firstTick,omitempty"` +} + +type LoggerTickRangeResponseObj struct { + // Name of the logfile + Datafile *string `json:"datafile,omitempty"` + // Status of the datafile, in textual form (e.g. "sealed", "open") + Status *string `json:"status,omitempty"` + // Minimum tick value contained in logfile + TickMin *string `json:"tickMin,omitempty"` + // Maximum tick value contained in logfile + TickMax *string `json:"tickMax,omitempty"` +} + +type ApplierConfigResponse struct { + // Logger server endpoint (e.g., tcp://127.0.0.1:8529) + Endpoint *string `json:"endpoint,omitempty"` + // Database name (e.g., "_system") + Database *string `json:"database,omitempty"` + // Username for authentication + Username *string `json:"username,omitempty"` + // Password for authentication + Password *string `json:"password,omitempty"` + // Maximum connection attempts before stopping + MaxConnectRetries int `json:"maxConnectRetries"` + // Timeout (seconds) for connecting to endpoint + ConnectTimeout int `json:"connectTimeout"` + // Timeout (seconds) for individual requests + RequestTimeout int `json:"requestTimeout"` + // Max size of log transfer packets + ChunkSize int `json:"chunkSize"` + // Whether applier auto-starts on server startup + AutoStart bool `json:"autoStart"` + // Whether adaptive polling is used + AdaptivePolling bool `json:"adaptivePolling"` + // Whether system collections are included + IncludeSystem bool `json:"includeSystem"` + // Whether full automatic resync is performed if needed + AutoResync bool `json:"autoResync"` + // Number of auto-resync retries before giving up + AutoResyncRetries int `json:"autoResyncRetries"` + // Max wait time (seconds) for initial sync + InitialSyncMaxWaitTime int `json:"initialSyncMaxWaitTime"` + // Idle time (seconds) before retrying failed connection + ConnectionRetryWaitTime int `json:"connectionRetryWaitTime"` + // Minimum idle wait time (seconds) when no new data + IdleMinWaitTime int `json:"idleMinWaitTime"` + // Maximum idle wait time (seconds) when no new data (may be fractional, hence float64) + IdleMaxWaitTime float64 `json:"idleMaxWaitTime"` + // If true, aborts if start tick not available on leader + RequireFromPresent bool `json:"requireFromPresent"` + // If true, logs each applier operation (debugging only) + Verbose bool `json:"verbose"` + // Type of collection restriction ("include" or "exclude") + RestrictType string `json:"restrictType"` + // Collections included/excluded depending on RestrictType + RestrictCollections []string `json:"restrictCollections"` + // Max number of errors to ignore + IgnoreErrors *int `json:"ignoreErrors,omitempty"` + // SSL protocol version + SslProtocol *int `json:"sslProtocol,omitempty"` + // Whether to skip create/drop collection operations + SkipCreateDrop *bool `json:"skipCreateDrop,omitempty"` + // Max packet size (bytes) + MaxPacketSize *int64 `json:"maxPacketSize,omitempty"` + // Whether to include Foxx queues + IncludeFoxxQueues *bool `json:"includeFoxxQueues,omitempty"` + // Whether incremental sync is used + Incremental *bool `json:"incremental,omitempty"` +} + +// ApplierOptions holds the configuration options for the replication applier. +// These settings can only be changed when the applier is not running. +type ApplierOptions struct { + // AdaptivePolling controls whether the replication applier uses adaptive polling. + AdaptivePolling *bool `json:"adaptivePolling,omitempty"` + + // AutoResync, if set to true, allows the applier to automatically + // trigger a full resynchronization if it falls too far behind. + AutoResync *bool `json:"autoResync,omitempty"` + + // AutoResyncRetries defines how many times the applier should retry + // automatic resynchronization after failure. + AutoResyncRetries *int `json:"autoResyncRetries,omitempty"` + + // AutoStart indicates if the applier should start automatically + // once configured. + AutoStart *bool `json:"autoStart,omitempty"` + + // ChunkSize is the maximum size (in bytes) of the data batches + // fetched by the applier. + ChunkSize *int `json:"chunkSize,omitempty"` + + // ConnectTimeout is the timeout (in seconds) for the initial + // connection attempt to the master endpoint. + ConnectTimeout *int `json:"connectTimeout,omitempty"` + + // ConnectionRetryWaitTime is the wait time (in seconds) before retrying + // a failed connection attempt. + ConnectionRetryWaitTime *int `json:"connectionRetryWaitTime,omitempty"` + + // Database is the name of the database on the master that the applier + // should replicate from. + Database *string `json:"database,omitempty"` + + // Endpoint specifies the master server endpoint (e.g., "tcp://127.0.0.1:8529") + // from which replication data is pulled. This is required. + Endpoint *string `json:"endpoint,omitempty"` + + // IdleMaxWaitTime is the maximum wait time (in seconds) between + // polling requests when the applier is idle. + IdleMaxWaitTime *int `json:"idleMaxWaitTime,omitempty"` + + // IdleMinWaitTime is the minimum wait time (in seconds) between + // polling requests when the applier is idle. + IdleMinWaitTime *int `json:"idleMinWaitTime,omitempty"` + + // IncludeSystem specifies whether system collections should be + // replicated as well. + IncludeSystem *bool `json:"includeSystem,omitempty"` + + // InitialSyncMaxWaitTime defines the maximum wait time (in seconds) + // for the initial synchronization step. + InitialSyncMaxWaitTime *int `json:"initialSyncMaxWaitTime,omitempty"` + + // MaxConnectRetries is the maximum number of retries for + // initial connection attempts. + MaxConnectRetries *int `json:"maxConnectRetries,omitempty"` + + // Password is the password used when connecting to the master. + Password *string `json:"password,omitempty"` + + // RequestTimeout specifies the timeout (in seconds) for individual + // HTTP requests made by the applier. + RequestTimeout *int `json:"requestTimeout,omitempty"` + + // RequireFromPresent, if true, requires the replication to start from + // the present and not accept missing history. + RequireFromPresent *bool `json:"requireFromPresent,omitempty"` + + // RestrictCollections is an optional list of collections to include + // or exclude in replication, depending on RestrictType. + RestrictCollections *[]string `json:"restrictCollections,omitempty"` + + // RestrictType determines how RestrictCollections is interpreted: + // "include" or "exclude". + RestrictType *string `json:"restrictType,omitempty"` + + // Username is the username used when connecting to the master. + Username *string `json:"username,omitempty"` + + // Verbose controls the verbosity of the applier's logging. + Verbose *bool `json:"verbose,omitempty"` +} + +// ApplierState represents the current state of the replication applier. +type ApplierState struct { + // Started indicates when the applier was started. + Started *string `json:"started"` + + // Running is true if the applier is currently running. + Running *bool `json:"running"` + + // Phase describes the current applier phase (e.g., "running", "inactive"). + Phase *string `json:"phase"` + + // LastAppliedContinuousTick is the tick of the last operation applied by the applier. + LastAppliedContinuousTick *string `json:"lastAppliedContinuousTick"` + + // LastProcessedContinuousTick is the tick of the last operation processed. + LastProcessedContinuousTick *string `json:"lastProcessedContinuousTick"` + + // LastAvailableContinuousTick is the last tick available on the replication logger. + LastAvailableContinuousTick *string `json:"lastAvailableContinuousTick"` + + // SafeResumeTick is the tick from which the applier can safely resume. + SafeResumeTick *string `json:"safeResumeTick"` + + // TicksBehind indicates how many ticks the applier is behind the latest log. + TicksBehind *int64 `json:"ticksBehind,omitempty"` + + // Progress provides detailed information about the last progress event. + Progress *ApplierProgress `json:"progress,omitempty"` + + // TotalRequests is the total number of requests made by the applier. + TotalRequests *int `json:"totalRequests,omitempty"` + + // TotalFailedConnects counts the number of failed connection attempts. + TotalFailedConnects *int `json:"totalFailedConnects,omitempty"` + + // TotalEvents is the total number of replication events processed. + TotalEvents *int `json:"totalEvents,omitempty"` + + // TotalDocuments is the number of document operations applied. + TotalDocuments *int `json:"totalDocuments,omitempty"` + + // TotalRemovals is the number of document removal operations applied. + TotalRemovals *int `json:"totalRemovals,omitempty"` + + // TotalResyncs counts how many times a resync was triggered. + TotalResyncs *int `json:"totalResyncs,omitempty"` + + // TotalOperationsExcluded is the number of operations ignored (due to filters, etc.). + TotalOperationsExcluded *int `json:"totalOperationsExcluded,omitempty"` + + // TotalApplyTime is the cumulative time (in ms) spent applying operations. + TotalApplyTime *int `json:"totalApplyTime,omitempty"` + + // AverageApplyTime is the average time (in ms) spent applying operations. + AverageApplyTime *int `json:"averageApplyTime,omitempty"` + + // TotalFetchTime is the cumulative time (in ms) spent fetching operations. + TotalFetchTime *int `json:"totalFetchTime,omitempty"` + + // AverageFetchTime is the average time (in ms) spent fetching operations. + AverageFetchTime *int `json:"averageFetchTime,omitempty"` + + // LastError contains information about the last error, if any. + LastError *struct { + // ErrorNum is the numeric error code of the last error. + ErrorNum *int `json:"errorNum,omitempty"` + // ErrorMessage is the descriptive message of the last error. + ErrorMessage *string `json:"errorMessage,omitempty"` + // Time is the timestamp of the last error. + Time time.Time `json:"time,omitempty"` + } `json:"lastError,omitempty"` + + // Time is the timestamp of this applier state snapshot. + Time time.Time `json:"time,omitempty"` +} + +// ApplierProgress contains details about the applier's last progress event. +type ApplierProgress struct { + // Time is when the progress message was recorded. + Time *string `json:"time,omitempty"` + + // Message provides a short description of the progress (e.g., "applied batch"). + Message *string `json:"message,omitempty"` + + // FailedConnects counts failed connection attempts at this progress point. + FailedConnects *int `json:"failedConnects,omitempty"` +} + +type ApplierStateResp struct { + // State holds detailed information about the applier's current state. + State ApplierState `json:"state"` + + // Server contains information about the server providing this state. + Server ApplierServer `json:"server"` + + // Endpoint is the endpoint this applier is connected to. + Endpoint *string `json:"endpoint"` +} + +type ApplierServer struct { + // Version is the ArangoDB version. + Version *string `json:"version"` + + // ServerId is the unique ID of the server. + ServerId *string `json:"serverId"` +} + +type WALRangeResponse struct { + // Time is the timestamp when the range was recorded. + Time time.Time `json:"time"` + // Minimum tick in the range + TickMin string `json:"tickMin"` + // Maximum tick in the range + TickMax string `json:"tickMax"` + // Server information + Server ApplierServer `json:"server"` +} + +type WALLastTickResponse struct { + // Time is the timestamp when the range was recorded. + Time time.Time `json:"time"` + // Tick contains the last available tick + Tick string `json:"tick"` + // Server information + Server ApplierServer `json:"server"` +} + +type WALTailOptions struct { + // Global indicates whether operations for all databases should be included. + // If set to false, only the operations for the current database are included. + // The value true is only valid on the _system database. + Global *bool `json:"global,omitempty"` + + // From specifies the exclusive lower bound tick value for the replication. + From *int64 `json:"from,omitempty"` + + // To specifies the inclusive upper bound tick value for the replication. + To *int64 `json:"to,omitempty"` + + // LastScanned specifies the last scanned tick value (for RocksDB multi-response support). + LastScanned *int `json:"lastScanned,omitempty"` + + // ChunkSize specifies the approximate maximum size of the returned result in bytes. + ChunkSize *int `json:"chunkSize,omitempty"` + + // SyncerId specifies the ID of the client used to tail results. + // Required if ServerId is not provided. + SyncerId *int64 `json:"syncerId,omitempty"` + + // ServerId specifies the ID of the client machine. + // Required if SyncerId is not provided. + ServerId *int64 `json:"serverId,omitempty"` + + // ClientInfo provides a short description of the client (informational only). + ClientInfo *string `json:"clientInfo,omitempty"` +} + +type RevisionQueryParams struct { + // Collection Name + Collection string `json:"collection"` + BatchId string `json:"batchId"` + // The revisionId at which to resume, if a previous request was truncated + Resume *string `json:"resume,omitempty"` +} + +// ReplicationSyncResult is the response format from /_api/replication/sync +type ReplicationSyncResult struct { + // Collections is an array of collections that were synchronized + // from the leader to the local database. + Collections []map[string]interface{} `json:"collections"` + + // LastLogTick is the log tick value from the leader at the time + // the synchronization started. This value can be used later as + // the "from" tick when setting up continuous replication. + LastLogTick string `json:"lastLogTick"` +} + +// ReplicationSyncOptions holds optional parameters for sync +type ReplicationSyncOptions struct { + // Database specifies the name of the leader database to replicate from. + // If not provided, defaults to the current database. + Database *string `json:"database,omitempty"` + + // Endpoint is the leader endpoint (e.g., "http+tcp://192.168.1.65:8529"). + // This field is required. + Endpoint string `json:"endpoint,omitempty"` + + // Username is the login name used for authentication on the leader endpoint. + Username string `json:"username,omitempty"` + + // Password is the corresponding password used for authentication + // on the leader endpoint. + Password string `json:"password,omitempty"` + + // IncludeSystem determines whether system collections should be included + // in the synchronization. Defaults to false. + IncludeSystem *bool `json:"includeSystem,omitempty"` + + // Incremental indicates if incremental synchronization should be used. + // When true, only differences are transferred, which is faster if the + // local database already has some of the leader’s data. + // When false (default), a full synchronization is performed. + Incremental *bool `json:"incremental,omitempty"` + + // RestrictType controls how RestrictCollections is applied. + // Possible values: + // - "include": only collections listed in RestrictCollections are replicated. + // - "exclude": all collections except those listed are replicated. + RestrictType *string `json:"restrictType,omitempty"` + + // RestrictCollections is the list of collections used together with + // RestrictType to filter which collections are synchronized. + RestrictCollections *[]string `json:"restrictCollections,omitempty"` + + // InitialSyncMaxWaitSec specifies the maximum wait time (in seconds) + // the synchronization will wait for a response from the leader during + // initial collection data fetch. A value of 0 disables the wait time limit. + InitialSyncMaxWaitSec int `json:"initialSyncMaxWaitSec,omitempty"` +} diff --git a/v2/arangodb/client_replication_impl.go b/v2/arangodb/client_replication_impl.go new file mode 100644 index 00000000..38514be3 --- /dev/null +++ b/v2/arangodb/client_replication_impl.go @@ -0,0 +1,1079 @@ +// +// DISCLAIMER +// +// Copyright 2023-2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package arangodb + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + + "github.com/pkg/errors" + + "github.com/arangodb/go-driver/v2/arangodb/shared" + "github.com/arangodb/go-driver/v2/connection" + "github.com/arangodb/go-driver/v2/utils" +) + +type clientReplication struct { + client *client +} + +func newClientReplication(client *client) *clientReplication { + return &clientReplication{ + client: client, + } +} + +var _ ClientReplication = &clientReplication{} + +func (c *clientReplication) url(dbName string, pathSegments []string, queryParams map[string]interface{}) string { + + base := connection.NewUrl("_db", url.PathEscape(dbName), "_api", "replication") + for _, seg := range pathSegments { + base = fmt.Sprintf("%s/%s", base, url.PathEscape(seg)) + } + + if len(queryParams) > 0 { + q := url.Values{} + for k, v := range queryParams { + switch val := v.(type) { + case string: + q.Set(k, val) + case bool: + q.Set(k, fmt.Sprintf("%t", val)) + case int, int64, float64: + q.Set(k, fmt.Sprintf("%v", val)) + default: + // skip unsupported types or handle as needed + } + } + base = fmt.Sprintf("%s?%s", base, q.Encode()) + } + return base +} + +func (c *clientReplication) CreateNewBatch(ctx context.Context, dbName string, DBserver *string, state *bool, opt CreateNewBatchOptions) (CreateNewBatchResponse, error) { + // Build query params + queryParams := map[string]interface{}{} + if state != nil { + queryParams["state"] = *state + } + + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return CreateNewBatchResponse{}, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + if DBserver == nil || *DBserver == "" { + return CreateNewBatchResponse{}, errors.New("DBserver must be specified when creating a batch on a coordinator") + } + queryParams["DBserver"] = *DBserver + } + + // Build URL + url := c.url(dbName, []string{"batch"}, queryParams) + + // Prepare response wrapper + var response struct { + shared.ResponseStruct `json:",inline"` + CreateNewBatchResponse `json:",inline"` + } + + resp, err := connection.CallPost(ctx, c.client.connection, url, &response, opt) + if err != nil { + return CreateNewBatchResponse{}, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.CreateNewBatchResponse, nil + default: + return CreateNewBatchResponse{}, response.AsArangoErrorWithCode(code) + } +} + +func (c *clientReplication) GetInventory(ctx context.Context, dbName string, params InventoryQueryParams) (InventoryResponse, error) { + // Build query params + queryParams := map[string]interface{}{} + + if params.IncludeSystem == nil { + queryParams["includeSystem"] = utils.NewType(true) + } else { + queryParams["includeSystem"] = *params.IncludeSystem + } + + if params.Global == nil { + queryParams["global"] = utils.NewType(false) + } else { + queryParams["global"] = *params.Global + } + + if params.BatchID == "" { + return InventoryResponse{}, errors.New("batchId must be specified when querying inventory") + } + queryParams["batchId"] = params.BatchID + + if params.Collection != nil { + queryParams["collection"] = *params.Collection + } + + // Check server role + serverRole, err := c.client.ServerRole(ctx) + if err != nil { + return InventoryResponse{}, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + if params.DBserver == nil || *params.DBserver == "" { + return InventoryResponse{}, errors.New("DBserver must be specified when querying inventory on a coordinator") + } + queryParams["DBserver"] = *params.DBserver + } + + // Build URL + url := c.url(dbName, []string{"inventory"}, queryParams) + + // Prepare response wrapper + var response struct { + shared.ResponseStruct `json:",inline"` + InventoryResponse `json:",inline"` + } + + resp, err := connection.CallGet(ctx, c.client.connection, url, &response) + if err != nil { + return InventoryResponse{}, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.InventoryResponse, nil + default: + return InventoryResponse{}, response.AsArangoErrorWithCode(code) + } +} + +func (c *clientReplication) DeleteBatch(ctx context.Context, dbName string, DBserver *string, batchId string) error { + params := map[string]interface{}{} + // Check server role + serverRole, err := c.client.ServerRole(ctx) + if err != nil { + return errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + if DBserver == nil || *DBserver == "" { + return errors.New("DBserver must be specified when querying inventory on a coordinator") + } + params["DBserver"] = *DBserver + } + + if batchId == "" { + return errors.New("batchId must be specified for delete batch") + } + // Build URL + url := c.url(dbName, []string{"batch", batchId}, params) + + resp, err := connection.CallDelete(ctx, c.client.connection, url, nil) + if err != nil { + return errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusNoContent: + return nil + default: + return shared.NewResponseStruct().AsArangoErrorWithCode(code) + } +} + +func (c *clientReplication) ExtendBatch(ctx context.Context, dbName string, DBserver *string, batchId string, opt CreateNewBatchOptions) error { + + if batchId == "" { + return errors.New("batchId must be specified for extend batch") + } + + // Build query params + queryParams := map[string]interface{}{} + // Check server role + serverRole, err := c.client.ServerRole(ctx) + if err != nil { + return errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + if DBserver == nil || *DBserver == "" { + return errors.New("DBserver must be specified when extending a batch on a coordinator") + } + queryParams["DBserver"] = *DBserver + } + + // Build URL + url := c.url(dbName, []string{"batch", batchId}, queryParams) + + resp, err := connection.CallPut(ctx, c.client.connection, url, nil, opt) + if err != nil { + return errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusNoContent: + return nil + default: + return shared.NewResponseStruct().AsArangoErrorWithCode(code) + } +} + +func (c *clientReplication) Dump(ctx context.Context, dbName string, params ReplicationDumpParams) ([]byte, error) { + + role, err := c.client.ServerRole(ctx) + if err != nil { + return nil, errors.WithStack(err) + } + if role != ServerRoleSingle { + return nil, errors.Errorf("replication dump not supported on role %s", role) + } + + // Build query params + queryParams := map[string]interface{}{} + if params.ChunkSize != nil && *params.ChunkSize != 0 { + queryParams["chunkSize"] = params.ChunkSize + } + if params.Collection == "" { + return nil, errors.New("collection must be specified when querying replication dump") + } + queryParams["collection"] = params.Collection + if params.BatchID == "" { + return nil, errors.New("batchId must be specified when querying replication dump") + } + queryParams["batchId"] = params.BatchID + + // Build URL + url := c.url(dbName, []string{"dump"}, queryParams) + req, err := c.client.Connection().NewRequest(http.MethodGet, url) + if err != nil { + return nil, err + } + + var data []byte + // Call Do with nil result (we'll handle body manually) + resp, err := c.client.Connection().Do(ctx, req, &data, http.StatusOK, http.StatusNoContent) + if err != nil { + return nil, err + } + defer resp.RawResponse().Body.Close() + + if resp.Code() == http.StatusNoContent { + return nil, nil + } + if resp.Code() != http.StatusOK { + return nil, (&shared.ResponseStruct{}).AsArangoErrorWithCode(resp.Code()) + } + + return io.ReadAll(resp.RawResponse().Body) +} + +func (c *clientReplication) LoggerState(ctx context.Context, dbName string, DBserver *string) (LoggerStateResponse, error) { + // Build query params + queryParams := map[string]interface{}{} + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return LoggerStateResponse{}, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + if DBserver == nil || *DBserver == "" { + return LoggerStateResponse{}, errors.New("DBserver must be specified when creating a batch on a coordinator") + } + queryParams["DBserver"] = *DBserver + } + // Build URL + url := c.url(dbName, []string{"logger-state"}, queryParams) + + var response struct { + shared.ResponseStruct `json:",inline"` + LoggerStateResponse `json:",inline"` + } + resp, err := connection.CallGet(ctx, c.client.connection, url, &response) + if err != nil { + return LoggerStateResponse{}, errors.WithStack(err) + } + switch code := resp.Code(); code { + case http.StatusOK: + return response.LoggerStateResponse, nil + default: + return LoggerStateResponse{}, response.AsArangoErrorWithCode(code) + } +} + +func (c *clientReplication) LoggerFirstTick(ctx context.Context, dbName string) (LoggerFirstTickResponse, error) { + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return LoggerFirstTickResponse{}, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + return LoggerFirstTickResponse{}, errors.New("replication logger-first-tick is not supported on Coordinators") + } + // Build URL + url := c.url(dbName, []string{"logger-first-tick"}, nil) + + var response struct { + shared.ResponseStruct `json:",inline"` + LoggerFirstTickResponse `json:",inline"` + } + resp, err := connection.CallGet(ctx, c.client.connection, url, &response) + if err != nil { + return LoggerFirstTickResponse{}, errors.WithStack(err) + } + switch code := resp.Code(); code { + case http.StatusOK: + return response.LoggerFirstTickResponse, nil + default: + return LoggerFirstTickResponse{}, response.AsArangoErrorWithCode(code) + } +} + +func (c *clientReplication) LoggerTickRange(ctx context.Context, dbName string) ([]LoggerTickRangeResponseObj, error) { + // Check server role + serverRole, err := c.client.ServerRole(ctx) + if err != nil { + return nil, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + return nil, errors.New("replication logger-tick-ranges is not supported on Coordinators") + } + // Build URL + url := c.url(dbName, []string{"logger-tick-ranges"}, nil) + + var response []LoggerTickRangeResponseObj + resp, err := connection.CallGet(ctx, c.client.connection, url, &response) + if err != nil { + return nil, errors.WithStack(err) + } + switch code := resp.Code(); code { + case http.StatusOK: + return response, nil + default: + return nil, (&shared.ResponseStruct{}).AsArangoErrorWithCode(resp.Code()) + } +} + +func (c *clientReplication) GetApplierConfig(ctx context.Context, dbName string, global *bool) (ApplierConfigResponse, error) { + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return ApplierConfigResponse{}, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + return ApplierConfigResponse{}, errors.New("replication applier-config is not supported on Coordinators") + } + + // Build query params + queryParams := map[string]interface{}{} + if global != nil { + queryParams["global"] = *global + } + + // Build URL + url := c.url(dbName, []string{"applier-config"}, queryParams) + + var response struct { + shared.ResponseStruct `json:",inline"` + ApplierConfigResponse `json:",inline"` + } + resp, err := connection.CallGet(ctx, c.client.connection, url, &response) + if err != nil { + return ApplierConfigResponse{}, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.ApplierConfigResponse, nil + default: + return ApplierConfigResponse{}, response.AsArangoErrorWithCode(code) + } +} + +func formApplierParams(opts ApplierOptions) (map[string]interface{}, error) { + params := map[string]interface{}{} + + // Required + if opts.Endpoint == nil || *opts.Endpoint == "" { + return nil, RequiredFieldError("endpoint") + } + params["endpoint"] = *opts.Endpoint + + // Optional + if opts.Database != nil { + params["database"] = *opts.Database + } + if opts.Username != nil { + params["username"] = *opts.Username + } + if opts.Password != nil { + params["password"] = *opts.Password + } + if opts.MaxConnectRetries != nil { + params["maxConnectRetries"] = *opts.MaxConnectRetries + } + if opts.ConnectTimeout != nil { + params["connectTimeout"] = *opts.ConnectTimeout + } + if opts.RequestTimeout != nil { + params["requestTimeout"] = *opts.RequestTimeout + } + if opts.IdleMinWaitTime != nil { + params["idleMinWaitTime"] = *opts.IdleMinWaitTime + } + if opts.IdleMaxWaitTime != nil { + params["idleMaxWaitTime"] = *opts.IdleMaxWaitTime + } + if opts.InitialSyncMaxWaitTime != nil { + params["initialSyncMaxWaitTime"] = *opts.InitialSyncMaxWaitTime + } + if opts.IncludeSystem != nil { + params["includeSystem"] = *opts.IncludeSystem + } + if opts.ChunkSize != nil { + params["chunkSize"] = *opts.ChunkSize + } + if opts.AutoStart != nil { + params["autoStart"] = *opts.AutoStart + } + if opts.RestrictCollections != nil { + params["restrictCollections"] = *opts.RestrictCollections + } + if opts.RestrictType != nil { + params["restrictType"] = *opts.RestrictType + } + if opts.AdaptivePolling != nil { + params["adaptivePolling"] = *opts.AdaptivePolling + } + if opts.AutoResync != nil { + params["autoResync"] = *opts.AutoResync + } + if opts.AutoResyncRetries != nil { + params["autoResyncRetries"] = *opts.AutoResyncRetries + } + if opts.RequireFromPresent != nil { + params["requireFromPresent"] = *opts.RequireFromPresent + } + if opts.Verbose != nil { + params["verbose"] = *opts.Verbose + } + + return params, nil +} + +func (c *clientReplication) UpdateApplierConfig(ctx context.Context, dbName string, global *bool, opts ApplierOptions) (ApplierConfigResponse, error) { + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return ApplierConfigResponse{}, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + return ApplierConfigResponse{}, errors.New("replication applier-config is not supported on Coordinators") + } + + // Build query params + queryParams := map[string]interface{}{} + if global != nil { + queryParams["global"] = *global + } + + // Build URL + url := c.url(dbName, []string{"applier-config"}, queryParams) + + var response struct { + shared.ResponseStruct `json:",inline"` + ApplierConfigResponse `json:",inline"` + } + + requestParams, err := formApplierParams(opts) + if err != nil { + return ApplierConfigResponse{}, errors.WithStack(err) + } + + resp, err := connection.CallPut(ctx, c.client.connection, url, &response, requestParams) + if err != nil { + return ApplierConfigResponse{}, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.ApplierConfigResponse, nil + default: + return ApplierConfigResponse{}, response.AsArangoErrorWithCode(code) + } +} + +func (c *clientReplication) ApplierStart(ctx context.Context, dbName string, global *bool, from *string) (ApplierStateResp, error) { + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return ApplierStateResp{}, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + return ApplierStateResp{}, errors.New("replication applier-start is not supported on Coordinators") + } + + // Build query params + queryParams := map[string]interface{}{} + if global != nil { + queryParams["global"] = *global + } + if from != nil && *from != "" { + queryParams["from"] = *from + } + + // Build URL + url := c.url(dbName, []string{"applier-start"}, queryParams) + + var response struct { + shared.ResponseStruct `json:",inline"` + ApplierStateResp `json:",inline"` + } + + resp, err := connection.CallPut(ctx, c.client.connection, url, &response, nil) + if err != nil { + return ApplierStateResp{}, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.ApplierStateResp, nil + default: + return ApplierStateResp{}, response.AsArangoErrorWithCode(code) + } +} + +func (c *clientReplication) ApplierStop(ctx context.Context, dbName string, global *bool) (ApplierStateResp, error) { + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return ApplierStateResp{}, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + return ApplierStateResp{}, errors.New("replication applier-stop is not supported on Coordinators") + } + + // Build query params + queryParams := map[string]interface{}{} + if global != nil { + queryParams["global"] = *global + } + + // Build URL + url := c.url(dbName, []string{"applier-stop"}, queryParams) + + var response struct { + shared.ResponseStruct `json:",inline"` + ApplierStateResp `json:",inline"` + } + + resp, err := connection.CallPut(ctx, c.client.connection, url, &response, nil) + if err != nil { + return ApplierStateResp{}, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.ApplierStateResp, nil + default: + return ApplierStateResp{}, response.AsArangoErrorWithCode(code) + } +} + +func (c *clientReplication) GetApplierState(ctx context.Context, dbName string, global *bool) (ApplierStateResp, error) { + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return ApplierStateResp{}, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + return ApplierStateResp{}, errors.New("replication applier-stop is not supported on Coordinators") + } + + // Build query params + queryParams := map[string]interface{}{} + if global != nil { + queryParams["global"] = *global + } + + // Build URL + url := c.url(dbName, []string{"applier-state"}, queryParams) + + var response struct { + shared.ResponseStruct `json:",inline"` + ApplierStateResp `json:",inline"` + } + + resp, err := connection.CallGet(ctx, c.client.connection, url, &response) + if err != nil { + return ApplierStateResp{}, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.ApplierStateResp, nil + default: + return ApplierStateResp{}, response.AsArangoErrorWithCode(code) + } +} + +func (c *clientReplication) GetReplicationServerId(ctx context.Context, dbName string) (string, error) { + + // Build URL + url := c.url(dbName, []string{"server-id"}, nil) + + var response struct { + shared.ResponseStruct `json:",inline"` + ServerId string `json:"serverId"` + } + + resp, err := connection.CallGet(ctx, c.client.connection, url, &response) + if err != nil { + return "", errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.ServerId, nil + default: + return "", response.AsArangoErrorWithCode(code) + } +} + +func (c *clientReplication) MakeFollower(ctx context.Context, dbName string, opts ApplierOptions) (ApplierStateResp, error) { + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return ApplierStateResp{}, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + return ApplierStateResp{}, errors.New("replication make-follower is not supported on Coordinators") + } + + // Build URL + url := c.url(dbName, []string{"make-follower"}, nil) + + var response struct { + shared.ResponseStruct `json:",inline"` + ApplierStateResp `json:",inline"` + } + requestParams, err := formApplierParams(opts) + if err != nil { + return ApplierStateResp{}, errors.WithStack(err) + } + + resp, err := connection.CallPut(ctx, c.client.connection, url, &response, requestParams) + if err != nil { + return ApplierStateResp{}, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.ApplierStateResp, nil + default: + return ApplierStateResp{}, response.AsArangoErrorWithCode(code) + } +} + +// RebuildShardRevisionTree triggers a rebuild of the Merkle tree for a specific shard. +// This API must be called directly against a DBServer (not a Coordinator). +func (c *clientReplication) RebuildShardRevisionTree(ctx context.Context, dbName string, shardID ShardID) error { + // Ensure we are on a DBServer + role, err := c.client.ServerRole(ctx) + if err != nil { + return errors.WithStack(err) + } + if role != ServerRoleDBServer { + return fmt.Errorf("rebuild revision tree is only supported on DBServers, got role=%s", role) + } + + if shardID == "" { + return RequiredFieldError("shardID") + } + + // Build URL + queryParams := map[string]interface{}{ + "collection": shardID, + } + url := c.url(dbName, []string{"revisions", "tree"}, queryParams) + + var response struct { + shared.ResponseStruct `json:",inline"` + } + + resp, err := connection.CallPost(ctx, c.client.connection, url, &response, nil) + if err != nil { + return errors.WithStack(err) + } + + if resp.Code() == http.StatusNoContent { + return nil + } + return response.AsArangoErrorWithCode(resp.Code()) +} + +func (c *clientReplication) GetShardRevisionTree(ctx context.Context, dbName string, shardID ShardID, batchId string) (json.RawMessage, error) { + role, err := c.client.ServerRole(ctx) + if err != nil { + return nil, errors.WithStack(err) + } + if role != ServerRoleDBServer { + return nil, fmt.Errorf("get revision tree is only supported on DBServers, got role=%s", role) + } + + if shardID == "" { + return nil, RequiredFieldError("shardID") + } + if batchId == "" { + return nil, RequiredFieldError("batchId") + } + + queryParams := map[string]interface{}{ + "collection": shardID, + "batchId": batchId, + } + + url := c.url(dbName, []string{"revisions", "tree"}, queryParams) + + var response struct { + shared.ResponseStruct `json:",inline"` + RevisionTree json.RawMessage `json:"revisionTree,omitempty"` + } + + resp, err := connection.CallGet(ctx, c.client.connection, url, &response) + if err != nil { + return nil, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.RevisionTree, nil + default: + return nil, response.AsArangoErrorWithCode(code) + } +} + +func (c *clientReplication) checkRevisionQueryParams(queryParams RevisionQueryParams) (map[string]interface{}, error) { + params := map[string]interface{}{} + if queryParams.Collection == "" { + return nil, RequiredFieldError("collection") + } + if queryParams.BatchId == "" { + return nil, RequiredFieldError("batchId") + } + if queryParams.Resume != nil { + params["resume"] = *queryParams.Resume + } + params["collection"] = queryParams.Collection + params["batchId"] = queryParams.BatchId + return params, nil +} + +func (c *clientReplication) ListDocumentRevisionsInRange(ctx context.Context, dbName string, queryParams RevisionQueryParams, opts [][2]string) ([][2]string, error) { + + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return nil, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + return nil, errors.New("replication revisions range is not supported on Coordinators") + } + params, err := c.checkRevisionQueryParams(queryParams) + if err != nil { + return nil, err + } + // Build URL + + url := c.url(dbName, []string{"revisions", "ranges"}, params) + + var response struct { + shared.ResponseStruct `json:",inline"` + Ranges [][2]string `json:"ranges,omitempty"` + } + + resp, err := connection.CallPut(ctx, c.client.connection, url, &response, opts) + if err != nil { + return nil, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.Ranges, nil + default: + return nil, response.AsArangoErrorWithCode(code) + } +} + +func (c *clientReplication) FetchRevisionDocuments(ctx context.Context, dbName string, queryParams RevisionQueryParams, opts []string) ([]map[string]interface{}, error) { + + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return nil, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + return nil, errors.New("replication revisions documents is not supported on Coordinators") + } + params, err := c.checkRevisionQueryParams(queryParams) + if err != nil { + return nil, err + } + + // Build URL + url := c.url(dbName, []string{"revisions", "documents"}, params) + + var response []map[string]interface{} + + resp, err := connection.CallPut(ctx, c.client.connection, url, &response, opts) + if err != nil { + return nil, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response, nil + default: + return nil, (&shared.ResponseStruct{}).AsArangoErrorWithCode(resp.Code()) + } +} + +func (c *clientReplication) formSyncBodyParams(opts ReplicationSyncOptions) (map[string]interface{}, error) { + params := map[string]interface{}{} + if opts.Endpoint == "" { + return nil, RequiredFieldError("endpoint") + } + params["endpoint"] = opts.Endpoint + if opts.Database != nil && *opts.Database != "" { + params["database"] = *opts.Database + } + if opts.Username != "" { + params["username"] = opts.Username + } + if opts.Password != "" { + params["password"] = opts.Password + } + if opts.IncludeSystem != nil { + params["includeSystem"] = *opts.IncludeSystem + } + if opts.Incremental != nil { + params["incremental"] = *opts.Incremental + } + if opts.RestrictType != nil && *opts.RestrictType != "" { + params["restrictType"] = *opts.RestrictType + } + if opts.RestrictCollections != nil && len(*opts.RestrictCollections) > 0 { + params["restrictCollections"] = *opts.RestrictCollections + } + params["initialSyncMaxWaitTime"] = opts.InitialSyncMaxWaitSec + return params, nil +} + +func (c *clientReplication) StartReplicationSync(ctx context.Context, dbName string, opts ReplicationSyncOptions) (ReplicationSyncResult, error) { + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return ReplicationSyncResult{}, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + return ReplicationSyncResult{}, errors.New("replication sync is not supported on Coordinators") + } + // Form request body params + body, err := c.formSyncBodyParams(opts) + if err != nil { + return ReplicationSyncResult{}, err + } + + // Build URL + url := c.url(dbName, []string{"sync"}, nil) + + var response struct { + shared.ResponseStruct `json:",inline"` + ReplicationSyncResult `json:",inline"` + } + + resp, err := connection.CallPut(ctx, c.client.connection, url, &response, body) + if err != nil { + return ReplicationSyncResult{}, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.ReplicationSyncResult, nil + default: + return ReplicationSyncResult{}, response.AsArangoErrorWithCode(code) + } +} + +func (c *clientReplication) GetWALRange(ctx context.Context, dbName string) (WALRangeResponse, error) { + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return WALRangeResponse{}, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + return WALRangeResponse{}, errors.New("WAL range is not supported on Coordinators") + } + // Build URL + url := connection.NewUrl("_db", url.PathEscape(dbName), "_api", "wal", "range") + + var response struct { + shared.ResponseStruct `json:",inline"` + WALRangeResponse `json:",inline"` + } + + resp, err := connection.CallGet(ctx, c.client.connection, url, &response) + if err != nil { + return WALRangeResponse{}, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.WALRangeResponse, nil + default: + return WALRangeResponse{}, response.AsArangoErrorWithCode(code) + } +} + +func (c *clientReplication) GetWALLastTick(ctx context.Context, dbName string) (WALLastTickResponse, error) { + // Check server role + serverRole, err := c.client.ServerRole(ctx) + + if err != nil { + return WALLastTickResponse{}, errors.WithStack(err) + } + if serverRole == ServerRoleCoordinator { + return WALLastTickResponse{}, errors.New("WAL last tick is not supported on Coordinators") + } + // Build URL + url := connection.NewUrl("_db", url.PathEscape(dbName), "_api", "wal", "lastTick") + + var response struct { + shared.ResponseStruct `json:",inline"` + WALLastTickResponse `json:",inline"` + } + + resp, err := connection.CallGet(ctx, c.client.connection, url, &response) + if err != nil { + return WALLastTickResponse{}, errors.WithStack(err) + } + + switch code := resp.Code(); code { + case http.StatusOK: + return response.WALLastTickResponse, nil + default: + return WALLastTickResponse{}, response.AsArangoErrorWithCode(code) + } +} + +func (c *clientReplication) formQueryParamsForTail(params *WALTailOptions) map[string]interface{} { + queryParams := map[string]interface{}{} + if params == nil { + return nil + } + if params.Global != nil { + queryParams["global"] = *params.Global + } + if params.From != nil { + queryParams["from"] = *params.From + } + if params.To != nil { + queryParams["to"] = *params.To + } + if params.LastScanned != nil { + queryParams["lastScanned"] = *params.LastScanned + } + if params.ChunkSize != nil { + queryParams["chunkSize"] = *params.ChunkSize + } + if params.SyncerId != nil { + queryParams["syncerId"] = *params.SyncerId + } + if params.ServerId != nil { + queryParams["serverId"] = *params.ServerId + } + if params.ClientInfo != nil { + queryParams["clientInfo"] = *params.ClientInfo + } + return queryParams +} + +func (c *clientReplication) GetWALTail(ctx context.Context, dbName string, params *WALTailOptions) ([]byte, error) { + + role, err := c.client.ServerRole(ctx) + if err != nil { + return nil, errors.WithStack(err) + } + if role == ServerRoleCoordinator { + return nil, errors.Errorf("replication Tail not supported on role %s", role) + } + + // Build query params + queryParams := c.formQueryParamsForTail(params) + + // Build URL + url := connection.NewUrl("_db", url.PathEscape(dbName), "_api", "wal", "tail") + req, err := c.client.Connection().NewRequest(http.MethodGet, url) + if err != nil { + return nil, err + } + + // Add query params + for k, v := range queryParams { + req.AddQuery(k, fmt.Sprintf("%v", v)) + } + + // Use a bytes.Buffer to capture the response + var buf bytes.Buffer + resp, err := c.client.Connection().Do(ctx, req, &buf, http.StatusOK, http.StatusNoContent) + if err != nil { + return nil, err + } + + if resp.Code() == http.StatusNoContent { + return nil, nil + } + + if resp.Code() != http.StatusOK { + return nil, (&shared.ResponseStruct{}).AsArangoErrorWithCode(resp.Code()) + } + + return buf.Bytes(), nil +} diff --git a/v2/tests/client_replication_test.go b/v2/tests/client_replication_test.go new file mode 100644 index 00000000..73e53e64 --- /dev/null +++ b/v2/tests/client_replication_test.go @@ -0,0 +1,713 @@ +// +// DISCLAIMER +// +// Copyright 2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package tests + +import ( + "context" + "fmt" + "os" + "strconv" + "testing" + "time" + + "github.com/arangodb/go-driver/v2/arangodb" + "github.com/arangodb/go-driver/v2/utils" + "github.com/stretchr/testify/require" +) + +func Test_CreateNewBatch(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole is %s\n", serverRole) + + var dbServer *string + state := utils.NewType(true) + + if serverRole == arangodb.ServerRoleCoordinator { + clusterHealth, err := client.Health(ctx) // Ensure the client is healthy + require.NoError(t, err) + for id, db := range clusterHealth.Health { + if db.Role == arangodb.ServerRoleDBServer { + s := string(id) + dbServer = &s + break + } + } + } + + db, err := client.GetDatabase(ctx, "_system", nil) + require.NoError(t, err) + + batch, err := client.CreateNewBatch(ctx, db.Name(), dbServer, state, arangodb.CreateNewBatchOptions{ + Ttl: 300, + }) + require.NoError(t, err) + require.NotNil(t, batch) + require.NotEmpty(t, batch.ID) + require.NotEmpty(t, batch.LastTick) + require.NotNil(t, batch.State) + + t.Run("GetInventory", func(t *testing.T) { + resp, err := client.GetInventory(ctx, db.Name(), arangodb.InventoryQueryParams{ + BatchID: batch.ID, + DBserver: dbServer, + }) + require.NoError(t, err) + require.NotNil(t, resp) + }) + + t.Run("ExtendBatch", func(t *testing.T) { + err := client.ExtendBatch(ctx, db.Name(), dbServer, batch.ID, arangodb.CreateNewBatchOptions{ + Ttl: 600, + }) + require.NoError(t, err) + }) + + t.Run("GetReplicationDump", func(t *testing.T) { + WithCollectionV2(t, db, nil, func(col arangodb.Collection) { + docs := []map[string]interface{}{ + {"_key": "doc1", "name": "Alice"}, + {"_key": "doc2", "name": "Bob"}, + {"_key": "doc3", "name": "Charlie"}, + } + for _, doc := range docs { + resp, err := col.CreateDocument(ctx, doc) + require.NoError(t, err) + require.NotNil(t, resp) + } + + // Give Arango some time to flush + time.Sleep(200 * time.Millisecond) + // Attempt to dump the collection + if serverRole == arangodb.ServerRoleSingle { + resp, err := client.Dump(ctx, db.Name(), arangodb.ReplicationDumpParams{ + BatchID: batch.ID, + Collection: col.Name(), + }) + require.NoError(t, err) + require.GreaterOrEqual(t, len(resp), 0) + } else { + t.Skipf("Dump only allowed for single server deployments. This is a %s server", serverRole) + } + }) + }) + + t.Run("DeleteBatch", func(t *testing.T) { + err := client.DeleteBatch(ctx, db.Name(), dbServer, batch.ID) + require.NoError(t, err) + }) + }) + }) +} + +func Test_LoggerState(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + WithDatabase(t, client, nil, func(db arangodb.Database) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole is %s\n", serverRole) + + var dbServer *string + if serverRole == arangodb.ServerRoleCoordinator { + clusterHealth, err := client.Health(ctx) // Ensure the client is healthy + require.NoError(t, err) + for id, db := range clusterHealth.Health { + if db.Role == arangodb.ServerRoleDBServer { + s := string(id) + dbServer = &s + break + } + } + } + resp, err := client.LoggerState(ctx, db.Name(), dbServer) + require.NoError(t, err) + require.NotNil(t, resp) + require.NotEmpty(t, resp.State) + require.NotEmpty(t, resp.Server) + require.GreaterOrEqual(t, len(resp.Clients), 0) + }) + }) + }) +} + +func Test_LoggerFirstTick(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + WithDatabase(t, client, nil, func(db arangodb.Database) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole is %s\n", serverRole) + + if serverRole == arangodb.ServerRoleCoordinator { + t.Skipf("Not supported on Coordinators (role: %s)", serverRole) + } + + resp, err := client.LoggerFirstTick(ctx, db.Name()) + require.NoError(t, err) + require.NotNil(t, resp) + require.NotEmpty(t, resp.FirstTick) + }) + }) + }) +} + +func Test_LoggerTickRange(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + WithDatabase(t, client, nil, func(db arangodb.Database) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole is %s\n", serverRole) + + if serverRole == arangodb.ServerRoleCoordinator { + t.Skipf("Not supported on Coordinators (role: %s)", serverRole) + } + + resp, err := client.LoggerTickRange(ctx, db.Name()) + require.NoError(t, err) + require.NotNil(t, resp) + }) + }) + }) +} + +func Test_GetApplierConfig(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole is %s\n", serverRole) + + if serverRole == arangodb.ServerRoleCoordinator { + t.Skipf("Not supported on Coordinators (role: %s)", serverRole) + } + db, err := client.GetDatabase(ctx, "_system", nil) + require.NoError(t, err) + t.Run("Running applier config with setting global:true", func(t *testing.T) { + + resp, err := client.GetApplierConfig(ctx, db.Name(), utils.NewType(false)) + require.NoError(t, err) + require.NotNil(t, resp) + }) + t.Run("Running applier config with setting global:nil", func(t *testing.T) { + resp, err := client.GetApplierConfig(ctx, db.Name(), nil) + require.NoError(t, err) + require.NotNil(t, resp) + }) + }) + }) +} + +func Test_UpdateApplierConfig(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole is %s\n", serverRole) + + if serverRole == arangodb.ServerRoleCoordinator { + t.Skipf("Not supported on Coordinators (role: %s)", serverRole) + } + db, err := client.GetDatabase(ctx, "_system", nil) + require.NoError(t, err) + t.Run("Update applier config with setting global:true", func(t *testing.T) { + resp, err := client.UpdateApplierConfig(ctx, db.Name(), utils.NewType(true), arangodb.ApplierOptions{ + ChunkSize: utils.NewType(1234), + AutoStart: utils.NewType(true), + Endpoint: utils.NewType("tcp://127.0.0.1:8529"), + Database: utils.NewType(db.Name()), + Username: utils.NewType("root"), + }) + require.NoError(t, err) + require.NotNil(t, resp) + }) + t.Run("Update applier config with setting global:false", func(t *testing.T) { + resp, err := client.UpdateApplierConfig(ctx, db.Name(), utils.NewType(false), arangodb.ApplierOptions{ + ChunkSize: utils.NewType(2596), + AutoStart: utils.NewType(false), + Endpoint: utils.NewType("tcp://127.0.0.1:8529"), + Database: utils.NewType(db.Name()), + }) + require.NoError(t, err) + require.NotNil(t, resp) + }) + }) + }) +} + +func Test_ApplierStart(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole is %s\n", serverRole) + time.Sleep(1 * time.Second) + if serverRole == arangodb.ServerRoleCoordinator { + t.Skipf("Not supported on Coordinators (role: %s)", serverRole) + } + + db, err := client.GetDatabase(ctx, "_system", nil) + require.NoError(t, err) + + batch, err := client.CreateNewBatch(ctx, db.Name(), nil, utils.NewType(true), arangodb.CreateNewBatchOptions{ + Ttl: 600, + }) + require.NoError(t, err) + require.NotNil(t, batch) + t.Run("Update applier config with setting global:true", func(t *testing.T) { + resp, err := client.UpdateApplierConfig(ctx, db.Name(), utils.NewType(true), arangodb.ApplierOptions{ + ChunkSize: utils.NewType(2596), + AutoStart: utils.NewType(false), + Endpoint: utils.NewType("tcp://127.0.0.1:8529"), + Database: utils.NewType(db.Name()), + }) + require.NoError(t, err) + require.NotNil(t, resp) + }) + t.Logf("Batch ID: %s", batch.ID) + t.Run("Applier Start with query params", func(t *testing.T) { + resp, err := client.ApplierStart(ctx, db.Name(), utils.NewType(true), utils.NewType(batch.ID)) + require.NoError(t, err) + require.NotNil(t, resp) + // Log useful debug info + t.Logf("Applier start:\n running=%v\n phase=%s\n message=%s\n failedConnects=%d", + *resp.State.Running, + *resp.State.Phase, + *resp.State.Progress.Message, + *resp.State.Progress.FailedConnects, + ) + }) + t.Run("Applier_State_with_query_params", func(t *testing.T) { + ctx := context.Background() + + state, err := client.GetApplierState(ctx, db.Name(), utils.NewType(true)) + require.NoError(t, err, "failed to get applier state") + require.NotNil(t, state.State) + + // Log useful debug info + t.Logf("Applier state:\n running=%v\n phase=%s\n message=%s\n failedConnects=%d", + *state.State.Running, + *state.State.Phase, + *state.State.Progress.Message, + *state.State.Progress.FailedConnects, + ) + }) + t.Run("Applier Stop with query params", func(t *testing.T) { + resp, err := client.ApplierStop(ctx, db.Name(), utils.NewType(true)) + require.NoError(t, err) + require.NotNil(t, resp) + // Log useful debug info + t.Logf("Applier stop:\n running=%v\n phase=%s\n message=%s\n failedConnects=%d", + *resp.State.Running, + *resp.State.Phase, + *resp.State.Progress.Message, + *resp.State.Progress.FailedConnects, + ) + }) + t.Run("Update applier config with out query params", func(t *testing.T) { + resp, err := client.UpdateApplierConfig(ctx, db.Name(), nil, arangodb.ApplierOptions{ + ChunkSize: utils.NewType(2596), + AutoStart: utils.NewType(false), + Endpoint: utils.NewType("tcp://127.0.0.1:8529"), + Database: utils.NewType(db.Name()), + }) + require.NoError(t, err) + require.NotNil(t, resp) + }) + t.Logf("Batch ID: %s", batch.ID) + t.Run("Applier Start with out query params", func(t *testing.T) { + resp, err := client.ApplierStart(ctx, db.Name(), nil, nil) + require.NoError(t, err) + require.NotNil(t, resp) + // Log useful debug info + t.Logf("Applier start:\n running=%v\n phase=%s\n message=%s\n failedConnects=%d", + *resp.State.Running, + *resp.State.Phase, + *resp.State.Progress.Message, + *resp.State.Progress.FailedConnects, + ) + }) + t.Run("Applier State with out query params", func(t *testing.T) { + ctx := context.Background() + + state, err := client.GetApplierState(ctx, db.Name(), nil) + require.NoError(t, err, "failed to get applier state") + require.NotNil(t, state.State) + + // Log useful debug info + t.Logf("Applier state:\n running=%v\n phase=%s\n message=%s\n failedConnects=%d", + *state.State.Running, + *state.State.Phase, + *state.State.Progress.Message, + *state.State.Progress.FailedConnects, + ) + }) + t.Run("Applier Stop with out query params", func(t *testing.T) { + resp, err := client.ApplierStop(ctx, db.Name(), nil) + require.NoError(t, err) + require.NotNil(t, resp) + // Log useful debug info + t.Logf("Applier stop:\n running=%v\n phase=%s\n message=%s\n failedConnects=%d", + *resp.State.Running, + *resp.State.Phase, + *resp.State.Progress.Message, + *resp.State.Progress.FailedConnects, + ) + }) + }) + }) +} + +func Test_GetReplicationServerId(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + db, err := client.GetDatabase(ctx, "_system", nil) + require.NoError(t, err) + t.Run("Get replication server ID", func(t *testing.T) { + resp, err := client.GetReplicationServerId(ctx, db.Name()) + require.NoError(t, err) + require.NotNil(t, resp) + t.Logf("Replication Server ID: %s", resp) + }) + }) + }) +} + +func Test_MakeFollower(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole is %s\n", serverRole) + + if serverRole == arangodb.ServerRoleCoordinator { + t.Skipf("Not supported on Coordinators (role: %s)", serverRole) + } + db, err := client.GetDatabase(ctx, "_system", nil) + require.NoError(t, err) + t.Run("Make Follower", func(t *testing.T) { + resp, err := client.MakeFollower(ctx, db.Name(), arangodb.ApplierOptions{ + ChunkSize: utils.NewType(1234), + Endpoint: utils.NewType("tcp://127.0.0.1:8529"), + Database: utils.NewType(db.Name()), + Username: utils.NewType("root"), + }) + require.NoError(t, err) + require.NotNil(t, resp) + }) + }) + }) +} + +func Test_GetWALReplicationEndpoints(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + db, err := client.GetDatabase(ctx, "_system", nil) + require.NoError(t, err) + + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole is %s\n", serverRole) + if serverRole == arangodb.ServerRoleCoordinator { + t.Skipf("Not supported on Coordinators (role: %s)", serverRole) + } + + t.Run("Get WAL range", func(t *testing.T) { + resp, err := client.GetWALRange(ctx, db.Name()) + require.NoError(t, err) + require.NotNil(t, resp) + }) + + t.Run("Get WAL last tick", func(t *testing.T) { + resp, err := client.GetWALLastTick(ctx, db.Name()) + require.NoError(t, err) + require.NotNil(t, resp) + }) + + WithCollectionV2(t, db, nil, func(coll arangodb.Collection) { + // WAL range before inserts + rangeResp, err := client.GetWALRange(ctx, db.Name()) + require.NoError(t, err) + fromTick, err := strconv.ParseInt(rangeResp.TickMax, 10, 64) + require.NoError(t, err) + t.Logf("Starting fromTick: %d\n", fromTick) + t.Run("Update applier config with out query params", func(t *testing.T) { + resp, err := client.UpdateApplierConfig(ctx, db.Name(), nil, arangodb.ApplierOptions{ + Endpoint: utils.NewType("tcp://127.0.0.1:8529"), + Database: utils.NewType(db.Name()), + Verbose: utils.NewType(true), + }) + require.NoError(t, err) + require.NotNil(t, resp) + }) + t.Run("Applier Start with out query params", func(t *testing.T) { + resp, err := client.ApplierStart(ctx, db.Name(), nil, nil) + require.NoError(t, err) + require.NotNil(t, resp) + }) + // Insert docs + t.Run("Inserting 5 documents", func(t *testing.T) { + for i := 0; i < 5; i++ { + resp, err := coll.CreateDocument(ctx, map[string]string{"foo": fmt.Sprintf("bar-%d", i)}) + require.NoError(t, err) + require.NotNil(t, resp) + } + }) + // Force sync and check WAL range again + time.Sleep(500 * time.Millisecond) // Increase sleep time + t.Run("Get WAL Tail with query params", func(t *testing.T) { + tailResp, err := client.GetWALTail(ctx, db.Name(), + &arangodb.WALTailOptions{ + Global: utils.NewType(true), + From: utils.NewType(fromTick), + ChunkSize: utils.NewType(1024 * 1024), + LastScanned: utils.NewType(0), + }) + require.NoError(t, err) + require.GreaterOrEqual(t, len(tailResp), 0) + }) + + t.Run("Applier Stop with out query params", func(t *testing.T) { + resp, err := client.ApplierStop(ctx, db.Name(), nil) + require.NoError(t, err) + require.NotNil(t, resp) + }) + }) + }) + }) +} + +func Test_RebuildShardRevisionTree(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + // Version checking + if os.Getenv("TEST_CONNECTION") == "vst" { + skipBelowVersion(client, ctx, "3.8", t) + } + + // Role check + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole is %s\n", serverRole) + + if serverRole != arangodb.ServerRoleDBServer { + t.Skipf("Not supported on role: %s", serverRole) + } + + // Check for DB existence + db, err := client.GetDatabase(ctx, "_system", nil) + require.NoError(t, err) + + WithCollectionV2(t, db, nil, func(col arangodb.Collection) { + // Insert documents + docs := []map[string]interface{}{ + {"_key": "doc1", "name": "Alice"}, + {"_key": "doc2", "name": "Bob"}, + {"_key": "doc3", "name": "Charlie"}, + } + for _, doc := range docs { + resp, err := col.CreateDocument(ctx, doc) + require.NoError(t, err) + require.NotNil(t, resp) + } + + var shardId arangodb.ShardID + shards, err := col.Shards(ctx, true) + require.NoError(t, err) + require.NotNil(t, shards) + + for existingShardId := range shards.Shards { + shardId = existingShardId + break + } + // Call Rebuild Shard Revision Tree + err = client.RebuildShardRevisionTree(ctx, db.Name(), shardId) + require.NoError(t, err) + }) + }) + }) +} +func Test_ListDocumentRevisionsInRange(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + // Version checking + if os.Getenv("TEST_CONNECTION") == "vst" { + skipBelowVersion(client, ctx, "3.8", t) + } + // Role check + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole: %s", serverRole) + + if serverRole == arangodb.ServerRoleCoordinator { + t.Skipf("Not supported on Coordinators (role: %s)", serverRole) + } + + // Check for DB existence + db, err := client.GetDatabase(ctx, "_system", nil) + require.NoError(t, err) + + WithCollectionV2(t, db, nil, func(col arangodb.Collection) { + var revs []string + // Insert documents + for i := 1; i <= 10; i++ { + doc := map[string]interface{}{ + "_key": fmt.Sprintf("doc%d", i), + "name": fmt.Sprintf("User %d", i), + } + resp, err := col.CreateDocument(ctx, doc) + require.NoError(t, err) + require.NotNil(t, resp) + revs = append(revs, resp.Rev) + } + require.NotEmpty(t, revs) + time.Sleep(500 * time.Millisecond) + + // Create a replication batch + state := utils.NewType(true) + batch, err := client.CreateNewBatch(ctx, db.Name(), nil, state, arangodb.CreateNewBatchOptions{Ttl: 300}) + require.NoError(t, err) + require.NotNil(t, batch) + require.NotEmpty(t, batch.ID) + + // Prepare pairs for ListDocumentRevisionsInRange + var opts [][2]string + for i := 0; i < len(revs)-1; i++ { + opts = append(opts, [2]string{revs[i], revs[i+1]}) + } + + // Call ListDocumentRevisionsInRange + revIds, err := client.ListDocumentRevisionsInRange(ctx, db.Name(), arangodb.RevisionQueryParams{ + BatchId: batch.ID, + Collection: col.Name(), + }, opts) + require.NoError(t, err) + require.NotNil(t, revIds) + }) + }) + }) +} + +func Test_FetchRevisionDocuments(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + // Version checking + if os.Getenv("TEST_CONNECTION") == "vst" { + skipBelowVersion(client, ctx, "3.8", t) + } + + // Role check + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole: %s", serverRole) + + if serverRole == arangodb.ServerRoleCoordinator { + t.Skipf("Not supported on Coordinators (role: %s)", serverRole) + } + + // Check for DB existence + db, err := client.GetDatabase(ctx, "_system", nil) + require.NoError(t, err) + + WithCollectionV2(t, db, nil, func(col arangodb.Collection) { + var revs []string + // Insert documents + docs := []map[string]interface{}{} + docs = append(docs, map[string]interface{}{"_key": "doc1", "name": "Alice"}) + docs = append(docs, map[string]interface{}{"_key": "doc2", "subjects": []string{"English", "Maths"}}) + docs = append(docs, map[string]interface{}{"_key": "doc3", "age": 30, "active": true}) + docs = append(docs, map[string]interface{}{"_key": "doc4", "profile": map[string]interface{}{"city": "Berlin", "country": "Germany"}}) + + for _, doc := range docs { + resp, err := col.CreateDocument(ctx, doc) + require.NoError(t, err) + require.NotNil(t, resp) + revs = append(revs, resp.Rev) + } + require.NotEmpty(t, revs) + time.Sleep(500 * time.Millisecond) + + // Create a replication batch + state := utils.NewType(true) + batch, err := client.CreateNewBatch(ctx, db.Name(), nil, state, arangodb.CreateNewBatchOptions{Ttl: 300}) + require.NoError(t, err) + require.NotNil(t, batch) + require.NotEmpty(t, batch.ID) + + // Call FetchRevisionDocuments + revDocs, err := client.FetchRevisionDocuments(ctx, db.Name(), arangodb.RevisionQueryParams{ + BatchId: batch.ID, + Collection: col.Name(), + }, revs) + require.NoError(t, err) + require.NotNil(t, revDocs) + }) + }) + }) +} + +func Test_StartReplicationSync(t *testing.T) { + Wrap(t, func(t *testing.T, client arangodb.Client) { + withContextT(t, defaultTestTimeout, func(ctx context.Context, tb testing.TB) { + // Version checking + if os.Getenv("TEST_CONNECTION") == "vst" { + skipBelowVersion(client, ctx, "3.8", t) + } + + // Role check + serverRole, err := client.ServerRole(ctx) + require.NoError(t, err) + t.Logf("ServerRole: %s", serverRole) + + if serverRole == arangodb.ServerRoleCoordinator || serverRole == arangodb.ServerRoleSingle { + t.Skipf("Replication sync not supported on role: %s", serverRole) + } + + // Check for DB existence + db, err := client.GetDatabase(ctx, "_system", nil) + require.NoError(t, err) + + opts := arangodb.ReplicationSyncOptions{ + Endpoint: "http+tcp://127.0.0.1:8529", + Username: "root", + // Password: "passwd", + // RestrictType: utils.NewType("include"), + // RestrictCollections: utils.NewType([]string{"airports"}), + } + + result, err := client.StartReplicationSync(ctx, db.Name(), opts) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(result.Collections) == 0 { + t.Errorf("expected collections in result") + } + if result.LastLogTick == "" { + t.Errorf("expected lastLogTick in result") + } + }) + }) +}