diff --git a/artifact_source/artifact_source_impl.go b/artifact_source/artifact_source_impl.go index c558478..73e56f7 100644 --- a/artifact_source/artifact_source_impl.go +++ b/artifact_source/artifact_source_impl.go @@ -18,6 +18,7 @@ import ( "github.com/turbot/tailpipe-plugin-sdk/collection_state" "github.com/turbot/tailpipe-plugin-sdk/context_values" "github.com/turbot/tailpipe-plugin-sdk/events" + "github.com/turbot/tailpipe-plugin-sdk/filepaths" "github.com/turbot/tailpipe-plugin-sdk/helpers" "github.com/turbot/tailpipe-plugin-sdk/parse" "github.com/turbot/tailpipe-plugin-sdk/rate_limiter" @@ -61,7 +62,7 @@ type ArtifactSourceImpl[S artifact_source_config.ArtifactSourceConfig, T parse.C // temporary directory for storing downloaded artifacts - this is initialised in the Init function // to be a subdirectory of the collection directory - TempDir string + TempArtifactDir string // shadow the row_source.RowSourceImpl Source property, but using ArtifactSource interface Source ArtifactSource @@ -95,8 +96,12 @@ func (a *ArtifactSourceImpl[S, T]) Init(ctx context.Context, params *row_source. a.NewCollectionStateFunc = collection_state.NewArtifactCollectionStateImpl } - // set the temp directory - a.TempDir = filepath.Join(params.CollectionTempDir, "artifacts") + // set the artifact directory + artifactDir, err := filepaths.EnsureArtifactPath(params.CollectionTempDir) + if err != nil { + return err + } + a.TempArtifactDir = artifactDir // call base to apply options and parse config if err := a.RowSourceImpl.Init(ctx, params, opts...); err != nil { @@ -215,6 +220,11 @@ func (a *ArtifactSourceImpl[S, T]) OnArtifactDiscovered(ctx context.Context, inf a.artifactDownloadLimiter.Release() slog.Debug("ArtifactDiscovered - rate limiter released", "artifact", info.Name) }() + + // as this is called from the file walking code, rather than as a result of an event, + // we need to check for pausing here to avoid downloading artifacts when paused + a.BlockWhilePaused(ctx) + // cast the source to an ArtifactSource and download the artifact err = a.Source.DownloadArtifact(ctx, info) if err != nil { @@ -254,15 +264,6 @@ func (a *ArtifactSourceImpl[S, T]) OnArtifactDownloaded(ctx context.Context, inf return fmt.Errorf("error updating collection state: %w", err) } - // TODO verify if this condition can still occur - // we have a race condition - if the processArtifact completes before we have time to handle the ArtifactDownloadedEvent - // ArtifactSourceImpl.Collect may return before this function is complete - // this may lead to sending a completion event before the artifact has been processed - // we need to ensure the wait group is not closed before we leave this function - //so increment the wait group again and - //a.artifactExtractWg.Add(1) - //defer a.artifactExtractWg.Done() - // if we DO NOT have a null loader, start the go routine to process the artifact // (if we have a null loader, we must have a ArtifactConversionCollector which will do the processing) if !nullLoader { diff --git a/artifact_source/plugin_source_wrapper.go b/artifact_source/plugin_source_wrapper.go index 01260d5..fe7a37f 100644 --- a/artifact_source/plugin_source_wrapper.go +++ b/artifact_source/plugin_source_wrapper.go @@ -192,6 +192,24 @@ func (w *PluginSourceWrapper) Collect(ctx context.Context) error { return nil } +// Pause is called to pause collection of source data +func (w *PluginSourceWrapper) Pause() error { + _, err := w.client.SourcePause() + if err != nil { + return err + } + return nil +} + +// Resume is called to resume collection of source data +func (w *PluginSourceWrapper) Resume() error { + _, err := w.client.SourcePause() + if err != nil { + return err + } + return nil +} + func (w *PluginSourceWrapper) readSourceEvents(ctx context.Context, pluginStream proto.TailpipePlugin_AddObserverClient) { pluginEventChan := make(chan *proto.Event) errChan := make(chan error) @@ -254,7 +272,7 @@ func (w *PluginSourceWrapper) readSourceEvents(ctx context.Context, pluginStream default: // pass all other events onwards // convert to a observable event - ev, err := events.SourceEventFromProto(protoEvent) + ev, err := events.EventFromProto(protoEvent) if err != nil { w.NotifyError(ctx, w.executionId, err) continue diff --git a/events/chunk.go b/events/chunk.go index 68a10af..1613ea5 100644 --- a/events/chunk.go +++ b/events/chunk.go @@ -17,6 +17,14 @@ func NewChunkEvent(executionId string, chunkNumber int32) *Chunk { } } +func ChunkWrittenFromProto(e *proto.Event) Event { + event := e.GetChunkWrittenEvent() + return &Chunk{ + ExecutionId: event.ExecutionId, + ChunkNumber: event.ChunkNumber, + } +} + // ToProto converts the event to a proto.Event func (r *Chunk) ToProto() *proto.Event { return &proto.Event{ diff --git a/events/complete.go b/events/complete.go index bc0793a..ddd905a 100644 --- a/events/complete.go +++ b/events/complete.go @@ -1,6 +1,7 @@ package events import ( + "errors" "github.com/turbot/tailpipe-plugin-sdk/grpc/proto" ) @@ -21,6 +22,20 @@ func NewCompletedEvent(executionId string, rowCount int64, chunksWritten int32, } } +func CompleteFromProto(e *proto.Event) Event { + event := e.GetCompleteEvent() + + res := &Complete{ + ExecutionId: event.ExecutionId, + RowCount: event.RowCount, + ChunksWritten: event.ChunkCount, + } + if event.Error != "" { + res.Err = errors.New(event.Error) + } + return res +} + func (c *Complete) ToProto() *proto.Event { errString := "" if c.Err != nil { diff --git a/events/proto.go b/events/proto.go index 9d00654..3e513e9 100644 --- a/events/proto.go +++ b/events/proto.go @@ -5,9 +5,9 @@ import ( "github.com/turbot/tailpipe-plugin-sdk/grpc/proto" ) -// SourceEventFromProto converts a proto.Event to an Event +// EventFromProto converts a proto.Event to an Event // NOTE: this function is used for sources implemented in external plugins so handles source events ONLY -func SourceEventFromProto(e *proto.Event) (Event, error) { +func EventFromProto(e *proto.Event) (Event, error) { switch e.Event.(type) { case *proto.Event_ArtifactDiscoveredEvent: return ArtifactDiscoveredFromProto(e), nil @@ -21,6 +21,16 @@ func SourceEventFromProto(e *proto.Event) (Event, error) { return SourceCompleteFromProto(e), nil case *proto.Event_ErrorEvent: return ErrorFromProto(e), nil + case *proto.Event_StartedEvent: + return StartedFromProto(e), nil + + case *proto.Event_StatusEvent: + return StatusFromProto(e), nil + case *proto.Event_ChunkWrittenEvent: + return ChunkWrittenFromProto(e), nil + case *proto.Event_CompleteEvent: + return CompleteFromProto(e), nil + default: return nil, fmt.Errorf("event %s not expected from source", e) } diff --git a/events/started.go b/events/started.go index 2712be2..62421ed 100644 --- a/events/started.go +++ b/events/started.go @@ -13,6 +13,12 @@ func NewStartedEvent(executionId string) *Started { } } +func StartedFromProto(e *proto.Event) Event { + return &Started{ + ExecutionId: e.GetStartedEvent().ExecutionId, + } +} + func (s *Started) ToProto() *proto.Event { return &proto.Event{ Event: &proto.Event_StartedEvent{ diff --git a/events/status.go b/events/status.go index 03748be..46437b9 100644 --- a/events/status.go +++ b/events/status.go @@ -40,7 +40,9 @@ func NewStatusEvent(executionId string) *Status { mut: &sync.Mutex{}, } } -func StatusFromProto(event *proto.EventStatus) *Status { + +func StatusFromProto(e *proto.Event) *Status { + event := e.GetStatusEvent() s := &Status{ LatestArtifactLocation: event.LatestArtifactPath, ArtifactsDiscovered: event.ArtifactsDiscovered, diff --git a/filepaths/json.go b/filepaths/json.go index 7300815..161f2de 100644 --- a/filepaths/json.go +++ b/filepaths/json.go @@ -20,3 +20,18 @@ func EnsureJSONLPath(baseDir string) (string, error) { return sourceFilePath, nil } + +// EnsureArtifactPath ensures the artifact temp dir path exists - this is the folder where the artifact source writes downloaded files +func EnsureArtifactPath(baseDir string) (string, error) { + artifactPath := filepath.Join(baseDir, "artifacts") + + // ensure it exists + if _, err := os.Stat(artifactPath); os.IsNotExist(err) { + err = os.MkdirAll(artifactPath, 0755) + if err != nil { + return "", fmt.Errorf("could not create artifact directory %s: %w", artifactPath, err) + } + } + + return artifactPath, nil +} diff --git a/formats/jsonl.go b/formats/jsonl.go index 448c4a0..23e7e77 100644 --- a/formats/jsonl.go +++ b/formats/jsonl.go @@ -76,7 +76,7 @@ func (d *JsonLines) GetMapper() (mappers.Mapper[*types.DynamicRow], error) { return nil, fmt.Errorf("JsonLines format does not support a mapper") } -// GetCsvOpts converts the Delimited configuration into a slice of CSV options strings +// GetReadJsonOpts converts the Delimited configuration into a slice of CSV options strings // in the format expected by DuckDb read_csv function func (d *JsonLines) GetReadJsonOpts() []string { var opts []string diff --git a/go.mod b/go.mod index cc8a66a..f5c4e2f 100644 --- a/go.mod +++ b/go.mod @@ -15,9 +15,10 @@ require ( github.com/marcboeker/go-duckdb/v2 v2.1.0 github.com/rs/xid v1.5.0 github.com/satyrius/gonx v1.4.0 + github.com/sethvargo/go-retry v0.3.0 github.com/stretchr/testify v1.10.0 - github.com/turbot/go-kit v1.2.0 - github.com/turbot/pipe-fittings/v2 v2.3.3 + github.com/turbot/go-kit v1.3.0 + github.com/turbot/pipe-fittings/v2 v2.4.0-rc.4 github.com/zclconf/go-cty v1.14.4 golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c golang.org/x/sync v0.12.0 diff --git a/go.sum b/go.sum index 6a1efe0..aef8add 100644 --- a/go.sum +++ b/go.sum @@ -649,6 +649,8 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= github.com/satyrius/gonx v1.4.0 h1:F3uxif5Yx6FBzdQAh79bHQK6CTJugOcN0w0Z8azQuQg= github.com/satyrius/gonx v1.4.0/go.mod h1:+r8KNe5d2tjkZU+DfhERo0G6KxkGih+1qYF6tqLHwvk= +github.com/sethvargo/go-retry v0.3.0 h1:EEt31A35QhrcRZtrYFDTBg91cqZVnFL2navjDrah2SE= +github.com/sethvargo/go-retry v0.3.0/go.mod h1:mNX17F0C/HguQMyMyJxcnU471gOZGxCLyYaFyAZraas= github.com/shiena/ansicolor v0.0.0-20230509054315-a9deabde6e02 h1:v9ezJDHA1XGxViAUSIoO/Id7Fl63u6d0YmsAm+/p2hs= github.com/shiena/ansicolor v0.0.0-20230509054315-a9deabde6e02/go.mod h1:RF16/A3L0xSa0oSERcnhd8Pu3IXSDZSK2gmGIMsttFE= github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= @@ -699,10 +701,10 @@ github.com/tklauser/numcpus v0.3.0 h1:ILuRUQBtssgnxw0XXIjKUC56fgnOrFoQQ/4+DeU2bi github.com/tklauser/numcpus v0.3.0/go.mod h1:yFGUr7TUHQRAhyqBcEg0Ge34zDBAsIvJJcyE6boqnA8= github.com/tkrajina/go-reflector v0.5.8 h1:yPADHrwmUbMq4RGEyaOUpz2H90sRsETNVpjzo3DLVQQ= github.com/tkrajina/go-reflector v0.5.8/go.mod h1:ECbqLgccecY5kPmPmXg1MrHW585yMcDkVl6IvJe64T4= -github.com/turbot/go-kit v1.2.0 h1:4pBDu2LGoqF2y6ypL4xJjqlDW0BkUw3IIDlyHkU0O88= -github.com/turbot/go-kit v1.2.0/go.mod h1:1xmRuQ0cn/10QUMNLNOAFIqN8P6Rz5s3VLT8mkN3nF8= -github.com/turbot/pipe-fittings/v2 v2.3.3 h1:7nz5MQah4++qL4zLP7eRNyhiwwm9eXhdwbiyGyYSqfA= -github.com/turbot/pipe-fittings/v2 v2.3.3/go.mod h1:wEoN4EseMTXophNlpOe740rAC9Jg0JhGRt5QM5R2ss8= +github.com/turbot/go-kit v1.3.0 h1:6cIYPAO5hO9fG7Zd5UBC4Ch3+C6AiiyYS0UQnrUlTV0= +github.com/turbot/go-kit v1.3.0/go.mod h1:piKJMYCF8EYmKf+D2B78Csy7kOHGmnQVOWingtLKWWQ= +github.com/turbot/pipe-fittings/v2 v2.4.0-rc.4 h1:tiRlC8XTf9PH+KV/4wcz6uCog93zd3qNn4pZLwDcR8c= +github.com/turbot/pipe-fittings/v2 v2.4.0-rc.4/go.mod h1:szte433cBDCaZcGe5zMVGG7uTl9HMaEYaQmuvzZRYIQ= github.com/turbot/pipes-sdk-go v0.12.0 h1:esbbR7bALa5L8n/hqroMPaQSSo3gNM/4X0iTmHa3D6U= github.com/turbot/pipes-sdk-go v0.12.0/go.mod h1:Mb+KhvqqEdRbz/6TSZc2QWDrMa5BN3E4Xw+gPt2TRkc= github.com/turbot/terraform-components v0.0.0-20231213122222-1f3526cab7a7 h1:qDMxFVd8Zo0rIhnEBdCIbR+T6WgjwkxpFZMN8zZmmjg= diff --git a/grpc/proto/plugin.pb.go b/grpc/proto/plugin.pb.go index f58fc82..8e7b152 100644 --- a/grpc/proto/plugin.pb.go +++ b/grpc/proto/plugin.pb.go @@ -79,9 +79,11 @@ type CollectRequest struct { // optional: the reattach config to connect to the plugin providing the source SourcePlugin *SourcePluginReattach `protobuf:"bytes,10,opt,name=source_plugin,json=sourcePlugin,proto3" json:"source_plugin,omitempty"` // optional: the collection start time - FromTime *timestamppb.Timestamp `protobuf:"bytes,11,opt,name=from_time,json=fromTime,proto3" json:"from_time,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + FromTime *timestamppb.Timestamp `protobuf:"bytes,11,opt,name=from_time,json=fromTime,proto3" json:"from_time,omitempty"` + // the max space to take with temp files + MaxTempCacheSizeMb int64 `protobuf:"varint,12,opt,name=max_temp_cache_size_mb,json=maxTempCacheSizeMb,proto3" json:"max_temp_cache_size_mb,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *CollectRequest) Reset() { @@ -191,6 +193,13 @@ func (x *CollectRequest) GetFromTime() *timestamppb.Timestamp { return nil } +func (x *CollectRequest) GetMaxTempCacheSizeMb() int64 { + if x != nil { + return x.MaxTempCacheSizeMb + } + return 0 +} + type UpdateCollectionStateRequest struct { state protoimpl.MessageState `protogen:"open.v1"` // the path to the collection state file @@ -2764,7 +2773,7 @@ var File_plugin_proto protoreflect.FileDescriptor const file_plugin_proto_rawDesc = "" + "\n" + "\fplugin.proto\x12\x05proto\x1a\x1fgoogle/protobuf/timestamp.proto\"\a\n" + - "\x05Empty\"\xbf\x04\n" + + "\x05Empty\"\xf3\x04\n" + "\x0eCollectRequest\x12\x1d\n" + "\n" + "table_name\x18\x01 \x01(\tR\ttableName\x12%\n" + @@ -2779,7 +2788,8 @@ const file_plugin_proto_rawDesc = "" + "\rsource_format\x18\t \x01(\v2\x11.proto.FormatDataR\fsourceFormat\x12@\n" + "\rsource_plugin\x18\n" + " \x01(\v2\x1b.proto.SourcePluginReattachR\fsourcePlugin\x127\n" + - "\tfrom_time\x18\v \x01(\v2\x1a.google.protobuf.TimestampR\bfromTime\"\xbf\x01\n" + + "\tfrom_time\x18\v \x01(\v2\x1a.google.protobuf.TimestampR\bfromTime\x122\n" + + "\x16max_temp_cache_size_mb\x18\f \x01(\x03R\x12maxTempCacheSizeMb\"\xbf\x01\n" + "\x1cUpdateCollectionStateRequest\x122\n" + "\x15collection_state_path\x18\x01 \x01(\tR\x13collectionStatePath\x122\n" + "\vsource_data\x18\x02 \x01(\v2\x11.proto.ConfigDataR\n" + @@ -3005,7 +3015,7 @@ const file_plugin_proto_rawDesc = "" + "\x0emissing_fields\x18\x01 \x03(\tR\rmissingFields\x12%\n" + "\x0einvalid_fields\x18\x02 \x03(\tR\rinvalidFields\x12\x1a\n" + "\bmessages\x18\x03 \x03(\tR\bmessages\x12\x14\n" + - "\x05count\x18\x04 \x01(\x03R\x05count2\xdd\x03\n" + + "\x05count\x18\x04 \x01(\x03R\x05count2\xb4\x04\n" + "\x0eTailpipePlugin\x12;\n" + "\bDescribe\x12\x16.proto.DescribeRequest\x1a\x17.proto.DescribeResponse\x12+\n" + "\vAddObserver\x12\f.proto.Empty\x1a\f.proto.Event0\x01\x128\n" + @@ -3015,7 +3025,9 @@ const file_plugin_proto_rawDesc = "" + "\x15UpdateCollectionState\x12#.proto.UpdateCollectionStateRequest\x1a\f.proto.Empty\x12)\n" + "\vCloseSource\x12\f.proto.Empty\x1a\f.proto.Empty\x121\n" + "\x13SaveCollectionState\x12\f.proto.Empty\x1a\f.proto.Empty\x12:\n" + - "\rSourceCollect\x12\x1b.proto.SourceCollectRequest\x1a\f.proto.EmptyB\tZ\a.;protob\x06proto3" + "\rSourceCollect\x12\x1b.proto.SourceCollectRequest\x1a\f.proto.Empty\x12)\n" + + "\vSourcePause\x12\f.proto.Empty\x1a\f.proto.Empty\x12*\n" + + "\fSourceResume\x12\f.proto.Empty\x1a\f.proto.EmptyB\tZ\a.;protob\x06proto3" var ( file_plugin_proto_rawDescOnce sync.Once @@ -3153,16 +3165,20 @@ var file_plugin_proto_depIdxs = []int32{ 0, // 65: proto.TailpipePlugin.CloseSource:input_type -> proto.Empty 0, // 66: proto.TailpipePlugin.SaveCollectionState:input_type -> proto.Empty 36, // 67: proto.TailpipePlugin.SourceCollect:input_type -> proto.SourceCollectRequest - 4, // 68: proto.TailpipePlugin.Describe:output_type -> proto.DescribeResponse - 14, // 69: proto.TailpipePlugin.AddObserver:output_type -> proto.Event - 6, // 70: proto.TailpipePlugin.Collect:output_type -> proto.CollectResponse - 33, // 71: proto.TailpipePlugin.InitSource:output_type -> proto.InitSourceResponse - 0, // 72: proto.TailpipePlugin.UpdateCollectionState:output_type -> proto.Empty - 0, // 73: proto.TailpipePlugin.CloseSource:output_type -> proto.Empty - 0, // 74: proto.TailpipePlugin.SaveCollectionState:output_type -> proto.Empty - 0, // 75: proto.TailpipePlugin.SourceCollect:output_type -> proto.Empty - 68, // [68:76] is the sub-list for method output_type - 60, // [60:68] is the sub-list for method input_type + 0, // 68: proto.TailpipePlugin.SourcePause:input_type -> proto.Empty + 0, // 69: proto.TailpipePlugin.SourceResume:input_type -> proto.Empty + 4, // 70: proto.TailpipePlugin.Describe:output_type -> proto.DescribeResponse + 14, // 71: proto.TailpipePlugin.AddObserver:output_type -> proto.Event + 6, // 72: proto.TailpipePlugin.Collect:output_type -> proto.CollectResponse + 33, // 73: proto.TailpipePlugin.InitSource:output_type -> proto.InitSourceResponse + 0, // 74: proto.TailpipePlugin.UpdateCollectionState:output_type -> proto.Empty + 0, // 75: proto.TailpipePlugin.CloseSource:output_type -> proto.Empty + 0, // 76: proto.TailpipePlugin.SaveCollectionState:output_type -> proto.Empty + 0, // 77: proto.TailpipePlugin.SourceCollect:output_type -> proto.Empty + 0, // 78: proto.TailpipePlugin.SourcePause:output_type -> proto.Empty + 0, // 79: proto.TailpipePlugin.SourceResume:output_type -> proto.Empty + 70, // [70:80] is the sub-list for method output_type + 60, // [60:70] is the sub-list for method input_type 60, // [60:60] is the sub-list for extension type_name 60, // [60:60] is the sub-list for extension extendee 0, // [0:60] is the sub-list for field type_name diff --git a/grpc/proto/plugin.proto b/grpc/proto/plugin.proto index c06e5f0..737337d 100644 --- a/grpc/proto/plugin.proto +++ b/grpc/proto/plugin.proto @@ -16,11 +16,12 @@ service TailpipePlugin { rpc CloseSource(Empty) returns (Empty); rpc SaveCollectionState(Empty) returns (Empty); rpc SourceCollect(SourceCollectRequest) returns (Empty); + rpc SourcePause(Empty) returns (Empty); + rpc SourceResume(Empty) returns (Empty); } message Empty{} - message CollectRequest { string table_name = 1; string partition_name = 2; @@ -42,6 +43,8 @@ message CollectRequest { SourcePluginReattach source_plugin = 10; // optional: the collection start time google.protobuf.Timestamp from_time = 11; + // the max space to take with temp files + int64 max_temp_cache_size_mb= 12; } message UpdateCollectionStateRequest { diff --git a/grpc/proto/plugin_grpc.pb.go b/grpc/proto/plugin_grpc.pb.go index 5804b38..cd1f330 100644 --- a/grpc/proto/plugin_grpc.pb.go +++ b/grpc/proto/plugin_grpc.pb.go @@ -27,6 +27,8 @@ const ( TailpipePlugin_CloseSource_FullMethodName = "/proto.TailpipePlugin/CloseSource" TailpipePlugin_SaveCollectionState_FullMethodName = "/proto.TailpipePlugin/SaveCollectionState" TailpipePlugin_SourceCollect_FullMethodName = "/proto.TailpipePlugin/SourceCollect" + TailpipePlugin_SourcePause_FullMethodName = "/proto.TailpipePlugin/SourcePause" + TailpipePlugin_SourceResume_FullMethodName = "/proto.TailpipePlugin/SourceResume" ) // TailpipePluginClient is the client API for TailpipePlugin service. @@ -41,6 +43,8 @@ type TailpipePluginClient interface { CloseSource(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error) SaveCollectionState(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error) SourceCollect(ctx context.Context, in *SourceCollectRequest, opts ...grpc.CallOption) (*Empty, error) + SourcePause(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error) + SourceResume(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error) } type tailpipePluginClient struct { @@ -140,6 +144,26 @@ func (c *tailpipePluginClient) SourceCollect(ctx context.Context, in *SourceColl return out, nil } +func (c *tailpipePluginClient) SourcePause(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(Empty) + err := c.cc.Invoke(ctx, TailpipePlugin_SourcePause_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *tailpipePluginClient) SourceResume(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(Empty) + err := c.cc.Invoke(ctx, TailpipePlugin_SourceResume_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + // TailpipePluginServer is the server API for TailpipePlugin service. // All implementations must embed UnimplementedTailpipePluginServer // for forward compatibility. @@ -152,6 +176,8 @@ type TailpipePluginServer interface { CloseSource(context.Context, *Empty) (*Empty, error) SaveCollectionState(context.Context, *Empty) (*Empty, error) SourceCollect(context.Context, *SourceCollectRequest) (*Empty, error) + SourcePause(context.Context, *Empty) (*Empty, error) + SourceResume(context.Context, *Empty) (*Empty, error) mustEmbedUnimplementedTailpipePluginServer() } @@ -186,6 +212,12 @@ func (UnimplementedTailpipePluginServer) SaveCollectionState(context.Context, *E func (UnimplementedTailpipePluginServer) SourceCollect(context.Context, *SourceCollectRequest) (*Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method SourceCollect not implemented") } +func (UnimplementedTailpipePluginServer) SourcePause(context.Context, *Empty) (*Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method SourcePause not implemented") +} +func (UnimplementedTailpipePluginServer) SourceResume(context.Context, *Empty) (*Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method SourceResume not implemented") +} func (UnimplementedTailpipePluginServer) mustEmbedUnimplementedTailpipePluginServer() {} func (UnimplementedTailpipePluginServer) testEmbeddedByValue() {} @@ -344,6 +376,42 @@ func _TailpipePlugin_SourceCollect_Handler(srv interface{}, ctx context.Context, return interceptor(ctx, in, info, handler) } +func _TailpipePlugin_SourcePause_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TailpipePluginServer).SourcePause(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: TailpipePlugin_SourcePause_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TailpipePluginServer).SourcePause(ctx, req.(*Empty)) + } + return interceptor(ctx, in, info, handler) +} + +func _TailpipePlugin_SourceResume_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TailpipePluginServer).SourceResume(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: TailpipePlugin_SourceResume_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TailpipePluginServer).SourceResume(ctx, req.(*Empty)) + } + return interceptor(ctx, in, info, handler) +} + // TailpipePlugin_ServiceDesc is the grpc.ServiceDesc for TailpipePlugin service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -379,6 +447,14 @@ var TailpipePlugin_ServiceDesc = grpc.ServiceDesc{ MethodName: "SourceCollect", Handler: _TailpipePlugin_SourceCollect_Handler, }, + { + MethodName: "SourcePause", + Handler: _TailpipePlugin_SourcePause_Handler, + }, + { + MethodName: "SourceResume", + Handler: _TailpipePlugin_SourceResume_Handler, + }, }, Streams: []grpc.StreamDesc{ { diff --git a/grpc/shared/grpc.go b/grpc/shared/grpc.go index c15a073..eebbbbf 100644 --- a/grpc/shared/grpc.go +++ b/grpc/shared/grpc.go @@ -40,6 +40,14 @@ func (c TailpipePluginClientWrapper) SourceCollect(req *proto.SourceCollectReque return c.client.SourceCollect(context.Background(), req) } +func (c TailpipePluginClientWrapper) SourcePause() (*proto.Empty, error) { + return c.client.SourcePause(context.Background(), &proto.Empty{}) +} + +func (c TailpipePluginClientWrapper) SourceResume() (*proto.Empty, error) { + return c.client.SourceResume(context.Background(), &proto.Empty{}) +} + // TailpipePluginServerWrapper is the gRPC server that TailpipePluginClient talks to. type TailpipePluginServerWrapper struct { proto.UnimplementedTailpipePluginServer @@ -84,3 +92,11 @@ func (s TailpipePluginServerWrapper) CloseSource(_ context.Context, req *proto.E func (s TailpipePluginServerWrapper) SourceCollect(_ context.Context, req *proto.SourceCollectRequest) (*proto.Empty, error) { return s.Impl.SourceCollect(context.Background(), req) } + +func (s TailpipePluginServerWrapper) SourcePause(_ context.Context, req *proto.Empty) (*proto.Empty, error) { + return s.Impl.SourcePause(context.Background(), req) +} + +func (s TailpipePluginServerWrapper) SourceResume(_ context.Context, req *proto.Empty) (*proto.Empty, error) { + return s.Impl.SourcePause(context.Background(), req) +} diff --git a/grpc/shared/interface.go b/grpc/shared/interface.go index 8402944..af6bc55 100644 --- a/grpc/shared/interface.go +++ b/grpc/shared/interface.go @@ -26,6 +26,8 @@ type TailpipePluginServer interface { SaveCollectionState(context.Context, *proto.Empty) (*proto.Empty, error) CloseSource(context.Context, *proto.Empty) (*proto.Empty, error) SourceCollect(context.Context, *proto.SourceCollectRequest) (*proto.Empty, error) + SourcePause(context.Context, *proto.Empty) (*proto.Empty, error) + SourceResume(context.Context, *proto.Empty) (*proto.Empty, error) } // TailpipePluginClient is the client interface that we're exposing as a plugin. @@ -38,6 +40,8 @@ type TailpipePluginClient interface { SaveCollectionState(context.Context, *proto.Empty) (*proto.Empty, error) CloseSource(context.Context, *proto.Empty) (*proto.Empty, error) SourceCollect(context.Context, *proto.SourceCollectRequest) (*proto.Empty, error) + SourcePause(context.Context, *proto.Empty) (*proto.Empty, error) + SourceResume(context.Context, *proto.Empty) (*proto.Empty, error) } // TailpipeGRPCPlugin is the implementation of plugin.GRPCPlugin so we can serve/consume this. diff --git a/helpers/disk_usage.go b/helpers/disk_usage.go new file mode 100644 index 0000000..feeaf09 --- /dev/null +++ b/helpers/disk_usage.go @@ -0,0 +1,35 @@ +package helpers + +import ( + "fmt" + "os" +) + +// GetFolderFileSizeMb returns the total size of all files in the given folder in Mb (single level only) +func GetFolderFileSizeMb(folderPath string) (int64, error) { + size, err := GetFolderFileSize(folderPath) + if err != nil { + return 0, fmt.Errorf("error getting folder size: %w", err) + } + return size / (1024 * 1024), nil +} + +// GetFolderFileSize returns the total size of all files in the given folder in bytes (single level only) +func GetFolderFileSize(folderPath string) (int64, error) { + entries, err := os.ReadDir(folderPath) + if err != nil { + return 0, fmt.Errorf("error reading directory %s: %w", folderPath, err) + } + + var totalSize int64 + for _, entry := range entries { + if !entry.IsDir() { + info, err := entry.Info() + if err != nil { + return 0, fmt.Errorf("error getting file info: %w", err) + } + totalSize += info.Size() + } + } + return totalSize, nil +} diff --git a/observable/observable.go b/observable/observable.go index ce19fe0..014bed2 100644 --- a/observable/observable.go +++ b/observable/observable.go @@ -9,6 +9,13 @@ type Observable interface { AddObserver(Observer) error } +// PausableObservable is an interface that extends Observable, providing Pause and Resume methods +type PausableObservable interface { + Observable + Pause() error + Resume() error +} + // Observer is the interface that all observers must implement type Observer interface { Notify(context.Context, events.Event) error diff --git a/observable/pauseable_observable.go b/observable/pauseable_observable.go new file mode 100644 index 0000000..1d73069 --- /dev/null +++ b/observable/pauseable_observable.go @@ -0,0 +1,102 @@ +package observable + +import ( + "context" + "github.com/turbot/tailpipe-plugin-sdk/events" + "log/slog" + "sync" + "sync/atomic" +) + +// PausableObservableImpl is an implementation of the PausableObservable interface - +// it extends ObservableImpl and adds Pause and Resume methods +// NotifyObservers will NOT proceed if pause has been called - it will block until Resume is called +// Thus, when the observer is paused, no events are sent +type PausableObservableImpl struct { + ObservableImpl + pausedAtomic atomic.Bool + mu sync.Mutex + cond *sync.Cond +} + +// Pause pauses the observable, preventing any events from being sent to observers +func (p *PausableObservableImpl) Pause() error { + slog.Info("PausableObservableImpl.Pause() called") + + p.mu.Lock() + defer p.mu.Unlock() + + if p.pausedAtomic.Load() { + slog.Info("PausableObservableImpl.Pause - already paused") + return nil // fast path, skip mutex + } + + // create the condition variable if it doesn't exist + p.ensureCond() + p.pausedAtomic.Store(true) + return nil +} + +// Resume resumes the observable, allowing events to be sent to observers +func (p *PausableObservableImpl) Resume() error { + slog.Info("PausableObservableImpl.Resume() called") + + p.mu.Lock() + defer p.mu.Unlock() + + p.pausedAtomic.Store(false) + + if p.cond != nil { + p.cond.Broadcast() + } + return nil +} + +// create the condition variable if it doesn't exist +func (p *PausableObservableImpl) ensureCond() { + if p.cond == nil { + p.cond = sync.NewCond(&p.mu) + } +} + +// NotifyObservers overrides the base implementation +// It will block if Pause has been called until Resume is called +func (p *PausableObservableImpl) NotifyObservers(ctx context.Context, e events.Event) error { + // if paused, clock until resumed + p.BlockWhilePaused(ctx) + + // call base implementation + return p.ObservableImpl.NotifyObservers(ctx, e) +} + +func (p *PausableObservableImpl) BlockWhilePaused(ctx context.Context) { + // fast path, skip mutex + if !p.pausedAtomic.Load() { + return + } + + p.mu.Lock() + defer p.mu.Unlock() + + // Create notification channel for when pause is lifted + waitDone := make(chan struct{}) + go func() { + // Keep waiting while paused + for p.pausedAtomic.Load() { + // Wait releases mutex while waiting and reacquires it when woken + p.cond.Wait() + // Notify main goroutine that we're no longer paused + waitDone <- struct{}{} + } + }() + + select { + case <-waitDone: + // Normal flow - pause was lifted via Resume() + case <-ctx.Done(): + // signal to stop waiting - to avoid leaking the goroutine + p.cond.Broadcast() + // Context cancelled, let caller handle the error + return + } +} diff --git a/plugin/interfaces.go b/plugin/interfaces.go index cf47c25..052433a 100644 --- a/plugin/interfaces.go +++ b/plugin/interfaces.go @@ -37,6 +37,8 @@ type TailpipePlugin interface { CloseSource(context.Context) error SaveCollectionState(context.Context) error SourceCollect(context.Context, *proto.SourceCollectRequest) error + SourcePause(context.Context) error + SourceResume(context.Context) error // Other interface functions diff --git a/plugin/plugin_impl.go b/plugin/plugin_impl.go index dc05963..a69a6e6 100644 --- a/plugin/plugin_impl.go +++ b/plugin/plugin_impl.go @@ -230,6 +230,20 @@ func (p *PluginImpl) SourceCollect(ctx context.Context, req *proto.SourceCollect return p.NotifyObservers(ctx, events.NewSourceCompleteEvent(req.ExecutionId, err)) } +func (p *PluginImpl) SourcePause(_ context.Context) error { + if p.source == nil { + return nil + } + return p.source.Pause() +} + +func (p *PluginImpl) SourceResume(_ context.Context) error { + if p.source == nil { + return nil + } + return p.source.Resume() +} + // Shutdown is called by Serve when the plugin exits func (p *PluginImpl) Shutdown(context.Context) error { return nil diff --git a/plugin/plugin_server.go b/plugin/plugin_server.go index 4718eff..b00abd9 100644 --- a/plugin/plugin_server.go +++ b/plugin/plugin_server.go @@ -108,6 +108,22 @@ func (s PluginServer) SourceCollect(ctx context.Context, req *proto.SourceCollec return &proto.Empty{}, nil } +func (s PluginServer) SourcePause(ctx context.Context, _ *proto.Empty) (*proto.Empty, error) { + err := s.impl.SourcePause(ctx) + if err != nil { + return nil, err + } + return &proto.Empty{}, nil +} + +func (s PluginServer) SourceResume(ctx context.Context, _ *proto.Empty) (*proto.Empty, error) { + err := s.impl.SourceResume(ctx) + if err != nil { + return nil, err + } + return &proto.Empty{}, nil +} + func (s PluginServer) Serve() error { // use plugin provided in opts ctx := context.Background() diff --git a/row_source/interfaces.go b/row_source/interfaces.go index fa44665..a64f42a 100644 --- a/row_source/interfaces.go +++ b/row_source/interfaces.go @@ -15,8 +15,7 @@ import ( // - Webhook source // Sources may be configured with data transfo type RowSource interface { - // Observable must be implemented by row sourceFuncs (it is implemented by row_source.RowSourceImpl) - observable.Observable + observable.PausableObservable // Init is called when the row source is created // it is responsible for parsing the source config and configuring the source diff --git a/row_source/row_source_impl.go b/row_source/row_source_impl.go index b39d45e..9eed198 100644 --- a/row_source/row_source_impl.go +++ b/row_source/row_source_impl.go @@ -25,7 +25,7 @@ import ( // S is the type of the source config struct // T is the type of the connection struct type RowSourceImpl[S, T parse.Config] struct { - observable.ObservableImpl + observable.PausableObservableImpl Config S Connection T // store a reference to the derived RowSource type so we can call its methods diff --git a/table/artifact_conversion_collector.go b/table/artifact_conversion_collector.go index 3716be4..40c6d8f 100644 --- a/table/artifact_conversion_collector.go +++ b/table/artifact_conversion_collector.go @@ -33,8 +33,7 @@ type ArtifactConversionCollector struct { table CustomTable - destPath string - db *sql.DB + db *sql.DB // we only convert one artifact at a time // TODO be a bit smarter about this - we could just avoid sending multiple events concurrently) conversionMut sync.Mutex @@ -76,7 +75,7 @@ func (c *ArtifactConversionCollector) Init(ctx context.Context, req *types.Colle if err != nil { return fmt.Errorf("error getting JSONL path: %w", err) } - c.destPath = jsonPath + c.jsonPath = jsonPath return nil } @@ -154,7 +153,7 @@ func (c *ArtifactConversionCollector) handleArtifactDownloaded(ctx context.Conte // load the current chunk count chunkCount := atomic.LoadInt32(&c.chunkCount) // generate the filename - destFile := filepath.Join(c.destPath, ExecutionIdToJsonlFileName(c.req.ExecutionId, chunkCount)) + destFile := filepath.Join(c.jsonPath, ExecutionIdToJsonlFileName(c.req.ExecutionId, chunkCount)) rowCount, err := c.executeConversionQuery(e, destFile) if err != nil { diff --git a/table/collector_impl.go b/table/collector_impl.go index ff479b0..6cd1be8 100644 --- a/table/collector_impl.go +++ b/table/collector_impl.go @@ -7,15 +7,17 @@ import ( "sync" "time" + "github.com/sethvargo/go-retry" "github.com/turbot/tailpipe-plugin-sdk/context_values" "github.com/turbot/tailpipe-plugin-sdk/events" + "github.com/turbot/tailpipe-plugin-sdk/helpers" "github.com/turbot/tailpipe-plugin-sdk/observable" "github.com/turbot/tailpipe-plugin-sdk/row_source" "github.com/turbot/tailpipe-plugin-sdk/types" ) type CollectorImpl[R types.RowStruct] struct { - observable.ObservableImpl + observable.PausableObservableImpl source row_source.RowSource req *types.CollectRequest @@ -25,20 +27,47 @@ type CollectorImpl[R types.RowStruct] struct { // for RowEnrichmentCollector this is incremented when a row is received and decremented when it is processed // for ArtifactConversionCollector this is incremented when an artifact is downloaded and decremented when it is processed collectionWg sync.WaitGroup + // the location to write JSON files + jsonPath string rowCount int64 chunkCount int32 + + pollMutex sync.Mutex + // the maximum allowable size of the JSONL files - set to 75% of the max temp cache size + maxJsonSize int64 } func (c *CollectorImpl[R]) Close() { } +// PauseCollection pauses the source and pauses our own event handling +func (c *CollectorImpl[R]) PauseCollection() error { + slog.Info("CollectorImpl: PauseCollection called") + if err := c.PausableObservableImpl.Pause(); err != nil { + return fmt.Errorf("error pausing observable: %w", err) + } + return c.source.Pause() +} + +// ResumeCollection resumes the source and resumes our own event handling +func (c *CollectorImpl[R]) ResumeCollection() error { + slog.Info("CollectorImpl: ResumeCollection called") + if err := c.PausableObservableImpl.Resume(); err != nil { + return fmt.Errorf("error resuming observable: %w", err) + } + return c.source.Resume() +} + // Collect executes the collection process. Tell our source to start collection func (c *CollectorImpl[R]) Collect(ctx context.Context) (int64, int32, error) { // create empty status event# c.status = events.NewStatusEvent(c.req.ExecutionId) + // set the max allowable size of the JSONL files to 75% of the max temp cache size + c.maxJsonSize = int64(float64(c.req.MaxTempCacheSizeMb) * 0.75) + // tell our source to Collect // this is a blocking call, but we will receive and process row events during the execution err := c.source.Collect(ctx) @@ -103,9 +132,71 @@ func (c *CollectorImpl[R]) onChunk(ctx context.Context, chunkNumber int32) error if err := c.source.SaveCollectionState(); err != nil { return fmt.Errorf("error saving collection state: %w", err) } + + // now check the size of the json destination folder and pause the source if it is too large + return c.checkJsonlSize(ctx) +} + +// check the size of the json destination folder and pause the source if it is too large +func (c *CollectorImpl[R]) checkJsonlSize(ctx context.Context) error { + // Try to acquire the lock, return immediately if already locked + if !c.pollMutex.TryLock() { + slog.Debug("JSONL sizeMb polling already in progress") + return nil + } + defer c.pollMutex.Unlock() + + sizeMb, err := helpers.GetFolderFileSizeMb(c.jsonPath) + if err != nil { + return fmt.Errorf("error getting jsonl folder sizeMb: %w", err) + } + if sizeMb > c.req.MaxTempCacheSizeMb { + slog.Info("JSONL folder max size exceeded - pausing source", "sizeMb", sizeMb, "maxSizeMb", c.req.MaxTempCacheSizeMb) + // pause our own event handling and our sources + if err := c.PauseCollection(); err != nil { + return fmt.Errorf("error pausing source: %w", err) + } + // periodically check the sizeMb of the folder and resume the source when it has shrunk to 75% of the max size + if err := c.pollJsonlSize(ctx); err != nil { + return err + } + + } return nil } +func (c *CollectorImpl[R]) pollJsonlSize(ctx context.Context) error { + err := retry.Do(ctx, retry.NewConstant(5*time.Second), func(ctx context.Context) error { + // check if context is cancelled + if err := ctx.Err(); err != nil { + return err + } + + // get the size of the json folder + sizeMb, err := helpers.GetFolderFileSizeMb(c.jsonPath) + if err != nil { + return retry.RetryableError(fmt.Errorf("error getting jsonl folder size: %w", err)) + } + + // do not resume the source until the size is below 75% of the allowable jsonl folder size + // (add some hysteresis to avoid flapping) + if sizeMb <= int64(float64(c.maxJsonSize)*0.75) { + slog.Info("JSONL folder size is below threshold - resuming source", "sizeMb", sizeMb, "maxSizeMb", c.maxJsonSize) + return nil + } + + return retry.RetryableError(fmt.Errorf("folder size still above threshold")) + }) + + if err != nil { + return err + } + + slog.Info("JSONL folder size is below threshold - resuming source") + return c.ResumeCollection() +} + +// if it is too large, we will delete the oldest file // updateStatus updates the status counters with the latest event // it also sends raises status event periodically (determined by statusUpdateInterval) // note: we will send a final status event when the collection completes diff --git a/table/interfaces.go b/table/interfaces.go index da76c57..2ec3ebb 100644 --- a/table/interfaces.go +++ b/table/interfaces.go @@ -35,7 +35,7 @@ type Table[R types.RowStruct] interface { // Collector is an interface which provides a methods for collecting table data from a source // This is implemented by the generic CollectorImpl struct type Collector interface { - observable.Observable + observable.PausableObservable Init(ctx context.Context, request *types.CollectRequest) error Identifier() string diff --git a/table/row_enrichment_collector.go b/table/row_enrichment_collector.go index c8af7ce..401ba43 100644 --- a/table/row_enrichment_collector.go +++ b/table/row_enrichment_collector.go @@ -83,6 +83,9 @@ func (c *RowEnrichmentCollector[R]) Init(ctx context.Context, req *types.Collect if err != nil { return fmt.Errorf("error getting JSONL path: %w", err) } + // set the path to write JSONL files to + c.jsonPath = jsonPath + // create the writer c.writer = NewJSONLWriter(jsonPath) slog.Info("Initialise collector", "table", c.table.Identifier(), "partition", req.PartitionName, "jsonPath", jsonPath) @@ -335,6 +338,11 @@ func (c *CollectorImpl[R]) onRowError(_ context.Context, source string, operatio // writeChunk writes a chunk of rows to a JSONL file func (c *RowEnrichmentCollector[R]) writeChunk(ctx context.Context, rowsToWrite []any, chunkNumber int32) error { + // wait for pause + // we call this in addition to the events being blocked to avoid row events which were sent _before_ the pause causing + // us to write JSONL files _after_ we are paused (allowing for enrichment time) + c.BlockWhilePaused(ctx) + slog.Debug("writing chunk to JSONL file", "chunk", chunkNumber, "rows", len(rowsToWrite)) // convert row to a JSONL file diff --git a/types/collect_request.go b/types/collect_request.go index 51d8886..1c88eea 100644 --- a/types/collect_request.go +++ b/types/collect_request.go @@ -28,6 +28,8 @@ type CollectRequest struct { From time.Time // the custom table definition, if specified CustomTableSchema *schema.TableSchema + // the max space to take with temp files + MaxTempCacheSizeMb int64 } func CollectRequestFromProto(pr *proto.CollectRequest) (*CollectRequest, error) { @@ -52,6 +54,7 @@ func CollectRequestFromProto(pr *proto.CollectRequest) (*CollectRequest, error) CollectionStatePath: pr.CollectionStatePath, SourceData: sourceData, From: pr.FromTime.AsTime(), + MaxTempCacheSizeMb: pr.MaxTempCacheSizeMb, } if pr.SourceFormat != nil {