Skip to content

Commit b2d1180

Browse files
committed
feat: load git from s3 first
Signed-off-by: Tianchu Zhao <[email protected]>
1 parent cfa71ce commit b2d1180

File tree

2 files changed

+71
-15
lines changed

2 files changed

+71
-15
lines changed

workflow/artifacts/azure/azure.go

+3
Original file line numberDiff line numberDiff line change
@@ -121,16 +121,19 @@ func (azblobDriver *ArtifactDriver) Load(artifact *wfv1.Artifact, path string) e
121121
}
122122
isEmptyFile = true
123123
} else if !bloberror.HasCode(origErr, bloberror.BlobNotFound) {
124+
_ = os.Remove(path)
124125
return fmt.Errorf("unable to download blob %s: %s", artifact.Azure.Blob, origErr)
125126
}
126127

127128
isDir, err := azblobDriver.IsDirectory(artifact)
128129
if err != nil {
130+
_ = os.Remove(path)
129131
return fmt.Errorf("unable to determine if %s is a directory: %s", artifact.Azure.Blob, err)
130132
}
131133

132134
// It's not a directory and the file doesn't exist, Return the original NoSuchKey error.
133135
if !isDir && !isEmptyFile {
136+
_ = os.Remove(path)
134137
return argoerrors.New(argoerrors.CodeNotFound, origErr.Error())
135138
}
136139

workflow/executor/executor.go

+68-15
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"fmt"
1212
"io"
1313
"io/fs"
14+
"math"
1415
"os"
1516
"path"
1617
"path/filepath"
@@ -162,7 +163,6 @@ func (we *WorkflowExecutor) HandleError(ctx context.Context) {
162163
func (we *WorkflowExecutor) LoadArtifacts(ctx context.Context) error {
163164
log.Infof("Start loading input artifacts...")
164165
for _, art := range we.Template.Inputs.Artifacts {
165-
166166
log.Infof("Downloading artifact: %s", art.Name)
167167

168168
if !art.HasLocationOrKey() {
@@ -177,14 +177,6 @@ func (we *WorkflowExecutor) LoadArtifacts(ctx context.Context) error {
177177
if err != nil {
178178
return err
179179
}
180-
driverArt, err := we.newDriverArt(&art)
181-
if err != nil {
182-
return fmt.Errorf("failed to load artifact '%s': %w", art.Name, err)
183-
}
184-
artDriver, err := we.InitDriver(ctx, driverArt)
185-
if err != nil {
186-
return err
187-
}
188180
// Determine the file path of where to load the artifact
189181
var artPath string
190182
mnt := common.FindOverlappingVolume(&we.Template, art.Path)
@@ -204,13 +196,74 @@ func (we *WorkflowExecutor) LoadArtifacts(ctx context.Context) error {
204196
// the file is a tarball or not. If it is, it is first extracted then renamed to
205197
// the desired location. If not, it is simply renamed to the location.
206198
tempArtPath := artPath + ".tmp"
207-
err = artDriver.Load(driverArt, tempArtPath)
208-
if err != nil {
209-
if art.Optional && argoerrs.IsCode(argoerrs.CodeNotFound, err) {
210-
log.Infof("Skipping optional input artifact that was not found: %s", art.Name)
211-
continue
199+
200+
proceed := true
201+
gitLoopCount := 0
202+
if art.Git != nil {
203+
// if git artifact, try s3 first
204+
for {
205+
if gitLoopCount >= 3 || !proceed {
206+
break
207+
}
208+
proceed = true
209+
repoString := art.Git.Repo[strings.LastIndex(art.Git.Repo, ":")+1:]
210+
repoStringArray := strings.Split(strings.Replace(repoString, ".git", "", -1), "/")
211+
repoString = repoStringArray[len(repoStringArray)-2] + "/" + repoStringArray[len(repoStringArray)-1]
212+
s3Key := "git-artifacts/workflow/" + we.workflow + "/" + repoString
213+
214+
artS3 := wfv1.Artifact{
215+
ArtifactLocation: wfv1.ArtifactLocation{
216+
S3: &wfv1.S3Artifact{
217+
Key: s3Key,
218+
},
219+
},
220+
}
221+
log.Info(artS3)
222+
driverArt, err := we.newDriverArt(&artS3)
223+
if err != nil {
224+
log.Warnf("failed to load artifact '%s': %w", artS3.Name, err)
225+
} else {
226+
artDriver, err := we.InitDriver(ctx, driverArt)
227+
if err != nil {
228+
log.Warn(err)
229+
} else {
230+
err = artDriver.Load(driverArt, tempArtPath)
231+
if err != nil {
232+
if art.Optional && argoerrs.IsCode(argoerrs.CodeNotFound, err) {
233+
log.Infof("Skipping optional input artifact that was not found: %s", artS3.Name)
234+
continue
235+
}
236+
log.Warnf("artifact %s failed to load: %w", artS3.Name, err)
237+
} else {
238+
proceed = false
239+
}
240+
}
241+
}
242+
baseDelay := 1 * time.Second
243+
secRetry := math.Pow(2, float64(gitLoopCount))
244+
delay := time.Duration(secRetry) * baseDelay
245+
time.Sleep(delay)
246+
gitLoopCount++
247+
}
248+
}
249+
if proceed {
250+
// other artifact
251+
driverArt, err := we.newDriverArt(&art)
252+
if err != nil {
253+
return fmt.Errorf("failed to load artifact '%s': %w", art.Name, err)
254+
}
255+
artDriver, err := we.InitDriver(ctx, driverArt)
256+
if err != nil {
257+
return err
258+
}
259+
err = artDriver.Load(driverArt, tempArtPath)
260+
if err != nil {
261+
if art.Optional && argoerrs.IsCode(argoerrs.CodeNotFound, err) {
262+
log.Infof("Skipping optional input artifact that was not found: %s", art.Name)
263+
continue
264+
}
265+
return fmt.Errorf("artifact %s failed to load: %w", art.Name, err)
212266
}
213-
return fmt.Errorf("artifact %s failed to load: %w", art.Name, err)
214267
}
215268

216269
isTar := false

0 commit comments

Comments
 (0)