Skip to content

Commit

Permalink
Handle reading plaintext footer files without decryption properties
Browse files Browse the repository at this point in the history
  • Loading branch information
adamreeve committed Jan 24, 2025
1 parent 06cfe65 commit 7f94e39
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 62 deletions.
1 change: 0 additions & 1 deletion parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ sysinfo = ["dep:sysinfo"]
crc = ["dep:crc32fast"]
# Enable SIMD UTF-8 validation
simdutf8 = ["dep:simdutf8"]
#encryption = ["aes-gcm", "base64"]
# Enable Parquet modular encryption support
encryption = ["dep:ring"]

Expand Down
58 changes: 57 additions & 1 deletion parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,6 @@ mod tests {
};
use arrow_select::concat::concat_batches;

#[cfg(feature = "encryption")]
use crate::arrow::arrow_reader::ArrowReaderMetadata;
use crate::arrow::arrow_reader::{
ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
Expand Down Expand Up @@ -1897,6 +1896,63 @@ mod tests {
verify_encryption_test_file_read(file, decryption_properties);
}

#[test]
fn test_non_uniform_encryption_plaintext_footer_without_decryption() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/encrypt_columns_plaintext_footer.parquet.encrypted");
let file = File::open(&path).unwrap();

let metadata = ArrowReaderMetadata::load(&file, Default::default(), None).unwrap();
let file_metadata = metadata.metadata.file_metadata();

assert_eq!(file_metadata.num_rows(), 50);
assert_eq!(file_metadata.schema_descr().num_columns(), 8);
assert_eq!(
file_metadata.created_by().unwrap(),
"parquet-cpp-arrow version 19.0.0-SNAPSHOT"
);

metadata.metadata.row_groups().iter().for_each(|rg| {
assert_eq!(rg.num_columns(), 8);
assert_eq!(rg.num_rows(), 50);
});

// Should be able to read unencrypted columns. Test reading one column.
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let mask = ProjectionMask::leaves(builder.parquet_schema(), [1]);
let record_reader = builder.with_projection(mask).build().unwrap();

let mut row_count = 0;
for batch in record_reader {
let batch = batch.unwrap();
row_count += batch.num_rows();

let time_col = batch
.column(0)
.as_primitive::<types::Time32MillisecondType>();
for (i, x) in time_col.iter().enumerate() {
assert_eq!(x.unwrap(), i as i32);
}
}

assert_eq!(row_count, file_metadata.num_rows() as usize);

// Reading an encrypted column should fail
let file = File::open(&path).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let mask = ProjectionMask::leaves(builder.parquet_schema(), [4]);
let mut record_reader = builder.with_projection(mask).build().unwrap();

match record_reader.next() {
Some(Err(ArrowError::ParquetError(s))) => {
assert!(s.contains("protocol error"));
}
_ => {
panic!("Expected ArrowError::ParquetError");
}
};
}

