Skip to content
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,7 @@ e2e_image_archive.tar
temp/
tmp/
data/
env/
env/

# Bob Shell session metadata
.bob/
234 changes: 134 additions & 100 deletions pkg/emitter/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,37 @@ import (
"github.com/opencost/opencost/core/pkg/log"
"github.com/opencost/opencost/core/pkg/opencost"
"github.com/opencost/opencost/core/pkg/source"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
// metricQueryFailures tracks the number of failed metric queries by query name
metricQueryFailures = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "finops_agent_metric_query_failures_total",
Help: "Total number of failed metric queries by query name",
},
[]string{"query_name"},
)
)

// queryFuture is an interface that matches the Await method signature
type queryFuture[T any] interface {
Await() ([]*T, error)
}

// awaitWithLog awaits a QueryGroupFuture and logs any errors, incrementing the failure counter
// Returns the result slice
func awaitWithLog[T any](name string, future queryFuture[T]) []*T {
result, err := future.Await()
if err != nil {
log.Warnf("metric query failed for %s: %v", name, err)
metricQueryFailures.WithLabelValues(name).Inc()
}
Comment thread
peatey marked this conversation as resolved.
return result
Comment thread
peatey marked this conversation as resolved.
}

// SnapshotProvider is an interface that defines a prototype for generating `ClusterSnapshot` instances
// leveraging the agent `DataSource`
type SnapshotProvider interface {
Expand Down Expand Up @@ -410,106 +439,111 @@ func snapshotMetrics(mq source.MetricsQuerier, start, end time.Time) (*MetricsSn
resourceQuotaStatusUsedRamLimitAvgFuture := source.WithGroup(grp, mq.QueryResourceQuotaStatusUsedRAMLimitAverage(start, end))
resourceQuotaStatusUsedRamLimitMaxFuture := source.WithGroup(grp, mq.QueryResourceQuotaStatusUsedRAMLimitMax(start, end))

pvActiveMinutes, _ := pvActiveMinutesFuture.Await()
pvUsedAverage, _ := pvUsedAverageFuture.Await()
pvUsedMax, _ := pvUsedMaxFuture.Await()
localStorageActiveMinutes, _ := localStorageActiveMinutesFuture.Await()
localStorageCost, _ := localStorageCostFuture.Await()
localStorageUsedCost, _ := localStorageUsedCostFuture.Await()
localStorageUsedAvg, _ := localStorageUsedAvgFuture.Await()
localStorageUsedMax, _ := localStorageUsedMaxFuture.Await()
localStorageBytes, _ := localStorageBytesFuture.Await()
nodeActiveMinutes, _ := nodeActiveMinutesFuture.Await()
nodeCPUCoresCapacity, _ := nodeCPUCoresCapacityFuture.Await()
nodeCPUCoresAllocatable, _ := nodeCPUCoresAllocatableFuture.Await()
nodeRAMBytesCapacity, _ := nodeRAMBytesCapacityFuture.Await()
nodeRAMBytesAllocatable, _ := nodeRAMBytesAllocatableFuture.Await()
nodeGPUCount, _ := nodeGPUCountFuture.Await()
nodeCPUModeTotal, _ := nodeCPUModeTotalFuture.Await()
nodeIsSpot, _ := nodeIsSpotFuture.Await()
nodeRAMSystemPercent, _ := nodeRAMSystemPercentFuture.Await()
nodeRAMUserPercent, _ := nodeRAMUserPercentFuture.Await()
lbActiveMinutes, _ := lbActiveMinutesFuture.Await()
lbPricePerHr, _ := lbPricePerHrFuture.Await()
clusterUptime, _ := clusterUptimeFuture.Await()
clusterManagementDuration, _ := clusterManagementDurationFuture.Await()
clusterManagementPricePerHr, _ := clusterManagementPricePerHrFuture.Await()
pods, _ := podsFuture.Await()
podsUID, _ := podsUIDFuture.Await()
ramBytesAllocated, _ := ramBytesAllocatedFuture.Await()
ramRequests, _ := ramRequestsFuture.Await()
ramLimits, _ := ramLimitsFuture.Await()
ramUsageAvg, _ := ramUsageAvgFuture.Await()
ramUsageMax, _ := ramUsageMaxFuture.Await()
nodeRAMPricePerGiBHr, _ := nodeRAMPricePerGiBHrFuture.Await()
cpuCoresAllocated, _ := cpuCoresAllocatedFuture.Await()
cpuRequests, _ := cpuRequestsFuture.Await()
cpuLimits, _ := cpuLimitsFuture.Await()
cpuUsageAvg, _ := cpuUsageAvgFuture.Await()
cpuUsageMax, _ := cpuUsageMaxFuture.Await()
nodeCPUPricePerHr, _ := nodeCPUPricePerHrFuture.Await()
gpusAllocated, _ := gpusAllocatedFuture.Await()
gpusRequested, _ := gpusRequestedFuture.Await()
gpusUsageAvg, _ := gpusUsageAvgFuture.Await()
gpusUsageMax, _ := gpusUsageMaxFuture.Await()
nodeGPUPricePerHr, _ := nodeGPUPricePerHrFuture.Await()
gpuInfo, _ := gpuInfoFuture.Await()
isGPUShared, _ := isGPUSharedFuture.Await()
podPVCAllocation, _ := podPVCAllocationFuture.Await()
pvcBytesRequested, _ := pvcBytesRequestedFuture.Await()
pvcInfo, _ := pvcInfoFuture.Await()
pvBytes, _ := pvBytesFuture.Await()
pvPricePerGiBHour, _ := pvPricePerGiBHourFuture.Await()
pvInfo, _ := pvInfoFuture.Await()
netZoneGiB, _ := netZoneGiBFuture.Await()
netZonePricePerGiB, _ := netZonePricePerGiBFuture.Await()
netRegionGiB, _ := netRegionGiBFuture.Await()
netRegionPricePerGiB, _ := netRegionPricePerGiBFuture.Await()
netInternetGiB, _ := netInternetGiBFuture.Await()
netInternetPricePerGiB, _ := netInternetPricePerGiBFuture.Await()
netInternetServiceGiB, _ := netInternetServiceGiBFuture.Await()
netNatGatewayPricePerGiB, _ := netNatGatewayPricePerGiBFuture.Await()
netNatGatewayGiB, _ := netNatGatewayGiBFuture.Await()
netTransferBytes, _ := netTransferBytesFuture.Await()
netZoneIngressGiB, _ := netZoneIngressGiBFuture.Await()
netRegionIngressGiB, _ := netRegionIngressGiBFuture.Await()
netInternetIngressGiB, _ := netInternetIngressGiBFuture.Await()
netInternetServiceIngressGiB, _ := netInternetServiceIngressGiBFuture.Await()
netNatGatewayIngressPricePerGiB, _ := netNatGatewayIngressPricePerGiBFuture.Await()
netNatGatewayIngressGiB, _ := netNatGatewayIngressGiBFuture.Await()
netReceiveBytes, _ := netReceiveBytesFuture.Await()
namespaceUptime, _ := namespaceUptimeFuture.Await()
namespaceAnnotations, _ := namespaceAnnotationsFuture.Await()
podAnnotations, _ := podAnnotationsFuture.Await()
nodeLabels, _ := nodeLabelsFuture.Await()
namespaceLabels, _ := namespaceLabelsFuture.Await()
podLabels, _ := podLabelsFuture.Await()
serviceLabels, _ := serviceLabelsFuture.Await()
deploymentLabels, _ := deploymentLabelsFuture.Await()
statefulSetLabels, _ := statefulSetLabelsFuture.Await()
daemonSetLabels, _ := daemonSetLabelsFuture.Await()
jobLabels, _ := jobLabelsFuture.Await()
podsWithReplicaSetOwner, _ := podsWithReplicaSetOwnerFuture.Await()
replicaSetsWithoutOwners, _ := replicaSetsWithoutOwnersFuture.Await()
replicaSetsWithRollout, _ := replicaSetsWithRolloutFuture.Await()
resourceQuotaUptime, _ := resourceQuotaUptimeFuture.Await()
resourceQuotaSpecCpuRequestAvg, _ := resourceQuotaSpecCpuRequestAvgFuture.Await()
resourceQuotaSpecCpuRequestMax, _ := resourceQuotaSpecCpuRequestMaxFuture.Await()
resourceQuotaSpecRamRequestAvg, _ := resourceQuotaSpecRamRequestAvgFuture.Await()
resourceQuotaSpecRamRequestMax, _ := resourceQuotaSpecRamRequestMaxFuture.Await()
resourceQuotaSpecCpuLimitAvg, _ := resourceQuotaSpecCpuLimitAvgFuture.Await()
resourceQuotaSpecCpuLimitMax, _ := resourceQuotaSpecCpuLimitMaxFuture.Await()
resourceQuotaSpecRamLimitAvg, _ := resourceQuotaSpecRamLimitAvgFuture.Await()
resourceQuotaSpecRamLimitMax, _ := resourceQuotaSpecRamLimitMaxFuture.Await()
resourceQuotaStatusUsedCpuRequestAvg, _ := resourceQuotaStatusUsedCpuRequestAvgFuture.Await()
resourceQuotaStatusUsedCpuRequestMax, _ := resourceQuotaStatusUsedCpuRequestMaxFuture.Await()
resourceQuotaStatusUsedRamRequestAvg, _ := resourceQuotaStatusUsedRamRequestAvgFuture.Await()
resourceQuotaStatusUsedRamRequestMax, _ := resourceQuotaStatusUsedRamRequestMaxFuture.Await()
resourceQuotaStatusUsedCpuLimitAvg, _ := resourceQuotaStatusUsedCpuLimitAvgFuture.Await()
resourceQuotaStatusUsedCpuLimitMax, _ := resourceQuotaStatusUsedCpuLimitMaxFuture.Await()
resourceQuotaStatusUsedRamLimitAvg, _ := resourceQuotaStatusUsedRamLimitAvgFuture.Await()
resourceQuotaStatusUsedRamLimitMax, _ := resourceQuotaStatusUsedRamLimitMaxFuture.Await()

pvActiveMinutes := awaitWithLog("pvActiveMinutes", pvActiveMinutesFuture)
pvUsedAverage := awaitWithLog("pvUsedAverage", pvUsedAverageFuture)
pvUsedMax := awaitWithLog("pvUsedMax", pvUsedMaxFuture)
localStorageActiveMinutes := awaitWithLog("localStorageActiveMinutes", localStorageActiveMinutesFuture)
localStorageCost := awaitWithLog("localStorageCost", localStorageCostFuture)
localStorageUsedCost := awaitWithLog("localStorageUsedCost", localStorageUsedCostFuture)
localStorageUsedAvg := awaitWithLog("localStorageUsedAvg", localStorageUsedAvgFuture)
localStorageUsedMax := awaitWithLog("localStorageUsedMax", localStorageUsedMaxFuture)
localStorageBytes := awaitWithLog("localStorageBytes", localStorageBytesFuture)
nodeActiveMinutes := awaitWithLog("nodeActiveMinutes", nodeActiveMinutesFuture)
nodeCPUCoresCapacity := awaitWithLog("nodeCPUCoresCapacity", nodeCPUCoresCapacityFuture)
nodeCPUCoresAllocatable := awaitWithLog("nodeCPUCoresAllocatable", nodeCPUCoresAllocatableFuture)
nodeRAMBytesCapacity := awaitWithLog("nodeRAMBytesCapacity", nodeRAMBytesCapacityFuture)
nodeRAMBytesAllocatable := awaitWithLog("nodeRAMBytesAllocatable", nodeRAMBytesAllocatableFuture)
nodeGPUCount := awaitWithLog("nodeGPUCount", nodeGPUCountFuture)
nodeCPUModeTotal := awaitWithLog("nodeCPUModeTotal", nodeCPUModeTotalFuture)
nodeIsSpot := awaitWithLog("nodeIsSpot", nodeIsSpotFuture)
nodeRAMSystemPercent := awaitWithLog("nodeRAMSystemPercent", nodeRAMSystemPercentFuture)
nodeRAMUserPercent := awaitWithLog("nodeRAMUserPercent", nodeRAMUserPercentFuture)
lbActiveMinutes := awaitWithLog("lbActiveMinutes", lbActiveMinutesFuture)
lbPricePerHr := awaitWithLog("lbPricePerHr", lbPricePerHrFuture)
clusterUptime := awaitWithLog("clusterUptime", clusterUptimeFuture)
clusterManagementDuration := awaitWithLog("clusterManagementDuration", clusterManagementDurationFuture)
clusterManagementPricePerHr := awaitWithLog("clusterManagementPricePerHr", clusterManagementPricePerHrFuture)
pods := awaitWithLog("pods", podsFuture)
podsUID := awaitWithLog("podsUID", podsUIDFuture)
ramBytesAllocated := awaitWithLog("ramBytesAllocated", ramBytesAllocatedFuture)
ramRequests := awaitWithLog("ramRequests", ramRequestsFuture)
ramLimits := awaitWithLog("ramLimits", ramLimitsFuture)
ramUsageAvg := awaitWithLog("ramUsageAvg", ramUsageAvgFuture)
ramUsageMax := awaitWithLog("ramUsageMax", ramUsageMaxFuture)
nodeRAMPricePerGiBHr := awaitWithLog("nodeRAMPricePerGiBHr", nodeRAMPricePerGiBHrFuture)
cpuCoresAllocated := awaitWithLog("cpuCoresAllocated", cpuCoresAllocatedFuture)
cpuRequests := awaitWithLog("cpuRequests", cpuRequestsFuture)
cpuLimits := awaitWithLog("cpuLimits", cpuLimitsFuture)
cpuUsageAvg := awaitWithLog("cpuUsageAvg", cpuUsageAvgFuture)
cpuUsageMax := awaitWithLog("cpuUsageMax", cpuUsageMaxFuture)
nodeCPUPricePerHr := awaitWithLog("nodeCPUPricePerHr", nodeCPUPricePerHrFuture)
gpusAllocated := awaitWithLog("gpusAllocated", gpusAllocatedFuture)
gpusRequested := awaitWithLog("gpusRequested", gpusRequestedFuture)
gpusUsageAvg := awaitWithLog("gpusUsageAvg", gpusUsageAvgFuture)
gpusUsageMax := awaitWithLog("gpusUsageMax", gpusUsageMaxFuture)
nodeGPUPricePerHr := awaitWithLog("nodeGPUPricePerHr", nodeGPUPricePerHrFuture)
gpuInfo := awaitWithLog("gpuInfo", gpuInfoFuture)
isGPUShared := awaitWithLog("isGPUShared", isGPUSharedFuture)
podPVCAllocation := awaitWithLog("podPVCAllocation", podPVCAllocationFuture)
pvcBytesRequested := awaitWithLog("pvcBytesRequested", pvcBytesRequestedFuture)
pvcInfo := awaitWithLog("pvcInfo", pvcInfoFuture)
pvBytes := awaitWithLog("pvBytes", pvBytesFuture)
pvPricePerGiBHour := awaitWithLog("pvPricePerGiBHour", pvPricePerGiBHourFuture)
pvInfo := awaitWithLog("pvInfo", pvInfoFuture)
netZoneGiB := awaitWithLog("netZoneGiB", netZoneGiBFuture)
netZonePricePerGiB := awaitWithLog("netZonePricePerGiB", netZonePricePerGiBFuture)
netRegionGiB := awaitWithLog("netRegionGiB", netRegionGiBFuture)
netRegionPricePerGiB := awaitWithLog("netRegionPricePerGiB", netRegionPricePerGiBFuture)
netInternetGiB := awaitWithLog("netInternetGiB", netInternetGiBFuture)
netInternetPricePerGiB := awaitWithLog("netInternetPricePerGiB", netInternetPricePerGiBFuture)
netInternetServiceGiB := awaitWithLog("netInternetServiceGiB", netInternetServiceGiBFuture)
netNatGatewayPricePerGiB := awaitWithLog("netNatGatewayPricePerGiB", netNatGatewayPricePerGiBFuture)
netNatGatewayGiB := awaitWithLog("netNatGatewayGiB", netNatGatewayGiBFuture)
netTransferBytes := awaitWithLog("netTransferBytes", netTransferBytesFuture)
netZoneIngressGiB := awaitWithLog("netZoneIngressGiB", netZoneIngressGiBFuture)
netRegionIngressGiB := awaitWithLog("netRegionIngressGiB", netRegionIngressGiBFuture)
netInternetIngressGiB := awaitWithLog("netInternetIngressGiB", netInternetIngressGiBFuture)
netInternetServiceIngressGiB := awaitWithLog("netInternetServiceIngressGiB", netInternetServiceIngressGiBFuture)
netNatGatewayIngressPricePerGiB := awaitWithLog("netNatGatewayIngressPricePerGiB", netNatGatewayIngressPricePerGiBFuture)
netNatGatewayIngressGiB := awaitWithLog("netNatGatewayIngressGiB", netNatGatewayIngressGiBFuture)
netReceiveBytes := awaitWithLog("netReceiveBytes", netReceiveBytesFuture)
namespaceUptime := awaitWithLog("namespaceUptime", namespaceUptimeFuture)
namespaceAnnotations := awaitWithLog("namespaceAnnotations", namespaceAnnotationsFuture)
podAnnotations := awaitWithLog("podAnnotations", podAnnotationsFuture)
nodeLabels := awaitWithLog("nodeLabels", nodeLabelsFuture)
namespaceLabels := awaitWithLog("namespaceLabels", namespaceLabelsFuture)
podLabels := awaitWithLog("podLabels", podLabelsFuture)
serviceLabels := awaitWithLog("serviceLabels", serviceLabelsFuture)
deploymentLabels := awaitWithLog("deploymentLabels", deploymentLabelsFuture)
statefulSetLabels := awaitWithLog("statefulSetLabels", statefulSetLabelsFuture)
daemonSetLabels := awaitWithLog("daemonSetLabels", daemonSetLabelsFuture)
jobLabels := awaitWithLog("jobLabels", jobLabelsFuture)
podsWithReplicaSetOwner := awaitWithLog("podsWithReplicaSetOwner", podsWithReplicaSetOwnerFuture)
replicaSetsWithoutOwners := awaitWithLog("replicaSetsWithoutOwners", replicaSetsWithoutOwnersFuture)
replicaSetsWithRollout := awaitWithLog("replicaSetsWithRollout", replicaSetsWithRolloutFuture)
resourceQuotaUptime := awaitWithLog("resourceQuotaUptime", resourceQuotaUptimeFuture)
resourceQuotaSpecCpuRequestAvg := awaitWithLog("resourceQuotaSpecCpuRequestAvg", resourceQuotaSpecCpuRequestAvgFuture)
resourceQuotaSpecCpuRequestMax := awaitWithLog("resourceQuotaSpecCpuRequestMax", resourceQuotaSpecCpuRequestMaxFuture)
resourceQuotaSpecRamRequestAvg := awaitWithLog("resourceQuotaSpecRamRequestAvg", resourceQuotaSpecRamRequestAvgFuture)
resourceQuotaSpecRamRequestMax := awaitWithLog("resourceQuotaSpecRamRequestMax", resourceQuotaSpecRamRequestMaxFuture)
resourceQuotaSpecCpuLimitAvg := awaitWithLog("resourceQuotaSpecCpuLimitAvg", resourceQuotaSpecCpuLimitAvgFuture)
resourceQuotaSpecCpuLimitMax := awaitWithLog("resourceQuotaSpecCpuLimitMax", resourceQuotaSpecCpuLimitMaxFuture)
resourceQuotaSpecRamLimitAvg := awaitWithLog("resourceQuotaSpecRamLimitAvg", resourceQuotaSpecRamLimitAvgFuture)
resourceQuotaSpecRamLimitMax := awaitWithLog("resourceQuotaSpecRamLimitMax", resourceQuotaSpecRamLimitMaxFuture)
resourceQuotaStatusUsedCpuRequestAvg := awaitWithLog("resourceQuotaStatusUsedCpuRequestAvg", resourceQuotaStatusUsedCpuRequestAvgFuture)
resourceQuotaStatusUsedCpuRequestMax := awaitWithLog("resourceQuotaStatusUsedCpuRequestMax", resourceQuotaStatusUsedCpuRequestMaxFuture)
resourceQuotaStatusUsedRamRequestAvg := awaitWithLog("resourceQuotaStatusUsedRamRequestAvg", resourceQuotaStatusUsedRamRequestAvgFuture)
resourceQuotaStatusUsedRamRequestMax := awaitWithLog("resourceQuotaStatusUsedRamRequestMax", resourceQuotaStatusUsedRamRequestMaxFuture)
resourceQuotaStatusUsedCpuLimitAvg := awaitWithLog("resourceQuotaStatusUsedCpuLimitAvg", resourceQuotaStatusUsedCpuLimitAvgFuture)
resourceQuotaStatusUsedCpuLimitMax := awaitWithLog("resourceQuotaStatusUsedCpuLimitMax", resourceQuotaStatusUsedCpuLimitMaxFuture)
resourceQuotaStatusUsedRamLimitAvg := awaitWithLog("resourceQuotaStatusUsedRamLimitAvg", resourceQuotaStatusUsedRamLimitAvgFuture)
resourceQuotaStatusUsedRamLimitMax := awaitWithLog("resourceQuotaStatusUsedRamLimitMax", resourceQuotaStatusUsedRamLimitMaxFuture)

// Phase 1: Observability only - we log and count failures via awaitWithLog above,
// but still fail-fast if any query errors. This maintains existing behavior while
// providing visibility into which specific queries are failing.
// Phase 2: Could change this to allow partial results by checking failure counts
// and deciding whether to return the snapshot with some missing data.
Comment thread
peatey marked this conversation as resolved.
if grp.HasErrors() {
return nil, grp.Error()
}
Expand Down
Loading
Loading