Skip to content

Feature/send logs to mongo #53

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: feature/filter_header
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions db/logs_dao.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package db

import (
"context"
"fmt"
"log"
"sync"
"time"

"go.mongodb.org/mongo-driver/mongo"
)

type LogDocument struct {
Log string `bson:"log"`
Key string `bson:"key"`
Timestamp int64 `bson:"timestamp"`
}

var (
logBuffer []LogDocument
bufferLock sync.Mutex
batchSize = 1000
logCollection *mongo.Collection
insertInterval = time.Second * 60
)

func init() {
collection, err := logsInstance()
if err != nil {
log.Fatalf("Error while getting mongo client for logs: %v", err)
}
logCollection = collection

go periodicInsert()
}

func logsInstance() (*mongo.Collection, error) {
client, err := GetMongoClient()
if err != nil {
fmt.Println("Error while getting mongo client for logs: " + err.Error())
return nil, err
}

return client.Database(AccountID).Collection(LogsCollectionName), nil
}

func InsertLog(logString string, key string) {
log.Println(logString)

logString = "MIRRORING: " + logString

logDoc := LogDocument{
Log: logString,
Key: key,
Timestamp: time.Now().Unix(),
}

bufferLock.Lock()
logBuffer = append(logBuffer, logDoc)
if len(logBuffer) >= batchSize {
flushLogs()
}
bufferLock.Unlock()
}

func flushLogs() {
if len(logBuffer) == 0 {
return
}

_, err := logCollection.InsertMany(context.Background(), toInterfaceSlice(logBuffer))
if err != nil {
fmt.Println("Error while inserting logs: " + err.Error())
} else {
fmt.Println("Logs inserted successfully")
}
logBuffer = logBuffer[:0] // reset the buffer
}

func toInterfaceSlice(logs []LogDocument) []interface{} {
interfaceSlice := make([]interface{}, len(logs))
for i, d := range logs {
interfaceSlice[i] = d
}
return interfaceSlice
}

func periodicInsert() {
for {
time.Sleep(insertInterval)
bufferLock.Lock()
flushLogs()
bufferLock.Unlock()
}
}
6 changes: 4 additions & 2 deletions db/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package db

import (
"context"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"os"
"strconv"
"sync"

"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

var (
Expand All @@ -18,6 +19,7 @@ var (
var AccountID = strconv.Itoa(1_000_000)
var TrafficMetricsCollectionName = "traffic_metrics"
var AccountSettingsCollectionName = "accounts_settings"
var LogsCollectionName = "logs_runtime"

func GetMongoClient() (*mongo.Client, error) {
once.Do(func() {
Expand Down
Loading
Loading