|
1 | 1 | #ifndef KIKIMR_DISABLE_S3_OPS
|
2 | 2 |
|
3 | 3 | #include "export_s3_buffer.h"
|
| 4 | +#include "backup_restore_traits.h" |
| 5 | +#include "export_s3.h" |
4 | 6 | #include "type_serialization.h"
|
5 | 7 |
|
6 | 8 | #include <ydb/core/backup/common/checksum.h>
|
| 9 | +#include <ydb/core/protos/s3_settings.pb.h> |
7 | 10 | #include <ydb/core/tablet_flat/flat_row_state.h>
|
8 | 11 | #include <yql/essentials/types/binary_json/read.h>
|
| 12 | +#include <ydb/public/api/protos/ydb_export.pb.h> |
9 | 13 | #include <ydb/public/lib/scheme_types/scheme_type_id.h>
|
10 | 14 |
|
11 | 15 | #include <library/cpp/string_utils/quote/quote.h>
|
@@ -430,6 +434,52 @@ void TZStdCompressionProcessor::Reset() {
|
430 | 434 |
|
431 | 435 | } // anonymous
|
432 | 436 |
|
| 437 | +IExport::IBuffer* TS3Export::CreateBuffer() const { |
| 438 | + using namespace NBackupRestoreTraits; |
| 439 | + |
| 440 | + const auto& scanSettings = Task.GetScanSettings(); |
| 441 | + const ui64 maxRows = scanSettings.GetRowsBatchSize() ? scanSettings.GetRowsBatchSize() : Max<ui64>(); |
| 442 | + const ui64 maxBytes = scanSettings.GetBytesBatchSize(); |
| 443 | + const ui64 minBytes = Task.GetS3Settings().GetLimits().GetMinWriteBatchSize(); |
| 444 | + |
| 445 | + TS3ExportBufferSettings bufferSettings; |
| 446 | + bufferSettings |
| 447 | + .WithColumns(Columns) |
| 448 | + .WithMaxRows(maxRows) |
| 449 | + .WithMaxBytes(maxBytes) |
| 450 | + .WithMinBytes(minBytes); // S3 API returns EntityTooSmall error if file part is smaller that 5MB: https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html |
| 451 | + if (Task.GetEnableChecksums()) { |
| 452 | + bufferSettings.WithChecksum(TS3ExportBufferSettings::Sha256Checksum()); |
| 453 | + } |
| 454 | + |
| 455 | + switch (CodecFromTask(Task)) { |
| 456 | + case ECompressionCodec::None: |
| 457 | + break; |
| 458 | + case ECompressionCodec::Zstd: |
| 459 | + bufferSettings |
| 460 | + .WithCompression(TS3ExportBufferSettings::ZstdCompression(Task.GetCompression().GetLevel())); |
| 461 | + break; |
| 462 | + case ECompressionCodec::Invalid: |
| 463 | + Y_ENSURE(false, "unreachable"); |
| 464 | + } |
| 465 | + |
| 466 | + if (Task.HasEncryptionSettings()) { |
| 467 | + NBackup::TEncryptionIV iv = NBackup::TEncryptionIV::Combine( |
| 468 | + NBackup::TEncryptionIV::FromBinaryString(Task.GetEncryptionSettings().GetIV()), |
| 469 | + NBackup::EBackupFileType::TableData, |
| 470 | + 0, // already combined |
| 471 | + Task.GetShardNum()); |
| 472 | + bufferSettings.WithEncryption( |
| 473 | + TS3ExportBufferSettings::TEncryptionSettings() |
| 474 | + .WithAlgorithm(Task.GetEncryptionSettings().GetEncryptionAlgorithm()) |
| 475 | + .WithKey(NBackup::TEncryptionKey(Task.GetEncryptionSettings().GetSymmetricKey().key())) |
| 476 | + .WithIV(iv) |
| 477 | + ); |
| 478 | + } |
| 479 | + |
| 480 | + return CreateS3ExportBuffer(std::move(bufferSettings)); |
| 481 | +} |
| 482 | + |
433 | 483 | NExportScan::IBuffer* CreateS3ExportBuffer(TS3ExportBufferSettings&& settings) {
|
434 | 484 | return new TS3Buffer(std::move(settings));
|
435 | 485 | }
|
|
0 commit comments