Skip to content

Commit 8c55a90

Browse files
committed
feat: load git from s3 first
Signed-off-by: Tianchu Zhao <[email protected]>
1 parent cfa71ce commit 8c55a90

File tree

2 files changed

+75
-15
lines changed

2 files changed

+75
-15
lines changed

workflow/artifacts/azure/azure.go

+3
Original file line numberDiff line numberDiff line change
@@ -121,16 +121,19 @@ func (azblobDriver *ArtifactDriver) Load(artifact *wfv1.Artifact, path string) e
121121
}
122122
isEmptyFile = true
123123
} else if !bloberror.HasCode(origErr, bloberror.BlobNotFound) {
124+
_ = os.Remove(path)
124125
return fmt.Errorf("unable to download blob %s: %s", artifact.Azure.Blob, origErr)
125126
}
126127

127128
isDir, err := azblobDriver.IsDirectory(artifact)
128129
if err != nil {
130+
_ = os.Remove(path)
129131
return fmt.Errorf("unable to determine if %s is a directory: %s", artifact.Azure.Blob, err)
130132
}
131133

132134
// It's not a directory and the file doesn't exist, Return the original NoSuchKey error.
133135
if !isDir && !isEmptyFile {
136+
_ = os.Remove(path)
134137
return argoerrors.New(argoerrors.CodeNotFound, origErr.Error())
135138
}
136139

workflow/executor/executor.go

+72-15
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"fmt"
1212
"io"
1313
"io/fs"
14+
"math"
1415
"os"
1516
"path"
1617
"path/filepath"
@@ -162,7 +163,6 @@ func (we *WorkflowExecutor) HandleError(ctx context.Context) {
162163
func (we *WorkflowExecutor) LoadArtifacts(ctx context.Context) error {
163164
log.Infof("Start loading input artifacts...")
164165
for _, art := range we.Template.Inputs.Artifacts {
165-
166166
log.Infof("Downloading artifact: %s", art.Name)
167167

168168
if !art.HasLocationOrKey() {
@@ -177,14 +177,6 @@ func (we *WorkflowExecutor) LoadArtifacts(ctx context.Context) error {
177177
if err != nil {
178178
return err
179179
}
180-
driverArt, err := we.newDriverArt(&art)
181-
if err != nil {
182-
return fmt.Errorf("failed to load artifact '%s': %w", art.Name, err)
183-
}
184-
artDriver, err := we.InitDriver(ctx, driverArt)
185-
if err != nil {
186-
return err
187-
}
188180
// Determine the file path of where to load the artifact
189181
var artPath string
190182
mnt := common.FindOverlappingVolume(&we.Template, art.Path)
@@ -204,13 +196,78 @@ func (we *WorkflowExecutor) LoadArtifacts(ctx context.Context) error {
204196
// the file is a tarball or not. If it is, it is first extracted then renamed to
205197
// the desired location. If not, it is simply renamed to the location.
206198
tempArtPath := artPath + ".tmp"
207-
err = artDriver.Load(driverArt, tempArtPath)
208-
if err != nil {
209-
if art.Optional && argoerrs.IsCode(argoerrs.CodeNotFound, err) {
210-
log.Infof("Skipping optional input artifact that was not found: %s", art.Name)
211-
continue
199+
200+
proceed := true
201+
gitLoopCount := 0
202+
if art.Git != nil {
203+
// if git artifact, try s3 first
204+
for {
205+
if gitLoopCount >= 3 || !proceed {
206+
break
207+
}
208+
proceed = true
209+
branch := "master"
210+
if art.Git.Branch != "" {
211+
branch = art.Git.Branch
212+
}
213+
repoString := art.Git.Repo[strings.LastIndex(art.Git.Repo, ":")+1:]
214+
repoStringArray := strings.Split(strings.Replace(repoString, ".git", "", -1), "/")
215+
repoString = repoStringArray[len(repoStringArray)-2] + "/" + repoStringArray[len(repoStringArray)-1]
216+
s3Key := "git-artifacts/workflow/" + we.workflow + "/" + repoString + "/" + branch
217+
218+
artS3 := wfv1.Artifact{
219+
ArtifactLocation: wfv1.ArtifactLocation{
220+
S3: &wfv1.S3Artifact{
221+
Key: s3Key,
222+
},
223+
},
224+
}
225+
log.Info(artS3)
226+
driverArt, err := we.newDriverArt(&artS3)
227+
if err != nil {
228+
log.Warn(err)
229+
} else {
230+
artDriver, err := we.InitDriver(ctx, driverArt)
231+
if err != nil {
232+
log.Warn(err)
233+
} else {
234+
err = artDriver.Load(driverArt, tempArtPath)
235+
if err != nil {
236+
if art.Optional && argoerrs.IsCode(argoerrs.CodeNotFound, err) {
237+
log.Infof("Skipping optional input artifact that was not found: %s", artS3.Name)
238+
continue
239+
}
240+
log.Warn(err)
241+
} else {
242+
proceed = false
243+
}
244+
}
245+
}
246+
baseDelay := 1 * time.Second
247+
secRetry := math.Pow(2, float64(gitLoopCount))
248+
delay := time.Duration(secRetry) * baseDelay
249+
time.Sleep(delay)
250+
gitLoopCount++
251+
}
252+
}
253+
if proceed {
254+
// other artifact
255+
driverArt, err := we.newDriverArt(&art)
256+
if err != nil {
257+
return fmt.Errorf("failed to load artifact '%s': %w", art.Name, err)
258+
}
259+
artDriver, err := we.InitDriver(ctx, driverArt)
260+
if err != nil {
261+
return err
262+
}
263+
err = artDriver.Load(driverArt, tempArtPath)
264+
if err != nil {
265+
if art.Optional && argoerrs.IsCode(argoerrs.CodeNotFound, err) {
266+
log.Infof("Skipping optional input artifact that was not found: %s", art.Name)
267+
continue
268+
}
269+
return fmt.Errorf("artifact %s failed to load: %w", art.Name, err)
212270
}
213-
return fmt.Errorf("artifact %s failed to load: %w", art.Name, err)
214271
}
215272

216273
isTar := false

0 commit comments

Comments
 (0)