Skip to content

Commit

Permalink
work
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Jan 9, 2025
1 parent e9d79c8 commit 3e7646d
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 26 deletions.
10 changes: 5 additions & 5 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -710,18 +710,18 @@ impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {

#[cfg(feature = "encryption")]
let crypto_context = if self.metadata.file_decryptor().is_some() {
let file_decryptor = Arc::new(self.metadata.file_decryptor().clone().unwrap());
let metadata_decryptor = Arc::new(self.metadata.file_decryptor().clone().unwrap());
let column_name = self
.metadata
.file_metadata()
.schema_descr()
.column(self.column_idx);
let data_decryptor =
Arc::new(file_decryptor.get_column_decryptor(column_name.name().as_bytes()));

let file_decryptor = self.metadata.file_decryptor().clone().unwrap().get_column_decryptor(column_name.name().as_bytes());
let data_decryptor = Arc::new(file_decryptor.clone());
let metadata_decryptor = Arc::new(file_decryptor.clone());

let crypto_context =
CryptoContext::new(rg_idx, self.column_idx, metadata_decryptor, data_decryptor);
CryptoContext::new(rg_idx, self.column_idx, data_decryptor, metadata_decryptor);
Some(Arc::new(crypto_context))
} else {
None
Expand Down
26 changes: 16 additions & 10 deletions parquet/src/encryption/ciphers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,23 +329,29 @@ impl FileDecryptor {
RingGcmBlockDecryptor::new(self.decryption_properties.footer_key.as_ref().unwrap())
}

pub(crate) fn has_column_key(&self, column_name: &[u8]) -> bool {
self.decryption_properties.column_keys.clone().unwrap().contains_key(column_name)
}

pub(crate) fn get_column_decryptor(&self, column_name: &[u8]) -> FileDecryptor {
if self.decryption_properties.column_keys.is_none() {
if self.decryption_properties.column_keys.is_none() || !self.has_column_key(column_name) {
return self.clone();
}
let column_keys = &self.decryption_properties.column_keys.clone().unwrap();
let decryptor = if let Some(column_key) = column_keys.get(column_name) {
Some(RingGcmBlockDecryptor::new(&column_key))
let decryption_properties = if let Some(column_key) = column_keys.get(column_name) {
DecryptionPropertiesBuilder::with_defaults()
.with_footer_key(column_key.clone())
.with_aad_prefix(self.aad_prefix.clone())
.build()
} else {
None
self.decryption_properties.clone()
};

FileDecryptor {
decryption_properties: self.decryption_properties.clone(),
footer_decryptor: decryptor,
aad_file_unique: self.aad_file_unique.clone(),
aad_prefix: self.aad_prefix.clone(),
}
FileDecryptor::new(
&decryption_properties,
self.aad_file_unique.clone(),
self.aad_prefix.clone(),
)
}

pub(crate) fn decryption_properties(&self) -> &FileDecryptionProperties {
Expand Down
4 changes: 4 additions & 0 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,10 @@ impl RowGroupMetaData {
cc = ColumnChunkMetaData::from_thrift(d.clone(), c)?;
} else {
let column_name = crypto_metadata.path_in_schema.join(".");
if !decryptor.unwrap().has_column_key(&column_name.as_bytes()) {
cc = ColumnChunkMetaData::from_thrift(d.clone(), c)?;
break;
}
let aad_file_unique = decryptor.unwrap().aad_file_unique();
let aad_prefix = decryptor
.unwrap()
Expand Down
20 changes: 9 additions & 11 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,15 @@ pub(crate) fn read_page_header<T: Read>(
#[cfg(feature = "encryption")]
if let Some(crypto_context) = crypto_context {
let decryptor = &crypto_context.data_decryptor();
// todo: in case of per-column key, decryptor should be column decryptor
if !decryptor.has_footer_key() || !decryptor.footer_decryptor().is_some() {
let mut prot = TCompactInputProtocol::new(input);
let page_header = PageHeader::read_from_in_protocol(&mut prot)?;
return Ok(page_header)
};

let file_decryptor = decryptor.column_decryptor();
// let file_decryptor = decryptor.column_decryptor();
let data_decryptor = &crypto_context.data_decryptor();
let aad_file_unique = decryptor.aad_file_unique();
let aad_prefix = decryptor.aad_prefix();

Expand All @@ -370,19 +377,10 @@ pub(crate) fn read_page_header<T: Read>(
crypto_context.page_ordinal,
)?;

// let mut len_bytes = [0; 4];
// input.read_exact(&mut len_bytes)?;
// let ciphertext_len = u32::from_le_bytes(len_bytes) as usize;
// let mut ciphertext = vec![0; 4 + ciphertext_len];
// input.read_exact(&mut ciphertext[4..])?;
// let mut ciphertext = Vec::new();
// input.read_to_end(&mut ciphertext)?;

let mut ciphertext: Vec<u8> = vec![];
input.read_to_end(&mut ciphertext)?;

// let ciphertext = input.read_to_end();
let buf = file_decryptor.decrypt(&ciphertext, aad.as_ref())?;
let buf = data_decryptor.footer_decryptor().unwrap().decrypt(&ciphertext, aad.as_ref())?;

let mut prot = TCompactSliceInputProtocol::new(buf.as_slice());
let page_header = PageHeader::read_from_in_protocol(&mut prot)?;
Expand Down

0 comments on commit 3e7646d

Please sign in to comment.