@@ -104,8 +104,10 @@ type MetricColumnAttrs struct {
104
104
}
105
105
106
106
type MetricAttrs struct {
107
- IsInstanceLevel bool `yaml:"is_instance_level"`
108
- MetricStorageName string `yaml:"metric_storage_name"`
107
+ IsInstanceLevel bool `yaml:"is_instance_level"`
108
+ MetricStorageName string `yaml:"metric_storage_name"`
109
+ ExtensionVersionOverrides []ExtensionOverrides `yaml:"extension_version_based_overrides"`
110
+ IsPrivate bool `yaml:"is_private"` // used only for extension overrides currently and ignored otherwise
109
111
}
110
112
111
113
type MetricVersionProperties struct {
@@ -164,13 +166,24 @@ type DBVersionMapEntry struct {
164
166
RealDbname string
165
167
SystemIdentifier string
166
168
IsSuperuser bool // if true and no helpers are installed, use superuser SQL version of metric if available
169
+ Extensions map [string ]decimal.Decimal
167
170
}
168
171
169
172
type ExistingPartitionInfo struct {
170
173
StartTime time.Time
171
174
EndTime time.Time
172
175
}
173
176
177
+ type ExtensionOverrides struct {
178
+ TargetMetric string `yaml:"target_metric"`
179
+ ExpectedExtensionVersions []ExtensionInfo `yaml:"expected_extension_versions"`
180
+ }
181
+
182
+ type ExtensionInfo struct {
183
+ ExtName string `yaml:"ext_name"`
184
+ ExtMinVersion decimal.Decimal `yaml:"ext_min_version"`
185
+ }
186
+
174
187
const EPOCH_COLUMN_NAME string = "epoch_ns" // this column (epoch in nanoseconds) is expected in every metric query
175
188
const TAG_PREFIX string = "tag_"
176
189
const METRIC_DEFINITION_REFRESH_TIME int64 = 120 // min time before checking for new/changed metric definitions
@@ -220,6 +233,7 @@ var metric_def_map_lock = sync.RWMutex{}
220
233
var host_metric_interval_map = make (map [string ]float64 ) // [db1_metric] = 30
221
234
var db_pg_version_map = make (map [string ]DBVersionMapEntry )
222
235
var db_pg_version_map_lock = sync.RWMutex {}
236
+ var db_get_pg_version_map_lock = make (map [string ]sync.RWMutex ) // synchronize initial PG version detection to 1 instance for each defined host
223
237
var monitored_db_cache map [string ]MonitoredDatabase
224
238
var monitored_db_cache_lock sync.RWMutex
225
239
var monitored_db_conn_cache map [string ]* sqlx.DB = make (map [string ]* sqlx.DB )
@@ -262,6 +276,7 @@ var instanceMetricCache = make(map[string]([]map[string]interface{})) // [db
262
276
var instanceMetricCacheLock = sync.RWMutex {}
263
277
var instanceMetricCacheTimestamp = make (map [string ]time.Time ) // [dbUnique+metric]last_fetch_time
264
278
var instanceMetricCacheTimestampLock = sync.RWMutex {}
279
+ var MinExtensionInfoAvailable , _ = decimal .NewFromString ("9.1" )
265
280
266
281
func IsPostgresDBType (dbType string ) bool {
267
282
if dbType == DBTYPE_BOUNCER {
@@ -1763,6 +1778,7 @@ func MetricsPersister(data_store string, storage_ch <-chan []MetricStoreMessage)
1763
1778
1764
1779
func DBGetPGVersion (dbUnique string , dbType string , noCache bool ) (DBVersionMapEntry , error ) {
1765
1780
var ver DBVersionMapEntry
1781
+ var verNew DBVersionMapEntry
1766
1782
var ok bool
1767
1783
sql := `
1768
1784
select /* pgwatch2_generated */ (regexp_matches(
@@ -1776,35 +1792,46 @@ func DBGetPGVersion(dbUnique string, dbType string, noCache bool) (DBVersionMapE
1776
1792
join pg_catalog.pg_roles b on (m.roleid = b.oid)
1777
1793
where m.member = r.oid and b.rolname = 'rds_superuser') as rolsuper
1778
1794
from pg_roles r where rolname = session_user;`
1795
+ sql_extensions := `select extname::text, (regexp_matches(extversion, $$\d+\.?\d+?$$))[1]::text as extversion from pg_extension order by 1;`
1779
1796
1780
1797
db_pg_version_map_lock .RLock ()
1798
+ get_ver_lock , ok := db_get_pg_version_map_lock [dbUnique ]
1799
+ if ! ok {
1800
+ log .Fatal ("db_get_pg_version_map_lock uninitialized" )
1801
+ }
1781
1802
ver , ok = db_pg_version_map [dbUnique ]
1782
1803
db_pg_version_map_lock .RUnlock ()
1783
1804
1784
1805
if ! noCache && ok && ver .LastCheckedOn .After (time .Now ().Add (time .Minute * - 2 )) { // use cached version for 2 min
1785
1806
//log.Debugf("using cached postgres version %s for %s", ver.Version.String(), dbUnique)
1786
1807
return ver , nil
1787
1808
} else {
1788
- log .Debug ("determining DB version for" , dbUnique )
1809
+ get_ver_lock .Lock () // limit to 1 concurrent version info fetch per DB
1810
+ defer get_ver_lock .Unlock ()
1811
+ log .Debugf ("[%s] determining DB version and recovery status..." , dbUnique )
1812
+
1813
+ if verNew .Extensions == nil {
1814
+ verNew .Extensions = make (map [string ]decimal.Decimal )
1815
+ }
1789
1816
1790
1817
if dbType == DBTYPE_BOUNCER {
1791
1818
data , err , _ := DBExecReadByDbUniqueName (dbUnique , "" , false , "show version" )
1792
1819
if err != nil {
1793
- return ver , err
1820
+ return verNew , err
1794
1821
}
1795
1822
if len (data ) == 0 {
1796
1823
// surprisingly pgbouncer 'show version' outputs in pre v1.12 is emitted as 'NOTICE' which cannot be accessed from Go lib/pg
1797
- ver .Version , _ = decimal .NewFromString ("0" )
1798
- ver .VersionStr = "0"
1824
+ verNew .Version , _ = decimal .NewFromString ("0" )
1825
+ verNew .VersionStr = "0"
1799
1826
} else {
1800
1827
rPBVer := regexp .MustCompile ("\\ d+\\ .+\\ d+" ) // "PgBouncer 1.12.0"
1801
1828
matches := rPBVer .FindStringSubmatch (data [0 ]["version" ].(string ))
1802
1829
if len (matches ) != 1 {
1803
1830
log .Errorf ("Unexpected PgBouncer version input: %s" , data [0 ]["version" ].(string ))
1804
1831
return ver , errors .New (fmt .Sprintf ("Unexpected PgBouncer version input: %s" , data [0 ]["version" ].(string )))
1805
1832
}
1806
- ver .VersionStr = matches [0 ]
1807
- ver .Version , _ = decimal .NewFromString (matches [0 ])
1833
+ verNew .VersionStr = matches [0 ]
1834
+ verNew .Version , _ = decimal .NewFromString (matches [0 ])
1808
1835
}
1809
1836
} else {
1810
1837
data , err , _ := DBExecReadByDbUniqueName (dbUnique , "" , useConnPooling , sql )
@@ -1816,33 +1843,51 @@ func DBGetPGVersion(dbUnique string, dbType string, noCache bool) (DBVersionMapE
1816
1843
return ver , nil
1817
1844
}
1818
1845
}
1819
- ver .Version , _ = decimal .NewFromString (data [0 ]["ver" ].(string ))
1820
- ver .VersionStr = data [0 ]["ver" ].(string )
1821
- ver .IsInRecovery = data [0 ]["pg_is_in_recovery" ].(bool )
1822
- ver .RealDbname = data [0 ]["current_database" ].(string )
1846
+ verNew .Version , _ = decimal .NewFromString (data [0 ]["ver" ].(string ))
1847
+ verNew .VersionStr = data [0 ]["ver" ].(string )
1848
+ verNew .IsInRecovery = data [0 ]["pg_is_in_recovery" ].(bool )
1849
+ verNew .RealDbname = data [0 ]["current_database" ].(string )
1823
1850
1824
- if ver .Version .GreaterThanOrEqual (decimal .NewFromFloat (10 )) && addSystemIdentifier {
1825
- log .Debugf ("determining system identifier version for %s ( ver: %v)" , dbUnique , ver .VersionStr )
1851
+ if verNew .Version .GreaterThanOrEqual (decimal .NewFromFloat (10 )) && addSystemIdentifier {
1852
+ log .Debugf ("[%s] determining system identifier version (pg ver: %v)" , dbUnique , verNew .VersionStr )
1826
1853
data , err , _ := DBExecReadByDbUniqueName (dbUnique , "" , useConnPooling , sql_sysid )
1827
1854
if err == nil && len (data ) > 0 {
1828
- ver .SystemIdentifier = data [0 ]["system_identifier" ].(string )
1855
+ verNew .SystemIdentifier = data [0 ]["system_identifier" ].(string )
1829
1856
}
1830
1857
}
1831
1858
1832
- log .Debugf ("determining if monitoring user is a superuser for %s " , dbUnique )
1859
+ log .Debugf ("[%s] determining if monitoring user is a superuser... " , dbUnique )
1833
1860
data , err , _ = DBExecReadByDbUniqueName (dbUnique , "" , useConnPooling , sql_su )
1834
1861
if err == nil {
1835
- ver .IsSuperuser = data [0 ]["rolsuper" ].(bool )
1862
+ verNew .IsSuperuser = data [0 ]["rolsuper" ].(bool )
1863
+ }
1864
+ log .Debugf ("[%s] superuser=%v" , dbUnique , verNew .IsSuperuser )
1865
+
1866
+ if verNew .Version .GreaterThanOrEqual (MinExtensionInfoAvailable ) {
1867
+ //log.Debugf("[%s] determining installed extensions info...", dbUnique)
1868
+ data , err , _ = DBExecReadByDbUniqueName (dbUnique , "" , useConnPooling , sql_extensions )
1869
+ if err != nil {
1870
+ log .Errorf ("[%s] failed to determine installed extensions info: %v" , dbUnique , err )
1871
+ } else {
1872
+ for _ , dr := range data {
1873
+ extver , err := decimal .NewFromString (dr ["extversion" ].(string ))
1874
+ if err != nil {
1875
+ log .Errorf ("[%s] failed to determine extension version info for extension %s: %v" , dbUnique , dr ["extname" ], err )
1876
+ continue
1877
+ }
1878
+ verNew .Extensions [dr ["extname" ].(string )] = extver
1879
+ }
1880
+ log .Debugf ("[%s] installed extensions: %+v" , dbUnique , verNew .Extensions )
1881
+ }
1836
1882
}
1837
- log .Debugf ("superuser=%v" , ver .IsSuperuser )
1838
1883
}
1839
1884
1840
- ver .LastCheckedOn = time .Now ()
1885
+ verNew .LastCheckedOn = time .Now ()
1841
1886
db_pg_version_map_lock .Lock ()
1842
- db_pg_version_map [dbUnique ] = ver
1887
+ db_pg_version_map [dbUnique ] = verNew
1843
1888
db_pg_version_map_lock .Unlock ()
1844
1889
}
1845
- return ver , nil
1890
+ return verNew , nil
1846
1891
}
1847
1892
1848
1893
// Need to define a sort interface as Go doesn't have support for Numeric/Decimal
@@ -1897,7 +1942,43 @@ func GetMetricVersionProperties(metric string, vme DBVersionMapEntry, metricDefM
1897
1942
return MetricVersionProperties {}, errors .New (fmt .Sprintf ("no suitable SQL found for metric \" %s\" , version \" %s\" " , metric , vme .VersionStr ))
1898
1943
}
1899
1944
1900
- return mdm [metric ][best_ver ], nil
1945
+ ret , _ := mdm [metric ][best_ver ]
1946
+
1947
+ // check if SQL def. override defined for some specific extension version and replace the metric SQL-s if so
1948
+ if ret .MetricAttrs .ExtensionVersionOverrides != nil && len (ret .MetricAttrs .ExtensionVersionOverrides ) > 0 {
1949
+ if vme .Extensions != nil && len (vme .Extensions ) > 0 {
1950
+ log .Debugf ("[%s] extension version based override request found: %+v" , metric , ret .MetricAttrs .ExtensionVersionOverrides )
1951
+ for _ , extOverride := range ret .MetricAttrs .ExtensionVersionOverrides {
1952
+ var matching = true
1953
+ for _ , extVer := range extOverride .ExpectedExtensionVersions { // "natural" sorting of metric definition assumed
1954
+ installedExtVer , ok := vme .Extensions [extVer .ExtName ]
1955
+ if ! ok || ! installedExtVer .GreaterThanOrEqual (extVer .ExtMinVersion ) {
1956
+ matching = false
1957
+ }
1958
+ }
1959
+ if matching { // all defined extensions / versions (if many) need to match
1960
+ _ , ok := mdm [extOverride .TargetMetric ]
1961
+ if ! ok {
1962
+ log .Warningf ("extension based override metric not found for metric %s. substitute metric name: %s" , metric , extOverride .TargetMetric )
1963
+ continue
1964
+ }
1965
+ mvp , err := GetMetricVersionProperties (extOverride .TargetMetric , vme , mdm , )
1966
+ if err != nil {
1967
+ log .Warningf ("undefined extension based override for metric %s, substitute metric name: %s, version: %s not found" , metric , extOverride .TargetMetric , best_ver )
1968
+ continue
1969
+ }
1970
+ log .Debugf ("overriding metric %s based on the extension_version_based_overrides metric attribute with %s:%s" , metric , extOverride .TargetMetric , best_ver )
1971
+ if mvp .Sql != "" {
1972
+ ret .Sql = mvp .Sql
1973
+ }
1974
+ if mvp .SqlSU != "" {
1975
+ ret .SqlSU = mvp .SqlSU
1976
+ }
1977
+ }
1978
+ }
1979
+ }
1980
+ }
1981
+ return ret , nil
1901
1982
}
1902
1983
1903
1984
func DetectSprocChanges (dbUnique string , vme DBVersionMapEntry , storage_ch chan <- []MetricStoreMessage , host_state map [string ]map [string ]string ) ChangeDetectionResults {
@@ -2219,7 +2300,7 @@ func GetAllRecoMetricsForVersion(vme DBVersionMapEntry) map[string]MetricVersion
2219
2300
mvp , err := GetMetricVersionProperties (m , vme , metric_def_map )
2220
2301
if err != nil {
2221
2302
log .Warningf ("Could not get SQL definition for metric \" %s\" , PG %s" , m , vme .VersionStr )
2222
- } else {
2303
+ } else if ! mvp . MetricAttrs . IsPrivate {
2223
2304
mvp_map [m ] = mvp
2224
2305
}
2225
2306
}
@@ -2233,7 +2314,7 @@ func CheckForRecommendationsAndStore(dbUnique string, vme DBVersionMapEntry) ([]
2233
2314
start_time_epoch_ns := time .Now ().UnixNano ()
2234
2315
2235
2316
reco_metrics := GetAllRecoMetricsForVersion (vme )
2236
- log .Infof ("Processing %d recommendation metrics for \" %s\" " , len (reco_metrics ), dbUnique )
2317
+ log .Debugf ("Processing %d recommendation metrics for \" %s\" " , len (reco_metrics ), dbUnique )
2237
2318
2238
2319
for m , mvp := range reco_metrics {
2239
2320
data , err , duration := DBExecReadByDbUniqueName (dbUnique , m , useConnPooling , mvp .Sql )
@@ -2775,7 +2856,9 @@ func ReadMetricDefinitionMapFromPostgres(failOnError bool) (map[string]map[decim
2775
2856
left join
2776
2857
pgwatch2.metric_attribute on (ma_metric_name = m_name)
2777
2858
where
2778
- m_is_active`
2859
+ m_is_active
2860
+ order by
2861
+ 1, 2`
2779
2862
2780
2863
log .Info ("updating metrics definitons from ConfigDB..." )
2781
2864
data , err := DBExecRead (configDb , CONFIGDB_IDENT , sql )
@@ -3125,7 +3208,7 @@ func ParseMetricAttrsFromString(jsonAttrs string) MetricAttrs {
3125
3208
func ReadMetricsFromFolder (folder string , failOnError bool ) (map [string ]map [decimal.Decimal ]MetricVersionProperties , error ) {
3126
3209
metrics_map := make (map [string ]map [decimal.Decimal ]MetricVersionProperties )
3127
3210
rIsDigitOrPunctuation := regexp .MustCompile ("^[\\ d\\ .]+$" )
3128
- metricNamePattern := "^[a-z0-9_]+$"
3211
+ metricNamePattern := "^[a-z0-9_\\ . ]+$"
3129
3212
rMetricNameFilter := regexp .MustCompile (metricNamePattern )
3130
3213
3131
3214
log .Infof ("Searching for metrics from path %s ..." , folder )
@@ -4022,7 +4105,7 @@ func main() {
4022
4105
_ , exists := db_conn_limiting_channel [db_unique ]
4023
4106
db_conn_limiting_channel_lock .RUnlock ()
4024
4107
4025
- if ! exists { // initialize DB connection limiting structure
4108
+ if ! exists { // new host, initialize DB connection limiting structure
4026
4109
db_conn_limiting_channel_lock .Lock ()
4027
4110
db_conn_limiting_channel [db_unique ] = make (chan bool , MAX_PG_CONNECTIONS_PER_MONITORED_DB )
4028
4111
i := 0
@@ -4032,6 +4115,10 @@ func main() {
4032
4115
i ++
4033
4116
}
4034
4117
db_conn_limiting_channel_lock .Unlock ()
4118
+
4119
+ db_pg_version_map_lock .Lock ()
4120
+ db_get_pg_version_map_lock [db_unique ] = sync.RWMutex {}
4121
+ db_pg_version_map_lock .Unlock ()
4035
4122
}
4036
4123
4037
4124
_ , connectFailedSoFar := failedInitialConnectHosts [db_unique ]
0 commit comments