Skip to content

Commit d8e7d1a

Browse files
committed
implement the Schema CRD
1 parent 96ac02e commit d8e7d1a

File tree

9 files changed

+628
-29
lines changed

9 files changed

+628
-29
lines changed

operator/src/main/java/it/aboutbits/postgresql/crd/database/DatabaseReconciler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ public class DatabaseReconciler
3131
private final PostgreSQLContextFactory contextFactory;
3232

3333
@Override
34-
public UpdateControl<Database> reconcile(Database resource, Context<Database> context) {
34+
public UpdateControl<Database> reconcile(
35+
Database resource,
36+
Context<Database> context
37+
) {
3538
var spec = resource.getSpec();
3639
var status = initializeStatus(resource);
3740

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package it.aboutbits.postgresql.crd.schema;
2+
3+
import io.fabric8.crd.generator.annotation.AdditionalPrinterColumn;
4+
import io.fabric8.kubernetes.api.model.Namespaced;
5+
import io.fabric8.kubernetes.client.CustomResource;
6+
import io.fabric8.kubernetes.model.annotation.Group;
7+
import io.fabric8.kubernetes.model.annotation.Version;
8+
import it.aboutbits.postgresql.core.CRStatus;
9+
import it.aboutbits.postgresql.core.Named;
10+
import org.jspecify.annotations.NullMarked;
11+
12+
@NullMarked
13+
@Version("v1")
14+
@Group("postgresql.aboutbits.it")
15+
@AdditionalPrinterColumn(
16+
name = "Name",
17+
jsonPath = ".status.name",
18+
type = AdditionalPrinterColumn.Type.STRING
19+
)
20+
@AdditionalPrinterColumn(
21+
name = "Phase",
22+
jsonPath = ".status.phase",
23+
type = AdditionalPrinterColumn.Type.STRING
24+
)
25+
@AdditionalPrinterColumn(
26+
name = "Message",
27+
jsonPath = ".status.message",
28+
type = AdditionalPrinterColumn.Type.STRING
29+
)
30+
@AdditionalPrinterColumn(
31+
name = "Since",
32+
jsonPath = ".status.lastPhaseTransitionTime",
33+
type = AdditionalPrinterColumn.Type.DATE
34+
)
35+
@AdditionalPrinterColumn(
36+
name = "Age",
37+
jsonPath = ".metadata.creationTimestamp",
38+
type = AdditionalPrinterColumn.Type.DATE
39+
)
40+
public class Schema
41+
extends CustomResource<SchemaSpec, CRStatus>
42+
implements Namespaced, Named {
43+
@Override
44+
public String getName() {
45+
return getSpec().getName();
46+
}
47+
}
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
package it.aboutbits.postgresql.crd.schema;
2+
3+
import io.fabric8.kubernetes.client.KubernetesClient;
4+
import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
5+
import io.javaoperatorsdk.operator.api.reconciler.Context;
6+
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
7+
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
8+
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
9+
import it.aboutbits.postgresql.core.BaseReconciler;
10+
import it.aboutbits.postgresql.core.CRPhase;
11+
import it.aboutbits.postgresql.core.CRStatus;
12+
import it.aboutbits.postgresql.core.PostgreSQLContextFactory;
13+
import it.aboutbits.postgresql.core.ReclaimPolicy;
14+
import lombok.RequiredArgsConstructor;
15+
import lombok.extern.slf4j.Slf4j;
16+
import org.jooq.DSLContext;
17+
import org.jspecify.annotations.NullMarked;
18+
19+
import java.util.Objects;
20+
import java.util.concurrent.TimeUnit;
21+
22+
@NullMarked
23+
@Slf4j
24+
@RequiredArgsConstructor
25+
public class SchemaReconciler
26+
extends BaseReconciler<Schema, CRStatus>
27+
implements Reconciler<Schema>, Cleaner<Schema> {
28+
private final SchemaService schemaService;
29+
30+
private final KubernetesClient kubernetesClient;
31+
private final PostgreSQLContextFactory contextFactory;
32+
33+
@Override
34+
public UpdateControl<Schema> reconcile(
35+
Schema resource,
36+
Context<Schema> context
37+
) {
38+
var spec = resource.getSpec();
39+
var status = initializeStatus(resource);
40+
41+
var name = resource.getMetadata().getName();
42+
var namespace = resource.getMetadata().getNamespace();
43+
44+
log.info(
45+
"Reconciling Schema [resource={}/{}, status.phase={}]",
46+
namespace,
47+
name,
48+
status.getPhase()
49+
);
50+
51+
var clusterRef = spec.getClusterRef();
52+
53+
var clusterConnectionOptional = getReferencedClusterConnection(
54+
kubernetesClient,
55+
resource,
56+
clusterRef
57+
);
58+
59+
if (clusterConnectionOptional.isEmpty()) {
60+
status.setPhase(CRPhase.PENDING)
61+
.setMessage("The specified ClusterConnection does not exist or is not ready yet [resource=%s/%s]".formatted(
62+
getResourceNamespaceOrOwn(resource, clusterRef.getNamespace()),
63+
clusterRef.getName()
64+
));
65+
66+
return UpdateControl.patchStatus(resource)
67+
.rescheduleAfter(60, TimeUnit.SECONDS);
68+
}
69+
70+
var clusterConnection = clusterConnectionOptional.get();
71+
72+
UpdateControl<Schema> updateControl;
73+
74+
try (var dsl = contextFactory.getDSLContext(clusterConnection)) {
75+
// Run everything in a single transaction
76+
updateControl = dsl.transactionResult(
77+
cfg -> reconcileInTransaction(
78+
cfg.dsl(),
79+
resource,
80+
status
81+
)
82+
);
83+
} catch (Exception e) {
84+
return handleError(
85+
resource,
86+
status,
87+
e
88+
);
89+
}
90+
91+
return updateControl;
92+
}
93+
94+
@Override
95+
public DeleteControl cleanup(
96+
Schema resource,
97+
Context<Schema> context
98+
) {
99+
var spec = resource.getSpec();
100+
var status = initializeStatus(resource);
101+
102+
var name = resource.getMetadata().getName();
103+
var namespace = resource.getMetadata().getNamespace();
104+
105+
log.info(
106+
"{}ing Schema [resource={}/{}, spec.name={}, status.phase={}]",
107+
spec.getReclaimPolicy().toValue(),
108+
namespace,
109+
name,
110+
spec.getName(),
111+
status.getPhase()
112+
);
113+
114+
if (status.getPhase() != CRPhase.DELETING) {
115+
status.setPhase(CRPhase.DELETING);
116+
117+
if (spec.getReclaimPolicy() == ReclaimPolicy.DELETE) {
118+
status.setMessage("Schema deletion in progress");
119+
}
120+
}
121+
122+
// We do not actually delete the schema if the reclaimPolicy is set to RETAIN, we only delete the CR instance
123+
if (spec.getReclaimPolicy() == ReclaimPolicy.RETAIN) {
124+
return DeleteControl.defaultDelete();
125+
}
126+
127+
var clusterRef = spec.getClusterRef();
128+
129+
var clusterConnectionOptional = getReferencedClusterConnection(
130+
kubernetesClient,
131+
resource,
132+
clusterRef
133+
);
134+
135+
if (clusterConnectionOptional.isEmpty()) {
136+
status.setMessage("The specified ClusterConnection no longer exists or is not ready yet [resource=%s/%s]".formatted(
137+
getResourceNamespaceOrOwn(resource, clusterRef.getNamespace()),
138+
clusterRef.getName()
139+
));
140+
141+
return DeleteControl.noFinalizerRemoval()
142+
.rescheduleAfter(60, TimeUnit.SECONDS);
143+
}
144+
145+
var clusterConnection = clusterConnectionOptional.get();
146+
147+
try (var dsl = contextFactory.getDSLContext(clusterConnection)) {
148+
schemaService.dropSchema(dsl, spec);
149+
150+
return DeleteControl.defaultDelete();
151+
} catch (Exception e) {
152+
log.error(
153+
"Failed to delete Schema [resource={}/{}, spec.name={}, status.phase={}]",
154+
namespace,
155+
name,
156+
spec.getName(),
157+
status.getPhase()
158+
);
159+
160+
status.setMessage("Deletion failed: %s".formatted(e.getMessage()));
161+
162+
return DeleteControl.noFinalizerRemoval()
163+
.rescheduleAfter(60, TimeUnit.SECONDS);
164+
}
165+
}
166+
167+
@Override
168+
protected CRStatus newStatus() {
169+
return new CRStatus();
170+
}
171+
172+
private UpdateControl<Schema> reconcileInTransaction(
173+
DSLContext tx,
174+
Schema resource,
175+
CRStatus status
176+
) {
177+
var name = resource.getMetadata().getName();
178+
var namespace = resource.getMetadata().getNamespace();
179+
180+
var spec = resource.getSpec();
181+
182+
// Create and return the role if it doesn't exist yet
183+
if (!schemaService.schemaExists(tx, spec)) {
184+
log.info(
185+
"Creating Schema [resource={}/{}]",
186+
namespace,
187+
name
188+
);
189+
190+
schemaService.createSchema(
191+
tx,
192+
spec
193+
);
194+
195+
status.setPhase(CRPhase.READY)
196+
.setMessage(null);
197+
198+
return UpdateControl.patchStatus(resource);
199+
}
200+
201+
var currentOwner = schemaService.fetchSchemaOwner(tx, spec);
202+
var expectedOwner = spec.getOwner();
203+
204+
if (Objects.equals(currentOwner, expectedOwner)) {
205+
log.info(
206+
"Schema up-to-date [resource={}/{}]",
207+
namespace,
208+
name
209+
);
210+
211+
return UpdateControl.noUpdate();
212+
}
213+
214+
log.info(
215+
"Changing Schema owner [resource={}/{}]",
216+
namespace,
217+
name
218+
);
219+
220+
schemaService.changeSchemaOwner(tx, spec);
221+
222+
status.setPhase(CRPhase.READY)
223+
.setMessage("Schema owner changed [previousOwner=%s, newOwner=%s]".formatted(
224+
currentOwner,
225+
expectedOwner
226+
));
227+
228+
return UpdateControl.patchStatus(resource);
229+
}
230+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package it.aboutbits.postgresql.crd.schema;
2+
3+
import it.aboutbits.postgresql.core.infrastructure.persistence.Routines;
4+
import jakarta.inject.Singleton;
5+
import org.jooq.DSLContext;
6+
import org.jspecify.annotations.NullMarked;
7+
8+
import static it.aboutbits.postgresql.core.infrastructure.persistence.Tables.PG_NAMESPACE;
9+
import static org.jooq.impl.DSL.query;
10+
import static org.jooq.impl.DSL.quotedName;
11+
import static org.jooq.impl.DSL.role;
12+
import static org.jooq.impl.DSL.selectOne;
13+
14+
@NullMarked
15+
@Singleton
16+
public class SchemaService {
17+
public boolean schemaExists(
18+
DSLContext tx,
19+
SchemaSpec spec
20+
) {
21+
return tx.fetchExists(selectOne()
22+
.from(PG_NAMESPACE)
23+
.where(PG_NAMESPACE.NSPNAME.eq(spec.getName()))
24+
);
25+
}
26+
27+
public void createSchema(
28+
DSLContext dsl,
29+
SchemaSpec spec
30+
) {
31+
var name = quotedName(spec.getName());
32+
33+
dsl.createSchema(name).execute();
34+
35+
if (spec.getOwner() != null) {
36+
changeSchemaOwner(dsl, spec);
37+
}
38+
}
39+
40+
public String fetchSchemaOwner(
41+
DSLContext tx,
42+
SchemaSpec spec
43+
) {
44+
return tx
45+
.select(Routines.pgGetUserbyid(
46+
PG_NAMESPACE.NSPOWNER
47+
))
48+
.from(PG_NAMESPACE)
49+
.where(PG_NAMESPACE.NSPNAME.eq(spec.getName()))
50+
.fetchSingleInto(String.class);
51+
}
52+
53+
public void changeSchemaOwner(
54+
DSLContext tx,
55+
SchemaSpec spec
56+
) {
57+
var name = quotedName(spec.getName());
58+
59+
tx.execute(query(
60+
"alter schema {0} owner to {1}",
61+
name,
62+
role(spec.getOwner())
63+
));
64+
}
65+
66+
public void dropSchema(
67+
DSLContext dsl,
68+
SchemaSpec spec
69+
) {
70+
dsl.dropSchemaIfExists(
71+
quotedName(spec.getName())
72+
).execute();
73+
}
74+
}

0 commit comments

Comments
 (0)