Skip to content

Commit 6db4b1c

Browse files
add metrics to collect date level stats of ingestion, query and other object store calls
1 parent d88307a commit 6db4b1c

File tree

14 files changed

+1153
-677
lines changed

14 files changed

+1153
-677
lines changed

src/event/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use self::error::EventError;
2828
use crate::{
2929
LOCK_EXPECT,
3030
metadata::update_stats,
31+
metrics::{increment_events_ingested_by_date, increment_events_ingested_size_by_date},
3132
parseable::{PARSEABLE, StagingError},
3233
storage::StreamType,
3334
};
@@ -88,6 +89,11 @@ impl Event {
8889
self.parsed_timestamp.date(),
8990
);
9091

92+
// Track billing metrics for event ingestion
93+
let date_string = self.parsed_timestamp.date().to_string();
94+
increment_events_ingested_by_date(self.rb.num_rows() as u64, &date_string);
95+
increment_events_ingested_size_by_date(self.origin_size, &date_string);
96+
9197
crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb);
9298

9399
Ok(())

src/handlers/http/modal/ingest_server.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,6 @@ impl ParseableServer for IngestServer {
116116
})
117117
.await;
118118

119-
PARSEABLE.storage.register_store_metrics(prometheus);
120-
121119
migration::run_migration(&PARSEABLE).await?;
122120

123121
// local sync on init

