-
Notifications
You must be signed in to change notification settings - Fork 608
/
Copy pathpatroni.go
616 lines (535 loc) · 23.7 KB
/
patroni.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
/*
Copyright 2021 - 2022 Crunchy Data Solutions, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package postgrescluster
import (
"context"
"fmt"
"io"
"time"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"github.com/crunchydata/postgres-operator/internal/initialize"
"github.com/crunchydata/postgres-operator/internal/logging"
"github.com/crunchydata/postgres-operator/internal/naming"
"github.com/crunchydata/postgres-operator/internal/patroni"
"github.com/crunchydata/postgres-operator/internal/pki"
"github.com/crunchydata/postgres-operator/internal/postgres"
"github.com/crunchydata/postgres-operator/pkg/apis/postgres-operator.crunchydata.com/v1beta1"
)
// +kubebuilder:rbac:groups="",resources=endpoints,verbs=deletecollection
func (r *Reconciler) deletePatroniArtifacts(
ctx context.Context, cluster *v1beta1.PostgresCluster,
) error {
// TODO(cbandy): This could also be accomplished by adopting the Endpoints
// as Patroni creates them. Would their events cause too many reconciles?
// Foreground deletion may force us to adopt and set finalizers anyway.
selector, err := naming.AsSelector(naming.ClusterPatronis(cluster))
if err == nil {
err = errors.WithStack(
r.Client.DeleteAllOf(ctx, &corev1.Endpoints{},
client.InNamespace(cluster.Namespace),
client.MatchingLabelsSelector{Selector: selector},
))
}
return err
}
func (r *Reconciler) handlePatroniRestarts(
ctx context.Context, cluster *v1beta1.PostgresCluster, instances *observedInstances,
) error {
const container = naming.ContainerDatabase
var primaryNeedsRestart, replicaNeedsRestart *Instance
// Look for one primary and one replica that need to restart. Ignore
// containers that are terminating or not running; Kubernetes will start
// them again, and calls to their Patroni API will likely be interrupted anyway.
for _, instance := range instances.forCluster {
if len(instance.Pods) > 0 && patroni.PodRequiresRestart(instance.Pods[0]) {
if terminating, known := instance.IsTerminating(); terminating || !known {
continue
}
if running, known := instance.IsRunning(container); !running || !known {
continue
}
if primary, _ := instance.IsPrimary(); primary {
primaryNeedsRestart = instance
} else {
replicaNeedsRestart = instance
}
if primaryNeedsRestart != nil && replicaNeedsRestart != nil {
break
}
}
}
// When the primary instance needs to restart, restart it and return early.
// Some PostgreSQL settings must be changed on the primary before any
// progress can be made on the replicas, e.g. decreasing "max_connections".
// Another reconcile will trigger when an instance with pending restarts
// updates its status in DCS. See [Reconciler.watchPods].
//
// NOTE: In Patroni v2.1.1, regardless of the PostgreSQL parameter, the
// primary indicates it needs to restart one "loop_wait" *after* the
// replicas indicate it. So, even though we consider the primary ahead of
// replicas here, replicas will typically restart first because we see them
// first.
if primaryNeedsRestart != nil {
exec := patroni.Executor(func(
ctx context.Context, stdin io.Reader, stdout, stderr io.Writer, command ...string,
) error {
pod := primaryNeedsRestart.Pods[0]
return r.PodExec(pod.Namespace, pod.Name, container, stdin, stdout, stderr, command...)
})
return errors.WithStack(exec.RestartPendingMembers(ctx, "master", naming.PatroniScope(cluster)))
}
// When the primary does not need to restart but a replica does, restart all
// replicas that still need it.
//
// NOTE: This does not always clear the "needs restart" indicator on a replica.
// Patroni sets that when a parameter must be increased to match the minimum
// required of data on disk. When that happens, restarts occur (i.e. downtime)
// but the affected parameter cannot change until the replica has replayed
// the new minimum from the primary, e.g. decreasing "max_connections".
// - https://github.com/zalando/patroni/blob/v2.1.1/patroni/postgresql/config.py#L1069
//
// TODO(cbandy): The above could interact badly with delayed replication.
// When we offer per-instance PostgreSQL configuration, we may need to revisit
// how we decide when to restart.
// - https://www.postgresql.org/docs/current/runtime-config-replication.html
if replicaNeedsRestart != nil {
exec := patroni.Executor(func(
ctx context.Context, stdin io.Reader, stdout, stderr io.Writer, command ...string,
) error {
pod := replicaNeedsRestart.Pods[0]
return r.PodExec(pod.Namespace, pod.Name, container, stdin, stdout, stderr, command...)
})
return errors.WithStack(exec.RestartPendingMembers(ctx, "replica", naming.PatroniScope(cluster)))
}
// Nothing needs to restart.
return nil
}
// +kubebuilder:rbac:groups="",resources=services,verbs=create;patch
// reconcilePatroniDistributedConfiguration sets labels and ownership on the
// objects Patroni creates for its distributed configuration.
func (r *Reconciler) reconcilePatroniDistributedConfiguration(
ctx context.Context, cluster *v1beta1.PostgresCluster,
) error {
// When using Endpoints for DCS, Patroni needs a Service to ensure that the
// Endpoints object is not removed by Kubernetes at startup. Patroni will
// create this object if it has permission to do so, but it won't set any
// ownership.
// - https://releases.k8s.io/v1.16.0/pkg/controller/endpoint/endpoints_controller.go#L547
// - https://releases.k8s.io/v1.20.0/pkg/controller/endpoint/endpoints_controller.go#L580
// - https://github.com/zalando/patroni/blob/v2.0.1/patroni/dcs/kubernetes.py#L865-L881
dcsService := &corev1.Service{ObjectMeta: naming.PatroniDistributedConfiguration(cluster)}
dcsService.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Service"))
err := errors.WithStack(r.setControllerReference(cluster, dcsService))
dcsService.Annotations = naming.Merge(
cluster.Spec.Metadata.GetAnnotationsOrNil())
dcsService.Labels = naming.Merge(
cluster.Spec.Metadata.GetLabelsOrNil(),
map[string]string{
naming.LabelCluster: cluster.Name,
naming.LabelPatroni: naming.PatroniScope(cluster),
})
// Allocate no IP address (headless) and create no Endpoints.
// - https://docs.k8s.io/concepts/services-networking/service/#headless-services
dcsService.Spec.ClusterIP = corev1.ClusterIPNone
dcsService.Spec.Selector = nil
if err == nil {
err = errors.WithStack(r.apply(ctx, dcsService))
}
// TODO(cbandy): DCS "failover_path"; `failover` and `switchover` create "{scope}-failover" endpoints.
// TODO(cbandy): DCS "sync_path"; `synchronous_mode` uses "{scope}-sync" endpoints.
return err
}
// +kubebuilder:rbac:resources=pods,verbs=get;list
func (r *Reconciler) reconcilePatroniDynamicConfiguration(
ctx context.Context, cluster *v1beta1.PostgresCluster, instances *observedInstances,
pgHBAs postgres.HBAs, pgParameters postgres.Parameters,
) error {
if !patroni.ClusterBootstrapped(cluster) {
// Patroni has not yet bootstrapped. Dynamic configuration happens through
// configuration files during bootstrap, so there's nothing to do here.
return nil
}
var pod *corev1.Pod
for _, instance := range instances.forCluster {
if terminating, known := instance.IsTerminating(); !terminating && known {
running, known := instance.IsRunning(naming.ContainerDatabase)
if running && known && len(instance.Pods) > 0 {
pod = instance.Pods[0]
break
}
}
}
if pod == nil {
// There are no running Patroni containers; nothing to do.
return nil
}
// NOTE(cbandy): Despite the guards above, calling PodExec may still fail
// due to a missing or stopped container.
exec := func(_ context.Context, stdin io.Reader, stdout, stderr io.Writer, command ...string) error {
return r.PodExec(pod.Namespace, pod.Name, naming.ContainerDatabase, stdin, stdout, stderr, command...)
}
var configuration map[string]interface{}
if cluster.Spec.Patroni != nil {
configuration = cluster.Spec.Patroni.DynamicConfiguration
}
configuration = patroni.DynamicConfiguration(cluster, configuration, pgHBAs, pgParameters)
return errors.WithStack(
patroni.Executor(exec).ReplaceConfiguration(ctx, configuration))
}
// generatePatroniLeaderLeaseService returns a v1.Service that exposes the
// Patroni leader when Patroni is using Endpoints for its leader elections.
func (r *Reconciler) generatePatroniLeaderLeaseService(
cluster *v1beta1.PostgresCluster) (*corev1.Service, error,
) {
service := &corev1.Service{ObjectMeta: naming.PatroniLeaderEndpoints(cluster)}
service.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Service"))
service.Annotations = naming.Merge(
cluster.Spec.Metadata.GetAnnotationsOrNil())
service.Labels = naming.Merge(
cluster.Spec.Metadata.GetLabelsOrNil())
if spec := cluster.Spec.Service; spec != nil {
service.Annotations = naming.Merge(service.Annotations,
spec.Metadata.GetAnnotationsOrNil())
service.Labels = naming.Merge(service.Labels,
spec.Metadata.GetLabelsOrNil())
}
// add our labels last so they aren't overwritten
service.Labels = naming.Merge(service.Labels,
map[string]string{
naming.LabelCluster: cluster.Name,
naming.LabelPatroni: naming.PatroniScope(cluster),
})
// Allocate an IP address and/or node port and let Patroni manage the Endpoints.
// Patroni will ensure that they always route to the elected leader.
// - https://docs.k8s.io/concepts/services-networking/service/#services-without-selectors
service.Spec.Selector = nil
// The TargetPort must be the name (not the number) of the PostgreSQL
// ContainerPort. This name allows the port number to differ between
// instances, which can happen during a rolling update.
servicePort := corev1.ServicePort{
Name: naming.PortPostgreSQL,
Port: *cluster.Spec.Port,
Protocol: corev1.ProtocolTCP,
TargetPort: intstr.FromString(naming.PortPostgreSQL),
AppProtocol: initialize.String(postgres.IANAServiceName),
}
if spec := cluster.Spec.Service; spec == nil {
service.Spec.Type = corev1.ServiceTypeClusterIP
} else {
service.Spec.Type = corev1.ServiceType(spec.Type)
if spec.NodePort != nil {
if service.Spec.Type == corev1.ServiceTypeClusterIP {
// The NodePort can only be set when the Service type is NodePort or
// LoadBalancer. However, due to a known issue prior to Kubernetes
// 1.20, we clear these errors during our apply. To preserve the
// appropriate behavior, we log an Event and return an error.
// TODO(tjmoore4): Once Validation Rules are available, this check
// and event could potentially be removed in favor of that validation
r.Recorder.Eventf(cluster, corev1.EventTypeWarning, "MisconfiguredClusterIP",
"NodePort cannot be set with type ClusterIP on Service %q", service.Name)
return nil, fmt.Errorf("NodePort cannot be set with type ClusterIP on Service %q", service.Name)
}
servicePort.NodePort = *spec.NodePort
}
}
service.Spec.Ports = []corev1.ServicePort{servicePort}
err := errors.WithStack(r.setControllerReference(cluster, service))
return service, err
}
// +kubebuilder:rbac:groups="",resources="services",verbs={create,patch}
// reconcilePatroniLeaderLease sets labels and ownership on the objects Patroni
// creates for its leader elections. When Patroni is using Endpoints for this,
// the returned Service resolves to the elected leader. Otherwise, it is nil.
func (r *Reconciler) reconcilePatroniLeaderLease(
ctx context.Context, cluster *v1beta1.PostgresCluster,
) (*corev1.Service, error) {
// When using Endpoints for DCS, Patroni needs a Service to ensure that the
// Endpoints object is not removed by Kubernetes at startup.
// - https://releases.k8s.io/v1.16.0/pkg/controller/endpoint/endpoints_controller.go#L547
// - https://releases.k8s.io/v1.20.0/pkg/controller/endpoint/endpoints_controller.go#L580
service, err := r.generatePatroniLeaderLeaseService(cluster)
if err == nil {
err = errors.WithStack(r.apply(ctx, service))
}
return service, err
}
// +kubebuilder:rbac:groups="",resources=endpoints,verbs=get
// reconcilePatroniStatus populates cluster.Status.Patroni with observations.
func (r *Reconciler) reconcilePatroniStatus(
ctx context.Context, cluster *v1beta1.PostgresCluster,
observedInstances *observedInstances,
) (reconcile.Result, error) {
result := reconcile.Result{}
log := logging.FromContext(ctx)
var readyInstance bool
for _, instance := range observedInstances.forCluster {
if r, _ := instance.IsReady(); r {
readyInstance = true
}
}
dcs := &corev1.Endpoints{ObjectMeta: naming.PatroniDistributedConfiguration(cluster)}
err := errors.WithStack(client.IgnoreNotFound(
r.Client.Get(ctx, client.ObjectKeyFromObject(dcs), dcs)))
if err == nil {
if dcs.Annotations["initialize"] != "" {
// After bootstrap, Patroni writes the cluster system identifier to DCS.
cluster.Status.Patroni.SystemIdentifier = dcs.Annotations["initialize"]
} else if readyInstance {
// While we typically expect a value for the initialize key to be present in the
// Endpoints above by the time the StatefulSet for any instance indicates "ready"
// (since Patroni writes this value after successful cluster bootstrap, at which time
// the initial primary should transition to "ready"), sometimes this is not the case
// and the "initialize" key is not yet present. Therefore, if a "ready" instance
// is detected in the cluster we assume this is the case, and simply log a message and
// requeue in order to try again until the expected value is found.
log.Info("detected ready instance but no initialize value")
result.RequeueAfter = 1 * time.Second
return result, nil
}
}
return result, err
}
// reconcileReplicationSecret creates a secret containing the TLS
// certificate, key and CA certificate for use with the replication and
// pg_rewind accounts in Postgres.
// TODO: As part of future work we will use this secret to setup a superuser
// account and enable cert authentication for that user
func (r *Reconciler) reconcileReplicationSecret(
ctx context.Context, cluster *v1beta1.PostgresCluster,
root *pki.RootCertificateAuthority,
) (*corev1.Secret, error) {
// if a custom postgrescluster secret is provided, just return it
if cluster.Spec.CustomReplicationClientTLSSecret != nil {
custom := &corev1.Secret{ObjectMeta: metav1.ObjectMeta{
Name: cluster.Spec.CustomReplicationClientTLSSecret.Name,
Namespace: cluster.Namespace,
}}
err := errors.WithStack(r.Client.Get(ctx,
client.ObjectKeyFromObject(custom), custom))
return custom, err
}
existing := &corev1.Secret{ObjectMeta: naming.ReplicationClientCertSecret(cluster)}
err := errors.WithStack(client.IgnoreNotFound(
r.Client.Get(ctx, client.ObjectKeyFromObject(existing), existing)))
leaf := &pki.LeafCertificate{}
commonName := postgres.ReplicationUser
dnsNames := []string{commonName}
if err == nil {
// Unmarshal and validate the stored leaf. These first errors can
// be ignored because they result in an invalid leaf which is then
// correctly regenerated.
_ = leaf.Certificate.UnmarshalText(existing.Data[naming.ReplicationCert])
_ = leaf.PrivateKey.UnmarshalText(existing.Data[naming.ReplicationPrivateKey])
leaf, err = root.RegenerateLeafWhenNecessary(leaf, commonName, dnsNames)
err = errors.WithStack(err)
}
intent := &corev1.Secret{ObjectMeta: naming.ReplicationClientCertSecret(cluster)}
intent.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Secret"))
intent.Data = make(map[string][]byte)
// set labels and annotations
intent.Annotations = naming.Merge(
cluster.Spec.Metadata.GetAnnotationsOrNil())
intent.Labels = naming.Merge(
cluster.Spec.Metadata.GetLabelsOrNil(),
map[string]string{
naming.LabelCluster: cluster.Name,
naming.LabelClusterCertificate: "replication-client-tls",
})
if err := errors.WithStack(r.setControllerReference(cluster, intent)); err != nil {
return nil, err
}
if err == nil {
intent.Data[naming.ReplicationCert], err = leaf.Certificate.MarshalText()
err = errors.WithStack(err)
}
if err == nil {
intent.Data[naming.ReplicationPrivateKey], err = leaf.PrivateKey.MarshalText()
err = errors.WithStack(err)
}
if err == nil {
intent.Data[naming.ReplicationCACert], err = root.Certificate.MarshalText()
err = errors.WithStack(err)
}
if err == nil {
err = errors.WithStack(r.apply(ctx, intent))
}
return intent, err
}
// replicationCertSecretProjection returns a secret projection of the postgrescluster's
// client certificate and key to include in the instance configuration volume.
func replicationCertSecretProjection(certificate *corev1.Secret) *corev1.SecretProjection {
return &corev1.SecretProjection{
LocalObjectReference: corev1.LocalObjectReference{
Name: certificate.Name,
},
Items: []corev1.KeyToPath{
{
Key: naming.ReplicationCert,
Path: naming.ReplicationCertPath,
},
{
Key: naming.ReplicationPrivateKey,
Path: naming.ReplicationPrivateKeyPath,
},
{
Key: naming.ReplicationCACert,
Path: naming.ReplicationCACertPath,
},
},
}
}
func (r *Reconciler) reconcilePatroniSwitchover(ctx context.Context,
cluster *v1beta1.PostgresCluster, instances *observedInstances) error {
log := logging.FromContext(ctx)
// If switchover is not enabled, clear out the Patroni switchover status fields
// which might have been set by previous switchovers.
// This also gives the user a way to easily recover and try again: if the operator
// runs into a problem with a switchover, turning `cluster.Spec.Patroni.Switchover`
// to `false` will clear the fields before another attempt
if cluster.Spec.Patroni == nil ||
cluster.Spec.Patroni.Switchover == nil ||
!cluster.Spec.Patroni.Switchover.Enabled {
cluster.Status.Patroni.Switchover = nil
cluster.Status.Patroni.SwitchoverTimeline = nil
return nil
}
annotation := cluster.GetAnnotations()[naming.PatroniSwitchover]
spec := cluster.Spec.Patroni.Switchover
status := cluster.Status.Patroni.Switchover
// If the status has been updated with the trigger annotation, the requested
// switchover has been successful, and the `SwitchoverTimeline` field can be cleared
if annotation == "" || (status != nil && *status == annotation) {
cluster.Status.Patroni.SwitchoverTimeline = nil
return nil
}
// If we've reached this point, we assume a switchover request or in progress
// and need to make sure the prerequisites are met, e.g., more than one pod,
// a running instance to issue the switchover command to, etc.
if len(instances.forCluster) <= 1 {
// TODO: event
// TODO: Possible webhook validation
return errors.New("Need more than one instance to switchover")
}
// TODO: Add webhook validation that requires a targetInstance when requesting failover
if spec.Type == v1beta1.PatroniSwitchoverTypeFailover {
if spec.TargetInstance == nil || *spec.TargetInstance == "" {
// TODO: event
return errors.New("TargetInstance required when running failover")
}
}
// Determine if user is specifying a target instance. Validate the
// provided instance has been observed in the cluster.
var targetInstance *Instance
if spec.TargetInstance != nil && *spec.TargetInstance != "" {
for _, instance := range instances.forCluster {
if *spec.TargetInstance == instance.Name {
targetInstance = instance
}
}
if targetInstance == nil {
// TODO: event
return errors.New("TargetInstance was specified but not found in the cluster")
}
if len(targetInstance.Pods) != 1 {
// We expect that a target instance should have one associated pod.
return errors.Errorf(
"TargetInstance should have one pod. Pods (%d)", len(targetInstance.Pods))
}
} else {
log.V(1).Info("TargetInstance not provided")
}
// Find a running Pod that can be used to define a PodExec function.
var runningPod *corev1.Pod
for _, instance := range instances.forCluster {
if running, known := instance.IsRunning(naming.ContainerDatabase); running &&
known && len(instance.Pods) == 1 {
runningPod = instance.Pods[0]
break
}
}
if runningPod == nil {
return errors.New("Could not find a running pod when attempting switchover.")
}
exec := func(_ context.Context, stdin io.Reader, stdout, stderr io.Writer,
command ...string) error {
return r.PodExec(runningPod.Namespace, runningPod.Name, naming.ContainerDatabase, stdin,
stdout, stderr, command...)
}
// To ensure idempotency, the operator verifies that the timeline reported by Patroni
// matches the timeline that was present when the switchover was first requested.
// TODO(benjb): consider pulling the timeline from the pod annotation; manual experiments
// have shown that the annotation on the Leader pod is up to date during a switchover, but
// missing from the Replica pods.
timeline, err := patroni.Executor(exec).GetTimeline(ctx)
if err != nil {
return err
}
if timeline == 0 {
return errors.New("error getting and parsing current timeline")
}
statusTimeline := cluster.Status.Patroni.SwitchoverTimeline
// If the `SwitchoverTimeline` field is empty, this is the first reconcile after
// a switchover has been requested and we need to fill in the field with the current TL
// as reported by Patroni.
// We return from here without calling for an explicit requeue, but since we're updating
// the object, we will reconcile this again for the actual switchover/failover action.
if statusTimeline == nil || (statusTimeline != nil && *statusTimeline == 0) {
log.V(1).Info("Setting SwitchoverTimeline", "timeline", timeline)
cluster.Status.Patroni.SwitchoverTimeline = &timeline
return nil
}
// If the `SwitchoverTimeline` field does not match the current timeline as reported by Patroni,
// then we assume a switchover has been completed, and we have reached this point because the
// cache does not yet have the updated `cluster.Status.Patroni.Switchover` field.
if statusTimeline != nil && *statusTimeline != timeline {
log.V(1).Info("SwitchoverTimeline does not match current timeline, assuming already completed switchover")
cluster.Status.Patroni.Switchover = initialize.String(annotation)
cluster.Status.Patroni.SwitchoverTimeline = nil
return nil
}
// We have the pod executor, now we need to figure out which API call to use
// In the default case we will be using SwitchoverAndWait. This API call uses
// a Patronictl switchover to move to the target instance.
action := func(ctx context.Context, exec patroni.Executor, next string) (bool, error) {
success, err := exec.SwitchoverAndWait(ctx, next)
return success, errors.WithStack(err)
}
if spec.Type == v1beta1.PatroniSwitchoverTypeFailover {
// When a failover has been requested we use FailoverAndWait to change the primary.
action = func(ctx context.Context, exec patroni.Executor, next string) (bool, error) {
success, err := exec.FailoverAndWait(ctx, next)
return success, errors.WithStack(err)
}
}
// If target instance has not been provided, we will pass in an empty string to patronictl
nextPrimary := ""
if targetInstance != nil {
nextPrimary = targetInstance.Pods[0].Name
}
success, err := action(ctx, exec, nextPrimary)
if err = errors.WithStack(err); err == nil && !success {
err = errors.New("unable to switchover")
}
// If we've reached this point, a switchover has successfully been triggered
// and we set the status accordingly.
if err == nil {
cluster.Status.Patroni.Switchover = initialize.String(annotation)
cluster.Status.Patroni.SwitchoverTimeline = nil
}
return err
}