Skip to content

Commit 394440d

Browse files
committed
feat: Add timestamp to S3 uploaded JDBC log filenames
- Add automatic timestamp formatting (yyyyMMdd_HHmmss) to uploaded filenames - Example: snowflake_jdbc.log -> snowflake_jdbc_20250630_021500.log - Update README with timestamp feature documentation - Clarify s3_prefix as optional parameter in documentation - Improve S3 path handling for empty prefix cases
1 parent 9ba88f1 commit 394440d

File tree

5 files changed

+107
-26
lines changed

5 files changed

+107
-26
lines changed

README.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ Snowflake output plugin for Embulk loads records to Snowflake.
3232
- **match_by_column_name**: specify whether to load semi-structured data into columns in the target table that match corresponding columns represented in the data. ("case_sensitive", "case_insensitive", "none", default: "none")
3333
- **upload_jdbc_log_to_s3**: enable automatic upload of JDBC driver logs to S3 when communication errors occur (boolean, default: false)
3434
- **s3_bucket**: S3 bucket name for JDBC log upload (string, required when upload_jdbc_log_to_s3 is true)
35-
- **s3_prefix**: S3 key prefix for JDBC log upload (string, required when upload_jdbc_log_to_s3 is true)
35+
- **s3_prefix**: S3 key prefix for JDBC log upload (string, optional - if empty, files are uploaded to bucket root)
3636
- **s3_region**: AWS region for S3 bucket (string, required when upload_jdbc_log_to_s3 is true)
3737
- **s3_access_key_id**: AWS access key ID for S3 access (string, optional - uses default AWS credentials provider chain if not specified)
3838
- **s3_secret_access_key**: AWS secret access key for S3 access (string, optional - uses default AWS credentials provider chain if not specified)
@@ -89,7 +89,7 @@ out:
8989
# JDBC log upload configuration
9090
upload_jdbc_log_to_s3: true
9191
s3_bucket: your-log-bucket
92-
s3_prefix: snowflake-jdbc-logs
92+
s3_prefix: snowflake-jdbc-logs # Optional: omit to upload to bucket root
9393
s3_region: us-east-1
9494

9595
# Optional: Explicit AWS credentials (uses IAM role if not specified)
@@ -107,7 +107,8 @@ out:
107107

108108
- JDBC logs are only uploaded when communication errors occur during Snowflake operations
109109
- The plugin automatically finds the latest `snowflake_jdbc*.log` file in the system temp directory
110-
- Logs are uploaded to `s3://{bucket}/{prefix}/{filename}`
110+
- **Automatic timestamping**: Upload timestamp is automatically added to the filename (format: `yyyyMMdd_HHmmss`)
111+
- Example: `snowflake_jdbc0.log.0` → `snowflake_jdbc0.log_20250630_021500.0`
111112
- If S3 upload fails, a warning is logged but the original error is still thrown
112113
- If required S3 configuration is missing, a warning is logged and log upload is skipped
113114

