-
Notifications
You must be signed in to change notification settings - Fork 130
fix: map udf metrics #2588
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
fix: map udf metrics #2588
Conversation
Signed-off-by: Takashi Menjo <[email protected]>
Signed-off-by: Takashi Menjo <[email protected]>
Signed-off-by: Takashi Menjo <[email protected]>
Signed-off-by: Takashi Menjo <[email protected]>
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #2588 +/- ##
==========================================
+ Coverage 70.44% 70.86% +0.41%
==========================================
Files 395 395
Lines 62180 62603 +423
==========================================
+ Hits 43803 44362 +559
+ Misses 17253 17128 -125
+ Partials 1124 1113 -11 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| `forwarder_drop_bytes_total` | Counter | `pipeline=<pipeline-name>` <br> `vertex=<vertex-name>` <br> `vertex_type=<vertex-type>` <br> `replica=<replica-index>` <br> `partition_name=<partition-name>` | Provides the total number of bytes dropped by a given Vertex due to a full Inter-Step Buffer Partition | | ||
| `forwarder_udf_read_total` | Counter | `pipeline=<pipeline-name>` <br> `vertex=<vertex-name>` <br> `vertex_type=<vertex-type>` <br> `replica=<replica-index>` <br> `partition_name=<partition-name>` | Provides the total number of messages read by UDF | | ||
| `forwarder_udf_write_total` | Counter | `pipeline=<pipeline-name>` <br> `vertex=<vertex-name>` <br> `vertex_type=<vertex-type>` <br> `replica=<replica-index>` <br> `partition_name=<partition-name>` | Provides the total number of messages written by UDF | | ||
| Metric name | Metric type | Labels | Which partition | Description | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does Which partition
mean? Is it the explanation of the partition_name
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here "which" means upstream or downstream, not a partition name. Sorry for the confusion.
I believe this column helps operators to understand each metric more correcly. I would choose a more appropriate word.
@@ -476,7 +478,7 @@ func (isdf *InterStepDataForward) streamMessage(ctx context.Context, dataMessage | |||
return nil, fmt.Errorf("failed to applyUDF, error: %w", err) | |||
} | |||
|
|||
metrics.UDFProcessingTime.With(metricLabels).Observe(float64(time.Since(start).Microseconds())) | |||
metrics.ConcurrentUDFProcessingTime.With(metricLabels).Observe(float64(time.Since(start).Microseconds())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Neither of these metrics makes sense to stream mode, since we don't have a way to exclude the buffer writing time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't have a way to exclude the buffer writing time.
Yes, I know that. My idea is: how about redefining ConcurrentUDFProcessingTime as it includes both udf processing time and writing time to buffers. I described that in the metrics document, in the third commit (doc: relationship between...).
udfResults, err = isdf.applyUDF(ctx, dataMessages) | ||
if err != nil { | ||
isdf.opts.logger.Errorw("failed to applyUDF", zap.Error(err)) | ||
// As there's no partial failure, non-ack all the readOffsets | ||
isdf.fromBufferPartition.NoAck(ctx, readOffsets) | ||
return err | ||
} | ||
metrics.UDFProcessingTime.With(metricLabels).Observe(float64(time.Since(udfStart).Microseconds())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be ConcurrentUDFProcessingTime
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be in batch mode, but how about in unary mode? In my understanding, calling applyUDF() will blocks until all the UDF results are received.
Is it better that we visit either concurrent or non-concurrent metric, depending on the mode?
Hello commiters, could you review again this pull request? Otherwise, I'd like to hear whether this is put off like #2624. |
Close #2498, #2502.
fix: latency metrics for UDF processing time in map UDF
This PR has map UDF...
doc: add forwarder_concurrent_udf_processing_time
This and the following commit in this PR updates the Metrics document. This one adds forwarder_concurrent_udf_processing_time to the document.
doc: relationship between UDF and write processing time in Map UDF
This clarifies relationship between forwarder_udf_processing_time/forwarder_concurrent_udf_processing_time and forwarder_write_processing_time. I'd say it's helpful for pipeline developers to analyze metrics.
doc: upstream or downstream partition for each LET metric
This adds a new column Which partition to the latency, traffic, and error metrics tables in the document to clarify what
partition_name=<partition-name>
means. It would be certainly helpful for Numaflow users to understand each metric correctly.