Skip to content

Commit 9239ca7

Browse files
committed
feat: Add JDBC log S3 upload with IAM role support
- Add JdbcLogUploader class for automatic JDBC log upload to S3 - Support both IAM role and explicit AWS credentials authentication - Add configuration options for S3 upload (bucket, prefix, region, credentials) - Enable JDBC tracing when log upload is enabled - Upload logs only when communication errors occur - Implement proper resource management with AutoCloseable - Add cross-platform support using system temp directory - Update README with configuration examples and usage instructions - Add AWS SDK v2 dependencies with conflict exclusions This feature helps with debugging Snowflake connection issues by automatically uploading JDBC driver logs to S3 when errors occur.
1 parent f612567 commit 9239ca7

File tree

4 files changed

+203
-0
lines changed

4 files changed

+203
-0
lines changed

README.md

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ Snowflake output plugin for Embulk loads records to Snowflake.
3030
- **merge_rule**: list of column assignments for updating existing records used in merge mode, for example `"foo" = T."foo" + S."foo"` (`T` means target table and `S` means source table). (string array, default: always overwrites with new values)
3131
- **batch_size**: size of a single batch insert (integer, default: 16777216)
3232
- **match_by_column_name**: specify whether to load semi-structured data into columns in the target table that match corresponding columns represented in the data. ("case_sensitive", "case_insensitive", "none", default: "none")
33+
- **upload_jdbc_log_to_s3**: enable automatic upload of JDBC driver logs to S3 when communication errors occur (boolean, default: false)
34+
- **s3_bucket**: S3 bucket name for JDBC log upload (string, required when upload_jdbc_log_to_s3 is true)
35+
- **s3_prefix**: S3 key prefix for JDBC log upload (string, required when upload_jdbc_log_to_s3 is true)
36+
- **s3_region**: AWS region for S3 bucket (string, required when upload_jdbc_log_to_s3 is true)
37+
- **s3_access_key_id**: AWS access key ID for S3 access (string, optional - uses IAM role if not specified)
38+
- **s3_secret_access_key**: AWS secret access key for S3 access (string, optional - uses IAM role if not specified)
3339
- **default_timezone**: If input column type (embulk type) is timestamp, this plugin needs to format the timestamp into a SQL string. This default_timezone option is used to control the timezone. You can overwrite timezone for each columns using column_options option. (string, default: `UTC`)
3440
- **column_options**: advanced: a key-value pairs where key is a column name and value is options for the column.
3541
- **type**: type of a column when this plugin creates new tables (e.g. `VARCHAR(255)`, `INTEGER NOT NULL UNIQUE`). This used when this plugin creates intermediate tables (insert, truncate_insert and merge modes), when it creates the target table (insert_direct and replace modes), and when it creates nonexistent target table automatically. (string, default: depends on input column type. `BIGINT` if input column type is long, `BOOLEAN` if boolean, `DOUBLE PRECISION` if double, `CLOB` if string, `TIMESTAMP` if timestamp)
@@ -62,6 +68,49 @@ Snowflake output plugin for Embulk loads records to Snowflake.
6268
* Transactional: Yes.
6369
* Resumable: No.
6470

71+
## JDBC Log Upload to S3
72+
73+
This plugin supports automatic upload of JDBC driver logs to S3 when communication errors occur. This feature is useful for debugging connection issues with Snowflake.
74+
75+
### Configuration Example
76+
77+
```yaml
78+
out:
79+
type: snowflake
80+
host: your-account.snowflakecomputing.com
81+
user: your_user
82+
password: your_password
83+
warehouse: your_warehouse
84+
database: your_database
85+
schema: your_schema
86+
table: your_table
87+
mode: insert
88+
89+
# JDBC log upload configuration
90+
upload_jdbc_log_to_s3: true
91+
s3_bucket: your-log-bucket
92+
s3_prefix: snowflake-jdbc-logs
93+
s3_region: us-east-1
94+
95+
# Optional: Explicit AWS credentials (uses IAM role if not specified)
96+
s3_access_key_id: YOUR_ACCESS_KEY_ID
97+
s3_secret_access_key: YOUR_SECRET_ACCESS_KEY
98+
```
99+
100+
### Authentication Methods
101+
102+
1. **IAM Role (Recommended)**: Leave `s3_access_key_id` and `s3_secret_access_key` unspecified. The plugin will use the default AWS credentials provider chain (IAM role, environment variables, etc.).
103+
104+
2. **Explicit Credentials**: Specify both `s3_access_key_id` and `s3_secret_access_key` for explicit authentication.
105+
106+
### Behavior
107+
108+
- JDBC logs are only uploaded when communication errors occur during Snowflake operations
109+
- The plugin automatically finds the latest `snowflake_jdbc*.log` file in the system temp directory
110+
- Logs are uploaded to `s3://{bucket}/{prefix}/{filename}`
111+
- If S3 upload fails, a warning is logged but the original error is still thrown
112+
- If required S3 configuration is missing, a warning is logged and log upload is skipped
113+
65114
## Build
66115

