@@ -32,14 +32,10 @@ import (
3232 "time"
3333)
3434
35- var bufferPool = sync.Pool {New : func () interface {} {
36- return & bytes.Buffer {}
37- }}
38-
39- type ApmServerTransportStatusType string
40-
4135// Constants for the state of the transport used in
4236// the backoff implementation.
37+ type ApmServerTransportStatusType string
38+
4339const (
4440 Failing ApmServerTransportStatusType = "Failing"
4541 Pending ApmServerTransportStatusType = "Pending"
@@ -48,30 +44,84 @@ const (
4844
4945// A struct to track the state and status of sending
5046// to the APM server. Used in the backoff implementation.
51- type ApmServerTransportStateType struct {
47+ type ApmServerTransport struct {
5248 sync.Mutex
53- Status ApmServerTransportStatusType
54- ReconnectionCount int
55- GracePeriodTimer * time.Timer
49+ bufferPool sync.Pool
50+ config * extensionConfig
51+ AgentDoneSignal chan struct {}
52+ dataChannel chan AgentData
53+ client * http.Client
54+ status ApmServerTransportStatusType
55+ reconnectionCount int
56+ gracePeriodTimer * time.Timer
5657}
5758
58- // The status of transport to the APM server.
59- //
60- // This instance of the ApmServerTransportStateType is public for use in tests.
61- var ApmServerTransportState = ApmServerTransportStateType {
62- Status : Healthy ,
63- ReconnectionCount : - 1 ,
59+ func InitApmServerTransport (config * extensionConfig ) * ApmServerTransport {
60+ var transport ApmServerTransport
61+ transport .bufferPool = sync.Pool {New : func () interface {} {
62+ return & bytes.Buffer {}
63+ }}
64+ transport .dataChannel = make (chan AgentData , 100 )
65+ transport .client = & http.Client {
66+ Timeout : time .Duration (config .DataForwarderTimeoutSeconds ) * time .Second ,
67+ Transport : http .DefaultTransport .(* http.Transport ).Clone (),
68+ }
69+ transport .config = config
70+ transport .status = Healthy
71+ transport .reconnectionCount = - 1
72+ return & transport
73+ }
74+
75+ // StartBackgroundApmDataForwarding Receive agent data as it comes in and post it to the APM server.
76+ // Stop checking for, and sending agent data when the function invocation
77+ // has completed, signaled via a channel.
78+ func (transport * ApmServerTransport ) ForwardApmData (ctx context.Context ) error {
79+ if transport .status == Failing {
80+ return nil
81+ }
82+ for {
83+ select {
84+ case <- ctx .Done ():
85+ Log .Debug ("Invocation context cancelled, not processing any more agent data" )
86+ return nil
87+ case agentData := <- transport .dataChannel :
88+ if err := transport .PostToApmServer (ctx , agentData ); err != nil {
89+ return fmt .Errorf ("error sending to APM server, skipping: %v" , err )
90+ }
91+ }
92+ }
93+ }
94+
95+ // FlushAPMData reads all the apm data in the apm data channel and sends it to the APM server.
96+ func (transport * ApmServerTransport ) FlushAPMData (ctx context.Context ) {
97+ if transport .status == Failing {
98+ Log .Debug ("Flush skipped - Transport failing" )
99+ return
100+ }
101+ Log .Debug ("Flush started - Checking for agent data" )
102+ for {
103+ select {
104+ case agentData := <- transport .dataChannel :
105+ Log .Debug ("Flush in progress - Processing agent data" )
106+ if err := transport .PostToApmServer (ctx , agentData ); err != nil {
107+ Log .Errorf ("Error sending to APM server, skipping: %v" , err )
108+ }
109+ default :
110+ Log .Debug ("Flush ended - No agent data on buffer" )
111+ return
112+ }
113+ }
64114}
65115
66116// PostToApmServer takes a chunk of APM agent data and posts it to the APM server.
67117//
68118// The function compresses the APM agent data, if it's not already compressed.
69119// It sets the APM transport status to failing upon errors, as part of the backoff
70120// strategy.
71- func PostToApmServer ( client * http. Client , agentData AgentData , config * extensionConfig , ctx context.Context ) error {
121+ func ( transport * ApmServerTransport ) PostToApmServer ( ctx context.Context , agentData AgentData ) error {
72122 // todo: can this be a streaming or streaming style call that keeps the
73123 // connection open across invocations?
74- if ! IsTransportStatusHealthyOrPending () {
124+ if transport . status == Failing {
75125 return errors .New ("transport status is unhealthy" )
76126 }
77127
@@ -83,10 +133,10 @@ func PostToApmServer(client *http.Client, agentData AgentData, config *extension
83133 r = bytes .NewReader (agentData .Data )
84134 } else {
85135 encoding = "gzip"
86- buf := bufferPool .Get ().(* bytes.Buffer )
136+ buf := transport . bufferPool .Get ().(* bytes.Buffer )
87137 defer func () {
88138 buf .Reset ()
89- bufferPool .Put (buf )
139+ transport . bufferPool .Put (buf )
90140 }()
91141 gw , err := gzip .NewWriterLevel (buf , gzip .BestSpeed )
92142 if err != nil {
@@ -101,98 +151,90 @@ func PostToApmServer(client *http.Client, agentData AgentData, config *extension
101151 r = buf
102152 }
103153
104- req , err := http .NewRequest ("POST" , config .apmServerUrl + endpointURI , r )
154+ req , err := http .NewRequest ("POST" , transport . config .apmServerUrl + endpointURI , r )
105155 if err != nil {
106156 return fmt .Errorf ("failed to create a new request when posting to APM server: %v" , err )
107157 }
108158 req .Header .Add ("Content-Encoding" , encoding )
109159 req .Header .Add ("Content-Type" , "application/x-ndjson" )
110- if config .apmServerApiKey != "" {
111- req .Header .Add ("Authorization" , "ApiKey " + config .apmServerApiKey )
112- } else if config .apmServerSecretToken != "" {
113- req .Header .Add ("Authorization" , "Bearer " + config .apmServerSecretToken )
160+ if transport . config .apmServerApiKey != "" {
161+ req .Header .Add ("Authorization" , "ApiKey " + transport . config .apmServerApiKey )
162+ } else if transport . config .apmServerSecretToken != "" {
163+ req .Header .Add ("Authorization" , "Bearer " + transport . config .apmServerSecretToken )
114164 }
115165
116- Log .Debug ("Sending data chunk to APM Server " )
117- resp , err := client .Do (req )
166+ Log .Debug ("Sending data chunk to APM server " )
167+ resp , err := transport . client .Do (req )
118168 if err != nil {
119- SetApmServerTransportState (Failing , ctx )
169+ transport . SetApmServerTransportState (ctx , Failing )
120170 return fmt .Errorf ("failed to post to APM server: %v" , err )
121171 }
122172
123173 //Read the response body
124174 defer resp .Body .Close ()
125175 body , err := ioutil .ReadAll (resp .Body )
126176 if err != nil {
127- SetApmServerTransportState (Failing , ctx )
177+ transport . SetApmServerTransportState (ctx , Failing )
128178 return fmt .Errorf ("failed to read the response body after posting to the APM server" )
129179 }
130180
131- SetApmServerTransportState (Healthy , ctx )
181+ transport . SetApmServerTransportState (ctx , Healthy )
132182 Log .Debug ("Transport status set to healthy" )
133183 Log .Debugf ("APM server response body: %v" , string (body ))
134184 Log .Debugf ("APM server response status code: %v" , resp .StatusCode )
135185 return nil
136186}
137187
138- // IsTransportStatusHealthyOrPending returns true if the APM server transport status is
139- // healthy or pending, and false otherwise.
140- //
141- // This function is public for use in tests.
142- func IsTransportStatusHealthyOrPending () bool {
143- return ApmServerTransportState .Status != Failing
144- }
145-
146188// SetApmServerTransportState takes a state of the APM server transport and updates
147189// the current state of the transport. For a change to a failing state, the grace period
148190// is calculated and a go routine is started that waits for that period to complete
149191// before changing the status to "pending". This would allow a subsequent send attempt
150192// to the APM server.
151193//
152194// This function is public for use in tests.
153- func SetApmServerTransportState ( status ApmServerTransportStatusType , ctx context.Context ) {
195+ func ( transport * ApmServerTransport ) SetApmServerTransportState ( ctx context.Context , status ApmServerTransportStatusType ) {
154196 switch status {
155197 case Healthy :
156- ApmServerTransportState .Lock ()
157- ApmServerTransportState . Status = status
158- Log .Debugf ("APM Server Transport status set to %s" , status )
159- ApmServerTransportState . ReconnectionCount = - 1
160- ApmServerTransportState .Unlock ()
198+ transport .Lock ()
199+ transport . status = status
200+ Log .Debugf ("APM server Transport status set to %s" , transport . status )
201+ transport . reconnectionCount = - 1
202+ transport .Unlock ()
161203 case Failing :
162- ApmServerTransportState .Lock ()
163- ApmServerTransportState . Status = status
164- Log .Debugf ("APM Server Transport status set to %s" , status )
165- ApmServerTransportState . ReconnectionCount ++
166- ApmServerTransportState . GracePeriodTimer = time .NewTimer (computeGracePeriod ())
167- Log .Debugf ("Grace period entered, reconnection count : %d" , ApmServerTransportState . ReconnectionCount )
204+ transport .Lock ()
205+ transport . status = status
206+ Log .Debugf ("APM server Transport status set to %s" , transport . status )
207+ transport . reconnectionCount ++
208+ transport . gracePeriodTimer = time .NewTimer (transport . computeGracePeriod ())
209+ Log .Debugf ("Grace period entered, reconnection count : %d" , transport . reconnectionCount )
168210 go func () {
169211 select {
170- case <- ApmServerTransportState . GracePeriodTimer .C :
212+ case <- transport . gracePeriodTimer .C :
171213 Log .Debug ("Grace period over - timer timed out" )
172214 case <- ctx .Done ():
173215 Log .Debug ("Grace period over - context done" )
174216 }
175- ApmServerTransportState . Status = Pending
176- Log .Debugf ("APM Server Transport status set to %s" , status )
177- ApmServerTransportState .Unlock ()
217+ transport . status = Pending
218+ Log .Debugf ("APM server Transport status set to %s" , transport . status )
219+ transport .Unlock ()
178220 }()
179221 default :
180- Log .Errorf ("Cannot set APM Server Transport status to %s" , status )
222+ Log .Errorf ("Cannot set APM server Transport status to %s" , status )
181223 }
182224}
183225
184226// ComputeGracePeriod https://github.com/elastic/apm/blob/main/specs/agents/transport.md#transport-errors
185- func computeGracePeriod () time.Duration {
186- gracePeriodWithoutJitter := math .Pow (math .Min (float64 (ApmServerTransportState . ReconnectionCount ), 6 ), 2 )
227+ func ( transport * ApmServerTransport ) computeGracePeriod () time.Duration {
228+ gracePeriodWithoutJitter := math .Pow (math .Min (float64 (transport . reconnectionCount ), 6 ), 2 )
187229 jitter := rand .Float64 ()/ 5 - 0.1
188230 return time .Duration ((gracePeriodWithoutJitter + jitter * gracePeriodWithoutJitter ) * float64 (time .Second ))
189231}
190232
191233// EnqueueAPMData adds a AgentData struct to the agent data channel, effectively queueing for a send
192234// to the APM server.
193- func EnqueueAPMData ( agentDataChannel chan AgentData , agentData AgentData ) {
235+ func ( transport * ApmServerTransport ) EnqueueAPMData ( agentData AgentData ) {
194236 select {
195- case agentDataChannel <- agentData :
237+ case transport . dataChannel <- agentData :
196238 Log .Debug ("Adding agent data to buffer to be sent to apm server" )
197239 default :
198240 Log .Warn ("Channel full: dropping a subset of agent data" )
0 commit comments