diff --git a/internal/cmd/local/helm/airbyte_values.go b/internal/cmd/local/helm/airbyte_values.go index 9e5571a..22ccee2 100644 --- a/internal/cmd/local/helm/airbyte_values.go +++ b/internal/cmd/local/helm/airbyte_values.go @@ -25,6 +25,7 @@ func BuildAirbyteValues(ctx context.Context, opts ValuesOpts) (string, error) { "global.auth.enabled=true", "global.jobs.resources.limits.cpu=3", "global.jobs.resources.limits.memory=4Gi", + "airbyte-bootloader.env_vars.PLATFORM_LOG_FORMAT=json", } span.SetAttributes( diff --git a/internal/cmd/local/local/install.go b/internal/cmd/local/local/install.go index d3ce08d..7282cda 100644 --- a/internal/cmd/local/local/install.go +++ b/internal/cmd/local/local/install.go @@ -442,10 +442,10 @@ func (c *Command) streamPodLogs(ctx context.Context, namespace, podName, prefix s := newLogScanner(r) for s.Scan() { - if s.line.level == "ERROR" { - pterm.Error.Printfln("%s: %s", prefix, s.line.msg) + if s.line.Level == "ERROR" { + pterm.Error.Printfln("%s: %s", prefix, s.line.Message) } else { - pterm.Debug.Printfln("%s: %s", prefix, s.line.msg) + pterm.Debug.Printfln("%s: %s", prefix, s.line.Message) } } diff --git a/internal/cmd/local/local/log_utils.go b/internal/cmd/local/local/log_utils.go index 0d491b1..4dc2356 100644 --- a/internal/cmd/local/local/log_utils.go +++ b/internal/cmd/local/local/log_utils.go @@ -2,19 +2,10 @@ package local import ( "bufio" + "encoding/json" "io" - "regexp" - "strings" ) -// 2024-09-10 20:16:24 WARN i.m.s.r.u.Loggers$Slf4JLogger(warn):299 - [273.... -var logRx = regexp.MustCompile(`^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} \x1b\[(?:1;)?\d+m(?P[A-Z]+)\x1b\[m (?P\S+ - .*)`) - -type logLine struct { - msg string - level string -} - type logScanner struct { scanner *bufio.Scanner line logLine @@ -23,10 +14,6 @@ type logScanner struct { func newLogScanner(r io.Reader) *logScanner { return &logScanner{ scanner: bufio.NewScanner(r), - line: logLine{ - msg: "", - level: "DEBUG", - }, } } @@ -36,22 +23,15 @@ func (j *logScanner) Scan() bool { return false } - // skip java stacktrace noise - if strings.HasPrefix(j.scanner.Text(), "\tat ") || strings.HasPrefix(j.scanner.Text(), "\t... ") { - continue - } - - m := logRx.FindSubmatch(j.scanner.Bytes()) - - if m != nil { - j.line.msg = string(m[2]) - j.line.level = string(m[1]) + var data logLine + err := json.Unmarshal(j.scanner.Bytes(), &data) + // not all lines are JSON. don't propogate errors, just include the full line. + if err != nil { + j.line = logLine{Message: j.scanner.Text()} } else { - // Some logs aren't from java (e.g. temporal) or they have a different format, - // or the log covers multiple lines (e.g. java stack trace). In that case, use the full line - // and reuse the level of the previous line. - j.line.msg = j.scanner.Text() + j.line = data } + return true } } @@ -59,3 +39,70 @@ func (j *logScanner) Scan() bool { func (j *logScanner) Err() error { return j.scanner.Err() } + +/* + { + "timestamp": 1734712334950, + "message": "Unable to bootstrap Airbyte environment.", + "level": "ERROR", + "logSource": "platform", + "caller": { + "className": "io.airbyte.bootloader.Application", + "methodName": "main", + "lineNumber": 28, + "threadName": "main" + }, + "throwable": { + "cause": { + "cause": null, + "stackTrace": [ + { + "cn": "io.airbyte.bootloader.Application", + "ln": 25, + "mn": "main" + } + ], + "message": "Unable to connect to the database.", + "suppressed": [], + "localizedMessage": "Unable to connect to the database." + }, + "stackTrace": [ + { + "cn": "io.airbyte.bootloader.Application", + "ln": 25, + "mn": "main" + } + ], + "message": "Database availability check failed.", + "suppressed": [], + "localizedMessage": "Database availability check failed." + } + } +*/ +type logLine struct { + Timestamp int64 `json:"timestamp"` + Message string `json:"message"` + Level string `json:"level"` + LogSource string `json:"logSource"` + Caller *logCaller `json:"caller"` + Throwable *logThrowable `json:throwable` +} + +type logCaller struct { + ClassName string `json:"className"` + MethodName string `json:"methodName"` + LineNumber int `json:"lineNumber"` + ThreadName string `json:"threadName"` +} + +type logStackElement struct { + ClassName string `json:"cn"` + LineNumber int `json:"ln"` + MethodName string `json:"mn"` +} + +type logThrowable struct { + Cause *logThrowable `json:"cause"` + Stacktrace []logStackElement `json:"stackTrace"` + Message string `json:"message"` +} diff --git a/internal/cmd/local/local/log_utils_test.go b/internal/cmd/local/local/log_utils_test.go index a16f646..9e96768 100644 --- a/internal/cmd/local/local/log_utils_test.go +++ b/internal/cmd/local/local/log_utils_test.go @@ -6,20 +6,8 @@ import ( ) var testLogs = strings.TrimSpace(` -2024-09-12 15:56:25 INFO i.a.d.c.DatabaseAvailabilityCheck(check):49 - Database is not ready yet. Please wait a moment, it might still be initializing... -2024-09-12 15:56:30 WARN i.m.s.r.u.Loggers$Slf4JLogger(warn):299 - [54bd6014, L:/127.0.0.1:52991 - R:localhost/127.0.0.1:8125] An exception has been observed post termination, use DEBUG level to see the full stack: java.net.PortUnreachableException: recvAddress(..) failed: Connection refused -2024-09-12 15:56:31 ERROR i.a.b.Application(main):25 - Unable to bootstrap Airbyte environment. -io.airbyte.db.init.DatabaseInitializationException: Database availability check failed. - at io.airbyte.db.init.DatabaseInitializer.initialize(DatabaseInitializer.java:54) ~[io.airbyte.airbyte-db-db-lib-0.64.3.jar:?] - at io.airbyte.bootloader.Bootloader.initializeDatabases(Bootloader.java:229) ~[io.airbyte-airbyte-bootloader-0.64.3.jar:?] - at io.airbyte.bootloader.Bootloader.load(Bootloader.java:104) ~[io.airbyte-airbyte-bootloader-0.64.3.jar:?] - at io.airbyte.bootloader.Application.main(Application.java:22) [io.airbyte-airbyte-bootloader-0.64.3.jar:?] -Caused by: io.airbyte.db.check.DatabaseCheckException: Unable to connect to the database. - at io.airbyte.db.check.DatabaseAvailabilityCheck.check(DatabaseAvailabilityCheck.java:40) ~[io.airbyte.airbyte-db-db-lib-0.64.3.jar:?] - at io.airbyte.db.init.DatabaseInitializer.initialize(DatabaseInitializer.java:45) ~[io.airbyte.airbyte-db-db-lib-0.64.3.jar:?] - ... 3 more -2024-09-12 15:56:31 INFO i.m.r.Micronaut(lambda$start$0):118 - Embedded Application shutting down -2024-09-12T15:56:33.125352208Z Thread-4 INFO Loading mask data from '/seed/specs_secrets_mask.yaml +nonjsonline +{"timestamp":1734723317023,"message":"Waiting for database to become available...","level":"WARN","logSource":"platform","caller":{"className":"io.airbyte.db.check.DatabaseAvailabilityCheck","methodName":"check","lineNumber":38,"threadName":"main"},"throwable":null} `) func TestJavaLogScanner(t *testing.T) { @@ -28,22 +16,17 @@ func TestJavaLogScanner(t *testing.T) { expectLogLine := func(level, msg string) { s.Scan() - if s.line.level != level { - t.Errorf("expected level %q but got %q", level, s.line.level) + if s.line.Level != level { + t.Errorf("expected level %q but got %q", level, s.line.Level) } - if s.line.msg != msg { - t.Errorf("expected msg %q but got %q", msg, s.line.msg) + if s.line.Message != msg { + t.Errorf("expected msg %q but got %q", msg, s.line.Message) } if s.Err() != nil { t.Errorf("unexpected error %v", s.Err()) } } - expectLogLine("INFO", "i.a.d.c.DatabaseAvailabilityCheck(check):49 - Database is not ready yet. Please wait a moment, it might still be initializing...") - expectLogLine("WARN", "i.m.s.r.u.Loggers$Slf4JLogger(warn):299 - [54bd6014, L:/127.0.0.1:52991 - R:localhost/127.0.0.1:8125] An exception has been observed post termination, use DEBUG level to see the full stack: java.net.PortUnreachableException: recvAddress(..) failed: Connection refused") - expectLogLine("ERROR", "i.a.b.Application(main):25 - Unable to bootstrap Airbyte environment.") - expectLogLine("ERROR", "io.airbyte.db.init.DatabaseInitializationException: Database availability check failed.") - expectLogLine("ERROR", "Caused by: io.airbyte.db.check.DatabaseCheckException: Unable to connect to the database.") - expectLogLine("INFO", "i.m.r.Micronaut(lambda$start$0):118 - Embedded Application shutting down") - expectLogLine("INFO", "2024-09-12T15:56:33.125352208Z Thread-4 INFO Loading mask data from '/seed/specs_secrets_mask.yaml") + expectLogLine("", "nonjsonline") + expectLogLine("WARN", "Waiting for database to become available...") } diff --git a/internal/cmd/local/local/testdata/expected-default.values.yaml b/internal/cmd/local/local/testdata/expected-default.values.yaml index 5c28fc8..d2e812b 100644 --- a/internal/cmd/local/local/testdata/expected-default.values.yaml +++ b/internal/cmd/local/local/testdata/expected-default.values.yaml @@ -1,3 +1,6 @@ +airbyte-bootloader: + env_vars: + PLATFORM_LOG_FORMAT: json global: auth: enabled: "true"