Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion operators/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::util::statistics::StatisticsError;
use bb8_postgres::bb8;
use geoengine_datatypes::dataset::{DataId, NamedData};
use geoengine_datatypes::dataset::{DataId, DatasetId, NamedData};
use geoengine_datatypes::error::ErrorSource;
use geoengine_datatypes::primitives::{FeatureDataType, TimeInterval};
use geoengine_datatypes::raster::RasterDataType;
Expand Down Expand Up @@ -472,6 +472,11 @@ pub enum Error {
Bb8Postgres {
source: bb8::RunError<tokio_postgres::Error>,
},

#[snafu(display("Dataset {} cannot be accessed, because it was deleted", id))]
DatasetDeleted {
id: DatasetId,
},
}

impl From<crate::adapters::SparseTilesFillAdapterError> for Error {
Expand Down
1 change: 1 addition & 0 deletions services/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ walkdir = "2.4"
zip = "2.1"
assert-json-diff = "2.0.2"
sha2 = "0.10.8"
strum_macros = "0.26.1"

[target.'cfg(target_os = "linux")'.dependencies]
nix = { version = "0.29", features = ["socket"] }
Expand Down
4 changes: 3 additions & 1 deletion services/src/api/handlers/datasets.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::datasets::storage::{check_reserved_tags, ReservedTags};
use crate::{
api::model::{
operators::{GdalLoadingInfoTemporalSlice, GdalMetaDataList},
Expand Down Expand Up @@ -635,7 +636,8 @@ pub async fn create_upload_dataset<C: ApplicationContext>(
let db = app_ctx.session_context(session).db();
let upload = db.load_upload(upload_id).await.context(UploadNotFound)?;

add_tag(&mut definition.properties, "upload".to_owned());
check_reserved_tags(&definition.properties.tags);
add_tag(&mut definition.properties, ReservedTags::Upload.to_string());

adjust_meta_data_path(&mut definition.meta_data, &upload)
.context(CannotResolveUploadFilePath)?;
Expand Down
47 changes: 5 additions & 42 deletions services/src/api/handlers/upload.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
use crate::api::model::responses::IdResponse;
use crate::contexts::{ApplicationContext, SessionContext};
use crate::datasets::upload::{FileId, FileUpload, Upload, UploadDb, UploadId, UploadRootPath};
use crate::datasets::upload::{create_upload, Upload, UploadDb, UploadId, UploadRootPath};
use crate::error::Error;
use crate::error::Result;
use crate::error::{self, Error};
use crate::util::path_with_base_path;
use actix_multipart::Multipart;
use actix_web::{web, FromRequest, Responder};
use futures::StreamExt;
use gdal::vector::LayerAccess;
use geoengine_datatypes::util::Identifier;
use geoengine_operators::util::gdal::gdal_open_dataset;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use std::path::Path;
use tokio::{fs, io::AsyncWriteExt};
use tokio::fs;
use utoipa::{ToResponse, ToSchema};

pub(crate) fn init_upload_routes<C>(cfg: &mut web::ServiceConfig)
Expand Down Expand Up @@ -70,43 +67,9 @@ impl<'a> ToSchema<'a> for FileUploadRequest {
async fn upload_handler<C: ApplicationContext>(
session: C::Session,
app_ctx: web::Data<C>,
mut body: Multipart,
body: Multipart,
) -> Result<web::Json<IdResponse<UploadId>>> {
let upload_id = UploadId::new();

let root = upload_id.root_path()?;

fs::create_dir_all(&root).await.context(error::Io)?;

let mut files: Vec<FileUpload> = vec![];
while let Some(item) = body.next().await {
let mut field = item?;
let file_name = field
.content_disposition()
.ok_or(error::Error::UploadFieldMissingFileName)?
.get_filename()
.ok_or(error::Error::UploadFieldMissingFileName)?
.to_owned();

let file_id = FileId::new();
let mut file = fs::File::create(root.join(&file_name))
.await
.context(error::Io)?;

let mut byte_size = 0_u64;
while let Some(chunk) = field.next().await {
let bytes = chunk?;
file.write_all(&bytes).await.context(error::Io)?;
byte_size += bytes.len() as u64;
}
file.flush().await.context(error::Io)?;

files.push(FileUpload {
id: file_id,
name: file_name,
byte_size,
});
}
let (upload_id, files) = create_upload(body).await?;

app_ctx
.session_context(session)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use async_trait::async_trait;
use tokio_postgres::Transaction;

use crate::error::Result;

use super::database_migration::{DatabaseVersion, Migration};

/// This migration adds new delete options for uploaded user datasets
pub struct Migration0012FairUploadDeletion;

#[async_trait]
impl Migration for Migration0012FairUploadDeletion {
fn prev_version(&self) -> Option<DatabaseVersion> {
Some("0011_remove_xgb".into())
}

fn version(&self) -> DatabaseVersion {
"0012_fair_upload_deletion".into()
}

async fn migrate(&self, _tx: &Transaction<'_>) -> Result<()> {
Ok(())
}
}
3 changes: 3 additions & 0 deletions services/src/contexts/migrations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub use crate::contexts::migrations::{
migration_0009_oidc_tokens::Migration0009OidcTokens,
migration_0010_s2_stack_time_buffers::Migration0010S2StacTimeBuffers,
migration_0011_remove_xgb::Migration0011RemoveXgb,
migration_0012_fair_upload_deletion::Migration0012FairUploadDeletion,
};
pub use database_migration::{
initialize_database, migrate_database, DatabaseVersion, Migration, MigrationResult,
Expand All @@ -30,6 +31,7 @@ pub mod migration_0008_band_names;
pub mod migration_0009_oidc_tokens;
pub mod migration_0010_s2_stack_time_buffers;
pub mod migration_0011_remove_xgb;
pub mod migration_0012_fair_upload_deletion;

#[cfg(test)]
mod schema_info;
Expand All @@ -55,6 +57,7 @@ pub fn all_migrations() -> Vec<Box<dyn Migration>> {
Box::new(Migration0009OidcTokens),
Box::new(Migration0010S2StacTimeBuffers),
Box::new(Migration0011RemoveXgb),
Box::new(Migration0012FairUploadDeletion),
]
}

Expand Down
2 changes: 1 addition & 1 deletion services/src/contexts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub use migrations::{
Migration0004DatasetListingProviderPrio, Migration0005GbifColumnSelection,
Migration0006EbvProvider, Migration0007OwnerRole, Migration0008BandNames,
Migration0009OidcTokens, Migration0010S2StacTimeBuffers, Migration0011RemoveXgb,
MigrationResult,
Migration0012FairUploadDeletion, MigrationResult,
};
pub use postgres::{PostgresContext, PostgresDb, PostgresSessionContext};
pub use session::{MockableSession, Session, SessionId, SimpleSession};
Expand Down
24 changes: 24 additions & 0 deletions services/src/datasets/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ use geoengine_operators::{mock::MockDatasetDataSourceLoadingInfo, source::GdalMe
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use std::fmt::Debug;
use std::str::FromStr;
use strum_macros;
use strum_macros::{Display, EnumString};
use utoipa::ToSchema;
use uuid::Uuid;
use validator::{Validate, ValidationError};
Expand Down Expand Up @@ -95,6 +98,13 @@ pub struct AutoCreateDataset {
pub tags: Option<Vec<String>>,
}

#[derive(Display, EnumString)]
pub enum ReservedTags {
#[strum(serialize = "upload")]
Upload,
Deleted,
}

fn validate_main_file(main_file: &str) -> Result<(), ValidationError> {
if main_file.is_empty() || main_file.contains('/') || main_file.contains("..") {
return Err(ValidationError::new("Invalid upload file name"));
Expand All @@ -114,6 +124,20 @@ pub fn validate_tags(tags: &Vec<String>) -> Result<(), ValidationError> {
Ok(())
}

pub fn check_reserved_tags(tags: &Option<Vec<String>>) {
if let Some(tags) = tags {
for tag in tags {
let conversion = ReservedTags::from_str(tag.as_str());
if let Ok(reserved) = conversion {
log::warn!(
"Adding a new dataset with a reserved tag: {}",
reserved.to_string()
);
}
}
}
}

#[derive(Deserialize, Serialize, Debug, Clone, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct SuggestMetaData {
Expand Down
53 changes: 53 additions & 0 deletions services/src/datasets/upload.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use actix_multipart::Multipart;
use std::fmt::{Display, Formatter};
use std::path::{Path, PathBuf};

Expand All @@ -9,7 +10,12 @@ use crate::{
util::config::{self, get_config_element},
};
use async_trait::async_trait;
use futures_util::StreamExt;
use geoengine_datatypes::util::Identifier;
use serde::{Deserialize, Deserializer, Serialize};
use snafu::ResultExt;
use tokio::fs;
use tokio::io::AsyncWriteExt;
use utoipa::ToSchema;

identifier!(UploadId);
Expand Down Expand Up @@ -118,6 +124,53 @@ pub struct UploadListing {
pub num_files: usize,
}

pub async fn create_upload(mut body: Multipart) -> Result<(UploadId, Vec<FileUpload>)> {
let upload_id = UploadId::new();

let root = upload_id.root_path()?;

fs::create_dir_all(&root).await.context(error::Io)?;

let mut files: Vec<FileUpload> = vec![];
while let Some(item) = body.next().await {
let mut field = item?;
let file_name = field
.content_disposition()
.ok_or(error::Error::UploadFieldMissingFileName)?
.get_filename()
.ok_or(error::Error::UploadFieldMissingFileName)?
.to_owned();

let file_id = FileId::new();
let mut file = fs::File::create(root.join(&file_name))
.await
.context(error::Io)?;

let mut byte_size = 0_u64;
while let Some(chunk) = field.next().await {
let bytes = chunk?;
file.write_all(&bytes).await.context(error::Io)?;
byte_size += bytes.len() as u64;
}
file.flush().await.context(error::Io)?;

files.push(FileUpload {
id: file_id,
name: file_name,
byte_size,
});
}

Ok((upload_id, files))
}

pub async fn delete_upload(upload_id: UploadId) -> Result<()> {
let root = upload_id.root_path()?;
log::debug!("Deleting {upload_id}");
fs::remove_dir_all(&root).await.context(error::Io)?;
Ok(())
}

#[async_trait]
pub trait UploadDb {
async fn load_upload(&self, upload: UploadId) -> Result<Upload>;
Expand Down
15 changes: 15 additions & 0 deletions services/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,21 @@ pub enum Error {
UnknownVolumeName {
volume_name: String,
},

#[snafu(display("Trying to set an expiration timestamp for Dataset {dataset} in the past"))]
ExpirationTimestampInPast {
dataset: DatasetId,
},
#[snafu(display("Illegal expiration update for Dataset {dataset}: {reason}"))]
IllegalExpirationUpdate {
dataset: DatasetId,
reason: String,
},
#[snafu(display("Illegal status for Dataset {dataset}: {status}"))]
IllegalDatasetStatus {
dataset: DatasetId,
status: String,
},
}

impl actix_web::error::ResponseError for Error {
Expand Down
12 changes: 11 additions & 1 deletion services/src/pro/api/apidoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ use crate::layers::listing::{
};
use crate::pro;
use crate::pro::api::handlers::users::{Quota, UpdateQuota};
use crate::pro::datasets::{
DatasetAccessStatusResponse, DatasetDeletionType, Expiration, ExpirationChange,
};
use crate::pro::permissions::{
Permission, PermissionListing, ResourceId, Role, RoleDescription, RoleId,
};
Expand Down Expand Up @@ -157,7 +160,10 @@ use utoipa::{Modify, OpenApi};
handlers::upload::upload_handler,
pro::api::handlers::permissions::add_permission_handler,
pro::api::handlers::permissions::remove_permission_handler,
pro::api::handlers::permissions::get_resource_permissions_handler
pro::api::handlers::permissions::get_resource_permissions_handler,
pro::api::handlers::datasets::set_dataset_expiration,
pro::api::handlers::datasets::get_dataset_status,
pro::api::handlers::datasets::gc_expired_datasets
),
components(
responses(
Expand Down Expand Up @@ -371,6 +377,10 @@ use utoipa::{Modify, OpenApi};
Volume,
VolumeName,
DataPath,
Expiration,
ExpirationChange,
DatasetDeletionType,
DatasetAccessStatusResponse,

PlotOutputFormat,
WrappedPlotOutput,
Expand Down
Loading