Skip to content

Commit aa14fdb

Browse files
authored
Add support for enforcing max_temp_cache_mb by limiting total JSONL disk usage. Closes #192
• Set max JSON size to 75% of the configured max_temp_cache_mb • Implement Pause and Resume functionality for RowSource conversion, collection to be paused to allow JSON to be processed and removed from disk. • Add GetFolderFileSizeMb to support conversion-time file size assessments.
1 parent a4ec4ca commit aa14fdb

29 files changed

+519
-50
lines changed

artifact_source/artifact_source_impl.go

+13-12
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/turbot/tailpipe-plugin-sdk/collection_state"
1919
"github.com/turbot/tailpipe-plugin-sdk/context_values"
2020
"github.com/turbot/tailpipe-plugin-sdk/events"
21+
"github.com/turbot/tailpipe-plugin-sdk/filepaths"
2122
"github.com/turbot/tailpipe-plugin-sdk/helpers"
2223
"github.com/turbot/tailpipe-plugin-sdk/parse"
2324
"github.com/turbot/tailpipe-plugin-sdk/rate_limiter"
@@ -61,7 +62,7 @@ type ArtifactSourceImpl[S artifact_source_config.ArtifactSourceConfig, T parse.C
6162

6263
// temporary directory for storing downloaded artifacts - this is initialised in the Init function
6364
// to be a subdirectory of the collection directory
64-
TempDir string
65+
TempArtifactDir string
6566

6667
// shadow the row_source.RowSourceImpl Source property, but using ArtifactSource interface
6768
Source ArtifactSource
@@ -95,8 +96,12 @@ func (a *ArtifactSourceImpl[S, T]) Init(ctx context.Context, params *row_source.
9596
a.NewCollectionStateFunc = collection_state.NewArtifactCollectionStateImpl
9697
}
9798

98-
// set the temp directory
99-
a.TempDir = filepath.Join(params.CollectionTempDir, "artifacts")
99+
// set the artifact directory
100+
artifactDir, err := filepaths.EnsureArtifactPath(params.CollectionTempDir)
101+
if err != nil {
102+
return err
103+
}
104+
a.TempArtifactDir = artifactDir
100105

101106
// call base to apply options and parse config
102107
if err := a.RowSourceImpl.Init(ctx, params, opts...); err != nil {
@@ -215,6 +220,11 @@ func (a *ArtifactSourceImpl[S, T]) OnArtifactDiscovered(ctx context.Context, inf
215220
a.artifactDownloadLimiter.Release()
216221
slog.Debug("ArtifactDiscovered - rate limiter released", "artifact", info.Name)
217222
}()
223+
224+
// as this is called from the file walking code, rather than as a result of an event,
225+
// we need to check for pausing here to avoid downloading artifacts when paused
226+
a.BlockWhilePaused(ctx)
227+
218228
// cast the source to an ArtifactSource and download the artifact
219229
err = a.Source.DownloadArtifact(ctx, info)
220230
if err != nil {
@@ -254,15 +264,6 @@ func (a *ArtifactSourceImpl[S, T]) OnArtifactDownloaded(ctx context.Context, inf
254264
return fmt.Errorf("error updating collection state: %w", err)
255265
}
256266

257-
// TODO verify if this condition can still occur
258-
// we have a race condition - if the processArtifact completes before we have time to handle the ArtifactDownloadedEvent
259-
// ArtifactSourceImpl.Collect may return before this function is complete
260-
// this may lead to sending a completion event before the artifact has been processed
261-
// we need to ensure the wait group is not closed before we leave this function
262-
//so increment the wait group again and
263-
//a.artifactExtractWg.Add(1)
264-
//defer a.artifactExtractWg.Done()
265-
266267
// if we DO NOT have a null loader, start the go routine to process the artifact
267268
// (if we have a null loader, we must have a ArtifactConversionCollector which will do the processing)
268269
if !nullLoader {

artifact_source/plugin_source_wrapper.go

+19-1
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,24 @@ func (w *PluginSourceWrapper) Collect(ctx context.Context) error {
192192
return nil
193193
}
194194

195+
// Pause is called to pause collection of source data
196+
func (w *PluginSourceWrapper) Pause() error {
197+
_, err := w.client.SourcePause()
198+
if err != nil {
199+
return err
200+
}
201+
return nil
202+
}
203+
204+
// Resume is called to resume collection of source data
205+
func (w *PluginSourceWrapper) Resume() error {
206+
_, err := w.client.SourcePause()
207+
if err != nil {
208+
return err
209+
}
210+
return nil
211+
}
212+
195213
func (w *PluginSourceWrapper) readSourceEvents(ctx context.Context, pluginStream proto.TailpipePlugin_AddObserverClient) {
196214
pluginEventChan := make(chan *proto.Event)
197215
errChan := make(chan error)
@@ -254,7 +272,7 @@ func (w *PluginSourceWrapper) readSourceEvents(ctx context.Context, pluginStream
254272
default:
255273
// pass all other events onwards
256274
// convert to a observable event
257-
ev, err := events.SourceEventFromProto(protoEvent)
275+
ev, err := events.EventFromProto(protoEvent)
258276
if err != nil {
259277
w.NotifyError(ctx, w.executionId, err)
260278
continue

events/chunk.go

+8
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@ func NewChunkEvent(executionId string, chunkNumber int32) *Chunk {
1717
}
1818
}
1919

20+
func ChunkWrittenFromProto(e *proto.Event) Event {
21+
event := e.GetChunkWrittenEvent()
22+
return &Chunk{
23+
ExecutionId: event.ExecutionId,
24+
ChunkNumber: event.ChunkNumber,
25+
}
26+
}
27+
2028
// ToProto converts the event to a proto.Event
2129
func (r *Chunk) ToProto() *proto.Event {
2230
return &proto.Event{

events/complete.go

+15
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package events
22

33
import (
4+
"errors"
45
"github.com/turbot/tailpipe-plugin-sdk/grpc/proto"
56
)
67

@@ -21,6 +22,20 @@ func NewCompletedEvent(executionId string, rowCount int64, chunksWritten int32,
2122
}
2223
}
2324

25+
func CompleteFromProto(e *proto.Event) Event {
26+
event := e.GetCompleteEvent()
27+
28+
res := &Complete{
29+
ExecutionId: event.ExecutionId,
30+
RowCount: event.RowCount,
31+
ChunksWritten: event.ChunkCount,
32+
}
33+
if event.Error != "" {
34+
res.Err = errors.New(event.Error)
35+
}
36+
return res
37+
}
38+
2439
func (c *Complete) ToProto() *proto.Event {
2540
errString := ""
2641
if c.Err != nil {

events/proto.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ import (
55
"github.com/turbot/tailpipe-plugin-sdk/grpc/proto"
66
)
77

8-
// SourceEventFromProto converts a proto.Event to an Event
8+
// EventFromProto converts a proto.Event to an Event
99
// NOTE: this function is used for sources implemented in external plugins so handles source events ONLY
10-
func SourceEventFromProto(e *proto.Event) (Event, error) {
10+
func EventFromProto(e *proto.Event) (Event, error) {
1111
switch e.Event.(type) {
1212
case *proto.Event_ArtifactDiscoveredEvent:
1313
return ArtifactDiscoveredFromProto(e), nil
@@ -21,6 +21,16 @@ func SourceEventFromProto(e *proto.Event) (Event, error) {
2121
return SourceCompleteFromProto(e), nil
2222
case *proto.Event_ErrorEvent:
2323
return ErrorFromProto(e), nil
24+
case *proto.Event_StartedEvent:
25+
return StartedFromProto(e), nil
26+
27+
case *proto.Event_StatusEvent:
28+
return StatusFromProto(e), nil
29+
case *proto.Event_ChunkWrittenEvent:
30+
return ChunkWrittenFromProto(e), nil
31+
case *proto.Event_CompleteEvent:
32+
return CompleteFromProto(e), nil
33+
2434
default:
2535
return nil, fmt.Errorf("event %s not expected from source", e)
2636
}

events/started.go

+6
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ func NewStartedEvent(executionId string) *Started {
1313
}
1414
}
1515

16+
func StartedFromProto(e *proto.Event) Event {
17+
return &Started{
18+
ExecutionId: e.GetStartedEvent().ExecutionId,
19+
}
20+
}
21+
1622
func (s *Started) ToProto() *proto.Event {
1723
return &proto.Event{
1824
Event: &proto.Event_StartedEvent{

events/status.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ func NewStatusEvent(executionId string) *Status {
4040
mut: &sync.Mutex{},
4141
}
4242
}
43-
func StatusFromProto(event *proto.EventStatus) *Status {
43+
44+
func StatusFromProto(e *proto.Event) *Status {
45+
event := e.GetStatusEvent()
4446
s := &Status{
4547
LatestArtifactLocation: event.LatestArtifactPath,
4648
ArtifactsDiscovered: event.ArtifactsDiscovered,

filepaths/json.go

+15
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,18 @@ func EnsureJSONLPath(baseDir string) (string, error) {
2020

2121
return sourceFilePath, nil
2222
}
23+
24+
// EnsureArtifactPath ensures the artifact temp dir path exists - this is the folder where the artifact source writes downloaded files
25+
func EnsureArtifactPath(baseDir string) (string, error) {
26+
artifactPath := filepath.Join(baseDir, "artifacts")
27+
28+
// ensure it exists
29+
if _, err := os.Stat(artifactPath); os.IsNotExist(err) {
30+
err = os.MkdirAll(artifactPath, 0755)
31+
if err != nil {
32+
return "", fmt.Errorf("could not create artifact directory %s: %w", artifactPath, err)
33+
}
34+
}
35+
36+
return artifactPath, nil
37+
}

formats/jsonl.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func (d *JsonLines) GetMapper() (mappers.Mapper[*types.DynamicRow], error) {
7676
return nil, fmt.Errorf("JsonLines format does not support a mapper")
7777
}
7878

79-
// GetCsvOpts converts the Delimited configuration into a slice of CSV options strings
79+
// GetReadJsonOpts converts the Delimited configuration into a slice of CSV options strings
8080
// in the format expected by DuckDb read_csv function
8181
func (d *JsonLines) GetReadJsonOpts() []string {
8282
var opts []string

go.mod

+3-2
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@ require (
1515
github.com/marcboeker/go-duckdb/v2 v2.1.0
1616
github.com/rs/xid v1.5.0
1717
github.com/satyrius/gonx v1.4.0
18+
github.com/sethvargo/go-retry v0.3.0
1819
github.com/stretchr/testify v1.10.0
19-
github.com/turbot/go-kit v1.2.0
20-
github.com/turbot/pipe-fittings/v2 v2.3.3
20+
github.com/turbot/go-kit v1.3.0
21+
github.com/turbot/pipe-fittings/v2 v2.4.0-rc.4
2122
github.com/zclconf/go-cty v1.14.4
2223
golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c
2324
golang.org/x/sync v0.12.0

go.sum

+6-4
Original file line numberDiff line numberDiff line change
@@ -649,6 +649,8 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g
649649
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
650650
github.com/satyrius/gonx v1.4.0 h1:F3uxif5Yx6FBzdQAh79bHQK6CTJugOcN0w0Z8azQuQg=
651651
github.com/satyrius/gonx v1.4.0/go.mod h1:+r8KNe5d2tjkZU+DfhERo0G6KxkGih+1qYF6tqLHwvk=
652+
github.com/sethvargo/go-retry v0.3.0 h1:EEt31A35QhrcRZtrYFDTBg91cqZVnFL2navjDrah2SE=
653+
github.com/sethvargo/go-retry v0.3.0/go.mod h1:mNX17F0C/HguQMyMyJxcnU471gOZGxCLyYaFyAZraas=
652654
github.com/shiena/ansicolor v0.0.0-20230509054315-a9deabde6e02 h1:v9ezJDHA1XGxViAUSIoO/Id7Fl63u6d0YmsAm+/p2hs=
653655
github.com/shiena/ansicolor v0.0.0-20230509054315-a9deabde6e02/go.mod h1:RF16/A3L0xSa0oSERcnhd8Pu3IXSDZSK2gmGIMsttFE=
654656
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
@@ -699,10 +701,10 @@ github.com/tklauser/numcpus v0.3.0 h1:ILuRUQBtssgnxw0XXIjKUC56fgnOrFoQQ/4+DeU2bi
699701
github.com/tklauser/numcpus v0.3.0/go.mod h1:yFGUr7TUHQRAhyqBcEg0Ge34zDBAsIvJJcyE6boqnA8=
700702
github.com/tkrajina/go-reflector v0.5.8 h1:yPADHrwmUbMq4RGEyaOUpz2H90sRsETNVpjzo3DLVQQ=
701703
github.com/tkrajina/go-reflector v0.5.8/go.mod h1:ECbqLgccecY5kPmPmXg1MrHW585yMcDkVl6IvJe64T4=
702-
github.com/turbot/go-kit v1.2.0 h1:4pBDu2LGoqF2y6ypL4xJjqlDW0BkUw3IIDlyHkU0O88=
703-
github.com/turbot/go-kit v1.2.0/go.mod h1:1xmRuQ0cn/10QUMNLNOAFIqN8P6Rz5s3VLT8mkN3nF8=
704-
github.com/turbot/pipe-fittings/v2 v2.3.3 h1:7nz5MQah4++qL4zLP7eRNyhiwwm9eXhdwbiyGyYSqfA=
705-
github.com/turbot/pipe-fittings/v2 v2.3.3/go.mod h1:wEoN4EseMTXophNlpOe740rAC9Jg0JhGRt5QM5R2ss8=
704+
github.com/turbot/go-kit v1.3.0 h1:6cIYPAO5hO9fG7Zd5UBC4Ch3+C6AiiyYS0UQnrUlTV0=
705+
github.com/turbot/go-kit v1.3.0/go.mod h1:piKJMYCF8EYmKf+D2B78Csy7kOHGmnQVOWingtLKWWQ=
706+
github.com/turbot/pipe-fittings/v2 v2.4.0-rc.4 h1:tiRlC8XTf9PH+KV/4wcz6uCog93zd3qNn4pZLwDcR8c=
707+
github.com/turbot/pipe-fittings/v2 v2.4.0-rc.4/go.mod h1:szte433cBDCaZcGe5zMVGG7uTl9HMaEYaQmuvzZRYIQ=
706708
github.com/turbot/pipes-sdk-go v0.12.0 h1:esbbR7bALa5L8n/hqroMPaQSSo3gNM/4X0iTmHa3D6U=
707709
github.com/turbot/pipes-sdk-go v0.12.0/go.mod h1:Mb+KhvqqEdRbz/6TSZc2QWDrMa5BN3E4Xw+gPt2TRkc=
708710
github.com/turbot/terraform-components v0.0.0-20231213122222-1f3526cab7a7 h1:qDMxFVd8Zo0rIhnEBdCIbR+T6WgjwkxpFZMN8zZmmjg=

grpc/proto/plugin.pb.go

+33-17
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

grpc/proto/plugin.proto

+4-1
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@ service TailpipePlugin {
1616
rpc CloseSource(Empty) returns (Empty);
1717
rpc SaveCollectionState(Empty) returns (Empty);
1818
rpc SourceCollect(SourceCollectRequest) returns (Empty);
19+
rpc SourcePause(Empty) returns (Empty);
20+
rpc SourceResume(Empty) returns (Empty);
1921
}
2022

2123
message Empty{}
2224

23-
2425
message CollectRequest {
2526
string table_name = 1;
2627
string partition_name = 2;
@@ -42,6 +43,8 @@ message CollectRequest {
4243
SourcePluginReattach source_plugin = 10;
4344
// optional: the collection start time
4445
google.protobuf.Timestamp from_time = 11;
46+
// the max space to take with temp files
47+
int64 max_temp_cache_size_mb= 12;
4548
}
4649

4750
message UpdateCollectionStateRequest {

0 commit comments

Comments
 (0)