Skip to content

Commit

Permalink
Work
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Feb 19, 2025
1 parent 1b895b6 commit 9482bd6
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 24 deletions.
12 changes: 7 additions & 5 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1127,9 +1127,9 @@ mod tests {

use std::fs::File;

use crate::arrow::arrow_reader::{
ArrowReaderOptions, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder,
};
#[cfg(feature = "encryption")]
use crate::arrow::arrow_reader::ArrowReaderOptions;
use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
use crate::arrow::ARROW_SCHEMA_META_KEY;
use arrow::datatypes::ToByteSlice;
use arrow::datatypes::{DataType, Schema};
Expand All @@ -1143,8 +1143,10 @@ mod tests {

use crate::basic::Encoding;
use crate::data_type::AsBytes;
use crate::encryption::decryption::FileDecryptionProperties;
use crate::encryption::encryption::FileEncryptionProperties;
#[cfg(feature = "encryption")]
use crate::encryption::{
decryption::FileDecryptionProperties, encryption::FileEncryptionProperties,
};
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::read_offset_indexes;
Expand Down
5 changes: 4 additions & 1 deletion parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
data_page_boundary_ascending: true,
data_page_boundary_descending: true,
last_non_null_data_page_min_max: None,
// metadata_encryptor: metadata_encryptor,
// data_encryptor: data_encryptor,
}
}

Expand Down Expand Up @@ -3403,7 +3405,8 @@ mod tests {
.with_file_encryption_properties(file_encryption_properties)
.build(),
);
let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
let mut _writer = SerializedFileWriter::new(&file, schema, props).unwrap();
todo!()
}