#[test]
#[cfg(feature = "encryption")]
fn test_non_uniform_encryption() {
Expand Down
27 changes: 16 additions & 11 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,18 +655,23 @@ impl RowGroupMetaData {
.zip(schema_descr.columns())
.enumerate()
{
if c.encrypted_column_metadata.is_some() {
// TODO: Allow ignoring encrypted column metadata in plaintext mode when no
// decryptor is set
let decryptor = decryptor.unwrap();
let Some(ColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(crypto_metadata)) =
c.crypto_metadata.clone()
else {
todo!()
// Read encrypted metadata if it's present and we have a decryptor.
if let (true, Some(decryptor)) = (c.encrypted_column_metadata.is_some(), decryptor) {
let column_decryptor = match c.crypto_metadata.as_ref() {
None => {
return Err(general_err!(
"No crypto_metadata is set for column {}, which has encrypted metadata",
i
));
}
Some(ColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(crypto_metadata)) => {
let column_name = crypto_metadata.path_in_schema.join(".");
decryptor.get_column_metadata_decryptor(column_name.as_bytes())
}
Some(ColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(_)) => {
decryptor.get_footer_decryptor()
}
};
let column_name = crypto_metadata.path_in_schema.join(".");
let column_decryptor =
decryptor.get_column_metadata_decryptor(column_name.as_bytes());

let column_aad = create_page_aad(
decryptor.file_aad(),
Expand Down
92 changes: 43 additions & 49 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,42 +717,31 @@ impl ParquetMetaDataReader {
}

#[cfg(feature = "encryption")]
let mut decryptor = None;
let mut file_decryptor = None;
#[cfg(feature = "encryption")]
let decrypted_fmd_buf;

#[cfg(feature = "encryption")]
if encrypted_footer {
if file_decryption_properties.is_none() {
return Err(general_err!("Parquet file has an encrypted footer but no decryption properties were provided"));
};

let t_file_crypto_metadata: TFileCryptoMetaData =
TFileCryptoMetaData::read_from_in_protocol(&mut prot)
.map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?;
let algo = t_file_crypto_metadata.encryption_algorithm;
let aes_gcm_algo = if let EncryptionAlgorithm::AESGCMV1(a) = algo {
a
} else {
unreachable!()
}; // todo decr: add support for GCMCTRV1

// todo decr: get key_metadata
let aad_file_unique = aes_gcm_algo.aad_file_unique.unwrap();
let aad_prefix: Vec<u8> = aes_gcm_algo.aad_prefix.unwrap_or_default();

decryptor = Some(FileDecryptor::new(
file_decryption_properties.unwrap(),
aad_file_unique,
aad_prefix,
));
let footer_decryptor = decryptor.clone().unwrap().get_footer_decryptor();
if let Some(file_decryption_properties) = file_decryption_properties {
let t_file_crypto_metadata: TFileCryptoMetaData =
TFileCryptoMetaData::read_from_in_protocol(&mut prot)
.map_err(|e| general_err!("Could not parse crypto metadata: {}", e))?;
let decryptor = get_file_decryptor(
t_file_crypto_metadata.encryption_algorithm,
file_decryption_properties,
);
let footer_decryptor = decryptor.get_footer_decryptor();
let aad_footer = create_footer_aad(decryptor.file_aad())?;

let aad_footer = create_footer_aad(decryptor.as_ref().unwrap().file_aad())?;
decrypted_fmd_buf =
footer_decryptor.decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())?;
prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref());

decrypted_fmd_buf =
footer_decryptor.decrypt(prot.as_slice().as_ref(), aad_footer.as_ref())?;
prot = TCompactSliceInputProtocol::new(decrypted_fmd_buf.as_ref());
file_decryptor = Some(decryptor);
} else {
return Err(general_err!("Parquet file has an encrypted footer but no decryption properties were provided"));
}
}

let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
Expand All @@ -761,33 +750,21 @@ impl ParquetMetaDataReader {
let schema_descr = Arc::new(SchemaDescriptor::new(schema));

#[cfg(feature = "encryption")]
if t_file_metadata.encryption_algorithm.is_some() {
let algo = t_file_metadata.encryption_algorithm;
let aes_gcm_algo = if let Some(EncryptionAlgorithm::AESGCMV1(a)) = algo {
a
} else {
unreachable!()
}; // todo decr: add support for GCMCTRV1
let aad_file_unique = aes_gcm_algo.aad_file_unique.unwrap();
let aad_prefix: Vec<u8> = aes_gcm_algo.aad_prefix.unwrap_or_default();

decryptor = Some(FileDecryptor::new(
file_decryption_properties.unwrap(),
aad_file_unique,
aad_prefix,
));
// todo get key_metadata etc. Set file decryptor in return value
// todo check signature
if let (Some(algo), Some(file_decryption_properties)) = (
t_file_metadata.encryption_algorithm,
file_decryption_properties,
) {
// File has a plaintext footer but encryption algorithm is set
file_decryptor = Some(get_file_decryptor(algo, file_decryption_properties));
}

let mut row_groups = Vec::new();
// TODO: row group filtering
for rg in t_file_metadata.row_groups {
let r = RowGroupMetaData::from_thrift(
schema_descr.clone(),
rg,
#[cfg(feature = "encryption")]
decryptor.as_ref(),
file_decryptor.as_ref(),
)?;
row_groups.push(r);
}
Expand All @@ -806,7 +783,7 @@ impl ParquetMetaDataReader {
file_metadata,
row_groups,
#[cfg(feature = "encryption")]
decryptor,
file_decryptor,
))
}

Expand Down Expand Up @@ -842,6 +819,23 @@ impl ParquetMetaDataReader {
}
}

#[cfg(feature = "encryption")]
fn get_file_decryptor(
encryption_algorithm: EncryptionAlgorithm,
file_decryption_properties: &FileDecryptionProperties,
) -> FileDecryptor {
let aes_gcm_algo = if let EncryptionAlgorithm::AESGCMV1(a) = encryption_algorithm {
a
} else {
todo!("GCMCTRV1 encryption algorithm")
};

let aad_file_unique = aes_gcm_algo.aad_file_unique.unwrap();
let aad_prefix: Vec<u8> = aes_gcm_algo.aad_prefix.unwrap_or_default();

FileDecryptor::new(file_decryption_properties, aad_file_unique, aad_prefix)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 7f94e39

Please sign in to comment.