Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: json logs from the bootloader #155

Merged
merged 1 commit into from
Jan 6, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/cmd/local/helm/airbyte_values.go
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@ func BuildAirbyteValues(ctx context.Context, opts ValuesOpts) (string, error) {
"global.auth.enabled=true",
"global.jobs.resources.limits.cpu=3",
"global.jobs.resources.limits.memory=4Gi",
"airbyte-bootloader.env_vars.PLATFORM_LOG_FORMAT=json",
}

span.SetAttributes(
6 changes: 3 additions & 3 deletions internal/cmd/local/local/install.go
Original file line number Diff line number Diff line change
@@ -442,10 +442,10 @@ func (c *Command) streamPodLogs(ctx context.Context, namespace, podName, prefix

s := newLogScanner(r)
for s.Scan() {
if s.line.level == "ERROR" {
pterm.Error.Printfln("%s: %s", prefix, s.line.msg)
if s.line.Level == "ERROR" {
pterm.Error.Printfln("%s: %s", prefix, s.line.Message)
} else {
pterm.Debug.Printfln("%s: %s", prefix, s.line.msg)
pterm.Debug.Printfln("%s: %s", prefix, s.line.Message)
}
}

103 changes: 75 additions & 28 deletions internal/cmd/local/local/log_utils.go
Original file line number Diff line number Diff line change
@@ -2,19 +2,10 @@ package local

import (
"bufio"
"encoding/json"
"io"
"regexp"
"strings"
)

// 2024-09-10 20:16:24 WARN i.m.s.r.u.Loggers$Slf4JLogger(warn):299 - [273....
var logRx = regexp.MustCompile(`^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} \x1b\[(?:1;)?\d+m(?P<level>[A-Z]+)\x1b\[m (?P<msg>\S+ - .*)`)

type logLine struct {
msg string
level string
}

Comment on lines -10 to -17
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we actually remove all of this? Isn't the PLATFORM_LOG_FORMAT envvar only going to work with certain newer versions of airbyte? Do we need to detect which version is running to know if the PLATFORM_LOG_FORMAT envvar is supported?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logRx wasn't really working already. I think shortly after I added that code, the platform tweaked the log format and the regexp stopped working.

type logScanner struct {
scanner *bufio.Scanner
line logLine
@@ -23,10 +14,6 @@ type logScanner struct {
func newLogScanner(r io.Reader) *logScanner {
return &logScanner{
scanner: bufio.NewScanner(r),
line: logLine{
msg: "",
level: "DEBUG",
},
}
}

@@ -36,26 +23,86 @@ func (j *logScanner) Scan() bool {
return false
}

// skip java stacktrace noise
if strings.HasPrefix(j.scanner.Text(), "\tat ") || strings.HasPrefix(j.scanner.Text(), "\t... ") {
continue
}

m := logRx.FindSubmatch(j.scanner.Bytes())

if m != nil {
j.line.msg = string(m[2])
j.line.level = string(m[1])
var data logLine
err := json.Unmarshal(j.scanner.Bytes(), &data)
// not all lines are JSON. don't propogate errors, just include the full line.
if err != nil {
j.line = logLine{Message: j.scanner.Text()}
} else {
// Some logs aren't from java (e.g. temporal) or they have a different format,
// or the log covers multiple lines (e.g. java stack trace). In that case, use the full line
// and reuse the level of the previous line.
j.line.msg = j.scanner.Text()
j.line = data
}

return true
}
}

func (j *logScanner) Err() error {
return j.scanner.Err()
}

/*
{
"timestamp": 1734712334950,
"message": "Unable to bootstrap Airbyte environment.",
"level": "ERROR",
"logSource": "platform",
"caller": {
"className": "io.airbyte.bootloader.Application",
"methodName": "main",
"lineNumber": 28,
"threadName": "main"
},
"throwable": {
"cause": {
"cause": null,
"stackTrace": [
{
"cn": "io.airbyte.bootloader.Application",
"ln": 25,
"mn": "main"
}
],
"message": "Unable to connect to the database.",
"suppressed": [],
"localizedMessage": "Unable to connect to the database."
},
"stackTrace": [
{
"cn": "io.airbyte.bootloader.Application",
"ln": 25,
"mn": "main"
}
],
"message": "Database availability check failed.",
"suppressed": [],
"localizedMessage": "Database availability check failed."
}
}
*/
type logLine struct {
Timestamp int64 `json:"timestamp"`
Message string `json:"message"`
Level string `json:"level"`
LogSource string `json:"logSource"`
Caller *logCaller `json:"caller"`
Throwable *logThrowable `json:throwable`
}

type logCaller struct {
ClassName string `json:"className"`
MethodName string `json:"methodName"`
LineNumber int `json:"lineNumber"`
ThreadName string `json:"threadName"`
}

type logStackElement struct {
ClassName string `json:"cn"`
LineNumber int `json:"ln"`
MethodName string `json:"mn"`
}

type logThrowable struct {
Cause *logThrowable `json:"cause"`
Stacktrace []logStackElement `json:"stackTrace"`
Message string `json:"message"`
}
33 changes: 8 additions & 25 deletions internal/cmd/local/local/log_utils_test.go
Original file line number Diff line number Diff line change
@@ -6,20 +6,8 @@ import (
)

var testLogs = strings.TrimSpace(`
2024-09-12 15:56:25 INFO i.a.d.c.DatabaseAvailabilityCheck(check):49 - Database is not ready yet. Please wait a moment, it might still be initializing...
2024-09-12 15:56:30 WARN i.m.s.r.u.Loggers$Slf4JLogger(warn):299 - [54bd6014, L:/127.0.0.1:52991 - R:localhost/127.0.0.1:8125] An exception has been observed post termination, use DEBUG level to see the full stack: java.net.PortUnreachableException: recvAddress(..) failed: Connection refused
2024-09-12 15:56:31 ERROR i.a.b.Application(main):25 - Unable to bootstrap Airbyte environment.
io.airbyte.db.init.DatabaseInitializationException: Database availability check failed.
at io.airbyte.db.init.DatabaseInitializer.initialize(DatabaseInitializer.java:54) ~[io.airbyte.airbyte-db-db-lib-0.64.3.jar:?]
at io.airbyte.bootloader.Bootloader.initializeDatabases(Bootloader.java:229) ~[io.airbyte-airbyte-bootloader-0.64.3.jar:?]
at io.airbyte.bootloader.Bootloader.load(Bootloader.java:104) ~[io.airbyte-airbyte-bootloader-0.64.3.jar:?]
at io.airbyte.bootloader.Application.main(Application.java:22) [io.airbyte-airbyte-bootloader-0.64.3.jar:?]
Caused by: io.airbyte.db.check.DatabaseCheckException: Unable to connect to the database.
at io.airbyte.db.check.DatabaseAvailabilityCheck.check(DatabaseAvailabilityCheck.java:40) ~[io.airbyte.airbyte-db-db-lib-0.64.3.jar:?]
at io.airbyte.db.init.DatabaseInitializer.initialize(DatabaseInitializer.java:45) ~[io.airbyte.airbyte-db-db-lib-0.64.3.jar:?]
... 3 more
2024-09-12 15:56:31 INFO i.m.r.Micronaut(lambda$start$0):118 - Embedded Application shutting down
2024-09-12T15:56:33.125352208Z Thread-4 INFO Loading mask data from '/seed/specs_secrets_mask.yaml
nonjsonline
{"timestamp":1734723317023,"message":"Waiting for database to become available...","level":"WARN","logSource":"platform","caller":{"className":"io.airbyte.db.check.DatabaseAvailabilityCheck","methodName":"check","lineNumber":38,"threadName":"main"},"throwable":null}
`)

func TestJavaLogScanner(t *testing.T) {
@@ -28,22 +16,17 @@ func TestJavaLogScanner(t *testing.T) {
expectLogLine := func(level, msg string) {
s.Scan()

if s.line.level != level {
t.Errorf("expected level %q but got %q", level, s.line.level)
if s.line.Level != level {
t.Errorf("expected level %q but got %q", level, s.line.Level)
}
if s.line.msg != msg {
t.Errorf("expected msg %q but got %q", msg, s.line.msg)
if s.line.Message != msg {
t.Errorf("expected msg %q but got %q", msg, s.line.Message)
}
if s.Err() != nil {
t.Errorf("unexpected error %v", s.Err())
}
}

expectLogLine("INFO", "i.a.d.c.DatabaseAvailabilityCheck(check):49 - Database is not ready yet. Please wait a moment, it might still be initializing...")
expectLogLine("WARN", "i.m.s.r.u.Loggers$Slf4JLogger(warn):299 - [54bd6014, L:/127.0.0.1:52991 - R:localhost/127.0.0.1:8125] An exception has been observed post termination, use DEBUG level to see the full stack: java.net.PortUnreachableException: recvAddress(..) failed: Connection refused")
expectLogLine("ERROR", "i.a.b.Application(main):25 - Unable to bootstrap Airbyte environment.")
expectLogLine("ERROR", "io.airbyte.db.init.DatabaseInitializationException: Database availability check failed.")
expectLogLine("ERROR", "Caused by: io.airbyte.db.check.DatabaseCheckException: Unable to connect to the database.")
expectLogLine("INFO", "i.m.r.Micronaut(lambda$start$0):118 - Embedded Application shutting down")
expectLogLine("INFO", "2024-09-12T15:56:33.125352208Z Thread-4 INFO Loading mask data from '/seed/specs_secrets_mask.yaml")
expectLogLine("", "nonjsonline")
expectLogLine("WARN", "Waiting for database to become available...")
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
airbyte-bootloader:
env_vars:
PLATFORM_LOG_FORMAT: json
global:
auth:
enabled: "true"