Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions agent/agents/postgres/pgstatmonitor/pgstatmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ const (
pgStatMonitorVersion21PG15
pgStatMonitorVersion21PG16
pgStatMonitorVersion21PG17
pgStatMonitorVersion23PG13
pgStatMonitorVersion23PG14
pgStatMonitorVersion23PG15
pgStatMonitorVersion23PG16
pgStatMonitorVersion23PG17
pgStatMonitorVersion23PG18
)

const (
Expand Down Expand Up @@ -173,6 +179,7 @@ func getPGVersion(q *reform.Querier) (pgVersion, error) {
return pgVersion(parsed), err
}

//nolint:mnd
func getPGMonitorVersion(q *reform.Querier) (pgStatMonitorVersion, pgStatMonitorPrerelease, error) {
var result string
err := q.QueryRow(fmt.Sprintf("SELECT /* %s */ pg_stat_monitor_version()", queryTag)).Scan(&result)
Expand All @@ -191,6 +198,21 @@ func getPGMonitorVersion(q *reform.Querier) (pgStatMonitorVersion, pgStatMonitor

var version pgStatMonitorVersion
switch {
case vPGSM.Core().GreaterThanOrEqual(v23):
switch {
case vPG >= 18:
version = pgStatMonitorVersion23PG18
case vPG >= 17:
version = pgStatMonitorVersion23PG17
case vPG >= 16:
version = pgStatMonitorVersion23PG16
case vPG >= 15:
version = pgStatMonitorVersion23PG15
case vPG >= 14:
version = pgStatMonitorVersion23PG14
default:
version = pgStatMonitorVersion23PG13
}
case vPGSM.Core().GreaterThanOrEqual(v21):
switch {
case vPG >= 17:
Expand Down Expand Up @@ -638,6 +660,16 @@ func (m *PGStatMonitorQAN) makeBuckets(current, cache map[time.Time]map[string]*
{float32(currentPSM.WalFpi - prevPSM.WalFpi), &mb.Postgresql.MWalFpiSum, &mb.Postgresql.MWalFpiCnt},
{float32(currentPSM.WalRecords - prevPSM.WalRecords), &mb.Postgresql.MWalRecordsSum, &mb.Postgresql.MWalRecordsCnt},
{float32(currentPSM.WalBytes - prevPSM.WalBytes), &mb.Postgresql.MWalBytesSum, &mb.Postgresql.MWalBytesCnt},
{float32(currentPSM.WalBuffersFull - prevPSM.WalBuffersFull), &mb.Postgresql.MWalBuffersFullSum, &mb.Postgresql.MWalBuffersFullCnt},

{
float32(currentPSM.ParallelWorkersToLaunch - prevPSM.ParallelWorkersToLaunch),
&mb.Postgresql.MParallelWorkersToLaunchSum, &mb.Postgresql.MParallelWorkersToLaunchCnt,
},
{
float32(currentPSM.ParallelWorkersLaunched - prevPSM.ParallelWorkersLaunched),
&mb.Postgresql.MParallelWorkersLaunchedSum, &mb.Postgresql.MParallelWorkersLaunchedCnt,
},

// convert milliseconds to seconds
{float32(currentPSM.TotalExecTime-prevPSM.TotalExecTime) / 1000, &mb.Common.MQueryTimeSum, &mb.Common.MQueryTimeCnt},
Expand All @@ -649,8 +681,6 @@ func (m *PGStatMonitorQAN) makeBuckets(current, cache map[time.Time]map[string]*
// convert microseconds to seconds
{float32(cpuSysTime) / 1000000, &mb.Postgresql.MCpuSysTimeSum, &mb.Postgresql.MCpuSysTimeCnt},
{float32(cpuUserTime) / 1000000, &mb.Postgresql.MCpuUserTimeSum, &mb.Postgresql.MCpuUserTimeCnt},

{float32(currentPSM.WalBytes - prevPSM.WalBytes), &mb.Postgresql.MWalBytesSum, &mb.Postgresql.MWalBytesCnt},
} {
if p.value != 0 {
*p.sum = p.value
Expand Down
17 changes: 16 additions & 1 deletion agent/agents/postgres/pgstatmonitor/pgstatmonitor_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
)

var (
v23 = version.Must(version.NewVersion("2.3.0"))
v21 = version.Must(version.NewVersion("2.1.0"))
v20 = version.Must(version.NewVersion("2.0.0"))
v11 = version.Must(version.NewVersion("1.1.0"))
Expand Down Expand Up @@ -99,6 +100,10 @@ type pgStatMonitor struct {
WalRecords int64
WalFpi int64
WalBytes int64
WalBuffersFull int64

ParallelWorkersToLaunch int64
ParallelWorkersLaunched int64

// reform related fields
pointers []interface{}
Expand Down Expand Up @@ -226,6 +231,13 @@ func newPgStatMonitorStructs(vPGSM pgStatMonitorVersion, vPG pgVersion) (*pgStat
}
}

if vPGSM >= pgStatMonitorVersion23PG18 {
fields = append(fields,
field{info: parse.FieldInfo{Name: "WalBuffersFull", Type: "int64", Column: "wal_buffers_full"}, pointer: &s.WalBuffersFull},
field{info: parse.FieldInfo{Name: "ParrallelWorkersToLaunch", Type: "int64", Column: "parallel_workers_to_launch"}, pointer: &s.ParallelWorkersToLaunch},
field{info: parse.FieldInfo{Name: "ParallelWorkersLaunched", Type: "int64", Column: "parallel_workers_launched"}, pointer: &s.ParallelWorkersLaunched})
}

if vPGSM >= pgStatMonitorVersion08 && vPGSM < pgStatMonitorVersion20PG12 {
fields = append(fields,
field{info: parse.FieldInfo{Name: "BucketStartTimeString", Type: "string", Column: "bucket_start_time"}, pointer: &s.BucketStartTimeString},
Expand Down Expand Up @@ -289,7 +301,7 @@ func (v *pgStatMonitorAllViewType) NewStruct() reform.Struct { //nolint:ireturn

// String returns a string representation of this struct or record.
func (s pgStatMonitor) String() string {
res := make([]string, 51)
res := make([]string, 54) //nolint:mnd
res[0] = "Bucket: " + reform.Inspect(s.Bucket, true)
res[1] = "BucketStartTime: " + reform.Inspect(s.BucketStartTime, true)
res[2] = "UserID: " + reform.Inspect(s.UserID, true)
Expand Down Expand Up @@ -341,6 +353,9 @@ func (s pgStatMonitor) String() string {
res[48] = "WalRecords: " + reform.Inspect(s.WalRecords, true)
res[49] = "WalFpi: " + reform.Inspect(s.WalFpi, true)
res[50] = "WalBytes: " + reform.Inspect(s.WalBytes, true)
res[51] = "WalBuffersFull: " + reform.Inspect(s.WalBuffersFull, true)
res[52] = "ParrallelWorkersToLaunch: " + reform.Inspect(s.ParallelWorkersToLaunch, true)
res[53] = "ParallelWorkersLaunched: " + reform.Inspect(s.ParallelWorkersLaunched, true)
return strings.Join(res, ", ")
}

Expand Down
66 changes: 36 additions & 30 deletions agent/agents/postgres/pgstatmonitor/pgstatmonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,36 +551,42 @@ func TestPGStatMonitorSchema(t *testing.T) {
Tables: []string{fmt.Sprintf("public.%s", tableName)},
},
Postgresql: &agentv1.MetricsBucket_PostgreSQL{
MSharedBlkReadTimeCnt: float32(n),
MSharedBlkReadTimeSum: actual.Postgresql.MSharedBlkReadTimeSum,
MLocalBlkReadTimeCnt: actual.Postgresql.MLocalBlkReadTimeCnt,
MLocalBlkReadTimeSum: actual.Postgresql.MLocalBlkReadTimeSum,
MSharedBlksReadCnt: actual.Postgresql.MSharedBlksReadCnt,
MSharedBlksReadSum: actual.Postgresql.MSharedBlksReadSum,
MSharedBlksWrittenCnt: actual.Postgresql.MSharedBlksWrittenCnt,
MSharedBlksWrittenSum: actual.Postgresql.MSharedBlksWrittenSum,
MSharedBlksDirtiedCnt: actual.Postgresql.MSharedBlksDirtiedCnt,
MSharedBlksDirtiedSum: actual.Postgresql.MSharedBlksDirtiedSum,
MSharedBlksHitCnt: actual.Postgresql.MSharedBlksHitCnt,
MSharedBlksHitSum: actual.Postgresql.MSharedBlksHitSum,
MRowsCnt: float32(n),
MRowsSum: float32(n),
MCpuUserTimeCnt: actual.Postgresql.MCpuUserTimeCnt,
MCpuUserTimeSum: actual.Postgresql.MCpuUserTimeSum,
MCpuSysTimeCnt: actual.Postgresql.MCpuSysTimeCnt,
MCpuSysTimeSum: actual.Postgresql.MCpuSysTimeSum,
CmdType: insertCMDType,
HistogramItems: actual.Postgresql.HistogramItems,
MPlansCallsSum: actual.Postgresql.MPlansCallsSum,
MPlansCallsCnt: actual.Postgresql.MPlansCallsCnt,
MPlanTimeCnt: actual.Postgresql.MPlanTimeCnt,
MPlanTimeSum: actual.Postgresql.MPlanTimeSum,
MPlanTimeMin: actual.Postgresql.MPlanTimeMin,
MPlanTimeMax: actual.Postgresql.MPlanTimeMax,
MWalBytesCnt: actual.Postgresql.MWalBytesCnt,
MWalBytesSum: actual.Postgresql.MWalBytesSum,
MWalRecordsSum: actual.Postgresql.MWalRecordsSum,
MWalRecordsCnt: actual.Postgresql.MWalRecordsCnt,
MSharedBlkReadTimeCnt: float32(n),
MSharedBlkReadTimeSum: actual.Postgresql.MSharedBlkReadTimeSum,
MLocalBlkReadTimeCnt: actual.Postgresql.MLocalBlkReadTimeCnt,
MLocalBlkReadTimeSum: actual.Postgresql.MLocalBlkReadTimeSum,
MSharedBlksReadCnt: actual.Postgresql.MSharedBlksReadCnt,
MSharedBlksReadSum: actual.Postgresql.MSharedBlksReadSum,
MSharedBlksWrittenCnt: actual.Postgresql.MSharedBlksWrittenCnt,
MSharedBlksWrittenSum: actual.Postgresql.MSharedBlksWrittenSum,
MSharedBlksDirtiedCnt: actual.Postgresql.MSharedBlksDirtiedCnt,
MSharedBlksDirtiedSum: actual.Postgresql.MSharedBlksDirtiedSum,
MSharedBlksHitCnt: actual.Postgresql.MSharedBlksHitCnt,
MSharedBlksHitSum: actual.Postgresql.MSharedBlksHitSum,
MRowsCnt: float32(n),
MRowsSum: float32(n),
MCpuUserTimeCnt: actual.Postgresql.MCpuUserTimeCnt,
MCpuUserTimeSum: actual.Postgresql.MCpuUserTimeSum,
MCpuSysTimeCnt: actual.Postgresql.MCpuSysTimeCnt,
MCpuSysTimeSum: actual.Postgresql.MCpuSysTimeSum,
CmdType: insertCMDType,
HistogramItems: actual.Postgresql.HistogramItems,
MPlansCallsSum: actual.Postgresql.MPlansCallsSum,
MPlansCallsCnt: actual.Postgresql.MPlansCallsCnt,
MPlanTimeCnt: actual.Postgresql.MPlanTimeCnt,
MPlanTimeSum: actual.Postgresql.MPlanTimeSum,
MPlanTimeMin: actual.Postgresql.MPlanTimeMin,
MPlanTimeMax: actual.Postgresql.MPlanTimeMax,
MWalBytesCnt: actual.Postgresql.MWalBytesCnt,
MWalBytesSum: actual.Postgresql.MWalBytesSum,
MWalRecordsSum: actual.Postgresql.MWalRecordsSum,
MWalRecordsCnt: actual.Postgresql.MWalRecordsCnt,
MWalBuffersFullCnt: actual.Postgresql.MWalBuffersFullCnt,
MWalBuffersFullSum: actual.Postgresql.MWalBuffersFullSum,
MParallelWorkersToLaunchCnt: actual.Postgresql.MParallelWorkersToLaunchCnt,
MParallelWorkersToLaunchSum: actual.Postgresql.MParallelWorkersToLaunchSum,
MParallelWorkersLaunchedCnt: actual.Postgresql.MParallelWorkersLaunchedCnt,
MParallelWorkersLaunchedSum: actual.Postgresql.MParallelWorkersLaunchedSum,
},
}
tests.AssertBucketsEqual(t, expected, actual)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,12 @@ func (m *PGStatStatementsQAN) Run(ctx context.Context) {
}()

// add current stat statements to cache, so they are not send as new on first iteration with incorrect timestamps
var current statementsMap
var running bool
var err error
m.changes <- agents.Change{Status: inventoryv1.AgentStatus_AGENT_STATUS_STARTING}

if current, _, err := m.getStatStatementsExtended(ctx); err == nil {
if current, _, err = m.getStatStatementsExtended(ctx); err == nil {
if err = m.statementsCache.Set(current); err == nil {
m.l.Debugf("Got %d initial stat statements.", len(current))
running = true
Expand Down
65 changes: 61 additions & 4 deletions api/agent/v1/collector.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions api/agent/v1/collector.pb.validate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions api/agent/v1/collector.proto
Original file line number Diff line number Diff line change
Expand Up @@ -384,11 +384,23 @@ message MetricsBucket {
// Total bytes of WAL (Write-ahead logging) records.
float m_wal_bytes_sum = 37;
float m_wal_bytes_cnt = 38;
// Total number of times WAL buffers become full.
float m_wal_buffers_full_sum = 54;
float m_wal_buffers_full_cnt = 55;

// Sum, count, min, max of plan time.
float m_plan_time_sum = 39;
float m_plan_time_cnt = 40;
float m_plan_time_min = 41;
float m_plan_time_max = 42;

// Total number of parallel workers to launch.
float m_parallel_workers_to_launch_sum = 56;
float m_parallel_workers_to_launch_cnt = 57;
// Total number of parallel workers launched.
float m_parallel_workers_launched_sum = 58;
float m_parallel_workers_launched_cnt = 59;

// Metrics skipped due to different bucket_time in pg_stat_monitor (1min in PMM, 5min in pg_stat_monitor):
// min_time, max_time, mean_time
// plan_mean_time
Expand Down
Loading
Loading