Skip to content

Commit

Permalink
refactor: Make input feeder's partition ID as a constant
Browse files Browse the repository at this point in the history
  • Loading branch information
therne committed Sep 13, 2021
1 parent 81ed1c9 commit 46f058d
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
6 changes: 3 additions & 3 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (m *Remote) createJob(ctx context.Context) error {
return wg.Wait()
}

func (m *Remote) feedInput(ctx context.Context, j *job.Job, input input.Feeder) (err error) {
func (m *Remote) feedInput(ctx context.Context, j *job.Job, in input.Feeder) (err error) {
// feed input to first stage (ignore stage #0 because it is input stage itself)
firstStage, targets := j.Stages[1], j.Partitions[1]
partitioner := j.Stages[0].Output.Partitioner
Expand Down Expand Up @@ -302,10 +302,10 @@ func (m *Remote) feedInput(ctx context.Context, j *job.Job, input input.Feeder)
if err := wg.Wait(); err != nil {
return err
}
writer := output.NewWriter("0", partitioner, outputsToFirstStage)
writer := output.NewWriter(input.FeederPartitionID, partitioner, outputsToFirstStage)
defer errorist.CloseWithErrCapture(writer, &err, errorist.Wrapf("close"))

if err := input.FeedInput(writer); err != nil {
if err := in.FeedInput(writer); err != nil {
return errors.Wrap(err, "write data")
}
return nil
Expand Down
2 changes: 2 additions & 0 deletions input/feeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"github.com/ab180/lrmr/output"
)

const FeederPartitionID = "__input"

type Feeder interface {
FeedInput(out output.Output) error
}

0 comments on commit 46f058d

Please sign in to comment.