Skip to content

Commit ffec1a5

Browse files
authored
Update servers improvements (#448)
* Add Content-Type header to NGINX client PATCH requests * Update to reduce unnecessary API calls and sanitize input Update UpdateHTTPServers and UpdateStreamServers: - No longer make extra GET requests for each PUT and DELETE request. - Removes identical duplicate servers. - Returns errors for duplicate servers with different parameters.
1 parent 15b12a8 commit ffec1a5

File tree

3 files changed

+643
-184
lines changed

3 files changed

+643
-184
lines changed

client/nginx.go

+157-50
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ var (
4848
ErrServerExists = errors.New("server already exists")
4949
ErrNotSupported = errors.New("not supported")
5050
ErrInvalidTimeout = errors.New("invalid timeout")
51+
ErrParameterMismatch = errors.New("encountered duplicate server with different parameters")
5152
ErrPlusVersionNotFound = errors.New("plus version not found in the input string")
5253
)
5354

@@ -775,9 +776,13 @@ func (client *NginxClient) AddHTTPServer(ctx context.Context, upstream string, s
775776
if id != -1 {
776777
return fmt.Errorf("failed to add %v server to %v upstream: %w", server.Server, upstream, ErrServerExists)
777778
}
779+
err = client.addHTTPServer(ctx, upstream, server)
780+
return err
781+
}
778782

783+
func (client *NginxClient) addHTTPServer(ctx context.Context, upstream string, server UpstreamServer) error {
779784
path := fmt.Sprintf("http/upstreams/%v/servers/", upstream)
780-
err = client.post(ctx, path, &server)
785+
err := client.post(ctx, path, &server)
781786
if err != nil {
782787
return fmt.Errorf("failed to add %v server to %v upstream: %w", server.Server, upstream, err)
783788
}
@@ -794,9 +799,13 @@ func (client *NginxClient) DeleteHTTPServer(ctx context.Context, upstream string
794799
if id == -1 {
795800
return fmt.Errorf("failed to remove %v server from %v upstream: %w", server, upstream, ErrServerNotFound)
796801
}
802+
err = client.deleteHTTPServer(ctx, upstream, server, id)
803+
return err
804+
}
797805

798-
path := fmt.Sprintf("http/upstreams/%v/servers/%v", upstream, id)
799-
err = client.delete(ctx, path, http.StatusOK)
806+
func (client *NginxClient) deleteHTTPServer(ctx context.Context, upstream, server string, serverID int) error {
807+
path := fmt.Sprintf("http/upstreams/%v/servers/%v", upstream, serverID)
808+
err := client.delete(ctx, path, http.StatusOK)
800809
if err != nil {
801810
return fmt.Errorf("failed to remove %v server from %v upstream: %w", server, upstream, err)
802811
}
@@ -809,6 +818,8 @@ func (client *NginxClient) DeleteHTTPServer(ctx context.Context, upstream string
809818
// Servers that aren't in the slice, but exist in NGINX, will be removed from NGINX.
810819
// Servers that are in the slice and exist in NGINX, but have different parameters, will be updated.
811820
// The client will attempt to update all servers, returning all the errors that occurred.
821+
// If there are duplicate servers with equivalent parameters, the duplicates will be ignored.
822+
// If there are duplicate servers with different parameters, those server entries will be ignored and an error returned.
812823
func (client *NginxClient) UpdateHTTPServers(ctx context.Context, upstream string, servers []UpstreamServer) (added []UpstreamServer, deleted []UpstreamServer, updated []UpstreamServer, err error) {
813824
serversInNginx, err := client.GetHTTPServers(ctx, upstream)
814825
if err != nil {
@@ -822,10 +833,12 @@ func (client *NginxClient) UpdateHTTPServers(ctx context.Context, upstream strin
822833
formattedServers = append(formattedServers, server)
823834
}
824835

836+
formattedServers, err = deduplicateServers(upstream, formattedServers)
837+
825838
toAdd, toDelete, toUpdate := determineUpdates(formattedServers, serversInNginx)
826839

827840
for _, server := range toAdd {
828-
addErr := client.AddHTTPServer(ctx, upstream, server)
841+
addErr := client.addHTTPServer(ctx, upstream, server)
829842
if addErr != nil {
830843
err = errors.Join(err, addErr)
831844
continue
@@ -834,7 +847,7 @@ func (client *NginxClient) UpdateHTTPServers(ctx context.Context, upstream strin
834847
}
835848

836849
for _, server := range toDelete {
837-
deleteErr := client.DeleteHTTPServer(ctx, upstream, server.Server)
850+
deleteErr := client.deleteHTTPServer(ctx, upstream, server.Server, server.ID)
838851
if deleteErr != nil {
839852
err = errors.Join(err, deleteErr)
840853
continue
@@ -858,46 +871,82 @@ func (client *NginxClient) UpdateHTTPServers(ctx context.Context, upstream strin
858871
return added, deleted, updated, err
859872
}
860873

861-
// haveSameParameters checks if a given server has the same parameters as a server already present in NGINX. Order matters.
862-
func haveSameParameters(newServer UpstreamServer, serverNGX UpstreamServer) bool {
863-
newServer.ID = serverNGX.ID
874+
func deduplicateServers(upstream string, servers []UpstreamServer) ([]UpstreamServer, error) {
875+
type serverCheck struct {
876+
server UpstreamServer
877+
valid bool
878+
}
864879

865-
if serverNGX.MaxConns != nil && newServer.MaxConns == nil {
866-
newServer.MaxConns = &defaultMaxConns
880+
serverMap := make(map[string]*serverCheck, len(servers))
881+
var err error
882+
for _, server := range servers {
883+
if prev, ok := serverMap[server.Server]; ok {
884+
if !prev.valid {
885+
continue
886+
}
887+
if !server.hasSameParametersAs(prev.server) {
888+
prev.valid = false
889+
err = errors.Join(err, fmt.Errorf(
890+
"failed to update %s server to %s upstream: %w",
891+
server.Server, upstream, ErrParameterMismatch))
892+
}
893+
continue
894+
}
895+
serverMap[server.Server] = &serverCheck{server, true}
867896
}
897+
retServers := make([]UpstreamServer, 0, len(serverMap))
898+
for _, server := range servers {
899+
if check, ok := serverMap[server.Server]; ok && check.valid {
900+
retServers = append(retServers, server)
901+
delete(serverMap, server.Server)
902+
}
903+
}
904+
return retServers, err
905+
}
868906

869-
if serverNGX.MaxFails != nil && newServer.MaxFails == nil {
870-
newServer.MaxFails = &defaultMaxFails
907+
// hasSameParametersAs checks if a given server has the same parameters.
908+
func (s UpstreamServer) hasSameParametersAs(compareServer UpstreamServer) bool {
909+
s.ID = compareServer.ID
910+
s.applyDefaults()
911+
compareServer.applyDefaults()
912+
return reflect.DeepEqual(s, compareServer)
913+
}
914+
915+
func (s *UpstreamServer) applyDefaults() {
916+
if s.MaxConns == nil {
917+
s.MaxConns = &defaultMaxConns
871918
}
872919

873-
if serverNGX.FailTimeout != "" && newServer.FailTimeout == "" {
874-
newServer.FailTimeout = defaultFailTimeout
920+
if s.MaxFails == nil {
921+
s.MaxFails = &defaultMaxFails
875922
}
876923

877-
if serverNGX.SlowStart != "" && newServer.SlowStart == "" {
878-
newServer.SlowStart = defaultSlowStart
924+
if s.FailTimeout == "" {
925+
s.FailTimeout = defaultFailTimeout
879926
}
880927

881-
if serverNGX.Backup != nil && newServer.Backup == nil {
882-
newServer.Backup = &defaultBackup
928+
if s.SlowStart == "" {
929+
s.SlowStart = defaultSlowStart
883930
}
884931

885-
if serverNGX.Down != nil && newServer.Down == nil {
886-
newServer.Down = &defaultDown
932+
if s.Backup == nil {
933+
s.Backup = &defaultBackup
887934
}
888935

889-
if serverNGX.Weight != nil && newServer.Weight == nil {
890-
newServer.Weight = &defaultWeight
936+
if s.Down == nil {
937+
s.Down = &defaultDown
891938
}
892939

893-
return reflect.DeepEqual(newServer, serverNGX)
940+
if s.Weight == nil {
941+
s.Weight = &defaultWeight
942+
}
894943
}
895944

896945
func determineUpdates(updatedServers []UpstreamServer, nginxServers []UpstreamServer) (toAdd []UpstreamServer, toRemove []UpstreamServer, toUpdate []UpstreamServer) {
897946
for _, server := range updatedServers {
898947
updateFound := false
899948
for _, serverNGX := range nginxServers {
900-
if server.Server == serverNGX.Server && !haveSameParameters(server, serverNGX) {
949+
if server.Server == serverNGX.Server && !server.hasSameParametersAs(serverNGX) {
901950
server.ID = serverNGX.ID
902951
updateFound = true
903952
break
@@ -1046,6 +1095,7 @@ func (client *NginxClient) patch(ctx context.Context, path string, input interfa
10461095
if err != nil {
10471096
return fmt.Errorf("failed to create a patch request: %w", err)
10481097
}
1098+
req.Header.Set("Content-Type", "application/json")
10491099

10501100
resp, err := client.httpClient.Do(req)
10511101
if err != nil {
@@ -1088,9 +1138,13 @@ func (client *NginxClient) AddStreamServer(ctx context.Context, upstream string,
10881138
if id != -1 {
10891139
return fmt.Errorf("failed to add %v stream server to %v upstream: %w", server.Server, upstream, ErrServerExists)
10901140
}
1141+
err = client.addStreamServer(ctx, upstream, server)
1142+
return err
1143+
}
10911144

1145+
func (client *NginxClient) addStreamServer(ctx context.Context, upstream string, server StreamUpstreamServer) error {
10921146
path := fmt.Sprintf("stream/upstreams/%v/servers/", upstream)
1093-
err = client.post(ctx, path, &server)
1147+
err := client.post(ctx, path, &server)
10941148
if err != nil {
10951149
return fmt.Errorf("failed to add %v stream server to %v upstream: %w", server.Server, upstream, err)
10961150
}
@@ -1106,9 +1160,13 @@ func (client *NginxClient) DeleteStreamServer(ctx context.Context, upstream stri
11061160
if id == -1 {
11071161
return fmt.Errorf("failed to remove %v stream server from %v upstream: %w", server, upstream, ErrServerNotFound)
11081162
}
1163+
err = client.deleteStreamServer(ctx, upstream, server, id)
1164+
return err
1165+
}
11091166

1110-
path := fmt.Sprintf("stream/upstreams/%v/servers/%v", upstream, id)
1111-
err = client.delete(ctx, path, http.StatusOK)
1167+
func (client *NginxClient) deleteStreamServer(ctx context.Context, upstream, server string, serverID int) error {
1168+
path := fmt.Sprintf("stream/upstreams/%v/servers/%v", upstream, serverID)
1169+
err := client.delete(ctx, path, http.StatusOK)
11121170
if err != nil {
11131171
return fmt.Errorf("failed to remove %v stream server from %v upstream: %w", server, upstream, err)
11141172
}
@@ -1120,6 +1178,8 @@ func (client *NginxClient) DeleteStreamServer(ctx context.Context, upstream stri
11201178
// Servers that aren't in the slice, but exist in NGINX, will be removed from NGINX.
11211179
// Servers that are in the slice and exist in NGINX, but have different parameters, will be updated.
11221180
// The client will attempt to update all servers, returning all the errors that occurred.
1181+
// If there are duplicate servers with equivalent parameters, the duplicates will be ignored.
1182+
// If there are duplicate servers with different parameters, those server entries will be ignored and an error returned.
11231183
func (client *NginxClient) UpdateStreamServers(ctx context.Context, upstream string, servers []StreamUpstreamServer) (added []StreamUpstreamServer, deleted []StreamUpstreamServer, updated []StreamUpstreamServer, err error) {
11241184
serversInNginx, err := client.GetStreamServers(ctx, upstream)
11251185
if err != nil {
@@ -1132,10 +1192,12 @@ func (client *NginxClient) UpdateStreamServers(ctx context.Context, upstream str
11321192
formattedServers = append(formattedServers, server)
11331193
}
11341194

1195+
formattedServers, err = deduplicateStreamServers(upstream, formattedServers)
1196+
11351197
toAdd, toDelete, toUpdate := determineStreamUpdates(formattedServers, serversInNginx)
11361198

11371199
for _, server := range toAdd {
1138-
addErr := client.AddStreamServer(ctx, upstream, server)
1200+
addErr := client.addStreamServer(ctx, upstream, server)
11391201
if addErr != nil {
11401202
err = errors.Join(err, addErr)
11411203
continue
@@ -1144,7 +1206,7 @@ func (client *NginxClient) UpdateStreamServers(ctx context.Context, upstream str
11441206
}
11451207

11461208
for _, server := range toDelete {
1147-
deleteErr := client.DeleteStreamServer(ctx, upstream, server.Server)
1209+
deleteErr := client.deleteStreamServer(ctx, upstream, server.Server, server.ID)
11481210
if deleteErr != nil {
11491211
err = errors.Join(err, deleteErr)
11501212
continue
@@ -1183,45 +1245,82 @@ func (client *NginxClient) getIDOfStreamServer(ctx context.Context, upstream str
11831245
return -1, nil
11841246
}
11851247

1186-
// haveSameParametersForStream checks if a given server has the same parameters as a server already present in NGINX. Order matters.
1187-
func haveSameParametersForStream(newServer StreamUpstreamServer, serverNGX StreamUpstreamServer) bool {
1188-
newServer.ID = serverNGX.ID
1189-
if serverNGX.MaxConns != nil && newServer.MaxConns == nil {
1190-
newServer.MaxConns = &defaultMaxConns
1248+
func deduplicateStreamServers(upstream string, servers []StreamUpstreamServer) ([]StreamUpstreamServer, error) {
1249+
type serverCheck struct {
1250+
server StreamUpstreamServer
1251+
valid bool
11911252
}
11921253

1193-
if serverNGX.MaxFails != nil && newServer.MaxFails == nil {
1194-
newServer.MaxFails = &defaultMaxFails
1254+
serverMap := make(map[string]*serverCheck, len(servers))
1255+
var err error
1256+
for _, server := range servers {
1257+
if prev, ok := serverMap[server.Server]; ok {
1258+
if !prev.valid {
1259+
continue
1260+
}
1261+
if !server.hasSameParametersAs(prev.server) {
1262+
prev.valid = false
1263+
err = errors.Join(err, fmt.Errorf(
1264+
"failed to update stream %s server to %s upstream: %w",
1265+
server.Server, upstream, ErrParameterMismatch))
1266+
}
1267+
continue
1268+
}
1269+
serverMap[server.Server] = &serverCheck{server, true}
11951270
}
1271+
retServers := make([]StreamUpstreamServer, 0, len(serverMap))
1272+
for _, server := range servers {
1273+
if check, ok := serverMap[server.Server]; ok && check.valid {
1274+
retServers = append(retServers, server)
1275+
delete(serverMap, server.Server)
1276+
}
1277+
}
1278+
return retServers, err
1279+
}
11961280

1197-
if serverNGX.FailTimeout != "" && newServer.FailTimeout == "" {
1198-
newServer.FailTimeout = defaultFailTimeout
1281+
// hasSameParametersAs checks if a given server has the same parameters.
1282+
func (s StreamUpstreamServer) hasSameParametersAs(compareServer StreamUpstreamServer) bool {
1283+
s.ID = compareServer.ID
1284+
s.applyDefaults()
1285+
compareServer.applyDefaults()
1286+
return reflect.DeepEqual(s, compareServer)
1287+
}
1288+
1289+
func (s *StreamUpstreamServer) applyDefaults() {
1290+
if s.MaxConns == nil {
1291+
s.MaxConns = &defaultMaxConns
1292+
}
1293+
1294+
if s.MaxFails == nil {
1295+
s.MaxFails = &defaultMaxFails
11991296
}
12001297

1201-
if serverNGX.SlowStart != "" && newServer.SlowStart == "" {
1202-
newServer.SlowStart = defaultSlowStart
1298+
if s.FailTimeout == "" {
1299+
s.FailTimeout = defaultFailTimeout
12031300
}
12041301

1205-
if serverNGX.Backup != nil && newServer.Backup == nil {
1206-
newServer.Backup = &defaultBackup
1302+
if s.SlowStart == "" {
1303+
s.SlowStart = defaultSlowStart
12071304
}
12081305

1209-
if serverNGX.Down != nil && newServer.Down == nil {
1210-
newServer.Down = &defaultDown
1306+
if s.Backup == nil {
1307+
s.Backup = &defaultBackup
12111308
}
12121309

1213-
if serverNGX.Weight != nil && newServer.Weight == nil {
1214-
newServer.Weight = &defaultWeight
1310+
if s.Down == nil {
1311+
s.Down = &defaultDown
12151312
}
12161313

1217-
return reflect.DeepEqual(newServer, serverNGX)
1314+
if s.Weight == nil {
1315+
s.Weight = &defaultWeight
1316+
}
12181317
}
12191318

12201319
func determineStreamUpdates(updatedServers []StreamUpstreamServer, nginxServers []StreamUpstreamServer) (toAdd []StreamUpstreamServer, toRemove []StreamUpstreamServer, toUpdate []StreamUpstreamServer) {
12211320
for _, server := range updatedServers {
12221321
updateFound := false
12231322
for _, serverNGX := range nginxServers {
1224-
if server.Server == serverNGX.Server && !haveSameParametersForStream(server, serverNGX) {
1323+
if server.Server == serverNGX.Server && !server.hasSameParametersAs(serverNGX) {
12251324
server.ID = serverNGX.ID
12261325
updateFound = true
12271326
break
@@ -1949,9 +2048,13 @@ func (client *NginxClient) deleteKeyValPairs(ctx context.Context, zone string, s
19492048
return nil
19502049
}
19512050

1952-
// UpdateHTTPServer updates the server of the upstream.
2051+
// UpdateHTTPServer updates the server of the upstream with the matching server ID.
19532052
func (client *NginxClient) UpdateHTTPServer(ctx context.Context, upstream string, server UpstreamServer) error {
19542053
path := fmt.Sprintf("http/upstreams/%v/servers/%v", upstream, server.ID)
2054+
// The server ID is expected in the URI, but not expected in the body.
2055+
// The NGINX API will return
2056+
// {"error":{"status":400,"text":"unknown parameter \"id\"","code":"UpstreamConfFormatError"}
2057+
// if the ID field is present.
19552058
server.ID = 0
19562059
err := client.patch(ctx, path, &server, http.StatusOK)
19572060
if err != nil {
@@ -1961,9 +2064,13 @@ func (client *NginxClient) UpdateHTTPServer(ctx context.Context, upstream string
19612064
return nil
19622065
}
19632066

1964-
// UpdateStreamServer updates the stream server of the upstream.
2067+
// UpdateStreamServer updates the stream server of the upstream with the matching server ID.
19652068
func (client *NginxClient) UpdateStreamServer(ctx context.Context, upstream string, server StreamUpstreamServer) error {
19662069
path := fmt.Sprintf("stream/upstreams/%v/servers/%v", upstream, server.ID)
2070+
// The server ID is expected in the URI, but not expected in the body.
2071+
// The NGINX API will return
2072+
// {"error":{"status":400,"text":"unknown parameter \"id\"","code":"UpstreamConfFormatError"}
2073+
// if the ID field is present.
19672074
server.ID = 0
19682075
err := client.patch(ctx, path, &server, http.StatusOK)
19692076
if err != nil {

0 commit comments

Comments
 (0)