Skip to content

Commit eccd4c6

Browse files
Fixed credentials being exposed when running ps command; added flow control feature to prevent destination OOMs
1 parent d84e7b2 commit eccd4c6

File tree

12 files changed

+540
-37
lines changed

12 files changed

+540
-37
lines changed

.DS_Store

6 KB
Binary file not shown.

cmd/.DS_Store

0 Bytes
Binary file not shown.

cmd/docStreamer/main.go

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/Percona-Lab/percona-docstreamer/internal/config"
3434
"github.com/Percona-Lab/percona-docstreamer/internal/dbops"
3535
"github.com/Percona-Lab/percona-docstreamer/internal/discover"
36+
"github.com/Percona-Lab/percona-docstreamer/internal/flow"
3637
"github.com/Percona-Lab/percona-docstreamer/internal/logging"
3738
"github.com/Percona-Lab/percona-docstreamer/internal/pid"
3839
"github.com/Percona-Lab/percona-docstreamer/internal/status"
@@ -221,12 +222,8 @@ func startAction(cmd *cobra.Command, args []string) {
221222
os.Exit(1)
222223
}
223224

224-
var hiddenargs = []string{"run",
225-
"--docdb-user", docdbUser,
226-
"--mongo-user", mongoUser,
227-
"--docdb-pass", docdbPass,
228-
"--mongo-pass", mongoPass,
229-
}
225+
var hiddenargs = []string{"run"}
226+
230227
if config.Cfg.Migration.Destroy {
231228
hiddenargs = append(hiddenargs, "--destroy")
232229
}
@@ -241,6 +238,15 @@ func startAction(cmd *cobra.Command, args []string) {
241238
runCmd.SysProcAttr = &syscall.SysProcAttr{
242239
Setsid: true, // Detach
243240
}
241+
242+
// Inherit the current environment
243+
runCmd.Env = os.Environ()
244+
// Append credentials explicitly
245+
runCmd.Env = append(runCmd.Env, fmt.Sprintf("DOCDB_USER=%s", docdbUser))
246+
runCmd.Env = append(runCmd.Env, fmt.Sprintf("MONGO_USER=%s", mongoUser))
247+
runCmd.Env = append(runCmd.Env, fmt.Sprintf("DOCDB_PASS=%s", docdbPass))
248+
runCmd.Env = append(runCmd.Env, fmt.Sprintf("MONGO_PASS=%s", mongoPass))
249+
244250
if err := runCmd.Start(); err != nil {
245251
logging.PrintError(fmt.Sprintf("Failed to launch background process: %v", err), 0)
246252
os.Exit(1)
@@ -539,9 +545,25 @@ func runMigrationProcess(cmd *cobra.Command, args []string) {
539545
}()
540546

541547
docdbUser, _ := cmd.Flags().GetString("docdb-user")
548+
if docdbUser == "" {
549+
docdbUser = os.Getenv("DOCDB_USER")
550+
}
551+
542552
mongoUser, _ := cmd.Flags().GetString("mongo-user")
553+
if mongoUser == "" {
554+
mongoUser = os.Getenv("MONGO_USER")
555+
}
556+
543557
docdbPass, _ := cmd.Flags().GetString("docdb-pass")
558+
if docdbPass == "" {
559+
docdbPass = os.Getenv("DOCDB_PASS")
560+
}
561+
544562
mongoPass, _ := cmd.Flags().GetString("mongo-pass")
563+
if mongoPass == "" {
564+
mongoPass = os.Getenv("MONGO_PASS")
565+
}
566+
545567
destroy, _ := cmd.Flags().GetBool("destroy")
546568

547569
docdbURI := config.Cfg.BuildDocDBURI(docdbUser, docdbPass)
@@ -578,6 +600,7 @@ func runMigrationProcess(cmd *cobra.Command, args []string) {
578600
logging.PrintError(err.Error(), 0)
579601
return
580602
}
603+
581604
// --- Disconnect with timeout ---
582605
defer func() {
583606
dCtx, dCancel := context.WithTimeout(context.Background(), 5*time.Second)
@@ -647,8 +670,15 @@ func runMigrationProcess(cmd *cobra.Command, args []string) {
647670
valStore := validator.NewStore(targetClient)
648671

649672
apiServer = api.NewServer(config.Cfg.Migration.StatusHTTPPort)
673+
// Initialize Status Manager BEFORE Flow Manager
650674
statusManager = status.NewManager(targetClient, false)
651675

676+
// --- FLOW CONTROL ---
677+
// Pass mongoUser and mongoPass so we can connect to shards if discovered
678+
flowManager := flow.NewManager(targetClient, statusManager, mongoUser, mongoPass)
679+
flowManager.Start()
680+
defer flowManager.Stop()
681+
652682
validationManager := validator.NewManager(sourceClient, targetClient, tracker, valStore, statusManager)
653683
defer validationManager.Close()
654684

@@ -730,7 +760,7 @@ func runMigrationProcess(cmd *cobra.Command, args []string) {
730760
logging.PrintPhase("4", "FULL DATA LOAD")
731761
statusManager.SetState("running", "Initial Sync (Full Load)")
732762

733-
_, err = launchFullLoadWorkers(ctx, sourceClient, targetClient, toRun, statusManager, checkpointManager)
763+
_, err = launchFullLoadWorkers(ctx, sourceClient, targetClient, toRun, statusManager, checkpointManager, flowManager)
734764
if err != nil {
735765
if err != context.Canceled {
736766
statusManager.SetError(err.Error())
@@ -785,6 +815,8 @@ func runMigrationProcess(cmd *cobra.Command, args []string) {
785815

786816
startAt = resumeAt
787817
statusManager.SetCloneCompleted()
818+
819+
statusManager.SetInitialSyncCompleted(0)
788820
}
789821

790822
if ctx.Err() != nil {
@@ -805,6 +837,7 @@ func runMigrationProcess(cmd *cobra.Command, args []string) {
805837
tracker,
806838
valStore,
807839
validationManager,
840+
flowManager,
808841
)
809842

810843
cdcManager.Start(ctx)
@@ -824,7 +857,7 @@ func extractDBNames(collections []discover.CollectionInfo) []string {
824857
return dbNames
825858
}
826859

827-
func launchFullLoadWorkers(ctx context.Context, source, target *mongo.Client, collections []discover.CollectionInfo, statusMgr *status.Manager, checkpointMgr *checkpoint.Manager) (bson.Timestamp, error) {
860+
func launchFullLoadWorkers(ctx context.Context, source, target *mongo.Client, collections []discover.CollectionInfo, statusMgr *status.Manager, checkpointMgr *checkpoint.Manager, flowMgr *flow.Manager) (bson.Timestamp, error) {
828861
jobs := make(chan discover.CollectionInfo, len(collections))
829862
for _, c := range collections {
830863
jobs <- c
@@ -844,7 +877,7 @@ func launchFullLoadWorkers(ctx context.Context, source, target *mongo.Client, co
844877
}
845878
ns := collInfo.Namespace
846879
logging.PrintStep(fmt.Sprintf("[Worker %d] Starting full load for %s", workerID, ns), 0)
847-
copier := cloner.NewCopyManager(source, target, collInfo, statusMgr, checkpointMgr, config.Cfg.Migration.CheckpointDocID)
880+
copier := cloner.NewCopyManager(source, target, collInfo, statusMgr, checkpointMgr, config.Cfg.Migration.CheckpointDocID, flowMgr)
848881
docCount, _, err := copier.Do(ctx)
849882
start := time.Now()
850883
logging.LogFullLoadOp(start, ns, docCount, err)

config.yaml

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,11 +143,15 @@ cloner:
143143
# Increase writers to achieve 1:8 ratio with readers, this typically provides an effective performance
144144
# however it is important to keep in mind this is per collection, so tune this wisely (see max_concurrent_workers)
145145
# e.g. If num_read_workers = 4 then set num_insert_workers to 32 (e.g. 4 * 8 = 32)
146+
# CRITICAL: Reduce insert workers to limit concurrent writes.
147+
# High values here can OOM a shard by flooding it with connections/data.
148+
# Recommendation: 4-8 per collection
146149
num_insert_workers: 8
147150

148151
# read_batch_size: Number of documents per read batch
149152
read_batch_size: 1000
150153

154+
# Reduce batch size to lower memory pressure per request on the cluster.
151155
# insert_batch_size: Number of documents per insert batch
152156
insert_batch_size: 1000
153157

@@ -205,4 +209,41 @@ cdc:
205209
# write_timeout_ms: The maximum time to wait for a BulkWrite operation to complete.
206210
# If the network hangs, this ensures the worker doesn't freeze forever.
207211
# Default: 30000 (30 seconds)
208-
write_timeout_ms: 30000
212+
write_timeout_ms: 30000
213+
214+
# -----------------------------------------------
215+
# Adaptive Flow Control (Throttling)
216+
# -----------------------------------------------
217+
flow_control:
218+
# Enabled: If true, docStreamer continuously monitors the target MongoDB's health.
219+
# It polls `db.serverStatus()` on the target (and all shards if discovered) every second.
220+
# If any node shows signs of overload (high queues or memory usage),
221+
# docStreamer will temporarily pause fetching new data from the source.
222+
enabled: true
223+
224+
# Check Interval: How often (in milliseconds) to poll the target database for its status.
225+
# Default: 1000 (1 second)
226+
check_interval_ms: 1000
227+
228+
# Target Max Queued Ops: The safety limit for the Target's Global Lock Queue.
229+
# Source Metric: db.serverStatus().globalLock.currentQueue.total
230+
# Checks BOTH Mongos and all backend Shards.
231+
# If ANY node has more than this many operations queued, docStreamer pauses.
232+
# Default: 50
233+
target_max_queued_ops: 50
234+
235+
# Target Max Resident MB: The safety limit for Target RAM usage.
236+
# Source Metric: db.serverStatus().mem.resident
237+
# If the target's Resident Memory exceeds this value (in Megabytes), docStreamer will pause.
238+
# This is critical for preventing OOM (Out of Memory) kills on the target host.
239+
# SHARDED CLUSTERS NOTE:
240+
# If the migration user exists on the backend shards, this setting PROTECTS THE SHARDS directly.
241+
# If the user only exists on Mongos, this setting only protects the Mongos router.
242+
# Set to 0 to disable memory-based throttling.
243+
# Default: 0 (Disabled)
244+
target_max_resident_mb: 0
245+
246+
# Pause Duration: How long (in milliseconds) to sleep when an overload is detected.
247+
# The application will re-check the status after this duration.
248+
# Default: 500
249+
pause_duration_ms: 500

faq.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,43 @@ Indexes created on the source **after** migration starts (during CDC) are **not*
212212

213213
---
214214

215+
### Adaptive Flow Control
216+
217+
**Q: Why do I need Flow Control? Can't I just increase the number of workers?**
218+
**A:** Increasing workers speeds up data *reading*, but it can easily overwhelm your target database. If you push data faster than MongoDB can write it to disk, you risk:
219+
1. **Lock Contention:** Operations pile up, causing latency spikes for other applications using the cluster.
220+
2. **Memory Saturation:** MongoDB may consume all available RAM, leading to OOM (Out of Memory) crashes by the Operating System.
221+
3. **Replica Set Lag:** Secondary nodes may fall too far behind, risking data consistency or triggering elections.
222+
223+
Flow Control acts as an intelligent "brake," ensuring migration speed never exceeds the target's physical capacity.
224+
225+
**Q: What exactly happens when the destination is overloaded?**
226+
**A:** When `docStreamer` detects that the target is stressed (high queue depth or memory usage):
227+
1. **Status Change:** The application enters a `PAUSED` state and logs a warning (e.g., `[WARN] THROTTLING PAUSED`).
228+
2. **Source Throttle:** It stops fetching new documents from DocumentDB immediately.
229+
3. **Connection Keep-Alive:** Existing connections remain open, but no new write operations are sent.
230+
4. **Auto-Resume:** The background monitor continues checking every second. As soon as the target's metrics drop below your configured thresholds, the migration automatically resumes exactly where it left off.
231+
232+
**Q: Does this work with Sharded Clusters?**
233+
**A:** **Yes.** This is a Cluster-Aware feature.
234+
* **Standard tools** often only monitor the `mongos` router, which usually reports "healthy" metrics (0 queues) even when backend shards are struggling.
235+
* **docStreamer** automatically discovers your cluster topology and opens direct monitoring connections to **every backend shard**.
236+
* **Protection:** If *any* single shard becomes overloaded, the entire migration pauses. This prevents a "hot shard" scenario where one specific shard causes a cluster-wide failure.
237+
238+
**Q: I see "targetQueuedOps: 0" in the status output. Is Flow Control working?**
239+
**A:** **Yes, this is normal.** MongoDB is highly optimized. A value of `0` means your database is handling the current write load instantly without any backlog.
240+
* **Healthy:** `0 - 10`
241+
* **Warning:** `10 - 50` (Micro-bursts)
242+
* **Critical:** `> 50` (Sustained saturation)
243+
Flow Control only steps in when it sees the critical values you defined in `config.yaml`.
244+
245+
**Q: Will Flow Control slow down my migration?**
246+
**A:** It might slightly extend the total duration, but it **prevents failure**.
247+
* *Without Flow Control:* You might migrate 20% faster, but risk crashing the production database or forcing a restart due to errors.
248+
* *With Flow Control:* You get the maximum *safe* speed your hardware can handle, with zero manual intervention required to prevent crashes.
249+
250+
---
251+
215252
## Q: What happens if the source environment is a Sharded Amazon DocumentDB cluster? Will docStreamer work?
216253
**A:** docStreamer is designed to work with sharded source environments, as long as the connection details point to the cluster's router/endpoint (equivalent to a mongos instance in a MongoDB sharded cluster).
217254

internal/.DS_Store

0 Bytes
Binary file not shown.

internal/cdc/cdc.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/Percona-Lab/percona-docstreamer/internal/checkpoint"
1313
"github.com/Percona-Lab/percona-docstreamer/internal/config"
14+
"github.com/Percona-Lab/percona-docstreamer/internal/flow"
1415
"github.com/Percona-Lab/percona-docstreamer/internal/logging"
1516
"github.com/Percona-Lab/percona-docstreamer/internal/status"
1617
"github.com/Percona-Lab/percona-docstreamer/internal/validator"
@@ -39,6 +40,7 @@ type CDCManager struct {
3940
excludeDBs map[string]bool
4041
excludeColls map[string]bool
4142
fatalErrorChan chan error
43+
flowMgr *flow.Manager
4244
}
4345

4446
// shouldRetry checks if an error is a transient network or connection issue
@@ -57,7 +59,7 @@ func shouldRetry(err error) bool {
5759
strings.Contains(msg, "server selection error")
5860
}
5961

60-
func NewManager(source, target *mongo.Client, checkpointDocID string, startAt bson.Timestamp, checkpoint *checkpoint.Manager, statusMgr *status.Manager, tracker *validator.InFlightTracker, store *validator.Store, valMgr *validator.Manager) *CDCManager {
62+
func NewManager(source, target *mongo.Client, checkpointDocID string, startAt bson.Timestamp, checkpoint *checkpoint.Manager, statusMgr *status.Manager, tracker *validator.InFlightTracker, store *validator.Store, valMgr *validator.Manager, flowMgr *flow.Manager) *CDCManager {
6163
resumeTS, found := checkpoint.GetResumeTimestamp(context.Background(), checkpointDocID)
6264

6365
if !found {
@@ -114,7 +116,8 @@ func NewManager(source, target *mongo.Client, checkpointDocID string, startAt bs
114116
checkpointDocID: checkpointDocID,
115117
excludeDBs: excludeDBs,
116118
excludeColls: excludeColls,
117-
fatalErrorChan: make(chan error, workerCount+1), // Buffer slightly to prevent blocking
119+
fatalErrorChan: make(chan error, workerCount+1),
120+
flowMgr: flowMgr,
118121
}
119122
mgr.totalEventsApplied.Store(initialEvents)
120123
return mgr
@@ -368,6 +371,10 @@ func (m *CDCManager) processChanges(ctx context.Context) {
368371
defer retryTicker.Stop()
369372

370373
for {
374+
// CHECK THROTTLE
375+
if m.flowMgr != nil {
376+
m.flowMgr.Wait()
377+
}
371378
select {
372379
case <-ctx.Done():
373380
for i, writer := range m.bulkWriters {

internal/cloner/copy.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/Percona-Lab/percona-docstreamer/internal/checkpoint"
1313
"github.com/Percona-Lab/percona-docstreamer/internal/config"
1414
"github.com/Percona-Lab/percona-docstreamer/internal/discover"
15+
"github.com/Percona-Lab/percona-docstreamer/internal/flow"
1516
"github.com/Percona-Lab/percona-docstreamer/internal/indexer"
1617
"github.com/Percona-Lab/percona-docstreamer/internal/logging"
1718
"github.com/Percona-Lab/percona-docstreamer/internal/status"
@@ -196,16 +197,19 @@ type CopyManager struct {
196197
checkpointMgr *checkpoint.Manager
197198
checkpointDocID string
198199
initialMaxKey bson.RawValue
200+
flowMgr *flow.Manager
199201
}
200202

201-
func NewCopyManager(source, target *mongo.Client, collInfo discover.CollectionInfo, statusMgr *status.Manager, checkpointMgr *checkpoint.Manager, checkpointDocID string) *CopyManager {
203+
// Update: NewCopyManager now accepts flowMgr
204+
func NewCopyManager(source, target *mongo.Client, collInfo discover.CollectionInfo, statusMgr *status.Manager, checkpointMgr *checkpoint.Manager, checkpointDocID string, flowMgr *flow.Manager) *CopyManager {
202205
return &CopyManager{
203206
sourceClient: source,
204207
targetClient: target,
205208
collInfo: collInfo,
206209
statusMgr: statusMgr,
207210
checkpointMgr: checkpointMgr,
208211
checkpointDocID: checkpointDocID,
212+
flowMgr: flowMgr,
209213
}
210214
}
211215

@@ -486,6 +490,10 @@ func (cm *CopyManager) readWorker(
486490
logging.PrintStep(fmt.Sprintf("[%s] Read Worker %d started", ns, workerID), 4)
487491

488492
for segment := range segmentQueue {
493+
// CHECK THROTTLE BEFORE READING
494+
if cm.flowMgr != nil {
495+
cm.flowMgr.Wait()
496+
}
489497
var minVal, maxVal interface{}
490498
if err := segment.Min.Unmarshal(&minVal); err != nil {
491499
logging.PrintError(fmt.Sprintf("Min unmarshal failed: %v", err), 0)

internal/config/config.go

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,15 +94,25 @@ type ValidationConfig struct {
9494
QueueSize int `mapstructure:"queue_size"`
9595
}
9696

97+
// FlowControlConfig holds settings for adaptive throttling
98+
type FlowControlConfig struct {
99+
Enabled bool `mapstructure:"enabled"`
100+
CheckIntervalMS int `mapstructure:"check_interval_ms"`
101+
TargetMaxQueuedOps int `mapstructure:"target_max_queued_ops"`
102+
TargetMaxResidentMB int `mapstructure:"target_max_resident_mb"`
103+
PauseDurationMS int `mapstructure:"pause_duration_ms"`
104+
}
105+
97106
// Config holds all configuration for the application
98107
type Config struct {
99-
Logging LoggingConfig `mapstructure:"logging"`
100-
DocDB DocDBConfig `mapstructure:"docdb"`
101-
Mongo MongoConfig `mapstructure:"mongo"`
102-
Migration MigrationConfig `mapstructure:"migration"`
103-
Cloner ClonerConfig `mapstructure:"cloner"`
104-
CDC CDCConfig `mapstructure:"cdc"`
105-
Validation ValidationConfig `mapstructure:"validation"`
108+
Logging LoggingConfig `mapstructure:"logging"`
109+
DocDB DocDBConfig `mapstructure:"docdb"`
110+
Mongo MongoConfig `mapstructure:"mongo"`
111+
Migration MigrationConfig `mapstructure:"migration"`
112+
Cloner ClonerConfig `mapstructure:"cloner"`
113+
CDC CDCConfig `mapstructure:"cdc"`
114+
Validation ValidationConfig `mapstructure:"validation"`
115+
FlowControl FlowControlConfig `mapstructure:"flow_control"`
106116
}
107117

108118
// Cfg is the global config object
@@ -175,6 +185,13 @@ func LoadConfig() {
175185
viper.SetDefault("validation.max_retries", 3)
176186
viper.SetDefault("validation.queue_size", 2000)
177187

188+
// Flow control Defaults
189+
viper.SetDefault("flow_control.enabled", true)
190+
viper.SetDefault("flow_control.check_interval_ms", 1000)
191+
viper.SetDefault("flow_control.target_max_queued_ops", 50) // Conservative default
192+
viper.SetDefault("flow_control.target_max_resident_mb", 0) // 0 = disabled
193+
viper.SetDefault("flow_control.pause_duration_ms", 500)
194+
178195
// --- 2. Read config file ---
179196
viper.SetConfigName("config")
180197
viper.SetConfigType("yaml")

0 commit comments

Comments
 (0)