Skip to content

Implement Pause and Resume for RowSource #191

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions artifact_source/artifact_source_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/turbot/tailpipe-plugin-sdk/collection_state"
"github.com/turbot/tailpipe-plugin-sdk/context_values"
"github.com/turbot/tailpipe-plugin-sdk/events"
"github.com/turbot/tailpipe-plugin-sdk/filepaths"
"github.com/turbot/tailpipe-plugin-sdk/helpers"
"github.com/turbot/tailpipe-plugin-sdk/parse"
"github.com/turbot/tailpipe-plugin-sdk/rate_limiter"
Expand Down Expand Up @@ -61,7 +62,7 @@ type ArtifactSourceImpl[S artifact_source_config.ArtifactSourceConfig, T parse.C

// temporary directory for storing downloaded artifacts - this is initialised in the Init function
// to be a subdirectory of the collection directory
TempDir string
TempArtifactDir string

// shadow the row_source.RowSourceImpl Source property, but using ArtifactSource interface
Source ArtifactSource
Expand Down Expand Up @@ -95,8 +96,12 @@ func (a *ArtifactSourceImpl[S, T]) Init(ctx context.Context, params *row_source.
a.NewCollectionStateFunc = collection_state.NewArtifactCollectionStateImpl
}

// set the temp directory
a.TempDir = filepath.Join(params.CollectionTempDir, "artifacts")
// set the artifact directory
artifactDir, err := filepaths.EnsureArtifactPath(params.CollectionTempDir)
if err != nil {
return err
}
a.TempArtifactDir = artifactDir

// call base to apply options and parse config
if err := a.RowSourceImpl.Init(ctx, params, opts...); err != nil {
Expand Down Expand Up @@ -215,6 +220,11 @@ func (a *ArtifactSourceImpl[S, T]) OnArtifactDiscovered(ctx context.Context, inf
a.artifactDownloadLimiter.Release()
slog.Debug("ArtifactDiscovered - rate limiter released", "artifact", info.Name)
}()

// as this is called from the file walking code, rather than as a result of an event,
// we need to check for pausing here to avoid downloading artifacts when paused
a.BlockWhilePaused(ctx)

// cast the source to an ArtifactSource and download the artifact
err = a.Source.DownloadArtifact(ctx, info)
if err != nil {
Expand Down Expand Up @@ -254,15 +264,6 @@ func (a *ArtifactSourceImpl[S, T]) OnArtifactDownloaded(ctx context.Context, inf
return fmt.Errorf("error updating collection state: %w", err)
}

// TODO verify if this condition can still occur
// we have a race condition - if the processArtifact completes before we have time to handle the ArtifactDownloadedEvent
// ArtifactSourceImpl.Collect may return before this function is complete
// this may lead to sending a completion event before the artifact has been processed
// we need to ensure the wait group is not closed before we leave this function
//so increment the wait group again and
//a.artifactExtractWg.Add(1)
//defer a.artifactExtractWg.Done()

// if we DO NOT have a null loader, start the go routine to process the artifact
// (if we have a null loader, we must have a ArtifactConversionCollector which will do the processing)
if !nullLoader {
Expand Down
20 changes: 19 additions & 1 deletion artifact_source/plugin_source_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,24 @@ func (w *PluginSourceWrapper) Collect(ctx context.Context) error {
return nil
}

// Pause is called to pause collection of source data
func (w *PluginSourceWrapper) Pause() error {
_, err := w.client.SourcePause()
if err != nil {
return err
}
return nil
}

// Resume is called to resume collection of source data
func (w *PluginSourceWrapper) Resume() error {
_, err := w.client.SourcePause()
if err != nil {
return err
}
return nil
}

func (w *PluginSourceWrapper) readSourceEvents(ctx context.Context, pluginStream proto.TailpipePlugin_AddObserverClient) {
pluginEventChan := make(chan *proto.Event)
errChan := make(chan error)
Expand Down Expand Up @@ -254,7 +272,7 @@ func (w *PluginSourceWrapper) readSourceEvents(ctx context.Context, pluginStream
default:
// pass all other events onwards
// convert to a observable event
ev, err := events.SourceEventFromProto(protoEvent)
ev, err := events.EventFromProto(protoEvent)
if err != nil {
w.NotifyError(ctx, w.executionId, err)
continue
Expand Down
8 changes: 8 additions & 0 deletions events/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ func NewChunkEvent(executionId string, chunkNumber int32) *Chunk {
}
}

func ChunkWrittenFromProto(e *proto.Event) Event {
event := e.GetChunkWrittenEvent()
return &Chunk{
ExecutionId: event.ExecutionId,
ChunkNumber: event.ChunkNumber,
}
}

// ToProto converts the event to a proto.Event
func (r *Chunk) ToProto() *proto.Event {
return &proto.Event{
Expand Down
15 changes: 15 additions & 0 deletions events/complete.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package events

import (
"errors"
"github.com/turbot/tailpipe-plugin-sdk/grpc/proto"
)

Expand All @@ -21,6 +22,20 @@ func NewCompletedEvent(executionId string, rowCount int64, chunksWritten int32,
}
}

func CompleteFromProto(e *proto.Event) Event {
event := e.GetCompleteEvent()

res := &Complete{
ExecutionId: event.ExecutionId,
RowCount: event.RowCount,
ChunksWritten: event.ChunkCount,
}
if event.Error != "" {
res.Err = errors.New(event.Error)
}
return res
}

func (c *Complete) ToProto() *proto.Event {
errString := ""
if c.Err != nil {
Expand Down
14 changes: 12 additions & 2 deletions events/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"github.com/turbot/tailpipe-plugin-sdk/grpc/proto"
)

// SourceEventFromProto converts a proto.Event to an Event
// EventFromProto converts a proto.Event to an Event
// NOTE: this function is used for sources implemented in external plugins so handles source events ONLY
func SourceEventFromProto(e *proto.Event) (Event, error) {
func EventFromProto(e *proto.Event) (Event, error) {
switch e.Event.(type) {
case *proto.Event_ArtifactDiscoveredEvent:
return ArtifactDiscoveredFromProto(e), nil
Expand All @@ -21,6 +21,16 @@ func SourceEventFromProto(e *proto.Event) (Event, error) {
return SourceCompleteFromProto(e), nil
case *proto.Event_ErrorEvent:
return ErrorFromProto(e), nil
case *proto.Event_StartedEvent:
return StartedFromProto(e), nil

case *proto.Event_StatusEvent:
return StatusFromProto(e), nil
case *proto.Event_ChunkWrittenEvent:
return ChunkWrittenFromProto(e), nil
case *proto.Event_CompleteEvent:
return CompleteFromProto(e), nil

default:
return nil, fmt.Errorf("event %s not expected from source", e)
}
Expand Down
6 changes: 6 additions & 0 deletions events/started.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ func NewStartedEvent(executionId string) *Started {
}
}

func StartedFromProto(e *proto.Event) Event {
return &Started{
ExecutionId: e.GetStartedEvent().ExecutionId,
}
}

func (s *Started) ToProto() *proto.Event {
return &proto.Event{
Event: &proto.Event_StartedEvent{
Expand Down
4 changes: 3 additions & 1 deletion events/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ func NewStatusEvent(executionId string) *Status {
mut: &sync.Mutex{},
}
}
func StatusFromProto(event *proto.EventStatus) *Status {

func StatusFromProto(e *proto.Event) *Status {
event := e.GetStatusEvent()
s := &Status{
LatestArtifactLocation: event.LatestArtifactPath,
ArtifactsDiscovered: event.ArtifactsDiscovered,
Expand Down
15 changes: 15 additions & 0 deletions filepaths/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,18 @@ func EnsureJSONLPath(baseDir string) (string, error) {

return sourceFilePath, nil
}

// EnsureArtifactPath ensures the artifact temp dir path exists - this is the folder where the artifact source writes downloaded files
func EnsureArtifactPath(baseDir string) (string, error) {
artifactPath := filepath.Join(baseDir, "artifacts")

// ensure it exists
if _, err := os.Stat(artifactPath); os.IsNotExist(err) {
err = os.MkdirAll(artifactPath, 0755)
if err != nil {
return "", fmt.Errorf("could not create artifact directory %s: %w", artifactPath, err)
}
}

return artifactPath, nil
}
2 changes: 1 addition & 1 deletion formats/jsonl.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (d *JsonLines) GetMapper() (mappers.Mapper[*types.DynamicRow], error) {
return nil, fmt.Errorf("JsonLines format does not support a mapper")
}

// GetCsvOpts converts the Delimited configuration into a slice of CSV options strings
// GetReadJsonOpts converts the Delimited configuration into a slice of CSV options strings
// in the format expected by DuckDb read_csv function
func (d *JsonLines) GetReadJsonOpts() []string {
var opts []string
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ require (
github.com/marcboeker/go-duckdb/v2 v2.1.0
github.com/rs/xid v1.5.0
github.com/satyrius/gonx v1.4.0
github.com/sethvargo/go-retry v0.3.0
github.com/stretchr/testify v1.10.0
github.com/turbot/go-kit v1.2.0
github.com/turbot/pipe-fittings/v2 v2.3.3
github.com/turbot/go-kit v1.3.0
github.com/turbot/pipe-fittings/v2 v2.4.0-rc.4
github.com/zclconf/go-cty v1.14.4
golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c
golang.org/x/sync v0.12.0
Expand Down
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,8 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g
github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ=
github.com/satyrius/gonx v1.4.0 h1:F3uxif5Yx6FBzdQAh79bHQK6CTJugOcN0w0Z8azQuQg=
github.com/satyrius/gonx v1.4.0/go.mod h1:+r8KNe5d2tjkZU+DfhERo0G6KxkGih+1qYF6tqLHwvk=
github.com/sethvargo/go-retry v0.3.0 h1:EEt31A35QhrcRZtrYFDTBg91cqZVnFL2navjDrah2SE=
github.com/sethvargo/go-retry v0.3.0/go.mod h1:mNX17F0C/HguQMyMyJxcnU471gOZGxCLyYaFyAZraas=
github.com/shiena/ansicolor v0.0.0-20230509054315-a9deabde6e02 h1:v9ezJDHA1XGxViAUSIoO/Id7Fl63u6d0YmsAm+/p2hs=
github.com/shiena/ansicolor v0.0.0-20230509054315-a9deabde6e02/go.mod h1:RF16/A3L0xSa0oSERcnhd8Pu3IXSDZSK2gmGIMsttFE=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
Expand Down Expand Up @@ -699,10 +701,10 @@ github.com/tklauser/numcpus v0.3.0 h1:ILuRUQBtssgnxw0XXIjKUC56fgnOrFoQQ/4+DeU2bi
github.com/tklauser/numcpus v0.3.0/go.mod h1:yFGUr7TUHQRAhyqBcEg0Ge34zDBAsIvJJcyE6boqnA8=
github.com/tkrajina/go-reflector v0.5.8 h1:yPADHrwmUbMq4RGEyaOUpz2H90sRsETNVpjzo3DLVQQ=
github.com/tkrajina/go-reflector v0.5.8/go.mod h1:ECbqLgccecY5kPmPmXg1MrHW585yMcDkVl6IvJe64T4=
github.com/turbot/go-kit v1.2.0 h1:4pBDu2LGoqF2y6ypL4xJjqlDW0BkUw3IIDlyHkU0O88=
github.com/turbot/go-kit v1.2.0/go.mod h1:1xmRuQ0cn/10QUMNLNOAFIqN8P6Rz5s3VLT8mkN3nF8=
github.com/turbot/pipe-fittings/v2 v2.3.3 h1:7nz5MQah4++qL4zLP7eRNyhiwwm9eXhdwbiyGyYSqfA=
github.com/turbot/pipe-fittings/v2 v2.3.3/go.mod h1:wEoN4EseMTXophNlpOe740rAC9Jg0JhGRt5QM5R2ss8=
github.com/turbot/go-kit v1.3.0 h1:6cIYPAO5hO9fG7Zd5UBC4Ch3+C6AiiyYS0UQnrUlTV0=
github.com/turbot/go-kit v1.3.0/go.mod h1:piKJMYCF8EYmKf+D2B78Csy7kOHGmnQVOWingtLKWWQ=
github.com/turbot/pipe-fittings/v2 v2.4.0-rc.4 h1:tiRlC8XTf9PH+KV/4wcz6uCog93zd3qNn4pZLwDcR8c=
github.com/turbot/pipe-fittings/v2 v2.4.0-rc.4/go.mod h1:szte433cBDCaZcGe5zMVGG7uTl9HMaEYaQmuvzZRYIQ=
github.com/turbot/pipes-sdk-go v0.12.0 h1:esbbR7bALa5L8n/hqroMPaQSSo3gNM/4X0iTmHa3D6U=
github.com/turbot/pipes-sdk-go v0.12.0/go.mod h1:Mb+KhvqqEdRbz/6TSZc2QWDrMa5BN3E4Xw+gPt2TRkc=
github.com/turbot/terraform-components v0.0.0-20231213122222-1f3526cab7a7 h1:qDMxFVd8Zo0rIhnEBdCIbR+T6WgjwkxpFZMN8zZmmjg=
Expand Down
50 changes: 33 additions & 17 deletions grpc/proto/plugin.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion grpc/proto/plugin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ service TailpipePlugin {
rpc CloseSource(Empty) returns (Empty);
rpc SaveCollectionState(Empty) returns (Empty);
rpc SourceCollect(SourceCollectRequest) returns (Empty);
rpc SourcePause(Empty) returns (Empty);
rpc SourceResume(Empty) returns (Empty);
}

message Empty{}


message CollectRequest {
string table_name = 1;
string partition_name = 2;
Expand All @@ -42,6 +43,8 @@ message CollectRequest {
SourcePluginReattach source_plugin = 10;
// optional: the collection start time
google.protobuf.Timestamp from_time = 11;
// the max space to take with temp files
int64 max_temp_cache_size_mb= 12;
}

message UpdateCollectionStateRequest {
Expand Down
Loading