Skip to content

Commit d7ed403

Browse files
committed
Add image pull concurrency limit.
Signed-off-by: Lantao Liu <[email protected]>
1 parent 5abeeff commit d7ed403

File tree

10 files changed

+196
-9
lines changed

10 files changed

+196
-9
lines changed

client.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ import (
6161
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
6262
specs "github.com/opencontainers/runtime-spec/specs-go"
6363
"github.com/pkg/errors"
64+
"golang.org/x/sync/semaphore"
6465
"google.golang.org/grpc"
6566
"google.golang.org/grpc/health/grpc_health_v1"
6667
)
@@ -292,6 +293,9 @@ type RemoteContext struct {
292293
// platforms will be used to create a PlatformMatcher with no ordering
293294
// preference.
294295
Platforms []string
296+
297+
// MaxConcurrentDownloads is the max concurrent content downloads for each pull.
298+
MaxConcurrentDownloads int
295299
}
296300

297301
func defaultRemoteContext() *RemoteContext {
@@ -407,6 +411,7 @@ func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, lim
407411

408412
isConvertible bool
409413
converterFunc func(context.Context, ocispec.Descriptor) (ocispec.Descriptor, error)
414+
limiter *semaphore.Weighted
410415
)
411416

412417
if desc.MediaType == images.MediaTypeDockerSchema1Manifest && rCtx.ConvertSchema1 {
@@ -453,7 +458,10 @@ func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, lim
453458
}
454459
}
455460

456-
if err := images.Dispatch(ctx, handler, desc); err != nil {
461+
if rCtx.MaxConcurrentDownloads > 0 {
462+
limiter = semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads))
463+
}
464+
if err := images.Dispatch(ctx, handler, limiter, desc); err != nil {
457465
return images.Image{}, err
458466
}
459467

client_opts.go

+8
Original file line numberDiff line numberDiff line change
@@ -178,3 +178,11 @@ func WithImageHandler(h images.Handler) RemoteOpt {
178178
return nil
179179
}
180180
}
181+
182+
// WithMaxConcurrentDownloads sets max concurrent download limit.
183+
func WithMaxConcurrentDownloads(max int) RemoteOpt {
184+
return func(client *Client, c *RemoteContext) error {
185+
c.MaxConcurrentDownloads = max
186+
return nil
187+
}
188+
}

client_test.go

+17
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,23 @@ func TestImagePullSchema1(t *testing.T) {
327327
}
328328
}
329329

330+
func TestImagePullWithConcurrencyLimit(t *testing.T) {
331+
client, err := newClient(t, address)
332+
if err != nil {
333+
t.Fatal(err)
334+
}
335+
defer client.Close()
336+
337+
ctx, cancel := testContext()
338+
defer cancel()
339+
_, err = client.Pull(ctx, testImage,
340+
WithPlatformMatcher(platforms.Default()),
341+
WithMaxConcurrentDownloads(2))
342+
if err != nil {
343+
t.Fatal(err)
344+
}
345+
}
346+
330347
func TestClientReconnect(t *testing.T) {
331348
t.Parallel()
332349

images/handlers.go

+14-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
2727
"github.com/pkg/errors"
2828
"golang.org/x/sync/errgroup"
29+
"golang.org/x/sync/semaphore"
2930
)
3031

3132
var (
@@ -108,19 +109,30 @@ func Walk(ctx context.Context, handler Handler, descs ...ocispec.Descriptor) err
108109
// handler may return `ErrSkipDesc` to signal to the dispatcher to not traverse
109110
// any children.
110111
//
112+
// A concurrency limiter can be passed in to limit the number of concurrent
113+
// handlers running. When limiter is nil, there is no limit.
114+
//
111115
// Typically, this function will be used with `FetchHandler`, often composed
112116
// with other handlers.
113117
//
114118
// If any handler returns an error, the dispatch session will be canceled.
115-
func Dispatch(ctx context.Context, handler Handler, descs ...ocispec.Descriptor) error {
119+
func Dispatch(ctx context.Context, handler Handler, limiter *semaphore.Weighted, descs ...ocispec.Descriptor) error {
116120
eg, ctx := errgroup.WithContext(ctx)
117121
for _, desc := range descs {
118122
desc := desc
119123

124+
if limiter != nil {
125+
if err := limiter.Acquire(ctx, 1); err != nil {
126+
return err
127+
}
128+
}
120129
eg.Go(func() error {
121130
desc := desc
122131

123132
children, err := handler.Handle(ctx, desc)
133+
if limiter != nil {
134+
limiter.Release(1)
135+
}
124136
if err != nil {
125137
if errors.Cause(err) == ErrSkipDesc {
126138
return nil // don't traverse the children.
@@ -129,7 +141,7 @@ func Dispatch(ctx context.Context, handler Handler, descs ...ocispec.Descriptor)
129141
}
130142

131143
if len(children) > 0 {
132-
return Dispatch(ctx, handler, children...)
144+
return Dispatch(ctx, handler, limiter, children...)
133145
}
134146

135147
return nil

remotes/handlers.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ func PushContent(ctx context.Context, pusher Pusher, desc ocispec.Descriptor, pr
181181
pushHandler,
182182
)
183183

184-
if err := images.Dispatch(ctx, images.Handlers(handlers...), desc); err != nil {
184+
if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil {
185185
return err
186186
}
187187

vendor.conf

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ github.com/pkg/errors v0.8.0
2929
github.com/opencontainers/go-digest c9281466c8b2f606084ac71339773efd177436e7
3030
golang.org/x/sys 1b2967e3c290b7c545b3db0deeda16e9be4f98a2 https://github.com/golang/sys
3131
github.com/opencontainers/image-spec v1.0.1
32-
golang.org/x/sync 450f422ab23cf9881c94e2db30cac0eb1b7cf80c
32+
golang.org/x/sync 42b317875d0fa942474b76e1b46a6060d720ae6e
3333
github.com/BurntSushi/toml a368813c5e648fee92e5f6c30e3944ff9d5e8895
3434
github.com/grpc-ecosystem/go-grpc-prometheus 6b7015e65d366bf3f19b2b2a000a831940f0f7e0
3535
github.com/Microsoft/go-winio v0.4.11

vendor/golang.org/x/sync/README

-2
This file was deleted.

vendor/golang.org/x/sync/README.md

+18
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/golang.org/x/sync/errgroup/errgroup.go

+1-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/golang.org/x/sync/semaphore/semaphore.go

+127
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)