Skip to content

Commit

Permalink
Work
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Feb 17, 2025
1 parent 6d740b0 commit 7b3c30b
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 16 deletions.
12 changes: 7 additions & 5 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1127,9 +1127,9 @@ mod tests {

use std::fs::File;

use crate::arrow::arrow_reader::{
ArrowReaderOptions, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder,
};
#[cfg(feature = "encryption")]
use crate::arrow::arrow_reader::ArrowReaderOptions;
use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
use crate::arrow::ARROW_SCHEMA_META_KEY;
use arrow::datatypes::ToByteSlice;
use arrow::datatypes::{DataType, Schema};
Expand All @@ -1143,8 +1143,10 @@ mod tests {

use crate::basic::Encoding;
use crate::data_type::AsBytes;
use crate::encryption::decryption::FileDecryptionProperties;
use crate::encryption::encryption::FileEncryptionProperties;
#[cfg(feature = "encryption")]
use crate::encryption::{
decryption::FileDecryptionProperties, encryption::FileEncryptionProperties,
};
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::read_offset_indexes;
Expand Down
3 changes: 3 additions & 0 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
data_page_boundary_ascending: true,
data_page_boundary_descending: true,
last_non_null_data_page_min_max: None,
// metadata_encryptor: metadata_encryptor,
// data_encryptor: data_encryptor,
}
}

Expand Down Expand Up @@ -3404,6 +3406,7 @@ mod tests {
.build(),
);
let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
todo!()
}

#[test]
Expand Down
18 changes: 12 additions & 6 deletions parquet/src/encryption/encryption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use crate::encryption::ciphers::{RingGcmBlockEncryptor, BlockEncryptor};

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct FileEncryptionProperties {
encrypt_footer: bool,
footer_key: Vec<u8>,
Expand Down Expand Up @@ -62,20 +62,26 @@ impl EncryptionPropertiesBuilder {
pub struct FileEncryptor {
encryption_properties: FileEncryptionProperties,
footer_encryptor: Option<Arc<dyn BlockEncryptor>>,
column_encryptors: Option<HashMap<Vec<u8>, Arc<dyn BlockEncryptor>>>,
file_aad: Vec<u8>,
}

impl FileEncryptor {
pub(crate) fn new(
encryption_properties: FileEncryptionProperties,
aad_file_unique: Vec<u8>,
aad_prefix: Vec<u8>,
encryption_properties: FileEncryptionProperties, file_aad: Vec<u8>,
) -> Self {
let file_aad = [aad_prefix.as_slice(), aad_file_unique.as_slice()].concat();
let footer_encryptor = RingGcmBlockEncryptor::new(&encryption_properties.footer_key);
let footer_encryptor = RingGcmBlockEncryptor::new(&encryption_properties.footer_key.clone());
let mut column_encryptors: HashMap<Vec<u8>, Arc<dyn BlockEncryptor>> = HashMap::new();
if let Some(column_keys) = encryption_properties.column_keys.clone() {
for (column_name, key) in column_keys.iter() {
let column_encryptor = Arc::new(RingGcmBlockEncryptor::new(key));
column_encryptors.insert(column_name.clone(), column_encryptor);
}
}
Self {
encryption_properties,
footer_encryptor: Some(Arc::new(footer_encryptor)),
column_encryptors: Some(column_encryptors),
file_aad,
}
}
Expand Down
42 changes: 37 additions & 5 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::column::{
writer::{get_column_writer, ColumnWriter},
};
use crate::data_type::DataType;
use crate::encryption::ciphers::RingGcmBlockEncryptor;
#[cfg(feature = "encryption")]
use crate::encryption::encryption::FileEncryptor;
use crate::errors::{ParquetError, Result};
use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr};
Expand Down Expand Up @@ -155,6 +155,8 @@ pub struct SerializedFileWriter<W: Write> {
// kv_metadatas will be appended to `props` when `write_metadata`
kv_metadatas: Vec<KeyValue>,
finished: bool,
#[cfg(feature = "encryption")]
file_encryptor: Option<FileEncryptor>,
}

impl<W: Write> Debug for SerializedFileWriter<W> {
Expand All @@ -173,19 +175,41 @@ impl<W: Write + Send> SerializedFileWriter<W> {
/// Creates new file writer.
pub fn new(buf: W, schema: TypePtr, properties: WriterPropertiesPtr) -> Result<Self> {
let mut buf = TrackedWrite::new(buf);
#[cfg(feature = "encryption")]
let file_encryptor = if properties.file_encryption_properties().is_some() {
Some(FileEncryptor::new(
properties.file_encryption_properties().unwrap().clone(),
vec![],
))
} else {
None
};

#[cfg(feature = "encryption")]
if properties.file_encryption_properties().is_some() {
// todo: check if all columns in properties.file_encryption_properties().column_keys
// are present in the schema
let fep = properties.file_encryption_properties().clone().unwrap();
Self::start_encrypted_file(&mut buf)?;
} else {
Self::start_file(&mut buf)?;
}
#[cfg(not(feature = "encryption"))]
Self::start_file(&mut buf)?;
Ok(Self {
buf,
schema: schema.clone(),
descr: Arc::new(SchemaDescriptor::new(schema)),
props: properties,
props: properties.clone(),
row_groups: vec![],
bloom_filters: vec![],
column_indexes: Vec::new(),
offset_indexes: Vec::new(),
row_group_index: 0,
kv_metadatas: Vec::new(),
finished: false,
#[cfg(feature = "encryption")]
file_encryptor,
})
}

Expand Down Expand Up @@ -274,6 +298,11 @@ impl<W: Write + Send> SerializedFileWriter<W> {
Ok(())
}

fn start_encrypted_file(buf: &mut TrackedWrite<W>) -> Result<()> {
buf.write_all(&PARQUET_MAGIC)?;
Ok(())
}

/// Assembles and writes metadata at the end of the file.
fn write_metadata(&mut self) -> Result<parquet::FileMetaData> {
self.finished = true;
Expand Down Expand Up @@ -525,9 +554,12 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
) -> Result<C>,
{
self.assert_previous_writer_closed()?;
let file_encryption_properties = self.props.file_encryption_properties();
let file_encryptor =
FileEncryptor::new(file_encryption_properties.unwrap().clone(), vec![], vec![]);
#[cfg(feature = "encryption")]
let file_encryptor = FileEncryptor::new(
self.props.file_encryption_properties().unwrap().clone(),
vec![],
);

Ok(match self.next_column_desc() {
Some(column) => {
let props = self.props.clone();
Expand Down

0 comments on commit 7b3c30b

Please sign in to comment.