-
Notifications
You must be signed in to change notification settings - Fork 123
DATA-4320 - Implement BinaryDataToDataset in datamanager in RDK #5145
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…data for datasets file
go.mod
Outdated
@@ -444,3 +444,5 @@ require ( | |||
github.com/ziutek/mymysql v1.5.4 // indirect | |||
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e | |||
) | |||
|
|||
replace go.viam.com/api => github.com/viamrobotics/api v0.1.458-0.20250717192712-d9437d8203b6 // upload-to-dataset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a reminder that we’ll have to update the API proper before merging
@@ -259,3 +260,34 @@ func lookupCollectorConfigsByResource( | |||
} | |||
return collectorConfigsByResource, nil | |||
} | |||
|
|||
func (b *builtIn) UploadBinaryDataToDataset(ctx context.Context, | |||
image []byte, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename this binary_data
?
fileContents = append(fileContents, datasetBytes...) | ||
timeoutCtx, timeoutFn := context.WithTimeout(context.Background(), time.Second*5) | ||
defer timeoutFn() | ||
for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry — not quite understanding why this is in a retry loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we expecting UploadBinaryDataToDataset to potentially fail and need to be retried?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm actually not sure, was mainly following the structure above for Syncing
if !tc.serviceFail { | ||
// Validate first metadata message. | ||
test.That(t, uploadCount.Load(), test.ShouldEqual, 1) | ||
expectedUploadedCount := 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this a variable that runs per test. Meaning, TestA has 3 expectedUploadedCount, TestB has 2...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the test relies on conditionals and checking only the expected count within this condition, so I'm keen to leave as is as to not introduce an even larger diff if that's ok
@@ -476,6 +480,35 @@ func (s *Sync) syncArbitraryFile(f *os.File, tags []string, fileLastModifiedMill | |||
s.atomicUploadStats.arbitrary.uploadedBytes.Add(bytesUploaded) | |||
} | |||
|
|||
// UploadBinaryDataToDataset simultaneously uploads binary data and adds it to a dataset. | |||
func (s *Sync) UploadBinaryDataToDataset(ctx context.Context, data []byte, datasetIDs, tags []string, mimeType v1.MimeType) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the ask was for the writing to file to happen async as well? Like, to wrap all of this into its own goroutine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
services/datamanager/client.go
Outdated
mimeType datasyncpb.MimeType, | ||
extra map[string]interface{}, | ||
) error { | ||
ext, err := protoutils.StructToStructPb(extra) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: can we change this to extra
. Just we also have the word extension being thrown around and that’s shortened to ext
@@ -259,3 +260,34 @@ func lookupCollectorConfigsByResource( | |||
} | |||
return collectorConfigsByResource, nil | |||
} | |||
|
|||
func (b *builtIn) UploadBinaryDataToDataset(ctx context.Context, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (b *builtIn) UploadBinaryDataToDataset(ctx context.Context, | |
func (b *builtIn) UploadBinaryDataToDatasets(ctx context.Context, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although I guess it looks like the name was already solidified?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ended up changing this, thank you!
return err | ||
} | ||
return b.sync.UploadBinaryDataToDataset(ctx, imgBytes, datasetIDs, tags, mimeType) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are errors from these new functions all safe to surface to the user?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I believe these errors should be safe to surface to the user
fileContents = append(fileContents, datasetBytes...) | ||
timeoutCtx, timeoutFn := context.WithTimeout(context.Background(), time.Second*5) | ||
defer timeoutFn() | ||
for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we expecting UploadBinaryDataToDataset to potentially fail and need to be retried?
if info.IsDir() && info.Name() == FailedDir { | ||
// Do not sync the files in the corrupted data directory or in the directory that holds files | ||
// that are simultaneously uploaded and added to a dataset. | ||
if info.IsDir() && (info.Name() == FailedDir || info.Name() == DatasetDir) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be more comfortable if the skipped directory had a name that was very unlikely to be used by users. "dataset"
doesn't feel out of the realm of possibility 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This directory would be .viam/capture/dataset not just the directory dataset. I can change it to viamUploadToDataset or something? Open to suggestions
services/datamanager/client.go
Outdated
|
||
func (c *client) UploadBinaryDataToDataset( | ||
ctx context.Context, | ||
image []byte, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
image []byte, | |
binaryData []byte, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated!
} | ||
|
||
// ConvertImageToBytes converts an image.Image to a byte slice based on the specified MIME type. | ||
func ConvertImageToBytes(image image.Image, mimeType datasyncpb.MimeType) ([]byte, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Do we have concerns about loading the entire image into memory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, though I can add this as a follow up. This is a paradigm that's repeated throughout RDK
services/datamanager/server.go
Outdated
@@ -34,6 +34,21 @@ func (server *serviceServer) Sync(ctx context.Context, req *pb.SyncRequest) (*pb | |||
return &pb.SyncResponse{}, nil | |||
} | |||
|
|||
func (server *serviceServer) UploadImageToDataset( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the naming of this confusing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated, was a typo
binaryData []byte, | ||
datasetIDs, tags []string, | ||
mimeType v1.MimeType, | ||
extra map[string]interface{}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't look like extra
is used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
Did a quick pass, but I'm going to add @vijayvuyyuru because I'm going to be mostly OOO next week. |
mimeType v1.MimeType, | ||
_ map[string]interface{}, | ||
) error { | ||
b.logger.Info("UploadBinaryDataToDatasets START") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these should be debug level logs? Seems excessive on every upload call.
mimeType v1.MimeType, | ||
_ map[string]interface{}, | ||
) error { | ||
b.logger.Info("UploadImageToDataset START") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
u got it
) error { | ||
b.logger.Info("UploadImageToDataset START") | ||
defer b.logger.Info("UploadImageToDataset END") | ||
b.mu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like this locking should only be around the upload itself? I feel like safe to leave unlocked if we haven't converted the image.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep!
binaryData []byte, | ||
datasetIDs, tags []string, | ||
mimeType v1.MimeType, | ||
_ map[string]interface{}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these should be passed through as part of the Extra field. Same for UploadBinaryImage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method calls the sync client's UploadBinaryDataToDatasets and that does not take an extra
go func() { | ||
defer close(errChan) | ||
// Create a new directory CaptureDir/DatasetDir | ||
newDir := filepath.Join(s.config.CaptureDir, DatasetDir) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[q] Feel free to ignore this. But I feel like this logic could be moved to reconfigure? Like making the directory as that unformation only changes then as opposed to trying to do it on each upload call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we would only want to create this directory if we were actively trying to make the upload call. If the directory already exists, this should just be a no-op
return err | ||
} | ||
|
||
imgBytes, err := ConvertImageToBytes(image, mimeType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] Could we change this function to just call c.UploadBinaryDataToDataSets? That way we dont need the marshalling of extra from map -> structpb defined twice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
Overview
This PR implements the new endpoint UploadBinaryDataToDataset as part of the DataManager Service. We also implement both UploadBinaryDataToDataset and UploadImageToDataset as part of the Go SDK, where the latter is just a convenience method that converts an image.Image into bytes before calling the same underlying upload binary data method.
Scope: https://docs.google.com/document/d/1YSJ3lTz5sC5HP5xguvd0VqSAQNoUVPSWrvj3zldW0Uo/edit?tab=t.0#heading=h.tcicyojyqi6c
Testing