-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathlocal.ts
70 lines (56 loc) · 1.68 KB
/
local.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import { createReadStream, createWriteStream, existsSync, mkdirSync } from "fs";
import { dirname } from "path";
import { Readable, Writable } from "stream";
import {
DataSourceOverwriteError,
DataSourceUnreachableError,
} from "../errors";
import { StreamReader, StreamWriter } from "../interface";
class LocalSource {
constructor(readonly path: string) {
// TODO: might need to resolve path so it uses project dir when being executed from node_modules
if (!this.path.endsWith(".ndjson")) this.path += ".ndjson";
}
}
export class LocalReader extends StreamReader {
private source: LocalSource;
protected stream?: Readable;
constructor(path: string) {
super();
this.source = new LocalSource(path);
}
override get path() {
return this.source.path;
}
async open(): Promise<void> {
if (!existsSync(this.path))
throw new DataSourceUnreachableError("file does not exist");
this.stream = createReadStream(this.path);
}
}
export class LocalWriter extends StreamWriter {
private source: LocalSource;
protected stream?: Writable;
constructor(path: string) {
super();
this.source = new LocalSource(path);
}
override get path() {
return this.source.path;
}
async open(overwrite?: boolean) {
// Create directory if it does not exist
if (!existsSync(dirname(this.path))) {
mkdirSync(dirname(this.path), { recursive: true });
}
// Check if file exists
let overwritten = false;
if (existsSync(this.path)) {
if (!overwrite) throw new DataSourceOverwriteError(this.path);
overwritten = true;
}
// Create stream
this.stream = createWriteStream(this.path);
return overwritten;
}
}