#[test]
Expand Down
28 changes: 27 additions & 1 deletion parquet/src/encryption/ciphers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,32 @@ impl RingGcmBlockEncryptor {

impl BlockEncryptor for RingGcmBlockEncryptor {
fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Vec<u8> {
todo!()
let tag = [0u8; TAG_LEN];
let mut ciphertext = Vec::from(plaintext);
ciphertext.extend_from_slice(&tag);

self.key.seal_in_place_append_tag(self.nonce_sequence.advance().unwrap(), Aad::from(aad), &mut ciphertext).unwrap();
ciphertext
}
}


#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_round_trip() {
let key = [0u8; 16];
let mut encryptor = RingGcmBlockEncryptor::new(&key);
let decryptor = RingGcmBlockDecryptor::new(&key);

let plaintext = b"hello, world!";
let aad = b"some aad";

let ciphertext = encryptor.encrypt(plaintext, aad);
let decrypted = decryptor.decrypt(&ciphertext, aad).unwrap();

assert_eq!(plaintext, decrypted.as_slice());
}
}
18 changes: 12 additions & 6 deletions parquet/src/encryption/encryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use crate::encryption::ciphers::{RingGcmBlockEncryptor, BlockEncryptor};

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct FileEncryptionProperties {
encrypt_footer: bool,
footer_key: Vec<u8>,
Expand Down Expand Up @@ -62,20 +62,26 @@ impl EncryptionPropertiesBuilder {
pub struct FileEncryptor {
encryption_properties: FileEncryptionProperties,
footer_encryptor: Option<Arc<dyn BlockEncryptor>>,
column_encryptors: Option<HashMap<Vec<u8>, Arc<dyn BlockEncryptor>>>,
file_aad: Vec<u8>,
}

impl FileEncryptor {
pub(crate) fn new(
encryption_properties: FileEncryptionProperties,
aad_file_unique: Vec<u8>,
aad_prefix: Vec<u8>,
encryption_properties: FileEncryptionProperties, file_aad: Vec<u8>,
) -> Self {
let file_aad = [aad_prefix.as_slice(), aad_file_unique.as_slice()].concat();
let footer_encryptor = RingGcmBlockEncryptor::new(&encryption_properties.footer_key);
let footer_encryptor = RingGcmBlockEncryptor::new(&encryption_properties.footer_key.clone());
let mut column_encryptors: HashMap<Vec<u8>, Arc<dyn BlockEncryptor>> = HashMap::new();
if let Some(column_keys) = encryption_properties.column_keys.clone() {
for (column_name, key) in column_keys.iter() {
let column_encryptor = Arc::new(RingGcmBlockEncryptor::new(key));
column_encryptors.insert(column_name.clone(), column_encryptor);
}
}
Self {
encryption_properties,
footer_encryptor: Some(Arc::new(footer_encryptor)),
column_encryptors: Some(column_encryptors),
file_aad,
}
}
Expand Down
7 changes: 1 addition & 6 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ pub struct WriterProperties {
statistics_truncate_length: Option<usize>,
coerce_types: bool,
#[cfg(feature = "encryption")]
file_encryption_properties: Option<FileEncryptionProperties>,
pub(crate) file_encryption_properties: Option<FileEncryptionProperties>,
}

impl Default for WriterProperties {
Expand Down Expand Up @@ -374,11 +374,6 @@ impl WriterProperties {
.and_then(|c| c.bloom_filter_properties())
.or_else(|| self.default_column_properties.bloom_filter_properties())
}

#[cfg(feature = "encryption")]
pub fn file_encryption_properties(&self) -> Option<&FileEncryptionProperties> {
self.file_encryption_properties.as_ref()
}
}

/// Builder for [`WriterProperties`] parquet writer configuration.
Expand Down
50 changes: 45 additions & 5 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::column::{
writer::{get_column_writer, ColumnWriter},
};
use crate::data_type::DataType;
use crate::encryption::ciphers::RingGcmBlockEncryptor;
#[cfg(feature = "encryption")]
use crate::encryption::encryption::FileEncryptor;
use crate::errors::{ParquetError, Result};
use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr};
Expand Down Expand Up @@ -155,6 +155,8 @@ pub struct SerializedFileWriter<W: Write> {
// kv_metadatas will be appended to `props` when `write_metadata`
kv_metadatas: Vec<KeyValue>,
finished: bool,
#[cfg(feature = "encryption")]
file_encryptor: Option<FileEncryptor>,
}

impl<W: Write> Debug for SerializedFileWriter<W> {
Expand All @@ -173,19 +175,45 @@ impl<W: Write + Send> SerializedFileWriter<W> {
/// Creates new file writer.
pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> {
let mut buf = TrackedWrite::new(buf);
#[cfg(feature = "encryption")]
let file_encryptor = if properties.file_encryption_properties.is_some() {
Some(FileEncryptor::new(
properties
.file_encryption_properties
.as_ref()
.unwrap()
.clone(),
vec![],
))
} else {
None
};

#[cfg(feature = "encryption")]
if properties.file_encryption_properties.is_some() {
// todo: check if all columns in properties.file_encryption_properties.column_keys
// are present in the schema
let _fep = properties.file_encryption_properties.clone().unwrap();
Self::start_encrypted_file(&mut buf)?;
} else {
Self::start_file(&mut buf)?;
}
#[cfg(not(feature = "encryption"))]
Self::start_file(&mut buf)?;
Ok(Self {
buf,
schema: schema.clone(),
descr: Arc::new(SchemaDescriptor::new(schema)),
props: properties,
props: properties.clone(),
row_groups: vec![],
bloom_filters: vec![],
column_indexes: Vec::new(),
offset_indexes: Vec::new(),
row_group_index: 0,
kv_metadatas: Vec::new(),
finished: false,
#[cfg(feature = "encryption")]
file_encryptor,
})
}

Expand Down Expand Up @@ -274,6 +302,11 @@ impl<W: Write + Send> SerializedFileWriter<W> {
Ok(())
}

fn start_encrypted_file(buf: &mut TrackedWrite<W>) -> Result<()> {
buf.write_all(&PARQUET_MAGIC)?;
Ok(())
}

/// Assembles and writes metadata at the end of the file.
fn write_metadata(&mut self) -> Result<parquet::FileMetaData> {
self.finished = true;
Expand Down Expand Up @@ -525,9 +558,16 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
) -> Result<C>,
{
self.assert_previous_writer_closed()?;
let file_encryption_properties = self.props.file_encryption_properties();
let file_encryptor =
FileEncryptor::new(file_encryption_properties.unwrap().clone(), vec![], vec![]);
#[cfg(feature = "encryption")]
let file_encryptor = FileEncryptor::new(
self.props
.file_encryption_properties
.as_ref()
.unwrap()
.clone(),
vec![],
);

Ok(match self.next_column_desc() {
Some(column) => {
let props = self.props.clone();
Expand Down

0 comments on commit 9482bd6

Please sign in to comment.