From e68db3c8bb45b91260fbbd2b4d7393aff37adfa9 Mon Sep 17 00:00:00 2001 From: peichins Date: Wed, 3 Jul 2024 15:40:44 +1000 Subject: [PATCH 1/3] added scripts to transcode from parquet to tfrecord files --- .../transcode_embeddings.py | 55 +++++++++++++++++++ tests/app_tests/test_transcode.py | 24 ++++++++ 2 files changed, 79 insertions(+) create mode 100644 src/transcode_embeddings/transcode_embeddings.py create mode 100644 tests/app_tests/test_transcode.py diff --git a/src/transcode_embeddings/transcode_embeddings.py b/src/transcode_embeddings/transcode_embeddings.py new file mode 100644 index 0000000..925a05f --- /dev/null +++ b/src/transcode_embeddings/transcode_embeddings.py @@ -0,0 +1,55 @@ +# Takes embeddings from parquet files and writes them to TFRecord files + +import tensorflow as tf +import pandas as pd +import numpy as np + +from chirp.inference.tf_examples import EmbeddingsTFRecordMultiWriter, bytes_feature, int_feature, float_feature, serialize_tensor + +from src.data_frames import df_to_embeddings + +def get_parquet_file_list(parquet_folder): + """ + Recursively finds all parquet files in a folder + """ + return [f for f in parquet_folder.rglob('*.parquet')] + +def transcode_from_parquet(parquet_filepaths, output_path): + + print(f"transcoding {len(parquet_filepaths)} parquet files to {output_path}") + + + with EmbeddingsTFRecordMultiWriter(output_path, num_files=256) as writer: + for i, fp in enumerate(parquet_filepaths): + + #print a dot without a newline every 10th file and + # print i of total every 100 files + if i % 10 == 0: + if i % 100 == 0: + print(f"\n{i} of {len(parquet_filepaths)}") + else: + print('.', end='', flush=True) + + # read the parquet file with pandas + embeddings_table = df_to_embeddings(pd.read_parquet(fp)) + embeddings = np.array(embeddings_table[:,:,2:1282], dtype=np.float16) + embeddings = tf.convert_to_tensor(embeddings, dtype=tf.float16) + features = { + 'filename': bytes_feature(embeddings_table[0][0][0].encode()), + 'timestamp_s': float_feature(0.0), + 'embedding': bytes_feature(serialize_tensor(embeddings, tf.float16)), + 'embedding_shape': int_feature(tuple(embeddings.shape)) + } + ex = tf.train.Example(features=tf.train.Features(feature=features)) + writer.write(ex.SerializeToString()) + +# def filename_to_url(filename, domain): + +# # filename is made of 3 parts: datetime, site, and file number, followed by a file extension +# # the 3 parts are separated by underscores. The site name might also contain an underscore +# # the datetime is in the format YYYYMMDDTHHmmssZ, file number is an integer, and the file extension is .parquet +# # we need to contruct a url like this: https://[domain]/ + + + +# return f"https://storage.googleapis.com/urban-sound-classification/{filename}" \ No newline at end of file diff --git a/tests/app_tests/test_transcode.py b/tests/app_tests/test_transcode.py new file mode 100644 index 0000000..b740b38 --- /dev/null +++ b/tests/app_tests/test_transcode.py @@ -0,0 +1,24 @@ +import os +from pathlib import Path +from src.transcode_embeddings.transcode_embeddings import transcode_from_parquet, get_parquet_file_list + + + +def test_transcode_from_parquet(): + # Define the input file path + input_folder = Path("./tests/files/embeddings") + + output_folder = Path("./tests/output/") + + parquet_files = get_parquet_file_list(input_folder) + + # Call the transcode_from_parquet function + transcode_from_parquet(parquet_files, output_folder) + + # Assert that the output files exist by checking that + # there are 256 files in the output folder with filenames embeddings-[date]-%[file_num]-of-00256 + # where date is a timestamp and file_num is a number between 0 and 255 with leading zeros + # by getting a list of files that match that pattern, and checking that the length of the list is 256 + output_files = [f for f in output_folder.rglob('embeddings-*-*-of-00256')] + assert len(output_files) == 256 + From e298509a660ec36b585231f8ad1e5505add00509 Mon Sep 17 00:00:00 2001 From: peichins Date: Wed, 7 Aug 2024 16:22:08 +1000 Subject: [PATCH 2/3] remove debug print statements --- src/transcode_embeddings/transcode_embeddings.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/transcode_embeddings/transcode_embeddings.py b/src/transcode_embeddings/transcode_embeddings.py index 925a05f..f760ba0 100644 --- a/src/transcode_embeddings/transcode_embeddings.py +++ b/src/transcode_embeddings/transcode_embeddings.py @@ -33,7 +33,9 @@ def transcode_from_parquet(parquet_filepaths, output_path): # read the parquet file with pandas embeddings_table = df_to_embeddings(pd.read_parquet(fp)) embeddings = np.array(embeddings_table[:,:,2:1282], dtype=np.float16) + #print(f"embeddings shape: {embeddings.shape}") embeddings = tf.convert_to_tensor(embeddings, dtype=tf.float16) + #print(f"embeddings shape: {embeddings.shape}") features = { 'filename': bytes_feature(embeddings_table[0][0][0].encode()), 'timestamp_s': float_feature(0.0), From ea413f0c00674fdaf2ad79cc1e0f2a65acc5e0bd Mon Sep 17 00:00:00 2001 From: peichins Date: Mon, 2 Dec 2024 22:15:42 +1000 Subject: [PATCH 3/3] change default number of tf record files to transcode to --- src/transcode_embeddings/transcode_embeddings.py | 4 ++-- tests/app_tests/test_transcode.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/transcode_embeddings/transcode_embeddings.py b/src/transcode_embeddings/transcode_embeddings.py index f760ba0..9fad634 100644 --- a/src/transcode_embeddings/transcode_embeddings.py +++ b/src/transcode_embeddings/transcode_embeddings.py @@ -14,12 +14,12 @@ def get_parquet_file_list(parquet_folder): """ return [f for f in parquet_folder.rglob('*.parquet')] -def transcode_from_parquet(parquet_filepaths, output_path): +def transcode_from_parquet(parquet_filepaths, output_path, num_files=10): print(f"transcoding {len(parquet_filepaths)} parquet files to {output_path}") - with EmbeddingsTFRecordMultiWriter(output_path, num_files=256) as writer: + with EmbeddingsTFRecordMultiWriter(output_path, num_files=num_files) as writer: for i, fp in enumerate(parquet_filepaths): #print a dot without a newline every 10th file and diff --git a/tests/app_tests/test_transcode.py b/tests/app_tests/test_transcode.py index b740b38..0e8b86c 100644 --- a/tests/app_tests/test_transcode.py +++ b/tests/app_tests/test_transcode.py @@ -13,7 +13,7 @@ def test_transcode_from_parquet(): parquet_files = get_parquet_file_list(input_folder) # Call the transcode_from_parquet function - transcode_from_parquet(parquet_files, output_folder) + transcode_from_parquet(parquet_files, output_folder, num_files=256) # Assert that the output files exist by checking that # there are 256 files in the output folder with filenames embeddings-[date]-%[file_num]-of-00256