diff --git a/cli/data.go b/cli/data.go index 05e3f72c906..c895703c112 100644 --- a/cli/data.go +++ b/cli/data.go @@ -2,7 +2,6 @@ package cli import ( "bufio" - "bytes" "compress/gzip" "context" "encoding/json" @@ -23,8 +22,6 @@ import ( datapb "go.viam.com/api/app/data/v1" "go.viam.com/utils" "go.viam.com/utils/rpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/structpb" @@ -365,14 +362,55 @@ func (c *viamClient) dataExportBinaryAction(cCtx *cli.Context, args dataExportBi } func (c *viamClient) dataExportBinaryIDsAction(args dataExportBinaryIDsArgs) error { - if len(args.BinaryDataIDs) > 0 { - result := c.downloadBinary(args.Destination, args.Timeout, args.BinaryDataIDs...) - if result == nil { // nil result means success - printf(c.c.App.Writer, "Downloaded %d files", len(args.BinaryDataIDs)) + if len(args.BinaryDataIDs) == 0 { + return nil + } + + ctx := c.c.Context + + // Fetch all metadata in one call + var resp *datapb.BinaryDataByIDsResponse + var err error + for count := 0; count < maxRetryCount; count++ { + resp, err = c.dataClient.BinaryDataByIDs(ctx, &datapb.BinaryDataByIDsRequest{ + BinaryDataIds: args.BinaryDataIDs, + IncludeBinary: false, + }) + if err == nil { + break } - return result } - return nil + if err != nil { + return errors.Wrapf(err, serverErrorMessage) + } + + data := resp.GetData() + if len(data) != len(args.BinaryDataIDs) { + return errors.Errorf("expected %d responses for %d files, received %d responses", + len(args.BinaryDataIDs), len(args.BinaryDataIDs), len(data)) + } + + // Download all files in parallel (use a reasonable default for parallel workers) + parallelWorkers := uint(min(len(args.BinaryDataIDs), 10)) + if parallelWorkers == 0 { + parallelWorkers = 1 + } + + result := c.downloadBinaryFilesInParallel( + ctx, + args.Destination, + time.Duration(args.Timeout)*time.Second, + data, + parallelWorkers, + func(i int32) { + printf(c.c.App.Writer, "Downloaded %d files", i) + }, + ) + + if result == nil { + printf(c.c.App.Writer, "Downloaded %d files", len(args.BinaryDataIDs)) + } + return result } func (c *viamClient) dataExportTabularAction(cCtx *cli.Context, args dataExportTabularArgs) error { @@ -390,11 +428,30 @@ func (c *viamClient) dataExportTabularAction(cCtx *cli.Context, args dataExportT // BinaryData downloads binary data matching filter to dst. func (c *viamClient) binaryData(dst string, filter *datapb.Filter, parallelDownloads, timeout uint) error { - return c.performActionOnBinaryDataFromFilter( - func(id string) error { - return c.downloadBinary(dst, timeout, id) - }, - filter, parallelDownloads, + ctx := c.c.Context + // If limit is too high the request can time out, so limit each call to a maximum value of 100. + limit := min(parallelDownloads, maxLimit) + if limit == 0 { + limit = 1 + } + + // Collect all metadata matching the filter + allData, _, err := getMatchingBinaryMetadata(ctx, c.dataClient, filter, limit) + if err != nil { + return err + } + + if len(allData) == 0 { + return nil + } + + // Download all files in parallel + return c.downloadBinaryFilesInParallel( + ctx, + dst, + time.Duration(timeout)*time.Second, + allData, + parallelDownloads, func(i int32) { printf(c.c.App.Writer, "Downloaded %d files", i) }, @@ -420,11 +477,22 @@ func (c *viamClient) performActionOnBinaryDataFromFilter(actionOnBinaryData func wg.Add(1) go func() { defer wg.Done() + defer close(ids) // If limit is too high the request can time out, so limit each call to a maximum value of 100. limit := min(parallelActions, maxLimit) - if err := getMatchingBinaryIDs(ctx, c.dataClient, filter, ids, limit); err != nil { + if limit == 0 { + limit = 1 + } + allData, _, err := getMatchingBinaryMetadata(ctx, c.dataClient, filter, limit) + if err != nil { errs <- err cancel() + return + } + for _, bd := range allData { + if bd.GetMetadata() != nil { + ids <- bd.GetMetadata().GetBinaryDataId() + } } }() @@ -472,18 +540,23 @@ func (c *viamClient) performActionOnBinaryDataFromFilter(actionOnBinaryData func return allErrs } -// getMatchingBinaryIDs queries client for all BinaryData matching filter, and passes each of their ids into ids. -func getMatchingBinaryIDs(ctx context.Context, client datapb.DataServiceClient, filter *datapb.Filter, - ids chan string, limit uint, -) error { +// getMatchingBinaryMetadata fetches all BinaryData metadata matching the filter. +// Returns the full BinaryData objects (with metadata including URIs) and the last token for pagination. +func getMatchingBinaryMetadata( + ctx context.Context, + client datapb.DataServiceClient, + filter *datapb.Filter, + limit uint, +) ([]*datapb.BinaryData, string, error) { + var allData []*datapb.BinaryData var last string - defer close(ids) + for { if err := ctx.Err(); err != nil { - return err + return nil, "", err } - resp, err := client.BinaryDataByFilter(ctx, &datapb.BinaryDataByFilterRequest{ + req := &datapb.BinaryDataByFilterRequest{ DataRequest: &datapb.DataRequest{ Filter: filter, Limit: uint64(limit), @@ -491,188 +564,245 @@ func getMatchingBinaryIDs(ctx context.Context, client datapb.DataServiceClient, }, CountOnly: false, IncludeBinary: false, - }) + } + + resp, err := client.BinaryDataByFilter(ctx, req) if err != nil { - return err + return nil, "", err } // If no data is returned, there is no more data. if len(resp.GetData()) == 0 { - return nil + return allData, "", nil } last = resp.GetLast() - for _, bd := range resp.GetData() { - ids <- bd.GetMetadata().GetBinaryDataId() + allData = append(allData, resp.GetData()...) + + // If last is empty, we've fetched all data + if last == "" { + return allData, "", nil } } } -func (c *viamClient) downloadBinary(dst string, timeout uint, ids ...string) error { +// downloadSingleBinaryFile downloads a single binary file given its metadata. +// It writes the metadata JSON, performs HTTP download, and saves the file. +func (c *viamClient) downloadSingleBinaryFile( + ctx context.Context, + dst string, + timeout time.Duration, + datum *datapb.BinaryData, +) error { args, err := getGlobalArgs(c.c) if err != nil { return err } - debugf(c.c.App.Writer, args.Debug, "Attempting to download binary files: %v", ids) - var resp *datapb.BinaryDataByIDsResponse - largeFile := false - // To begin, we assume the files are small and downloadable, so we try getting the binary directly - for count := 0; count < maxRetryCount; count++ { - resp, err = c.dataClient.BinaryDataByIDs(c.c.Context, &datapb.BinaryDataByIDsRequest{ - BinaryDataIds: ids, - IncludeBinary: !largeFile, - }) - // If any file is too large, we break and try a different pathway for downloading - if err == nil || status.Code(err) == codes.ResourceExhausted || status.Code(err) == codes.Unavailable { - debugf(c.c.App.Writer, args.Debug, "Small file download for files %v: attempt %d/%d succeeded", ids, count+1, maxRetryCount) - break - } - debugf(c.c.App.Writer, args.Debug, "Small file download for files %v: attempt %d/%d failed", ids, count+1, maxRetryCount) - } - // For large files, we get the metadata but not the binary itself - // Resource exhausted is returned when the message we're receiving exceeds the GRPC maximum limit - // Unavailable (such as error 'upstream connect error or disconnect/reset before headers. reset reason: connection termination') - // can also be returned when the file is too large. - if err != nil && (status.Code(err) == codes.ResourceExhausted || status.Code(err) == codes.Unavailable) { - largeFile = true - for count := 0; count < maxRetryCount; count++ { - resp, err = c.dataClient.BinaryDataByIDs(c.c.Context, &datapb.BinaryDataByIDsRequest{ - BinaryDataIds: ids, - IncludeBinary: !largeFile, - }) - if err == nil { - debugf(c.c.App.Writer, args.Debug, "Metadata fetch for files %v: attempt %d/%d succeeded", ids, count+1, maxRetryCount) - break - } - debugf(c.c.App.Writer, args.Debug, "Metadata fetch for files %v: attempt %d/%d failed", ids, count+1, maxRetryCount) - } + metadata := datum.GetMetadata() + if metadata == nil { + return errors.New("metadata is nil") + } + + id := metadata.GetBinaryDataId() + fileName := filenameForDownload(metadata) + + // Modify the file name in the metadata to reflect what it will be saved as. + metadata.FileName = fileName + + // Write metadata JSON + jsonPath := filepath.Join(dst, metadataDir, fileName+".json") + if err := os.MkdirAll(filepath.Dir(jsonPath), 0o700); err != nil { + return errors.Wrapf(err, "could not create metadata directory %s", filepath.Dir(jsonPath)) } + //nolint:gosec + jsonFile, err := os.Create(jsonPath) if err != nil { - return errors.Wrapf(err, serverErrorMessage) + return err } + defer jsonFile.Close() - data := resp.GetData() + mdJSONBytes, err := protojson.Marshal(metadata) + if err != nil { + return err + } + if _, err := jsonFile.Write(mdJSONBytes); err != nil { + return err + } - // Loop through responses and download each file - if len(data) != len(ids) { - return errors.Errorf("expected %d responses for %d files, received %d responses", len(ids), len(ids), len(data)) + // HTTP download with retry + uri := metadata.GetUri() + if uri == "" { + return errors.Errorf("URI is empty for binary data %s", id) } - for i, datum := range data { - id := ids[i] - fileName := filenameForDownload(datum.GetMetadata()) - // Modify the file name in the metadata to reflect what it will be saved as. - metadata := datum.GetMetadata() - metadata.FileName = fileName + debugf(c.c.App.Writer, args.Debug, "Attempting HTTP download for file %s", id) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, uri, nil) + if err != nil { + return errors.Wrapf(err, serverErrorMessage) + } - jsonPath := filepath.Join(dst, metadataDir, fileName+".json") - if err := os.MkdirAll(filepath.Dir(jsonPath), 0o700); err != nil { - return errors.Wrapf(err, "could not create metadata directory %s", filepath.Dir(jsonPath)) + // Set the headers so HTTP requests that are not gRPC calls can still be authenticated in app + // We can authenticate via token or API key, so we try both. + token, ok := c.conf.Auth.(*token) + if ok { + req.Header.Add(rpc.MetadataFieldAuthorization, rpc.AuthorizationValuePrefixBearer+token.AccessToken) + } + apiKey, ok := c.conf.Auth.(*apiKey) + if ok { + req.Header.Add("key_id", apiKey.KeyID) + req.Header.Add("key", apiKey.KeyCrypto) + } + + httpClient := &http.Client{Timeout: timeout} + + var res *http.Response + for count := 0; count < maxRetryCount; count++ { + res, err = httpClient.Do(req) + + if err == nil && res.StatusCode == http.StatusOK { + debugf(c.c.App.Writer, args.Debug, + "HTTP download for file %s: attempt %d/%d succeeded", id, count+1, maxRetryCount) + break } - //nolint:gosec - jsonFile, err := os.Create(jsonPath) - if err != nil { - return err + debugf(c.c.App.Writer, args.Debug, "HTTP download for file %s: attempt %d/%d failed", id, count+1, maxRetryCount) + } + + if err != nil { + debugf(c.c.App.Writer, args.Debug, "Failed downloading file %s: %s", id, err) + return errors.Wrapf(err, serverErrorMessage) + } + if res == nil || res.StatusCode != http.StatusOK { + statusCode := 0 + if res != nil { + statusCode = res.StatusCode } - mdJSONBytes, err := protojson.Marshal(metadata) - if err != nil { - return err + debugf(c.c.App.Writer, args.Debug, "Failed downloading file %s: Server returned %d response", id, statusCode) + if res != nil { + utils.UncheckedError(res.Body.Close()) } - if _, err := jsonFile.Write(mdJSONBytes); err != nil { + return errors.New(serverErrorMessage) + } + defer func() { + utils.UncheckedError(res.Body.Close()) + }() + + r := res.Body + + // Handle file path and gzip + dataPath := filepath.Join(dst, dataDir, fileName) + ext := metadata.GetFileExt() + + // If the file is gzipped, unzip. + if ext == gzFileExt { + r, err = gzip.NewReader(r) + if err != nil { + debugf(c.c.App.Writer, args.Debug, "Failed unzipping file %s: %s", id, err) return err } + defer func() { + utils.UncheckedError(r.Close()) + }() + } else if filepath.Ext(dataPath) != ext { + // If the file name did not already include the extension (e.g. for data capture files), add it. + // Don't do this for files that we're unzipping. + dataPath += ext + } - var r io.ReadCloser - if largeFile { - debugf(c.c.App.Writer, args.Debug, "Attempting file %s as a large file download", id) - // Make request to the URI for large files since we exceed the message limit for gRPC - req, err := http.NewRequestWithContext(c.c.Context, http.MethodGet, datum.GetMetadata().GetUri(), nil) - if err != nil { - return errors.Wrapf(err, serverErrorMessage) - } + if err := os.MkdirAll(filepath.Dir(dataPath), 0o700); err != nil { + debugf(c.c.App.Writer, args.Debug, "Failed creating data directory %s: %s", dataPath, err) + return errors.Wrapf(err, "could not create data directory %s", filepath.Dir(dataPath)) + } + //nolint:gosec + dataFile, err := os.Create(dataPath) + if err != nil { + debugf(c.c.App.Writer, args.Debug, "Failed creating file %s: %s", id, err) + return errors.Wrapf(err, "could not create file for datum %s", id) + } + defer dataFile.Close() - // Set the headers so HTTP requests that are not gRPC calls can still be authenticated in app - // We can authenticate via token or API key, so we try both. - token, ok := c.conf.Auth.(*token) - if ok { - req.Header.Add(rpc.MetadataFieldAuthorization, rpc.AuthorizationValuePrefixBearer+token.AccessToken) - } - apiKey, ok := c.conf.Auth.(*apiKey) - if ok { - req.Header.Add("key_id", apiKey.KeyID) - req.Header.Add("key", apiKey.KeyCrypto) - } + //nolint:gosec + if _, err := io.Copy(dataFile, r); err != nil { + debugf(c.c.App.Writer, args.Debug, "Failed writing data to file %s: %s", id, err) + return err + } - httpClient := &http.Client{Timeout: time.Duration(timeout) * time.Second} + return nil +} - var res *http.Response - for count := 0; count < maxRetryCount; count++ { - res, err = httpClient.Do(req) +// downloadBinaryFilesInParallel downloads multiple binary files in parallel using a worker pool. +func (c *viamClient) downloadBinaryFilesInParallel( + ctx context.Context, + dst string, + timeout time.Duration, + data []*datapb.BinaryData, + parallelWorkers uint, + logProgress func(int32), +) error { + if len(data) == 0 { + return nil + } - if err == nil && res.StatusCode == http.StatusOK { - debugf(c.c.App.Writer, args.Debug, - "Large file download for file %s: attempt %d/%d succeeded", id, count+1, maxRetryCount) - break + // Channel to distribute work + workChan := make(chan *datapb.BinaryData, parallelWorkers) + errChan := make(chan error, parallelWorkers) + var wg sync.WaitGroup + var numFilesProcessed atomic.Int32 + + // Start worker goroutines + for i := uint(0); i < parallelWorkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case datum, ok := <-workChan: + if !ok { + return // Channel closed, no more work + } + err := c.downloadSingleBinaryFile(ctx, dst, timeout, datum) + if err != nil { + errChan <- err + return + } + numFilesProcessed.Add(1) + if numFilesProcessed.Load()%logEveryN == 0 { + logProgress(numFilesProcessed.Load()) + } } - debugf(c.c.App.Writer, args.Debug, "Large file download for file %s: attempt %d/%d failed", id, count+1, maxRetryCount) } + }() + } - if err != nil { - debugf(c.c.App.Writer, args.Debug, "Failed downloading large file %s: %s", id, err) - return errors.Wrapf(err, serverErrorMessage) - } - if res.StatusCode != http.StatusOK { - debugf(c.c.App.Writer, args.Debug, "Failed downloading large file %s: Server returned %d response", id, res.StatusCode) - return errors.New(serverErrorMessage) + // Send work items + go func() { + defer close(workChan) + for _, datum := range data { + select { + case <-ctx.Done(): + return + case workChan <- datum: } - defer func() { - utils.UncheckedError(res.Body.Close()) - }() - - r = res.Body - } else { - // If the binary has not already been populated as large file download, - // get the binary data from the response. - r = io.NopCloser(bytes.NewReader(datum.GetBinary())) } + }() - dataPath := filepath.Join(dst, dataDir, fileName) - ext := datum.GetMetadata().GetFileExt() + // Wait for completion + wg.Wait() + close(errChan) - // If the file is gzipped, unzip. - if ext == gzFileExt { - r, err = gzip.NewReader(r) - if err != nil { - debugf(c.c.App.Writer, args.Debug, "Failed unzipping file %s: %s", id, err) - return err - } - } else if filepath.Ext(dataPath) != ext { - // If the file name did not already include the extension (e.g. for data capture files), add it. - // Don't do this for files that we're unzipping. - dataPath += ext - } + // Check for errors + var allErrs error + for err := range errChan { + allErrs = multierr.Append(allErrs, err) + } - if err := os.MkdirAll(filepath.Dir(dataPath), 0o700); err != nil { - debugf(c.c.App.Writer, args.Debug, "Failed creating data directory %s: %s", dataPath, err) - return errors.Wrapf(err, "could not create data directory %s", filepath.Dir(dataPath)) - } - //nolint:gosec - dataFile, err := os.Create(dataPath) - if err != nil { - debugf(c.c.App.Writer, args.Debug, "Failed creating file %s: %s", id, err) - return errors.Wrapf(err, "could not create file for datum %s", id) - } - //nolint:gosec - if _, err := io.Copy(dataFile, r); err != nil { - debugf(c.c.App.Writer, args.Debug, "Failed writing data to file %s: %s", id, err) - return err - } - if err := r.Close(); err != nil { - debugf(c.c.App.Writer, args.Debug, "Failed closing file %s: %s", id, err) - return err - } + // Final progress log + if numFilesProcessed.Load()%logEveryN != 0 { + logProgress(numFilesProcessed.Load()) } - return nil + + return allErrs } // transform datum's filename to a destination path on this computer. diff --git a/cli/dataset.go b/cli/dataset.go index cfb2f0a1f35..1b064b838ef 100644 --- a/cli/dataset.go +++ b/cli/dataset.go @@ -5,6 +5,8 @@ import ( "fmt" "os" "path/filepath" + "sync" + "time" "github.com/pkg/errors" "github.com/urfave/cli/v2" @@ -246,25 +248,89 @@ func (c *viamClient) downloadDataset(dst, datasetID string, onlyJSONLines, force return fmt.Errorf("%s does not match any dataset IDs", datasetID) } - return c.performActionOnBinaryDataFromFilter( - func(id string) error { - var downloadErr error - var datasetFilePath string - if !onlyJSONLines { - downloadErr = c.downloadBinary(dst, timeout, id) - datasetFilePath = filepath.Join(dst, dataDir) + ctx := c.c.Context + // Collect all metadata matching the filter (only one metadata fetch!) + limit := min(parallelDownloads, maxLimit) + if limit == 0 { + limit = 1 + } + allData, _, err := getMatchingBinaryMetadata(ctx, c.dataClient, &datapb.Filter{ + DatasetId: datasetID, + }, limit) + if err != nil { + return err + } + + if len(allData) == 0 { + return nil + } + + datasetFilePath := filepath.Join(dst, dataDir) + var downloadErr error + + // Download files in parallel if needed + if !onlyJSONLines { + downloadErr = c.downloadBinaryFilesInParallel( + ctx, + dst, + time.Duration(timeout)*time.Second, + allData, + parallelDownloads, + func(i int32) { + printf(c.c.App.Writer, "Downloaded %d files", i) + }, + ) + } + + // Write JSON lines in parallel using metadata we already have + // Use a mutex to protect file writes since ImageMetadataToJSONLines writes to shared file + var fileMutex sync.Mutex + jsonErrChan := make(chan error, len(allData)) + var jsonWG sync.WaitGroup + + // Limit parallel JSON writing to avoid too many concurrent file operations + jsonWorkers := min(parallelDownloads, uint(len(allData))) + if jsonWorkers == 0 { + jsonWorkers = 1 + } + + workChan := make(chan *datapb.BinaryData, jsonWorkers) + for i := uint(0); i < jsonWorkers; i++ { + jsonWG.Add(1) + go func() { + defer jsonWG.Done() + for datum := range workChan { + fileMutex.Lock() + err := binaryDataToJSONLinesWithMetadata(datasetFilePath, datasetFile, datum, forceLinuxPath) + fileMutex.Unlock() + if err != nil { + jsonErrChan <- err + } } - datasetErr := binaryDataToJSONLines(c.c.Context, c.dataClient, datasetFilePath, datasetFile, id, forceLinuxPath) - - return multierr.Combine(downloadErr, datasetErr) - }, - &datapb.Filter{ - DatasetId: datasetID, - }, parallelDownloads, - func(i int32) { - printf(c.c.App.Writer, "Downloaded %d files", i) - }, - ) + }() + } + + // Send work + go func() { + defer close(workChan) + for _, datum := range allData { + workChan <- datum + } + }() + + jsonWG.Wait() + close(jsonErrChan) + + var jsonErrs error + for err := range jsonErrChan { + jsonErrs = multierr.Append(jsonErrs, err) + } + + if jsonErrs != nil { + printf(c.c.App.Writer, "Downloaded %d files", len(allData)) + } + + return multierr.Combine(downloadErr, jsonErrs) } // Annotation holds the label associated with the image. @@ -295,6 +361,45 @@ type BBoxAnnotation struct { YMaxNormalized float64 `json:"y_max_normalized"` } +// binaryDataToJSONLinesWithMetadata writes JSON lines for a binary data object using its metadata. +// This version accepts metadata directly instead of fetching it, eliminating redundant API calls. +func binaryDataToJSONLinesWithMetadata(dst string, file *os.File, datum *datapb.BinaryData, forceLinuxPath bool) error { + metadata := datum.GetMetadata() + if metadata == nil { + return errors.New("metadata is nil") + } + + fileName := filepath.Join(dst, filenameForDownload(metadata)) + if forceLinuxPath { + fileName = filepath.ToSlash(fileName) + } + ext := metadata.GetFileExt() + // If the file is gzipped, unzip. + if ext != gzFileExt && filepath.Ext(fileName) != ext { + // If the file name did not already include the extension (e.g. for data capture files), add it. + // Don't do this for files that we're unzipping. + fileName += ext + } + + imageMetadata := &utilsml.ImageMetadata{ + Timestamp: metadata.GetTimeRequested().AsTime(), + Tags: metadata.GetCaptureMetadata().GetTags(), + Annotations: metadata.GetAnnotations(), + Path: fileName, + BinaryDataID: metadata.GetBinaryDataId(), + OrganizationID: metadata.GetCaptureMetadata().GetOrganizationId(), + LocationID: metadata.GetCaptureMetadata().GetLocationId(), + RobotID: metadata.GetCaptureMetadata().GetRobotId(), + PartID: metadata.GetCaptureMetadata().GetPartId(), + ComponentName: metadata.GetCaptureMetadata().GetComponentName(), + } + err := utilsml.ImageMetadataToJSONLines([]*utilsml.ImageMetadata{imageMetadata}, nil, mlpb.ModelType_MODEL_TYPE_UNSPECIFIED, file) + if err != nil { + return errors.Wrap(err, "error writing to file") + } + return nil +} + func binaryDataToJSONLines(ctx context.Context, client datapb.DataServiceClient, dst string, file *os.File, id string, forceLinuxPath bool, ) error { @@ -319,33 +424,5 @@ func binaryDataToJSONLines(ctx context.Context, client datapb.DataServiceClient, } datum := data[0] - fileName := filepath.Join(dst, filenameForDownload(datum.GetMetadata())) - if forceLinuxPath { - fileName = filepath.ToSlash(fileName) - } - ext := datum.GetMetadata().GetFileExt() - // If the file is gzipped, unzip. - if ext != gzFileExt && filepath.Ext(fileName) != ext { - // If the file name did not already include the extension (e.g. for data capture files), add it. - // Don't do this for files that we're unzipping. - fileName += ext - } - - imageMetadata := &utilsml.ImageMetadata{ - Timestamp: datum.GetMetadata().GetTimeRequested().AsTime(), - Tags: datum.GetMetadata().GetCaptureMetadata().GetTags(), - Annotations: datum.GetMetadata().GetAnnotations(), - Path: fileName, - BinaryDataID: datum.GetMetadata().GetBinaryDataId(), - OrganizationID: datum.GetMetadata().GetCaptureMetadata().GetOrganizationId(), - LocationID: datum.GetMetadata().GetCaptureMetadata().GetLocationId(), - RobotID: datum.GetMetadata().GetCaptureMetadata().GetRobotId(), - PartID: datum.GetMetadata().GetCaptureMetadata().GetPartId(), - ComponentName: datum.GetMetadata().GetCaptureMetadata().GetComponentName(), - } - err = utilsml.ImageMetadataToJSONLines([]*utilsml.ImageMetadata{imageMetadata}, nil, mlpb.ModelType_MODEL_TYPE_UNSPECIFIED, file) - if err != nil { - return errors.Wrap(err, "error writing to file") - } - return nil + return binaryDataToJSONLinesWithMetadata(dst, file, datum, forceLinuxPath) }