67116
```

build.gradle

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,24 @@ dependencies {
5252

5353
compile "org.embulk:embulk-output-jdbc:0.10.2"
5454
compile "net.snowflake:snowflake-jdbc:3.13.26"
55+
56+
implementation platform("software.amazon.awssdk:bom:2.25.20")
57+
58+
implementation("software.amazon.awssdk:s3") {
59+
exclude group: "com.fasterxml.jackson.core", module: "jackson-annotations"
60+
exclude group: "com.fasterxml.jackson.core", module: "jackson-core"
61+
exclude group: "com.fasterxml.jackson.core", module: "jackson-databind"
62+
exclude group: "joda-time", module: "joda-time"
63+
exclude group: "commons-logging", module: "commons-logging"
64+
}
65+
66+
implementation("software.amazon.awssdk:regions") {
67+
exclude group: "com.fasterxml.jackson.core", module: "jackson-annotations"
68+
exclude group: "com.fasterxml.jackson.core", module: "jackson-core"
69+
exclude group: "com.fasterxml.jackson.core", module: "jackson-databind"
70+
exclude group: "joda-time", module: "joda-time"
71+
exclude group: "commons-logging", module: "commons-logging"
72+
}
5573
}
5674
embulkPlugin {
5775
mainClass = "org.embulk.output.SnowflakeOutputPlugin"

src/main/java/org/embulk/output/SnowflakeOutputPlugin.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.fasterxml.jackson.annotation.JsonCreator;
44
import com.fasterxml.jackson.annotation.JsonValue;
5+
import java.io.File;
56
import java.io.IOException;
67
import java.sql.SQLException;
78
import java.sql.Types;
@@ -15,6 +16,7 @@
1516
import org.embulk.config.ConfigSource;
1617
import org.embulk.config.TaskSource;
1718
import org.embulk.output.jdbc.*;
19+
import org.embulk.output.s3.JdbcLogUploader;
1820
import org.embulk.output.snowflake.PrivateKeyReader;
1921
import org.embulk.output.snowflake.SnowflakeCopyBatchInsert;
2022
import org.embulk.output.snowflake.SnowflakeOutputConnection;
@@ -91,6 +93,30 @@ public interface SnowflakePluginTask extends PluginTask {
9193
@ConfigDefault("\"none\"")
9294
public MatchByColumnName getMatchByColumnName();
9395

96+
@Config("upload_jdbc_log_to_s3")
97+
@ConfigDefault("false")
98+
public boolean getUploadJdbcLogToS3();
99+
100+
@Config("s3_bucket")
101+
@ConfigDefault("null")
102+
public String getS3Bucket();
103+
104+
@Config("s3_prefix")
105+
@ConfigDefault("null")
106+
public String getS3Prefix();
107+
108+
@Config("s3_region")
109+
@ConfigDefault("null")
110+
public String getS3Region();
111+
112+
@Config("s3_access_key_id")
113+
@ConfigDefault("null")
114+
public String getS3AccessKeyId();
115+
116+
@Config("s3_secret_access_key")
117+
@ConfigDefault("null")
118+
public String getS3SecretAccessKey();
119+
94120
public void setCopyIntoTableColumnNames(String[] columnNames);
95121

96122
public String[] getCopyIntoTableColumnNames();
@@ -139,6 +165,9 @@ public static MatchByColumnName fromString(String value) {
139165
private static final int MASTER_TOKEN_INVALID_GS_CODE = 390115;
140166
private static final int ID_TOKEN_INVALID_LOGIN_REQUEST_GS_CODE = 390195;
141167

168+
private static final String ENCOUNTERED_COMMUNICATION_ERROR_MESSAGE =
169+
"JDBC driver encountered communication error";
170+
142171
@Override
143172
protected Class<? extends PluginTask> getTaskClass() {
144173
return SnowflakePluginTask.class;
@@ -198,6 +227,10 @@ protected JdbcOutputConnector getConnector(PluginTask task, boolean retryableMet
198227
props.setProperty("CLIENT_METADATA_REQUEST_USE_CONNECTION_CTX", "true");
199228
props.setProperty("MULTI_STATEMENT_COUNT", "0");
200229

230+
if (t.getUploadJdbcLogToS3()) {
231+
props.setProperty("tracing", "ALL");
232+
}
233+
201234
props.putAll(t.getOptions());
202235

203236
logConnectionProperties(url, props);
@@ -222,6 +255,42 @@ public ConfigDiff transaction(
222255
runDropStageWithRecovery(snowflakeCon, stageIdentifier, task);
223256
}
224257
} catch (Exception e) {
258+
if (e instanceof SQLException) {
259+
String message = e.getMessage();
260+
if (message != null
261+
&& message.contains(ENCOUNTERED_COMMUNICATION_ERROR_MESSAGE)
262+
&& t.getUploadJdbcLogToS3() == true) {
263+
final String s3Bucket = t.getS3Bucket();
264+
final String s3Prefix = t.getS3Prefix();
265+
final String s3Region = t.getS3Region();
266+
final String s3AccessKeyId = t.getS3AccessKeyId();
267+
final String s3SecretAccessKey = t.getS3SecretAccessKey();
268+
if (s3Bucket == null || s3Prefix == null || s3Region == null) {
269+
logger.warn("s3_bucket, s3_prefix, and s3_region must be set when upload_jdbc_log_to_s3 is true");
270+
} else {
271+
try (JdbcLogUploader jdbcLogUploader = new JdbcLogUploader(s3Bucket, s3Prefix, s3Region, s3AccessKeyId, s3SecretAccessKey)) {
272+
// snowflake_jdbc*.log で最新のファイルを探してアップロード
273+
String tmpDir = System.getProperty("java.io.tmpdir", "/tmp");
274+
File logDir = new File(tmpDir);
275+
File[] logFiles =
276+
logDir.listFiles(
277+
(dir, name) -> name.startsWith("snowflake_jdbc") && name.endsWith(".log"));
278+
if (logFiles != null && logFiles.length > 0) {
279+
// 最終更新日時が新しいファイルを選択
280+
Optional<File> latest =
281+
Arrays.stream(logFiles).max(Comparator.comparingLong(File::lastModified));
282+
if (latest.isPresent()) {
283+
jdbcLogUploader.uploadIfExists(latest.get());
284+
}
285+
} else {
286+
logger.warn("No snowflake_jdbc*.log file found in {} for upload", tmpDir);
287+
}
288+
} catch (Exception uploadException) {
289+
logger.warn("Failed to upload JDBC log to S3: {}", uploadException.getMessage());
290+
}
291+
}
292+
}
293+
}
225294
if (t.getDeleteStage() && t.getDeleteStageOnError()) {
226295
try {
227296
runDropStageWithRecovery(snowflakeCon, stageIdentifier, task);
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package org.embulk.output.s3;
2+
3+
import java.io.File;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
7+
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
8+
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
9+
import software.amazon.awssdk.core.sync.RequestBody;
10+
import software.amazon.awssdk.regions.Region;
11+
import software.amazon.awssdk.services.s3.S3Client;
12+
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
13+
14+
public class JdbcLogUploader implements AutoCloseable {
15+
private final Logger logger = LoggerFactory.getLogger(JdbcLogUploader.class);
16+
17+
private final S3Client s3Client;
18+
private final String bucket;
19+
private final String prefix;
20+
private final Region region;
21+
22+
public JdbcLogUploader(String bucket, String prefix, String region, String accessKeyId, String secretAccessKey) {
23+
this.bucket = bucket;
24+
this.prefix = prefix;
25+
this.region = Region.of(region);
26+
27+
if (accessKeyId != null && secretAccessKey != null) {
28+
// Use explicit credentials
29+
AwsBasicCredentials awsCreds = AwsBasicCredentials.create(accessKeyId, secretAccessKey);
30+
this.s3Client = S3Client.builder()
31+
.region(this.region)
32+
.credentialsProvider(StaticCredentialsProvider.create(awsCreds))
33+
.build();
34+
} else {
35+
// Use default credentials provider (IAM role, environment variables, etc.)
36+
this.s3Client = S3Client.builder()
37+
.region(this.region)
38+
.credentialsProvider(DefaultCredentialsProvider.create())
39+
.build();
40+
}
41+
}
42+
43+
public void uploadIfExists(File file) {
44+
if (!file.exists()) {
45+
logger.warn("File not found: {}", file.getAbsolutePath());
46+
return;
47+
}
48+
49+
String key = prefix + "/" + file.getName();
50+
try {
51+
PutObjectRequest putObjectRequest =
52+
PutObjectRequest.builder().bucket(bucket).key(key).build();
53+
54+
s3Client.putObject(putObjectRequest, RequestBody.fromFile(file));
55+
logger.info("Uploaded {} to s3://{}/{}", file.getAbsolutePath(), bucket, key);
56+
} catch (Exception e) {
57+
logger.error("Failed to upload {} to S3: {}", file.getAbsolutePath(), e.getMessage(), e);
58+
}
59+
}
60+
61+
@Override
62+
public void close() {
63+
if (s3Client != null) {
64+
s3Client.close();
65+
}
66+
}
67+
}

0 commit comments

Comments
 (0)