src/handlers/http/modal/query_server.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ impl ParseableServer for QueryServer {
106106
prometheus: &PrometheusMetrics,
107107
shutdown_rx: oneshot::Receiver<()>,
108108
) -> anyhow::Result<()> {
109-
PARSEABLE.storage.register_store_metrics(prometheus);
110109
// write the ingestor metadata to storage
111110
QUERIER_META
112111
.get_or_init(|| async {

src/handlers/http/modal/server.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,6 @@ impl ParseableServer for Server {
128128
prometheus: &PrometheusMetrics,
129129
shutdown_rx: oneshot::Receiver<()>,
130130
) -> anyhow::Result<()> {
131-
PARSEABLE.storage.register_store_metrics(prometheus);
132-
133131
migration::run_migration(&PARSEABLE).await?;
134132

135133
// load on init

src/handlers/http/query.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use tokio::task::JoinSet;
4646
use tracing::{error, warn};
4747

4848
use crate::event::{DEFAULT_TIMESTAMP_KEY, commit_schema};
49-
use crate::metrics::QUERY_EXECUTE_TIME;
49+
use crate::metrics::{QUERY_EXECUTE_TIME, increment_query_calls_by_date};
5050
use crate::parseable::{PARSEABLE, StreamNotFound};
5151
use crate::query::error::ExecuteError;
5252
use crate::query::{CountsRequest, Query as LogicalQuery, execute};
@@ -123,6 +123,10 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
123123
user_auth_for_datasets(&permissions, &tables).await?;
124124
let time = Instant::now();
125125

126+
// Track billing metrics for query calls
127+
let current_date = chrono::Utc::now().date_naive().to_string();
128+
increment_query_calls_by_date(&current_date);
129+
126130
// if the query is `select count(*) from <dataset>`
127131
// we use the `get_bin_density` method to get the count of records in the dataset
128132
// instead of executing the query using datafusion
@@ -341,6 +345,10 @@ pub async fn get_counts(
341345
req: HttpRequest,
342346
counts_request: Json<CountsRequest>,
343347
) -> Result<impl Responder, QueryError> {
348+
// Track billing metrics for query calls
349+
let current_date = chrono::Utc::now().date_naive().to_string();
350+
increment_query_calls_by_date(&current_date);
351+
344352
let creds = extract_session_key_from_req(&req)?;
345353
let permissions = Users.get_permissions(&creds);
346354

src/metrics/mod.rs

Lines changed: 210 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
*/
1818

1919
pub mod prom_utils;
20-
pub mod storage;
21-
2220
use crate::{handlers::http::metrics_path, stats::FullStats};
2321
use actix_web::Responder;
2422
use actix_web_prometheus::{PrometheusMetrics, PrometheusMetricsBuilder};
@@ -228,6 +226,125 @@ pub static ALERTS_STATES: Lazy<IntCounterVec> = Lazy::new(|| {
228226
.expect("metric can be created")
229227
});
230228

229+
// Billing Metrics - Counter type metrics for billing/usage tracking
230+
pub static TOTAL_EVENTS_INGESTED_BY_DATE: Lazy<IntCounterVec> = Lazy::new(|| {
231+
IntCounterVec::new(
232+
Opts::new(
233+
"total_events_ingested_by_date",
234+
"Total events ingested by date (Counter for billing)",
235+
)
236+
.namespace(METRICS_NAMESPACE),
237+
&["date"],
238+
)
239+
.expect("metric can be created")
240+
});
241+
242+
pub static TOTAL_EVENTS_INGESTED_SIZE_BY_DATE: Lazy<IntCounterVec> = Lazy::new(|| {
243+
IntCounterVec::new(
244+
Opts::new(
245+
"total_events_ingested_size_by_date",
246+
"Total events ingested size in bytes by date (Counter for billing)",
247+
)
248+
.namespace(METRICS_NAMESPACE),
249+
&["date"],
250+
)
251+
.expect("metric can be created")
252+
});
253+
254+
pub static TOTAL_PARQUETS_STORED_BY_DATE: Lazy<IntCounterVec> = Lazy::new(|| {
255+
IntCounterVec::new(
256+
Opts::new(
257+
"total_parquets_stored_by_date",
258+
"Total parquet files stored by date (Counter for billing)",
259+
)
260+
.namespace(METRICS_NAMESPACE),
261+
&["date"],
262+
)
263+
.expect("metric can be created")
264+
});
265+
266+
pub static TOTAL_PARQUETS_STORED_SIZE_BY_DATE: Lazy<IntCounterVec> = Lazy::new(|| {
267+
IntCounterVec::new(
268+
Opts::new(
269+
"total_parquets_stored_size_by_date",
270+
"Total parquet files stored size in bytes by date (Counter for billing)",
271+
)
272+
.namespace(METRICS_NAMESPACE),
273+
&["date"],
274+
)
275+
.expect("metric can be created")
276+
});
277+
278+
pub static TOTAL_QUERY_CALLS_BY_DATE: Lazy<IntCounterVec> = Lazy::new(|| {
279+
IntCounterVec::new(
280+
Opts::new(
281+
"total_query_calls_by_date",
282+
"Total query calls by date (Counter for billing)",
283+
)
284+
.namespace(METRICS_NAMESPACE),
285+
&["date"],
286+
)
287+
.expect("metric can be created")
288+
});
289+
290+
pub static TOTAL_FILES_SCANNED_IN_QUERY_BY_DATE: Lazy<IntCounterVec> = Lazy::new(|| {
291+
IntCounterVec::new(
292+
Opts::new(
293+
"total_files_scanned_in_query_by_date",
294+
"Total files scanned in queries by date (Counter for billing)",
295+
)
296+
.namespace(METRICS_NAMESPACE),
297+
&["date"],
298+
)
299+
.expect("metric can be created")
300+
});
301+
302+
pub static TOTAL_BYTES_SCANNED_IN_QUERY_BY_DATE: Lazy<IntCounterVec> = Lazy::new(|| {
303+
IntCounterVec::new(
304+
Opts::new(
305+
"total_bytes_scanned_in_query_by_date",
306+
"Total bytes scanned in queries by date (Counter for billing)",
307+
)
308+
.namespace(METRICS_NAMESPACE),
309+
&["date"],
310+
)
311+
.expect("metric can be created")
312+
});
313+
314+
pub static TOTAL_OBJECT_STORE_CALLS_BY_DATE: Lazy<IntCounterVec> = Lazy::new(|| {
315+
IntCounterVec::new(
316+
Opts::new(
317+
"total_object_store_calls_by_date",
318+
"Total object store calls by date (Counter for billing)",
319+
)
320+
.namespace(METRICS_NAMESPACE),
321+
&["provider", "method", "date"],
322+
)
323+
.expect("metric can be created")
324+
});
325+
326+
pub static TOTAL_FILES_SCANNED_IN_OBJECT_STORE_CALLS_BY_DATE: Lazy<IntCounterVec> =
327+
Lazy::new(|| {
328+
IntCounterVec::new(
329+
Opts::new(
330+
"total_files_scanned_in_object_store_calls_by_date",
331+
"Total files scanned in object store calls by date (Counter for billing)",
332+
)
333+
.namespace(METRICS_NAMESPACE),
334+
&["provider", "method", "date"],
335+
)
336+
.expect("metric can be created")
337+
});
338+
339+
pub static STORAGE_REQUEST_RESPONSE_TIME: Lazy<HistogramVec> = Lazy::new(|| {
340+
HistogramVec::new(
341+
HistogramOpts::new("storage_request_response_time", "Storage Request Latency")
342+
.namespace(METRICS_NAMESPACE),
343+
&["provider", "method", "status"],
344+
)
345+
.expect("metric can be created")
346+
});
347+
231348
fn custom_metrics(registry: &Registry) {
232349
registry
233350
.register(Box::new(EVENTS_INGESTED.clone()))
@@ -286,6 +403,39 @@ fn custom_metrics(registry: &Registry) {
286403
registry
287404
.register(Box::new(ALERTS_STATES.clone()))
288405
.expect("metric can be registered");
406+
// Register billing metrics
407+
registry
408+
.register(Box::new(TOTAL_EVENTS_INGESTED_BY_DATE.clone()))
409+
.expect("metric can be registered");
410+
registry
411+
.register(Box::new(TOTAL_EVENTS_INGESTED_SIZE_BY_DATE.clone()))
412+
.expect("metric can be registered");
413+
registry
414+
.register(Box::new(TOTAL_PARQUETS_STORED_BY_DATE.clone()))
415+
.expect("metric can be registered");
416+
registry
417+
.register(Box::new(TOTAL_PARQUETS_STORED_SIZE_BY_DATE.clone()))
418+
.expect("metric can be registered");
419+
registry
420+
.register(Box::new(TOTAL_QUERY_CALLS_BY_DATE.clone()))
421+
.expect("metric can be registered");
422+
registry
423+
.register(Box::new(TOTAL_FILES_SCANNED_IN_QUERY_BY_DATE.clone()))
424+
.expect("metric can be registered");
425+
registry
426+
.register(Box::new(TOTAL_BYTES_SCANNED_IN_QUERY_BY_DATE.clone()))
427+
.expect("metric can be registered");
428+
registry
429+
.register(Box::new(TOTAL_OBJECT_STORE_CALLS_BY_DATE.clone()))
430+
.expect("metric can be registered");
431+
registry
432+
.register(Box::new(
433+
TOTAL_FILES_SCANNED_IN_OBJECT_STORE_CALLS_BY_DATE.clone(),
434+
))
435+
.expect("metric can be registered");
436+
registry
437+
.register(Box::new(STORAGE_REQUEST_RESPONSE_TIME.clone()))
438+
.expect("metric can be registered");
289439
}
290440

291441
pub fn build_metrics_handler() -> PrometheusMetrics {
@@ -345,6 +495,64 @@ pub async fn fetch_stats_from_storage(stream_name: &str, stats: FullStats) {
345495
.set(stats.lifetime_stats.storage as i64);
346496
}
347497

498+
// Helper functions for tracking billing metrics
499+
pub fn increment_events_ingested_by_date(count: u64, date: &str) {
500+
TOTAL_EVENTS_INGESTED_BY_DATE
501+
.with_label_values(&[date])
502+
.inc_by(count);
503+
}
504+
505+
pub fn increment_events_ingested_size_by_date(size: u64, date: &str) {
506+
TOTAL_EVENTS_INGESTED_SIZE_BY_DATE
507+
.with_label_values(&[date])
508+
.inc_by(size);
509+
}
510+
511+
pub fn increment_parquets_stored_by_date(date: &str) {
512+
TOTAL_PARQUETS_STORED_BY_DATE
513+
.with_label_values(&[date])
514+
.inc();
515+
}
516+
517+
pub fn increment_parquets_stored_size_by_date(size: u64, date: &str) {
518+
TOTAL_PARQUETS_STORED_SIZE_BY_DATE
519+
.with_label_values(&[date])
520+
.inc_by(size);
521+
}
522+
523+
pub fn increment_query_calls_by_date(date: &str) {
524+
TOTAL_QUERY_CALLS_BY_DATE.with_label_values(&[date]).inc();
525+
}
526+
527+
pub fn increment_files_scanned_in_query_by_date(count: u64, date: &str) {
528+
TOTAL_FILES_SCANNED_IN_QUERY_BY_DATE
529+
.with_label_values(&[date])
530+
.inc_by(count);
531+
}
532+
533+
pub fn increment_bytes_scanned_in_query_by_date(bytes: u64, date: &str) {
534+
TOTAL_BYTES_SCANNED_IN_QUERY_BY_DATE
535+
.with_label_values(&[date])
536+
.inc_by(bytes);
537+
}
538+
539+
pub fn increment_object_store_calls_by_date(provider: &str, method: &str, date: &str) {
540+
TOTAL_OBJECT_STORE_CALLS_BY_DATE
541+
.with_label_values(&[provider, method, date])
542+
.inc();
543+
}
544+
545+
pub fn increment_files_scanned_in_object_store_calls_by_date(
546+
provider: &str,
547+
method: &str,
548+
count: u64,
549+
date: &str,
550+
) {
551+
TOTAL_FILES_SCANNED_IN_OBJECT_STORE_CALLS_BY_DATE
552+
.with_label_values(&[provider, method, date])
553+
.inc_by(count);
554+
}
555+
348556
use actix_web::HttpResponse;
349557

350558
pub async fn get() -> Result<impl Responder, MetricsError> {

0 commit comments

Comments
 (0)