build.gradle

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,23 +53,36 @@ dependencies {
5353
compile "org.embulk:embulk-output-jdbc:0.10.2"
5454
compile "net.snowflake:snowflake-jdbc:3.13.26"
5555

56-
implementation platform("software.amazon.awssdk:bom:2.25.20")
56+
compile platform("software.amazon.awssdk:bom:2.25.20")
5757

58-
implementation("software.amazon.awssdk:s3") {
58+
compile("software.amazon.awssdk:s3") {
5959
exclude group: "com.fasterxml.jackson.core", module: "jackson-annotations"
6060
exclude group: "com.fasterxml.jackson.core", module: "jackson-core"
6161
exclude group: "com.fasterxml.jackson.core", module: "jackson-databind"
6262
exclude group: "joda-time", module: "joda-time"
6363
exclude group: "commons-logging", module: "commons-logging"
64+
exclude group: "org.slf4j", module: "slf4j-api"
6465
}
6566

66-
implementation("software.amazon.awssdk:regions") {
67+
compile("software.amazon.awssdk:regions") {
6768
exclude group: "com.fasterxml.jackson.core", module: "jackson-annotations"
6869
exclude group: "com.fasterxml.jackson.core", module: "jackson-core"
6970
exclude group: "com.fasterxml.jackson.core", module: "jackson-databind"
7071
exclude group: "joda-time", module: "joda-time"
7172
exclude group: "commons-logging", module: "commons-logging"
73+
exclude group: "org.slf4j", module: "slf4j-api"
7274
}
75+
76+
compile("software.amazon.awssdk:auth") {
77+
exclude group: "com.fasterxml.jackson.core", module: "jackson-annotations"
78+
exclude group: "com.fasterxml.jackson.core", module: "jackson-core"
79+
exclude group: "com.fasterxml.jackson.core", module: "jackson-databind"
80+
exclude group: "joda-time", module: "joda-time"
81+
exclude group: "commons-logging", module: "commons-logging"
82+
exclude group: "org.slf4j", module: "slf4j-api"
83+
}
84+
85+
compile "commons-logging:commons-logging:1.2"
7386
}
7487
embulkPlugin {
7588
mainClass = "org.embulk.output.SnowflakeOutputPlugin"

gradle/dependency-locks/embulkPluginRuntime.lockfile

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,53 @@ com.fasterxml.jackson.core:jackson-annotations:2.6.7
55
com.fasterxml.jackson.core:jackson-core:2.6.7
66
com.fasterxml.jackson.core:jackson-databind:2.6.7
77
com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.6.7
8+
commons-codec:commons-codec:1.15
9+
commons-logging:commons-logging:1.2
10+
io.netty:netty-buffer:4.1.108.Final
11+
io.netty:netty-codec-http2:4.1.108.Final
12+
io.netty:netty-codec-http:4.1.108.Final
13+
io.netty:netty-codec:4.1.108.Final
14+
io.netty:netty-common:4.1.108.Final
15+
io.netty:netty-handler:4.1.108.Final
16+
io.netty:netty-resolver:4.1.108.Final
17+
io.netty:netty-transport-classes-epoll:4.1.108.Final
18+
io.netty:netty-transport-native-unix-common:4.1.108.Final
19+
io.netty:netty-transport:4.1.108.Final
820
javax.validation:validation-api:1.1.0.Final
921
net.snowflake:snowflake-jdbc:3.13.26
22+
org.apache.httpcomponents:httpclient:4.5.13
23+
org.apache.httpcomponents:httpcore:4.4.13
1024
org.embulk:embulk-output-jdbc:0.10.2
1125
org.embulk:embulk-util-config:0.3.0
1226
org.embulk:embulk-util-json:0.1.1
1327
org.embulk:embulk-util-retryhelper:0.8.2
1428
org.embulk:embulk-util-rubytime:0.3.2
1529
org.embulk:embulk-util-timestamp:0.2.1
30+
org.reactivestreams:reactive-streams:1.0.4
31+
software.amazon.awssdk:annotations:2.25.20
32+
software.amazon.awssdk:apache-client:2.25.20
33+
software.amazon.awssdk:arns:2.25.20
34+
software.amazon.awssdk:auth:2.25.20
35+
software.amazon.awssdk:aws-core:2.25.20
36+
software.amazon.awssdk:aws-query-protocol:2.25.20
37+
software.amazon.awssdk:aws-xml-protocol:2.25.20
38+
software.amazon.awssdk:checksums-spi:2.25.20
39+
software.amazon.awssdk:checksums:2.25.20
40+
software.amazon.awssdk:crt-core:2.25.20
41+
software.amazon.awssdk:endpoints-spi:2.25.20
42+
software.amazon.awssdk:http-auth-aws:2.25.20
43+
software.amazon.awssdk:http-auth-spi:2.25.20
44+
software.amazon.awssdk:http-auth:2.25.20
45+
software.amazon.awssdk:http-client-spi:2.25.20
46+
software.amazon.awssdk:identity-spi:2.25.20
47+
software.amazon.awssdk:json-utils:2.25.20
48+
software.amazon.awssdk:metrics-spi:2.25.20
49+
software.amazon.awssdk:netty-nio-client:2.25.20
50+
software.amazon.awssdk:profiles:2.25.20
51+
software.amazon.awssdk:protocol-core:2.25.20
52+
software.amazon.awssdk:regions:2.25.20
53+
software.amazon.awssdk:s3:2.25.20
54+
software.amazon.awssdk:sdk-core:2.25.20
55+
software.amazon.awssdk:third-party-jackson-core:2.25.20
56+
software.amazon.awssdk:utils:2.25.20
57+
software.amazon.eventstream:eventstream:1.0.1

src/main/java/org/embulk/output/SnowflakeOutputPlugin.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ public ConfigDiff transaction(
250250
try {
251251
snowflakeCon = (SnowflakeOutputConnection) getConnector(task, true).connect(true);
252252
snowflakeCon.runCreateStage(stageIdentifier);
253+
253254
configDiff = super.transaction(config, schema, taskCount, control);
254255
if (t.getDeleteStage()) {
255256
runDropStageWithRecovery(snowflakeCon, stageIdentifier, task);
@@ -265,18 +266,22 @@ public ConfigDiff transaction(
265266
final Optional<String> s3Region = t.getS3Region();
266267
final Optional<String> s3AccessKeyId = t.getS3AccessKeyId();
267268
final Optional<String> s3SecretAccessKey = t.getS3SecretAccessKey();
268-
if (!s3Bucket.isPresent() || !s3Prefix.isPresent() || !s3Region.isPresent()) {
269-
logger.warn("s3_bucket, s3_prefix, and s3_region must be set when upload_jdbc_log_to_s3 is true");
269+
if (!s3Bucket.isPresent() || !s3Region.isPresent()) {
270+
logger.warn("s3_bucket, and s3_region must be set when upload_jdbc_log_to_s3 is true");
270271
} else {
271-
try (JdbcLogUploader jdbcLogUploader = new JdbcLogUploader(s3Bucket.get(), s3Prefix.get(), s3Region.get(), s3AccessKeyId.orElse(null), s3SecretAccessKey.orElse(null))) {
272-
// snowflake_jdbc*.log で最新のファイルを探してアップロード
272+
try (JdbcLogUploader jdbcLogUploader =
273+
new JdbcLogUploader(
274+
s3Bucket.get(),
275+
s3Prefix.orElse(""),
276+
s3Region.get(),
277+
s3AccessKeyId.orElse(null),
278+
s3SecretAccessKey.orElse(null))) {
273279
String tmpDir = System.getProperty("java.io.tmpdir", "/tmp");
274280
File logDir = new File(tmpDir);
275281
File[] logFiles =
276282
logDir.listFiles(
277-
(dir, name) -> name.startsWith("snowflake_jdbc") && name.endsWith(".log"));
283+
(dir, name) -> name.startsWith("snowflake_jdbc"));
278284
if (logFiles != null && logFiles.length > 0) {
279-
// 最終更新日時が新しいファイルを選択
280285
Optional<File> latest =
281286
Arrays.stream(logFiles).max(Comparator.comparingLong(File::lastModified));
282287
if (latest.isPresent()) {

src/main/java/org/embulk/output/s3/JdbcLogUploader.java

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package org.embulk.output.s3;
22

33
import java.io.File;
4+
import java.time.LocalDateTime;
5+
import java.time.format.DateTimeFormatter;
46
import org.slf4j.Logger;
57
import org.slf4j.LoggerFactory;
68
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
7-
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
89
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
10+
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
911
import software.amazon.awssdk.core.sync.RequestBody;
1012
import software.amazon.awssdk.regions.Region;
1113
import software.amazon.awssdk.services.s3.S3Client;
@@ -19,24 +21,27 @@ public class JdbcLogUploader implements AutoCloseable {
1921
private final String prefix;
2022
private final Region region;
2123

22-
public JdbcLogUploader(String bucket, String prefix, String region, String accessKeyId, String secretAccessKey) {
24+
public JdbcLogUploader(
25+
String bucket, String prefix, String region, String accessKeyId, String secretAccessKey) {
2326
this.bucket = bucket;
2427
this.prefix = prefix;
2528
this.region = Region.of(region);
26-
29+
2730
if (accessKeyId != null && secretAccessKey != null) {
2831
// Use explicit credentials
2932
AwsBasicCredentials awsCreds = AwsBasicCredentials.create(accessKeyId, secretAccessKey);
30-
this.s3Client = S3Client.builder()
31-
.region(this.region)
32-
.credentialsProvider(StaticCredentialsProvider.create(awsCreds))
33-
.build();
33+
this.s3Client =
34+
S3Client.builder()
35+
.region(this.region)
36+
.credentialsProvider(StaticCredentialsProvider.create(awsCreds))
37+
.build();
3438
} else {
3539
// Use default credentials provider (IAM role, environment variables, etc.)
36-
this.s3Client = S3Client.builder()
37-
.region(this.region)
38-
.credentialsProvider(DefaultCredentialsProvider.create())
39-
.build();
40+
this.s3Client =
41+
S3Client.builder()
42+
.region(this.region)
43+
.credentialsProvider(DefaultCredentialsProvider.create())
44+
.build();
4045
}
4146
}
4247

@@ -46,15 +51,30 @@ public void uploadIfExists(File file) {
4651
return;
4752
}
4853

49-
String key = prefix + "/" + file.getName();
54+
// Add timestamp to filename
55+
String timestamp = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd_HHmmss"));
56+
String originalFileName = file.getName();
57+
String fileNameWithTimestamp;
58+
59+
// Insert timestamp before file extension
60+
int lastDotIndex = originalFileName.lastIndexOf('.');
61+
if (lastDotIndex > 0) {
62+
String nameWithoutExt = originalFileName.substring(0, lastDotIndex);
63+
String extension = originalFileName.substring(lastDotIndex);
64+
fileNameWithTimestamp = nameWithoutExt + "_" + timestamp + extension;
65+
} else {
66+
fileNameWithTimestamp = originalFileName + "_" + timestamp;
67+
}
68+
69+
String key = prefix.isEmpty() ? fileNameWithTimestamp : prefix + "/" + fileNameWithTimestamp;
5070
try {
5171
PutObjectRequest putObjectRequest =
5272
PutObjectRequest.builder().bucket(bucket).key(key).build();
5373

5474
s3Client.putObject(putObjectRequest, RequestBody.fromFile(file));
55-
logger.info("Uploaded {} to s3://{}/{}", file.getAbsolutePath(), bucket, key);
75+
logger.info("Uploaded {}", file.getAbsolutePath());
5676
} catch (Exception e) {
57-
logger.error("Failed to upload {} to S3: {}", file.getAbsolutePath(), e.getMessage(), e);
77+
logger.error("Failed to upload {}", file.getAbsolutePath());
5878
}
5979
}
6080

0 commit comments

Comments
 (0)