-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathutil.ts
288 lines (242 loc) · 7.72 KB
/
util.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
import {
connect,
credsAuthenticator,
JetStreamSubscription,
ObjectStore,
StreamInfo,
StringCodec,
} from "nats";
import { connect as wsConnect } from "natsws";
import { Database } from "sqlite3";
import { NatsConf, NatsInit, NatsRes } from "./types.ts";
// NATS initialization function
export async function setupNats(conf: NatsInit): Promise<NatsRes> {
const { app, creds, token, url } = conf;
const natsOpts: NatsConf = { servers: url, maxReconnectAttempts: -1 };
if (token) natsOpts.token = token;
if (creds) {
natsOpts.authenticator = credsAuthenticator(Deno.readFileSync(creds));
}
console.log("Connecting to NATS");
const nc = url.startsWith("ws")
? await wsConnect(natsOpts)
: await connect(natsOpts);
console.log("Connected to NATS Server:", nc.getServer());
// Create a jetstream manager
const jsm = await nc.jetstreamManager();
const sc = StringCodec();
// Get the list of streams
const streams = await jsm.streams.list().next();
let stream = streams.find((s: StreamInfo) => s.config.name === app);
// Create stream if it doesn't exist
if (!stream) {
console.log("Creating stream");
stream = await jsm.streams.add({ name: app, subjects: [`${app}.*`] });
// Try to update the stream to 3 replicas
try {
await jsm.streams.update(app, { num_replicas: 3 });
} catch (e) {
console.log("Could not update stream to 3 replicas:", e.message);
}
}
// Create a jetstream client
const js = nc.jetstream();
console.log("Creating object store if it don't exist");
const os = await js.views.os(app);
// Try to update the object store to 3 replicas
try {
await jsm.streams.update(`OBJ_${app}`, { num_replicas: 3 });
} catch (e) {
console.log("Could not update object store to 3 replicas:", e.message);
}
console.log("NATS initialized");
return { nc, sc, js, os, jsm };
}
export async function bootstrapDataDir(dataDir: string) {
console.log("Bootstrapping data directory:", dataDir);
try {
await Deno.remove(dataDir, { recursive: true });
} catch (e) {
console.log(e.message);
}
try {
await Deno.mkdir(dataDir, { recursive: true });
} catch (e) {
console.log(e.message);
}
}
export function setupDb(file: string): Database {
const db = new Database(file);
db.exec("pragma locking_mode = exclusive");
db.exec("pragma auto_vacuum = none");
db.exec("pragma journal_mode = wal");
db.exec("pragma synchronous = normal");
db.exec("pragma temp_store = memory");
const version = db.prepare("select sqlite_version()").value<[string]>()!;
console.log(`SQLite version: ${version}`);
// Create sequence table if it doesn't exist
console.log("Creating sequence table if it doesn't exist");
db.exec(
`CREATE TABLE IF NOT EXISTS _nqlite_ (id INTEGER PRIMARY KEY, seq NOT NULL)`,
);
// Insert the first sequence number if it doesn't exist
db.exec(`INSERT OR IGNORE INTO _nqlite_ (id, seq) VALUES (1,0)`);
return db;
}
export async function restore(os: ObjectStore, db: string): Promise<boolean> {
// See if snapshot exists in object store
const o = await os.get("snapshot");
if (!o) {
console.log("No snapshot object to restore");
return false;
}
console.log(
`Restoring from snapshot taken: ${o.info.mtime}`,
);
// Get the object
await fromReadableStream(o.data, db);
// Convert bytes to megabytes
const mb = (o.info.size / 1024 / 1024).toFixed(2);
console.log(`Restored from snapshot: ${mb}Mb`);
return true;
}
async function fromReadableStream(
rs: ReadableStream<Uint8Array>,
file: string,
): Promise<void> {
const reader = rs.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
// Add the chunk to the array
if (value && value.length) {
// Write and concat the chunks to the file
await Deno.writeFile(file, value, { append: true });
}
}
// Close the reader
reader.releaseLock();
}
function readableStreamFrom(data: Uint8Array): ReadableStream<Uint8Array> {
return new ReadableStream<Uint8Array>({
pull(controller) {
// the readable stream adds data
controller.enqueue(data);
controller.close();
},
});
}
export async function snapshot(
os: ObjectStore,
db: string,
): Promise<boolean> {
try {
// Put the sqlite file in the object store
const info = await os.put(
{ name: "snapshot" },
readableStreamFrom(Deno.readFileSync(db)),
);
// Convert bytes to megabytes
const mb = (info.size / 1024 / 1024).toFixed(2);
console.log(
`Snapshot stored in object store: ${mb}Mb`,
);
return true;
} catch (e) {
console.log("Error during snapshot:", e.message);
return false;
}
}
export async function snapshotCheck(
os: ObjectStore,
seq: number,
threshold: number,
): Promise<boolean> {
console.log(
`Checking if we need to snapshot (seq: ${seq}, threshold: ${threshold})`,
);
try {
const snapInfo = await os.info("snapshot");
if (!snapInfo) console.log("No snapshot found in object store");
// Check if we need to snapshot
if (snapInfo) {
const processed = seq - Number(snapInfo.description);
console.log("Messages processed since last snapshot ->", processed);
if (processed < threshold) {
console.log(
`Skipping snapshot, threshold not met: ${processed} < ${threshold}`,
);
return false;
}
// Check if another is in progress or created in the last minute
const now = new Date().getTime();
const last = new Date(snapInfo.mtime).getTime();
if (now - last < 60 * 1000) {
const diff = Math.floor((now - last) / 1000);
console.log(`Skipping snapshot, latest snapshot ${diff} seconds ago`);
return false;
}
}
// Check if no snapshot exists and we are below the threshold
if (!snapInfo && seq < threshold) {
console.log(
`Skipping snapshot, threshold not met: ${seq} < ${threshold}`,
);
return false;
}
} catch (e) {
console.log("Error during snapshot check:", e.message);
return false;
}
return true;
}
export async function httpBackup(db: string, url: string): Promise<boolean> {
// Backup to HTTP using the fetch API
try {
const res = await fetch(url, {
method: "POST",
body: Deno.readFileSync(db),
});
console.log("HTTP backup response:", res.status, res.statusText);
if (res.status !== 200) return false;
const mb = (Deno.statSync(db).size / 1024 / 1024).toFixed(2);
console.log(`Snapshot stored via http: ${mb}Mb`);
return true;
} catch (e) {
console.log("Error during http backup:", e.message);
return false;
}
}
export async function httpRestore(db: string, url: string): Promise<boolean> {
// Restore from HTTP using the fetch API
try {
const res = await fetch(url);
console.log("HTTP restore response:", res.status, res.statusText);
if (res.status !== 200) return false;
const file = await Deno.open(db, { write: true, create: true });
await res.body?.pipeTo(file.writable);
const mb = (Deno.statSync(db).size / 1024 / 1024).toFixed(2);
console.log(`Restored from http snapshot: ${mb}Mb`);
return true;
} catch (e) {
console.log("Error during http restore:", e.message);
return false;
}
}
export async function sigHandler(
inSnap: boolean,
sub: JetStreamSubscription,
db: Database,
): Promise<void> {
// Check if inSnapShot is true
if (inSnap) {
console.log("SIGINT received while in snapshot. Waiting 10 seconds...");
await new Promise((resolve) => setTimeout(resolve, 10000));
}
console.log("About to die! Draining subscription...");
await sub.drain();
await sub.destroy();
console.log("Closing the database");
db.close();
Deno.exit();
}