Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions app/vlinsert/insertutil/common_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
)

Expand Down Expand Up @@ -155,6 +156,7 @@ type LogMessageProcessor interface {
}

type logMessageProcessor struct {
isStreamMode bool
mu sync.Mutex
wg sync.WaitGroup
stopCh chan struct{}
Expand Down Expand Up @@ -183,6 +185,10 @@ func (lmp *logMessageProcessor) initPeriodicFlush() {
case <-lmp.stopCh:
return
case <-ticker.C:
if vlstorage.CanWriteData() != nil {
continue
}

lmp.mu.Lock()
if time.Since(lmp.lastFlushTime) >= d {
lmp.flushLocked()
Expand All @@ -197,6 +203,10 @@ func (lmp *logMessageProcessor) initPeriodicFlush() {
//
// If streamFields is non-nil, then it is used as log stream fields instead of the pre-configured stream fields.
func (lmp *logMessageProcessor) AddRow(timestamp int64, fields, streamFields []logstorage.Field) {
if !lmp.canWriteData() {
lmp.waitUntilStorageIsAvailable()
}

lmp.rowsIngestedTotal.Inc()
n := logstorage.EstimatedJSONRowLen(fields)
lmp.bytesIngestedTotal.Add(n)
Expand Down Expand Up @@ -225,6 +235,23 @@ func (lmp *logMessageProcessor) AddRow(timestamp int64, fields, streamFields []l
}
}

func (lmp *logMessageProcessor) waitUntilStorageIsAvailable() {
t := timerpool.Get(time.Second)
for lmp.canWriteData() {
select {
case <-lmp.stopCh:
timerpool.Put(t)
return
case <-t.C:
timerpool.Put(t)
}
}
}

func (lmp *logMessageProcessor) canWriteData() bool {
return !lmp.isStreamMode || vlstorage.CanWriteData() == nil
}

// InsertRowProcessor is used by native data ingestion protocol parser.
type InsertRowProcessor interface {
// AddInsertRow must add r to the underlying storage.
Expand Down Expand Up @@ -286,8 +313,9 @@ func (cp *CommonParams) NewLogMessageProcessor(protocolName string, isStreamMode
rowsIngestedTotal := metrics.GetOrCreateCounter(fmt.Sprintf("vl_rows_ingested_total{type=%q}", protocolName))
bytesIngestedTotal := metrics.GetOrCreateCounter(fmt.Sprintf("vl_bytes_ingested_total{type=%q}", protocolName))
lmp := &logMessageProcessor{
cp: cp,
lr: lr,
isStreamMode: isStreamMode,
cp: cp,
lr: lr,

rowsIngestedTotal: rowsIngestedTotal,
bytesIngestedTotal: bytesIngestedTotal,
Expand Down
12 changes: 9 additions & 3 deletions app/vlstorage/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,18 +255,24 @@ func processForceFlush(w http.ResponseWriter, r *http.Request) bool {

// CanWriteData returns non-nil error if it cannot write data to vlstorage
func CanWriteData() error {
if localStorage == nil {
// The data can be always written in non-local mode.
if netstorageInsert != nil {
if netstorageInsert.IsBroken() {
return &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("cannot write data to vlstorage because all the storage nodes are broken"),
StatusCode: http.StatusTooManyRequests,
}
}
return nil
}

if localStorage.IsReadOnly() {
if localStorage != nil && localStorage.IsReadOnly() {
return &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("cannot add rows into storage in read-only mode; the storage can be in read-only mode "+
"because of lack of free disk space at -storageDataPath=%s", *storageDataPath),
StatusCode: http.StatusTooManyRequests,
}
}

return nil
}

Expand Down
83 changes: 74 additions & 9 deletions app/vlstorage/netinsert/netinsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package netinsert

import (
"errors"
"flag"
"fmt"
"io"
"net/http"
Expand All @@ -23,6 +24,11 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
)

var (
maxInsertRetries = flag.Int("insert.maxRetries", 20, "The maximum number of retry attempts when sending data to storage nodes. "+
"After exhausting retries, the data is queued in a retry buffer and new ingestion requests are rejected with HTTP 429 until storage nodes recover.")
)

// the maximum size of a single data block sent to storage node.
const maxInsertBlockSize = 2 * 1024 * 1024

Expand All @@ -41,6 +47,10 @@ type Storage struct {

pendingDataBuffers chan *bytesutil.ByteBuffer

retryDataBuffersMu sync.Mutex
needDrainRetryData atomic.Bool
retryDataBuffers []*bytesutil.ByteBuffer

stopCh chan struct{}
wg sync.WaitGroup
}
Expand Down Expand Up @@ -110,6 +120,10 @@ func (sn *storageNode) backgroundFlusher() {
case <-sn.s.stopCh:
return
case <-t.C:
if sn.s.needDrainRetryData.Load() {
sn.flushRetryData()
continue
}
sn.flushPendingData()
}
}
Expand All @@ -126,7 +140,31 @@ func (sn *storageNode) flushPendingData() {
pendingData := sn.grabPendingDataForFlushLocked()
sn.pendingDataMu.Unlock()

sn.mustSendInsertRequest(pendingData)
_ = sn.mustSendInsertRequest(pendingData)
}

func (sn *storageNode) flushRetryData() {
sn.s.retryDataBuffersMu.Lock()
defer sn.s.retryDataBuffersMu.Unlock()

for len(sn.s.retryDataBuffers) > 0 {
lastIdx := len(sn.s.retryDataBuffers) - 1
pendingData := sn.s.retryDataBuffers[lastIdx]
sn.s.retryDataBuffers = sn.s.retryDataBuffers[:lastIdx]
if !sn.mustSendInsertRequest(pendingData) {
return
}
}

sn.s.needDrainRetryData.Store(len(sn.s.retryDataBuffers) > 0)
}

func (sn *storageNode) addRetryData(pendingData *bytesutil.ByteBuffer) {
sn.s.retryDataBuffersMu.Lock()
defer sn.s.retryDataBuffersMu.Unlock()

sn.s.retryDataBuffers = append(sn.s.retryDataBuffers, pendingData)
sn.s.needDrainRetryData.Store(true)
}

func (sn *storageNode) addRow(r *logstorage.InsertRow) {
Expand All @@ -152,7 +190,7 @@ func (sn *storageNode) addRow(r *logstorage.InsertRow) {
bbPool.Put(bb)

if pendingData != nil {
sn.mustSendInsertRequest(pendingData)
_ = sn.mustSendInsertRequest(pendingData)
}
}

Expand All @@ -166,33 +204,51 @@ func (sn *storageNode) grabPendingDataForFlushLocked() *bytesutil.ByteBuffer {
return pendingData
}

func (sn *storageNode) mustSendInsertRequest(pendingData *bytesutil.ByteBuffer) {
// mustSendInsertRequest guarantees that data will be sent to storage nodes or buffered for retry.
// It attempts to send pendingData to storage nodes with retry logic and returns:
// - true: data was handled (successfully sent to a storage node, or operation was cancelled during shutdown)
// - false: all storage nodes are unavailable after maxInsertRetries attempts, data has been added to retry buffer
//
// When this method returns false, it indicates that the storages are temporarily unavailable
// and the data has been queued in the retry buffer for later processing when nodes become available.
// The retry buffer prevents data loss while protecting against infinite memory accumulation.
func (sn *storageNode) mustSendInsertRequest(pendingData *bytesutil.ByteBuffer) (handled bool) {
defer func() {
pendingData.Reset()
sn.s.pendingDataBuffers <- pendingData
if handled {
pendingData.Reset()
sn.s.pendingDataBuffers <- pendingData
}
}()

err := sn.sendInsertRequest(pendingData)
if err == nil {
return
return true
}

if !errors.Is(err, errTemporarilyDisabled) {
logger.Warnf("%s; re-routing the data block to the remaining nodes", err)
}
for !sn.s.sendInsertRequestToAnyNode(pendingData) {

for i := 0; !sn.s.sendInsertRequestToAnyNode(pendingData); i++ {
if *maxInsertRetries > 0 && i >= *maxInsertRetries {
sn.addRetryData(pendingData)
return false
}

logger.Errorf("cannot send pending data to all storage nodes, since all of them are unavailable; re-trying to send the data in a second")

t := timerpool.Get(time.Second)
select {
case <-sn.s.stopCh:
timerpool.Put(t)
logger.Errorf("dropping %d bytes of data, since there are no available storage nodes", pendingData.Len())
return
return true
case <-t.C:
timerpool.Put(t)
}
}

return true
}

func (sn *storageNode) sendInsertRequest(pendingData *bytesutil.ByteBuffer) error {
Expand Down Expand Up @@ -272,13 +328,16 @@ var zstdBufPool bytesutil.ByteBufferPool
// Call MustStop on the returned storage when it is no longer needed.
func NewStorage(addrs []string, authCfgs []*promauth.Config, isTLSs []bool, concurrency int, disableCompression bool) *Storage {
pendingDataBuffers := make(chan *bytesutil.ByteBuffer, concurrency*len(addrs))
for i := 0; i < cap(pendingDataBuffers); i++ {
for range cap(pendingDataBuffers) {
pendingDataBuffers <- &bytesutil.ByteBuffer{}
}

retryDataBuffers := []*bytesutil.ByteBuffer{}

s := &Storage{
disableCompression: disableCompression,
pendingDataBuffers: pendingDataBuffers,
retryDataBuffers: retryDataBuffers,
stopCh: make(chan struct{}),
}

Expand Down Expand Up @@ -307,6 +366,12 @@ func (s *Storage) AddRow(streamHash uint64, r *logstorage.InsertRow) {
sn.addRow(r)
}

// IsBroken returns true if the storage is in a broken state where retry data needs to be drained first.
// When true, it indicates that all storage nodes are temporarily unavailable and data is being buffered for retry.
func (s *Storage) IsBroken() bool {
return s.needDrainRetryData.Load()
}

func (s *Storage) sendInsertRequestToAnyNode(pendingData *bytesutil.ByteBuffer) bool {
startIdx := int(fastrand.Uint32n(uint32(len(s.sns))))
for i := range s.sns {
Expand Down
1 change: 1 addition & 0 deletions docs/victorialogs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta

## tip

* FEATURE: [vlinsert](https://docs.victoriametrics.com/victorialogs/data-ingestion/): add `-insert.maxRetries` command-line flag for limiting retry attempts when sending data to storage nodes. After exhausting retries, data is queued in a retry buffer and new ingestion requests are rejected with HTTP 429 until storage nodes recover. See [#9121](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9121).
* BUGFIX: [data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/): support `-` as a timestamp value, as described in [RFC5424](https://datatracker.ietf.org/doc/html/rfc5424#section-6.2.3).
* FEATURE: [`delete` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#delete-pipe): allow deleting all the fields with common prefix via `... | delete prefix*` syntax.
* FEATURE: [`fields` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#fields-pipe): allow keeping all the fields with common prefix via `... | fields prefix*` syntax.
Expand Down
2 changes: 2 additions & 0 deletions docs/victorialogs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,8 @@ Pass `-help` to VictoriaLogs in order to see the list of supported command-line
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 262144)
-insert.maxQueueDuration duration
The maximum duration to wait in the queue when -maxConcurrentInserts concurrent insert requests are executed (default 1m0s)
-insert.maxRetries int
The maximum number of retry attempts when sending data to storage nodes. After exhausting retries, the data is queued in a retry buffer and new ingestion requests are rejected with HTTP 429 until storage nodes recover. (default 20)
-internStringCacheExpireDuration duration
The expiry duration for caches for interned strings. See https://en.wikipedia.org/wiki/String_interning . See also -internStringMaxLen and -internStringDisableCache (default 6m0s)
-internStringDisableCache
Expand Down