diff --git a/aws/v4/aws_v4.go b/aws/v4/aws_v4.go index f18b8df43..bed93a66c 100644 --- a/aws/v4/aws_v4.go +++ b/aws/v4/aws_v4.go @@ -34,6 +34,52 @@ func NewV4SigningClientWithHTTPClient(creds *credentials.Credentials, region str } } +// NewV4SigningClientWithOptions returns a configured *http.Client +// that will sign all requests with AWS V4 Signing. +func NewV4SigningClientWithOptions(opts ...SigningClientOption) *http.Client { + tr := &Transport{} + for _, o := range opts { + o(tr) + } + if tr.client == nil { + tr.client = http.DefaultClient + } + return &http.Client{ + Transport: tr, + } +} + +// SigningClientOption specifies options to be used with NewV4SigningClientWithOptions. +type SigningClientOption func(*Transport) + +// WithHTTPClient configures the http.Client to be used in Transport. +func WithHTTPClient(client *http.Client) SigningClientOption { + return func(tr *Transport) { + tr.client = client + } +} + +// WithCredentials configures the AWS credentials to be used in Transport. +func WithCredentials(creds *credentials.Credentials) SigningClientOption { + return func(tr *Transport) { + tr.creds = creds + } +} + +// WithSigner configures the AWS signer to be used in Transport. +func WithSigner(signer *v4.Signer) SigningClientOption { + return func(tr *Transport) { + tr.signer = signer + } +} + +// WithRegion configures the AWS region to be used in Transport, e.g. eu-west-1. +func WithRegion(region string) SigningClientOption { + return func(tr *Transport) { + tr.region = region + } +} + // Transport is a RoundTripper that will sign requests with AWS V4 Signing type Transport struct { client *http.Client @@ -49,6 +95,7 @@ func (st Transport) RoundTrip(req *http.Request) (*http.Response, error) { return st.client.Do(req) } + // TODO(oe) Do we still need this? Can we use signer.DisableURIPathEscaping = true instead? if strings.Contains(req.URL.RawPath, "%2C") { // Escaping path req.URL.RawPath = url.PathEscape(req.URL.RawPath) @@ -57,14 +104,19 @@ func (st Transport) RoundTrip(req *http.Request) (*http.Response, error) { now := time.Now().UTC() req.Header.Set("Date", now.Format(time.RFC3339)) - var err error switch req.Body { case nil: - _, err = st.signer.Sign(req, nil, "es", st.region, now) + _, err := st.signer.Sign(req, nil, "es", st.region, now) + if err != nil { + return nil, err + } default: switch body := req.Body.(type) { case io.ReadSeeker: - _, err = st.signer.Sign(req, body, "es", st.region, now) + _, err := st.signer.Sign(req, body, "es", st.region, now) + if err != nil { + return nil, err + } default: buf, err := ioutil.ReadAll(req.Body) if err != nil { @@ -72,10 +124,10 @@ func (st Transport) RoundTrip(req *http.Request) (*http.Response, error) { } req.Body = ioutil.NopCloser(bytes.NewReader(buf)) _, err = st.signer.Sign(req, bytes.NewReader(buf), "es", st.region, time.Now().UTC()) + if err != nil { + return nil, err + } } } - if err != nil { - return nil, err - } return st.client.Do(req) } diff --git a/client.go b/client.go index d83906c29..eabba12ab 100644 --- a/client.go +++ b/client.go @@ -43,7 +43,7 @@ const ( // for a response from Elasticsearch on startup, i.e. when creating a // client. After the client is started, a shorter timeout is commonly used // (its default is specified in DefaultHealthcheckTimeout). - DefaultHealthcheckTimeoutStartup = 5 * time.Second + DefaultHealthcheckTimeoutStartup = 10 * time.Second // DefaultHealthcheckTimeout specifies the time a running client waits for // a response from Elasticsearch. Notice that the healthcheck timeout @@ -148,6 +148,7 @@ type Client struct { retrier Retrier // strategy for retries retryStatusCodes []int // HTTP status codes where to retry automatically (with retrier) headers http.Header // a list of default headers to add to each request + closeIdleConnsForDeadConn bool // enable to call CloseIdleConnections when we find a dead node } // NewClient creates a new client to work with Elasticsearch. @@ -472,6 +473,18 @@ func configToOptions(cfg *config.Config) ([]ClientOptionFunc, error) { return options, nil } +// SetCloseIdleConnections, when enabled, will call CloseIdleConnections +// whenever we find a dead connection in PerformRequest. This might help +// to fix issues with e.g. AWS Elasticsearch Service that automatically +// changes its configuration and leads Go net/http to use cached HTTP +// connection when it shouldn't. +func SetCloseIdleConnections(enabled bool) ClientOptionFunc { + return func(c *Client) error { + c.closeIdleConnsForDeadConn = enabled + return nil + } +} + // SetHttpClient can be used to specify the http.Client to use when making // HTTP requests to Elasticsearch. func SetHttpClient(httpClient Doer) ClientOptionFunc { @@ -594,7 +607,7 @@ func SetHealthcheck(enabled bool) ClientOptionFunc { } // SetHealthcheckTimeoutStartup sets the timeout for the initial health check. -// The default timeout is 5 seconds (see DefaultHealthcheckTimeoutStartup). +// The default timeout is 10 seconds (see DefaultHealthcheckTimeoutStartup). // Notice that timeouts for subsequent health checks can be modified with // SetHealthcheckTimeout. func SetHealthcheckTimeoutStartup(timeout time.Duration) ClientOptionFunc { @@ -1326,6 +1339,21 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions) retryStatusCodes = opt.RetryStatusCodes } defaultHeaders := c.headers + closeIdleConns := func() {} + if c.closeIdleConnsForDeadConn { + // If we're e.g. on AWS, we should make sure to close idle connections. + // That might happen when the AWS Elasticsearch domain is re-configured. + // Closing idle connections makes sure that net/http creates a + // new HTTP connection instead of re-using one from the cache. + closeIdleConns = func() { + type idleCloser interface { + CloseIdleConnections() + } + if ic, ok := c.c.(idleCloser); ok { + ic.CloseIdleConnections() + } + } + } c.mu.RUnlock() // retry returns true if statusCode indicates the request is to be retried @@ -1434,11 +1462,13 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions) wait, ok, rerr := retrier.Retry(ctx, n, (*http.Request)(req), res, err) if rerr != nil { c.errorf("elastic: %s is dead", conn.URL()) + closeIdleConns() conn.MarkAsDead() return nil, rerr } if !ok { c.errorf("elastic: %s is dead", conn.URL()) + closeIdleConns() conn.MarkAsDead() return nil, err } @@ -1451,6 +1481,7 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions) wait, ok, rerr := retrier.Retry(ctx, n, (*http.Request)(req), res, err) if rerr != nil { c.errorf("elastic: %s is dead", conn.URL()) + closeIdleConns() conn.MarkAsDead() return nil, rerr } diff --git a/client_test.go b/client_test.go index 0a59ad30c..9d9415f25 100644 --- a/client_test.go +++ b/client_test.go @@ -20,6 +20,7 @@ import ( "runtime" "strings" "sync" + "sync/atomic" "testing" "time" @@ -77,6 +78,9 @@ func TestClientDefaults(t *testing.T) { if client.sendGetBodyAs != "GET" { t.Errorf("expected sendGetBodyAs to be GET; got: %q", client.sendGetBodyAs) } + if client.closeIdleConnsForDeadConn != false { + t.Errorf("expected closeIdleConnsForDeadConn to be false; got: %v", client.closeIdleConnsForDeadConn) + } } func TestClientWithoutURL(t *testing.T) { @@ -1430,9 +1434,10 @@ func TestPerformRequestOnNoConnectionsWithHealthcheckRevival(t *testing.T) { // failingTransport will run a fail callback if it sees a given URL path prefix. type failingTransport struct { - path string // path prefix to look for - fail func(*http.Request) (*http.Response, error) // call when path prefix is found - next http.RoundTripper // next round-tripper (use http.DefaultTransport if nil) + path string // path prefix to look for + fail func(*http.Request) (*http.Response, error) // call when path prefix is found + next http.RoundTripper // next round-tripper (use http.DefaultTransport if nil) + closeIdleConns func() // callback for CloseIdleConnections } // RoundTrip implements a failing transport. @@ -1446,6 +1451,12 @@ func (tr *failingTransport) RoundTrip(r *http.Request) (*http.Response, error) { return http.DefaultTransport.RoundTrip(r) } +func (tr *failingTransport) CloseIdleConnections() { + if tr.closeIdleConns != nil { + tr.closeIdleConns() + } +} + func TestPerformRequestRetryOnHttpError(t *testing.T) { var numFailedReqs int fail := func(r *http.Request) (*http.Response, error) { @@ -1556,6 +1567,96 @@ func TestPerformRequestOnSpecifiedHttpStatusCodes(t *testing.T) { } } +func TestPerformRequestCloseIdleConnectionsEnabled(t *testing.T) { + var ( + numCallsRoundTripper int64 + numCallsCloseIdleConns int64 + ) + tr := &failingTransport{ + path: "/fail", + fail: func(r *http.Request) (*http.Response, error) { + // Called with every retry + atomic.AddInt64(&numCallsRoundTripper, 1) + return http.DefaultTransport.RoundTrip(r) + }, + closeIdleConns: func() { + // Called when a connection is marked as dead + atomic.AddInt64(&numCallsCloseIdleConns, 1) + }, + } + httpClient := &http.Client{Transport: tr} + + client, err := NewClient( + SetURL("http://127.0.0.1:9201"), + SetHttpClient(httpClient), + SetMaxRetries(5), + SetSniff(false), + SetHealthcheck(false), + SetCloseIdleConnections(true), // <- call CloseIdleConnections for dead nodes + ) + if err != nil { + t.Fatal(err) + } + + // Make a request, so that the connection is marked as dead. + client.PerformRequest(context.TODO(), PerformRequestOptions{ + Method: "GET", + Path: "/fail", + }) + + if want, have := int64(5), numCallsRoundTripper; want != have { + t.Errorf("expected %d calls to RoundTripper; got: %d", want, have) + } + if want, have := int64(1), numCallsCloseIdleConns; want != have { + t.Errorf("expected %d calls to CloseIdleConns; got: %d", want, have) + } +} + +func TestPerformRequestCloseIdleConnectionsDisabled(t *testing.T) { + var ( + numCallsRoundTripper int64 + numCallsCloseIdleConns int64 + ) + tr := &failingTransport{ + path: "/fail", + fail: func(r *http.Request) (*http.Response, error) { + // Called with every retry + atomic.AddInt64(&numCallsRoundTripper, 1) + return http.DefaultTransport.RoundTrip(r) + }, + closeIdleConns: func() { + // Called when a connection is marked as dead + atomic.AddInt64(&numCallsCloseIdleConns, 1) + }, + } + httpClient := &http.Client{Transport: tr} + + client, err := NewClient( + SetURL("http://127.0.0.1:9201"), + SetHttpClient(httpClient), + SetMaxRetries(5), + SetSniff(false), + SetHealthcheck(false), + SetCloseIdleConnections(false), // <- do NOT call CloseIdleConnections for dead nodes + ) + if err != nil { + t.Fatal(err) + } + + // Make a request, so that the connection is marked as dead. + client.PerformRequest(context.TODO(), PerformRequestOptions{ + Method: "GET", + Path: "/fail", + }) + + if want, have := int64(5), numCallsRoundTripper; want != have { + t.Errorf("expected %d calls to RoundTripper; got: %d", want, have) + } + if want, have := int64(0), numCallsCloseIdleConns; want != have { + t.Errorf("expected %d calls to CloseIdleConns; got: %d", want, have) + } +} + // failingBody will return an error when json.Marshal is called on it. type failingBody struct{} diff --git a/recipes/aws-es-client/.gitignore b/recipes/aws-es-client/.gitignore new file mode 100644 index 000000000..d477975c9 --- /dev/null +++ b/recipes/aws-es-client/.gitignore @@ -0,0 +1 @@ +/aws-es-client diff --git a/recipes/aws-es-client/go.mod b/recipes/aws-es-client/go.mod new file mode 100644 index 000000000..54be6ac82 --- /dev/null +++ b/recipes/aws-es-client/go.mod @@ -0,0 +1,11 @@ +module github.com/olivere/elastic/recipes/aws-es-client + +go 1.16 + +require ( + github.com/aws/aws-sdk-go v1.39.2 + github.com/olivere/elastic/v7 v7.0.26 + github.com/olivere/env v1.1.0 +) + +replace github.com/olivere/elastic/v7 => ../.. diff --git a/recipes/aws-es-client/go.sum b/recipes/aws-es-client/go.sum new file mode 100644 index 000000000..75b4726de --- /dev/null +++ b/recipes/aws-es-client/go.sum @@ -0,0 +1,118 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/aws/aws-sdk-go v1.38.17/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= +github.com/aws/aws-sdk-go v1.39.2 h1:t+n2j0QfAmGqSQVb1VIGulhSMjfaZ/RqSGlcRKGED9Y= +github.com/aws/aws-sdk-go v1.39.2/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/olivere/env v1.1.0 h1:owp/uwMwhru5668JjMDp8UTG3JGT27GTCk4ufYQfaTw= +github.com/olivere/env v1.1.0/go.mod h1:zaoXy53SjZfxqZBGiGrZCkuVLYPdwrc+vArPuUVhJdQ= +github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= +github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM= +github.com/smartystreets/gunit v1.4.2/go.mod h1:ZjM1ozSIMJlAz/ay4SG8PeKF00ckUp+zMHZXV9/bvak= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/recipes/aws-es-client/main.go b/recipes/aws-es-client/main.go new file mode 100644 index 000000000..624877936 --- /dev/null +++ b/recipes/aws-es-client/main.go @@ -0,0 +1,236 @@ +// Copyright 2012-present Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +// Seamlessly connect to an Elasticsearch Service on AWS. +// +// Example +// +// aws-es-client -domain-name=escluster1 -index=tweets -trace=false +// +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "log" + "net/url" + "os" + "time" + + "github.com/aws/aws-sdk-go/aws/session" + v4 "github.com/aws/aws-sdk-go/aws/signer/v4" + "github.com/aws/aws-sdk-go/service/elasticsearchservice" + + "github.com/olivere/elastic/v7" + elasticawsv4 "github.com/olivere/elastic/v7/aws/v4" +) + +const ( + mapping = ` + { + "settings":{ + "number_of_shards":1, + "number_of_replicas":0 + }, + "mappings":{ + "properties":{ + "user":{ + "type":"keyword" + }, + "message":{ + "type":"text" + }, + "retweets":{ + "type":"integer" + }, + "created":{ + "type":"date" + }, + "attributes":{ + "type":"object" + } + } + } + } + ` +) + +// Tweet is just an example document. +type Tweet struct { + User string `json:"user"` + Message string `json:"message"` + Retweets int `json:"retweets"` + Created time.Time `json:"created"` + Attrs map[string]interface{} `json:"attributes,omitempty"` +} + +func main() { + var ( + domainName = flag.String("domain-name", "", "AWS Elasticsearch Service Domain Name") + index = flag.String("index", "", "Index name") + trace = flag.Bool("trace", false, "Enable trace logging") + ) + flag.Parse() + log.SetFlags(log.LstdFlags | log.Lshortfile) + + if *domainName == "" { + log.Fatal("please specify an AWS Elasticsearch Service Domain Name with -domain-name") + } + if *index == "" { + log.Fatal("please specify an index name with -index") + } + + client, err := ConnectToAWS(context.Background(), *domainName, *trace) + if err != nil { + log.Fatal(err) + } + + // Just a status message + fmt.Println("Connection succeeded") + + // Check if index already exists. We'll drop it then. + // Next, we create a fresh index/mapping. + ctx := context.Background() + exists, err := client.IndexExists(*index).Pretty(true).Do(ctx) + if err != nil { + log.Fatal(err) + } + if exists { + _, err := client.DeleteIndex(*index).Pretty(true).Do(ctx) + if err != nil { + log.Fatal(err) + } + } + _, err = client.CreateIndex(*index).Body(mapping).Pretty(true).Do(ctx) + if err != nil { + log.Fatal(err) + } + + // Add a tweet + { + tweet := Tweet{ + User: "olivere", + Message: "Welcome to Go and Elasticsearch.", + Retweets: 0, + Created: time.Now(), + Attrs: map[string]interface{}{ + "views": 17, + "vip": true, + }, + } + _, err := client.Index(). + Index(*index). + Id("1"). + BodyJson(&tweet). + Refresh("true"). + Pretty(true). + Do(context.TODO()) + if err != nil { + log.Fatal(err) + } + } + + // Read the tweet + { + doc, err := client.Get(). + Index(*index). + Id("1"). + Pretty(true). + Do(context.TODO()) + if err != nil { + log.Fatal(err) + } + var tweet Tweet + if err = json.Unmarshal(doc.Source, &tweet); err != nil { + log.Fatal(err) + } + fmt.Printf("%s at %s: %s (%d retweets)\n", + tweet.User, + tweet.Created, + tweet.Message, + tweet.Retweets, + ) + fmt.Printf(" %v\n", tweet.Attrs) + } +} + +// ConnectToAWS creates an elastic.Client that connects to the ES cluster +// specified by esDomainName. +// +// It creates an AWS session first, by using different approaches, as +// documented by the AWS SDK for Go. Notice that for AWS Elasticsearch, +// we also use the region configured in the session. +// +// Next, it creates an ElasticsearchService instance to access the +// configuration settings of the cluster specified by esDomainName. +// For ConnectToAWS, we only use it to lookup the URL endpoint from the +// configuration. +// +// Finally, we configure all settings to be used with AWS ES. That is: +// * Disable sniffing +// * Disable health checks +// * Close idle connections when a dead node is found +// * Use a HTTP transport that automatically signs HTTP requests +// * (optionally) Trace output to stdout +func ConnectToAWS(ctx context.Context, esDomainName string, trace bool) (*elastic.Client, error) { + // Create a new AWS session + sess, err := session.NewSession() + if err != nil { + return nil, err + } + + // Create a new ElasticsearchService instance to dynamically retrieve + // the AWS ES endpoint URL + svc := elasticsearchservice.New(sess) + + // See https://docs.aws.amazon.com/sdk-for-go/api/service/elasticsearchservice/#ElasticsearchDomainStatus + out, err := svc.DescribeElasticsearchDomain(&elasticsearchservice.DescribeElasticsearchDomainInput{ + DomainName: &esDomainName, + }) + if err != nil { + log.Fatal(err) + } + + // fmt.Printf("%+v\n", out.DomainStatus) + // fmt.Printf("AWS Endpoint: %s\n", *out.DomainStatus.Endpoint) + + // Configure the AWS ES Endpoint URL from the ES Domain settings + awsESEndpoint := &url.URL{ + Host: *out.DomainStatus.Endpoint, // e.g. search-..es.amazonaws.com + } + if *out.DomainStatus.DomainEndpointOptions.EnforceHTTPS { + awsESEndpoint.Scheme = "https" + } else { + awsESEndpoint.Scheme = "http" + } + + // We need to sign HTTP requests with AWS + httpClient := elasticawsv4.NewV4SigningClientWithOptions( + elasticawsv4.WithCredentials(sess.Config.Credentials), + elasticawsv4.WithSigner(v4.NewSigner(sess.Config.Credentials, func(s *v4.Signer) { + s.DisableURIPathEscaping = true + })), + elasticawsv4.WithRegion(*sess.Config.Region), // use the AWS region from the session + ) + options := []elastic.ClientOptionFunc{ + elastic.SetSniff(false), // do not sniff with AWS ES + elastic.SetHealthcheck(false), // do not perform healthchecks with AWS ES + elastic.SetCloseIdleConnections(true), // close idle connections when dead nodes are found + elastic.SetURL(awsESEndpoint.String()), // use the dynamically retrieved endpoint URL + elastic.SetHttpClient(httpClient), // use a HTTP client that does the signing + } + if trace { + // Optional: Trace output + options = append(options, elastic.SetTraceLog(log.New(os.Stdout, "", 0))) + } + + // Create a client configured for using with AWS ES + client, err := elastic.NewClient(options...) + if err != nil { + return nil, err + } + return client, nil +} diff --git a/recipes/aws-mapping-v4/go.mod b/recipes/aws-mapping-v4/go.mod new file mode 100644 index 000000000..c9ab58561 --- /dev/null +++ b/recipes/aws-mapping-v4/go.mod @@ -0,0 +1,11 @@ +module github.com/olivere/elastic/recipes/aws-mapping-v4 + +go 1.16 + +require ( + github.com/aws/aws-sdk-go v1.39.2 + github.com/olivere/elastic/v7 v7.0.26 + github.com/olivere/env v1.1.0 +) + +replace github.com/olivere/elastic/v7 => ../.. diff --git a/recipes/aws-mapping-v4/go.sum b/recipes/aws-mapping-v4/go.sum new file mode 100644 index 000000000..75b4726de --- /dev/null +++ b/recipes/aws-mapping-v4/go.sum @@ -0,0 +1,118 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/aws/aws-sdk-go v1.38.17/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= +github.com/aws/aws-sdk-go v1.39.2 h1:t+n2j0QfAmGqSQVb1VIGulhSMjfaZ/RqSGlcRKGED9Y= +github.com/aws/aws-sdk-go v1.39.2/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/olivere/env v1.1.0 h1:owp/uwMwhru5668JjMDp8UTG3JGT27GTCk4ufYQfaTw= +github.com/olivere/env v1.1.0/go.mod h1:zaoXy53SjZfxqZBGiGrZCkuVLYPdwrc+vArPuUVhJdQ= +github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= +github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9/go.mod h1:SnhjPscd9TpLiy1LpzGSKh3bXCfxxXuqd9xmQJy3slM= +github.com/smartystreets/gunit v1.4.2/go.mod h1:ZjM1ozSIMJlAz/ay4SG8PeKF00ckUp+zMHZXV9/bvak= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/recipes/aws-mapping-v4/main.go b/recipes/aws-mapping-v4/main.go index 494a3600a..74b773365 100644 --- a/recipes/aws-mapping-v4/main.go +++ b/recipes/aws-mapping-v4/main.go @@ -17,14 +17,13 @@ import ( "flag" "fmt" "log" - "os" "time" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/olivere/env" + "github.com/aws/aws-sdk-go/aws/session" + v4 "github.com/aws/aws-sdk-go/aws/signer/v4" "github.com/olivere/elastic/v7" - aws "github.com/olivere/elastic/v7/aws/v4" + elasticawsv4 "github.com/olivere/elastic/v7/aws/v4" ) const ( @@ -35,23 +34,21 @@ const ( "number_of_replicas":0 }, "mappings":{ - "_doc":{ - "properties":{ - "user":{ - "type":"keyword" - }, - "message":{ - "type":"text" - }, - "retweets":{ - "type":"integer" - }, - "created":{ - "type":"date" - }, - "attributes":{ - "type":"object" - } + "properties":{ + "user":{ + "type":"keyword" + }, + "message":{ + "type":"text" + }, + "retweets":{ + "type":"integer" + }, + "created":{ + "type":"date" + }, + "attributes":{ + "type":"object" } } } @@ -70,113 +67,156 @@ type Tweet struct { func main() { var ( - accessKey = flag.String("access-key", env.String("", "AWS_ACCESS_KEY", "AWS_ACCESS_KEY_ID"), "Access Key ID") - secretKey = flag.String("secret-key", env.String("", "AWS_SECRET_KEY", "AWS_SECRET_ACCESS_KEY"), "Secret access key") - url = flag.String("url", "", "Elasticsearch URL") - sniff = flag.Bool("sniff", false, "Enable or disable sniffing") - trace = flag.Bool("trace", false, "Enable or disable tracing") - index = flag.String("index", "", "Index name") - region = flag.String("region", "eu-west-1", "AWS Region name") + url = flag.String("url", "", "AWS ES Endpoint URL") + index = flag.String("index", "", "Elasticsearch index name") + loop = flag.Bool("loop", false, "Run in an endless loop") ) flag.Parse() - log.SetFlags(log.LstdFlags | log.Lshortfile) + log.SetFlags(0) if *url == "" { - log.Fatal("please specify a URL with -url") + log.Fatal("please specify an AWS ES Endpoint URL with -url") } if *index == "" { log.Fatal("please specify an index name with -index") } - if *region == "" { - log.Fatal("please specify an AWS region with -region") - } - // Create an Elasticsearch client - signingClient := aws.NewV4SigningClient(credentials.NewStaticCredentials( - *accessKey, - *secretKey, - "", - ), *region) - - // Create an Elasticsearch client - opts := []elastic.ClientOptionFunc{ - elastic.SetURL(*url), - elastic.SetSniff(*sniff), - elastic.SetHealthcheck(*sniff), - elastic.SetHttpClient(signingClient), - } - if *trace { - opts = append(opts, elastic.SetTraceLog(log.New(os.Stdout, "", 0))) - } - client, err := elastic.NewClient(opts...) + // Create a pre-configured client to connect to AWS by the given endpoint + client, err := ConnectToAWS(context.Background(), *url) if err != nil { log.Fatal(err) } - // Check if index already exists. We'll drop it then. - // Next, we create a fresh index/mapping. - ctx := context.Background() - exists, err := client.IndexExists(*index).Pretty(true).Do(ctx) - if err != nil { - log.Fatal(err) - } - if exists { - _, err := client.DeleteIndex(*index).Pretty(true).Do(ctx) + for { + // Check if index already exists. We'll drop it then. + // Next, we create a fresh index/mapping. + ctx := context.Background() + exists, err := client.IndexExists(*index).Pretty(true).Do(ctx) if err != nil { - log.Fatal(err) + if !*loop { + log.Fatal(err) + } + log.Print(err) + continue } - } - _, err = client.CreateIndex(*index).Body(mapping).Pretty(true).Do(ctx) - if err != nil { - log.Fatal(err) - } - - // Add a tweet - { - tweet := Tweet{ - User: "olivere", - Message: "Welcome to Go and Elasticsearch.", - Retweets: 0, - Created: time.Now(), - Attrs: map[string]interface{}{ - "views": 17, - "vip": true, - }, + if exists { + _, err := client.DeleteIndex(*index).Pretty(true).Do(ctx) + if err != nil { + if !*loop { + log.Fatal(err) + } + log.Print(err) + continue + } } - _, err := client.Index(). - Index(*index). - Type("_doc"). - Id("1"). - BodyJson(&tweet). - Refresh("true"). - Pretty(true). - Do(context.TODO()) + _, err = client.CreateIndex(*index).Body(mapping).Pretty(true).Do(ctx) if err != nil { - log.Fatal(err) + if !*loop { + log.Fatal(err) + } + log.Print(err) + continue } - } - // Read the tweet - { - doc, err := client.Get(). - Index(*index). - Type("_doc"). - Id("1"). - Pretty(true). - Do(context.TODO()) - if err != nil { - log.Fatal(err) + // Add a tweet + { + tweet := Tweet{ + User: "olivere", + Message: "Welcome to Go and Elasticsearch.", + Retweets: 0, + Created: time.Now(), + Attrs: map[string]interface{}{ + "views": 17, + "vip": true, + }, + } + _, err := client.Index(). + Index(*index). + Id("1"). + BodyJson(&tweet). + Refresh("true"). + Pretty(true). + Do(context.TODO()) + if err != nil { + if !*loop { + log.Fatal(err) + } + log.Print(err) + continue + } + } + + // Read the tweet + { + doc, err := client.Get(). + Index(*index). + Id("1"). + Pretty(true). + Do(context.TODO()) + if err != nil { + if !*loop { + log.Fatal(err) + } + log.Print(err) + continue + } + var tweet Tweet + if err = json.Unmarshal(doc.Source, &tweet); err != nil { + if !*loop { + log.Fatal(err) + } + log.Print(err) + } + fmt.Printf("%s at %s: %s (%d retweets)\n", + tweet.User, + tweet.Created, + tweet.Message, + tweet.Retweets, + ) + fmt.Printf(" %v\n", tweet.Attrs) } - var tweet Tweet - if err = json.Unmarshal(doc.Source, &tweet); err != nil { - log.Fatal(err) + + if !*loop { + break } - fmt.Printf("%s at %s: %s (%d retweets)\n", - tweet.User, - tweet.Created, - tweet.Message, - tweet.Retweets, - ) - fmt.Printf(" %v\n", tweet.Attrs) } } + +// ConnectToAWS creates an elastic.Client that connects to the ES cluster +// specified by given URL endpoint. +// +// ConnectToAWS ensures we configure all settings to properly use AWS ES with +// this client, e.g.: +// * Disable sniffing +// * Disable health checks +// * Close idle connections when a dead node is found +// * Use a HTTP transport to automatically sign HTTP requests +func ConnectToAWS(ctx context.Context, url string) (*elastic.Client, error) { + sess, err := session.NewSession() + if err != nil { + return nil, err + } + + // We need to sign HTTP requests with AWS + httpClient := elasticawsv4.NewV4SigningClientWithOptions( + elasticawsv4.WithCredentials(sess.Config.Credentials), + elasticawsv4.WithSigner(v4.NewSigner(sess.Config.Credentials, func(s *v4.Signer) { + s.DisableURIPathEscaping = true + })), + elasticawsv4.WithRegion(*sess.Config.Region), // use the AWS region from the session + ) + options := []elastic.ClientOptionFunc{ + elastic.SetSniff(false), // do not sniff with AWS ES + elastic.SetHealthcheck(false), // do not perform healthchecks with AWS ES + elastic.SetCloseIdleConnections(true), // close idle connections when dead nodes are found + elastic.SetURL(url), + elastic.SetHttpClient(httpClient), // use a HTTP client that does the signing + } + + // Create a client configured for using with AWS ES + client, err := elastic.NewClient(options...) + if err != nil { + return nil, err + } + return client, nil +}