-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
out_s3: added blob handling feature #9907
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
Signed-off-by: Leonardo Alminana <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some comments that I'd prefer to clarify and/or change before merging this.
Curious why the ubuntu unit tests are failing, as well.
|
||
/* A match against "$TAG[" indicates an invalid or out of bounds tag part. */ | ||
if (strstr(s3_key, tmp)){ | ||
flb_warn("[s3_key] Invalid / Out of bounds tag part: At most 10 tag parts " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Is going above 10 tag parts the most likely cause of this issue? The "first part" of the error message implies that tags in the wrong place also trigger this check, in which case the "second part" should mention that in more detail as well to avoid confusing operators.
|
||
valid_blob_path = (char *) blob_path; | ||
|
||
while (*valid_blob_path == '.' || |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this value ever use the ../
format? If so the semantics here would ignore that motion, which is incorrect.
struct multipart_upload *m_upload, | ||
char *pre_signed_url); | ||
|
||
int abort_multipart_upload(struct flb_s3 *ctx, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we creating this function? I see we're using it to abort multipart blob uploads in s3.c
, but why is that needed here?
|
||
int flb_blob_db_lock(struct flb_blob_db *context) | ||
{ | ||
return flb_lock_acquire(&context->global_lock, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand every blob
publish operation is dependent on manipulating the same sqlite table. Do we know how this affects performance when parallelizing publishes? Is there a way to get a narrower lock here?
|
||
int flb_blob_db_unlock(struct flb_blob_db *context) | ||
{ | ||
return flb_lock_release(&context->global_lock, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: out of curiosity, why are we using global_lock
here but db_lock
in azure_blob_db.c
?
|
||
s3_client = ctx->s3_client; | ||
if (s3_plugin_under_test() == FLB_TRUE) { | ||
/* c = mock_s3_call("TEST_ABORT_MULTIPART_UPLOAD_ERROR", "AbortMultipartUpload"); */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Do we need this line?
time_t upload_parts_freshness_threshold; | ||
int file_delivery_attempt_limit; | ||
int part_delivery_attempt_limit; | ||
flb_sds_t authorization_endpoint_url; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are we using this authorization endpoint? Isn't the process of publishing binary data to S3 equivalent to string data, +/- the declared body type in the HTTP requests?
sched = flb_sched_ctx_get(); | ||
|
||
/* convert from seconds to milliseconds (scheduler needs ms) */ | ||
ms = ctx->upload_parts_timeout * 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the correct semantics?
From the description of upload_parts_timeout
below it seems like a maximum age to retry blob publishing, past which we will drop a blob instead of retrying.
As I understand the code, here we're instead inserting blob parts into the database and then attempting to publish them every upload_parts_timeout
seconds, dropping them on the first failure.
@@ -732,6 +732,203 @@ char* strtok_concurrent( | |||
#endif | |||
} | |||
|
|||
/* Constructs S3 object key as per the blob format. */ | |||
flb_sds_t flb_get_s3_blob_key(const char *format, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A majority of this is duplicated in flb_get_s3_key(...)
, does it make sense to try and introduce two helpers:
- https://github.com/fluent/fluent-bit/blob/leonardo-master-s3-blob/src/aws/flb_aws_util.c#L936-L1053 - initial validation/replacement
- https://github.com/fluent/fluent-bit/blob/leonardo-master-s3-blob/src/aws/flb_aws_util.c#L1080-L1100 - random alpha replacement
This could avoid possible divergence between the two methods over time
char *tag, | ||
char *source, | ||
char *destination, | ||
char *path, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are other instances where we are using cfl_sds_t
in place of char*, should we be consistent and migrate all char* here to cfl_sds_t
?
|
||
/* Fluent Bit | ||
* ========== | ||
* Copyright (C) 2015-2024 The Fluent Bit Authors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the copyright block include our current year? 🫣
} | ||
|
||
|
||
/* file destination update */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this comment include remote-id (similar to file part below)?
return FLB_BLOB_DB_ERROR_PREPARING_STATEMENT_GET_NEXT_FILE_PART; | ||
} | ||
|
||
result = sqlite3_prepare_v2(context->db->handler, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these next results contain a comment above (similar to prior prepared statement blocks)?
|
||
static int flb_blob_db_file_reset_part_upload_states(struct flb_blob_db *context, | ||
uint64_t id, | ||
char *path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to above comments on unused parameters
|
||
int flb_blob_db_file_reset_upload_states(struct flb_blob_db *context, | ||
uint64_t id, | ||
char *path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to above comments on unused parameters
uint64_t part_id, | ||
size_t offset_start, | ||
size_t offset_end, | ||
int64_t *out_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to above comments on unused parameters
|
||
flb_blob_db_lock(&ctx->blob_db); | ||
|
||
while (1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A large portion of these while(1) loops are duplicating code:
- https://github.com/fluent/fluent-bit/blob/leonardo-master-s3-blob/plugins/out_s3/s3.c#L2582-L2645
- https://github.com/fluent/fluent-bit/blob/leonardo-master-s3-blob/plugins/out_s3/s3.c#L2674-L2736
Can we consider moving to a helper method to avoid the duplication?
return ret; | ||
} | ||
|
||
static int blob_fetch_multipart_complete_pre_signed_url(struct flb_s3 *context, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These fetch_*_pre_signed_url methods are near duplicates with minor changes to the tmp (path variable), can we consider using a helper method for getting the presigned url and have each method handle their own tmp/path generation to reduce code?
No description provided.