Skip to content

Commit 0755342

Browse files
Add Cloudflare Worker compatible socket
1 parent 5532ca5 commit 0755342

File tree

10 files changed

+1182
-876
lines changed

10 files changed

+1182
-876
lines changed

packages/pg-cloudflare/README.md

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# pg-cloudflare
2+
3+
A socket implementation that can run on Cloudflare Workers using native TCP connections.
4+
5+
## install
6+
7+
```
8+
npm i --save-dev pg-cloudflare
9+
```
10+
11+
### license
12+
13+
The MIT License (MIT)
14+
15+
Copyright (c) 2023 Brian M. Carlson
16+
17+
Permission is hereby granted, free of charge, to any person obtaining a copy
18+
of this software and associated documentation files (the "Software"), to deal
19+
in the Software without restriction, including without limitation the rights
20+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
21+
copies of the Software, and to permit persons to whom the Software is
22+
furnished to do so, subject to the following conditions:
23+
24+
The above copyright notice and this permission notice shall be included in
25+
all copies or substantial portions of the Software.
26+
27+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
28+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
29+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
30+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
31+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
32+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
33+
THE SOFTWARE.

packages/pg-cloudflare/package.json

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
{
2+
"name": "pg-cloudflare",
3+
"version": "1.0.0",
4+
"description": "A socket implementation that can run on Cloudflare Workers using native TCP connections.",
5+
"main": "dist/index.js",
6+
"types": "dist/index.d.ts",
7+
"license": "MIT",
8+
"devDependencies": {
9+
"ts-node": "^8.5.4",
10+
"typescript": "^4.0.3"
11+
},
12+
"scripts": {
13+
"build": "tsc",
14+
"build:watch": "tsc --watch",
15+
"prepublish": "yarn build",
16+
"test": "echo e2e test in pg package"
17+
},
18+
"repository": {
19+
"type": "git",
20+
"url": "git://github.com/brianc/node-postgres.git",
21+
"directory": "packages/pg-cloudflare"
22+
},
23+
"files": [
24+
"/dist/*{js,ts,map}",
25+
"/src"
26+
]
27+
}

packages/pg-cloudflare/src/index.ts

+164
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
import { SocketOptions, Socket, TlsOptions } from 'cloudflare:sockets'
2+
import { EventEmitter } from 'events'
3+
4+
/**
5+
* Wrapper around the Cloudflare built-in socket that can be used by the `Connection`.
6+
*/
7+
export class CloudflareSocket extends EventEmitter {
8+
writable = false
9+
destroyed = false
10+
11+
private _upgrading = false
12+
private _upgraded = false
13+
private _cfSocket: Socket | null = null
14+
private _cfWriter: WritableStreamDefaultWriter | null = null
15+
private _cfReader: ReadableStreamDefaultReader | null = null
16+
17+
constructor(readonly ssl: boolean) {
18+
super()
19+
}
20+
21+
setNoDelay() {
22+
return this
23+
}
24+
setKeepAlive() {
25+
return this
26+
}
27+
ref() {
28+
return this
29+
}
30+
unref() {
31+
return this
32+
}
33+
34+
async connect(port: number, host: string, connectListener?: (...args: unknown[]) => void) {
35+
try {
36+
log('connecting')
37+
if (connectListener) this.once('connect', connectListener)
38+
39+
const options: SocketOptions = this.ssl ? { secureTransport: 'starttls' } : {}
40+
const { connect } = await import('cloudflare:sockets')
41+
this._cfSocket = connect(`${host}:${port}`, options)
42+
this._cfWriter = this._cfSocket.writable.getWriter()
43+
this._addClosedHandler()
44+
45+
this._cfReader = this._cfSocket.readable.getReader()
46+
if (this.ssl) {
47+
this._listenOnce().catch((e) => this.emit('error', e))
48+
} else {
49+
this._listen().catch((e) => this.emit('error', e))
50+
}
51+
52+
await this._cfWriter!.ready
53+
log('socket ready')
54+
this.writable = true
55+
this.emit('connect')
56+
57+
return this
58+
} catch (e) {
59+
this.emit('error', e)
60+
}
61+
}
62+
63+
async _listen() {
64+
while (true) {
65+
log('awaiting receive from CF socket')
66+
const { done, value } = await this._cfReader!.read()
67+
log('CF socket received:', done, value)
68+
if (done) {
69+
log('done')
70+
break
71+
}
72+
this.emit('data', Buffer.from(value))
73+
}
74+
}
75+
76+
async _listenOnce() {
77+
log('awaiting first receive from CF socket')
78+
const { done, value } = await this._cfReader!.read()
79+
log('First CF socket received:', done, value)
80+
this.emit('data', Buffer.from(value))
81+
}
82+
83+
write(
84+
data: Uint8Array | string,
85+
encoding: BufferEncoding = 'utf8',
86+
callback: (...args: unknown[]) => void = () => {}
87+
) {
88+
if (data.length === 0) return callback()
89+
if (typeof data === 'string') data = Buffer.from(data, encoding)
90+
91+
log('sending data direct:', data)
92+
this._cfWriter!.write(data).then(
93+
() => {
94+
log('data sent')
95+
callback()
96+
},
97+
(err) => {
98+
log('send error', err)
99+
callback(err)
100+
}
101+
)
102+
return true
103+
}
104+
105+
end(data = Buffer.alloc(0), encoding: BufferEncoding = 'utf8', callback: (...args: unknown[]) => void = () => {}) {
106+
log('ending CF socket')
107+
this.write(data, encoding, (err) => {
108+
this._cfSocket!.close()
109+
if (callback) callback(err)
110+
})
111+
return this
112+
}
113+
114+
destroy(reason: string) {
115+
log('destroying CF socket', reason)
116+
this.destroyed = true
117+
return this.end()
118+
}
119+
120+
startTls(options: TlsOptions) {
121+
if (this._upgraded) {
122+
// Don't try to upgrade again.
123+
this.emit('error', 'Cannot call `startTls()` more than once on a socket')
124+
return
125+
}
126+
this._cfWriter!.releaseLock()
127+
this._cfReader!.releaseLock()
128+
this._upgrading = true
129+
this._cfSocket = this._cfSocket!.startTls(options)
130+
this._cfWriter = this._cfSocket.writable.getWriter()
131+
this._cfReader = this._cfSocket.readable.getReader()
132+
this._addClosedHandler()
133+
this._listen().catch((e) => this.emit('error', e))
134+
}
135+
136+
_addClosedHandler() {
137+
this._cfSocket!.closed.then(() => {
138+
if (!this._upgrading) {
139+
log('CF socket closed')
140+
this._cfSocket = null
141+
this.emit('close')
142+
} else {
143+
this._upgrading = false
144+
this._upgraded = true
145+
}
146+
}).catch((e) => this.emit('error', e))
147+
}
148+
}
149+
150+
const debug = false
151+
152+
function dump(data: unknown) {
153+
if (data instanceof Uint8Array || data instanceof ArrayBuffer) {
154+
const hex = Buffer.from(data).toString('hex')
155+
const str = new TextDecoder().decode(data)
156+
return `\n>>> STR: "${str.replace(/\n/g, '\\n')}"\n>>> HEX: ${hex}\n`
157+
} else {
158+
return data
159+
}
160+
}
161+
162+
function log(...args: unknown[]) {
163+
debug && console.log(...args.map(dump))
164+
}

packages/pg-cloudflare/src/types.d.ts

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
declare module 'cloudflare:sockets' {
2+
export class Socket {
3+
public readonly readable: any
4+
public readonly writable: any
5+
public readonly closed: Promise<void>
6+
public close(): Promise<void>
7+
public startTls(options: TlsOptions): Socket
8+
}
9+
10+
export type TlsOptions = {
11+
expectedServerHostname?: string
12+
}
13+
14+
export type SocketAddress = {
15+
hostname: string
16+
port: number
17+
}
18+
19+
export type SocketOptions = {
20+
secureTransport?: 'off' | 'on' | 'starttls'
21+
allowHalfOpen?: boolean
22+
}
23+
24+
export function connect(address: string | SocketAddress, options?: SocketOptions): Socket
25+
}

packages/pg-cloudflare/tsconfig.json

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{
2+
"compilerOptions": {
3+
"module": "ES2020",
4+
"esModuleInterop": true,
5+
"allowSyntheticDefaultImports": true,
6+
"strict": true,
7+
"target": "ES2020",
8+
"noImplicitAny": true,
9+
"moduleResolution": "node",
10+
"sourceMap": true,
11+
"outDir": "dist",
12+
"incremental": true,
13+
"baseUrl": ".",
14+
"declaration": true,
15+
"paths": {
16+
"*": [
17+
"node_modules/*",
18+
"src/types/*"
19+
]
20+
}
21+
},
22+
"include": [
23+
"src/**/*"
24+
]
25+
}

packages/pg/lib/connection.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ var net = require('net')
44
var EventEmitter = require('events').EventEmitter
55

66
const { parse, serialize } = require('pg-protocol')
7+
const { getStream, getSecureStream } = require('./stream')
78

89
const flushBuffer = serialize.flush()
910
const syncBuffer = serialize.sync()
@@ -15,7 +16,7 @@ class Connection extends EventEmitter {
1516
super()
1617
config = config || {}
1718

18-
this.stream = config.stream || new net.Socket()
19+
this.stream = config.stream || getStream(config.ssl)
1920
if (typeof this.stream === 'function') {
2021
this.stream = this.stream(config)
2122
}
@@ -79,7 +80,6 @@ class Connection extends EventEmitter {
7980
self.stream.end()
8081
return self.emit('error', new Error('There was an error establishing an SSL connection'))
8182
}
82-
var tls = require('tls')
8383
const options = {
8484
socket: self.stream,
8585
}
@@ -97,7 +97,7 @@ class Connection extends EventEmitter {
9797
options.servername = host
9898
}
9999
try {
100-
self.stream = tls.connect(options)
100+
self.stream = getSecureStream(options)
101101
} catch (err) {
102102
return self.emit('error', err)
103103
}

packages/pg/lib/stream.js

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/**
2+
* Get a socket stream compatible with the current runtime environment.
3+
* @returns {Duplex}
4+
*/
5+
module.exports.getStream = function getStream(ssl) {
6+
const net = require('net')
7+
if (typeof net.Socket === 'function') {
8+
return new net.Socket()
9+
} else {
10+
const { CloudflareSocket } = require('pg-cloudflare')
11+
return new CloudflareSocket(ssl)
12+
}
13+
}
14+
15+
/**
16+
* Get a TLS secured socket, compatible with the current environment,
17+
* using the socket and other settings given in `options`.
18+
* @returns {Duplex}
19+
*/
20+
module.exports.getSecureStream = function getSecureStream(options) {
21+
var tls = require('tls')
22+
if (tls.connect) {
23+
return tls.connect(options)
24+
} else {
25+
options.socket.startTls(options)
26+
return options.socket
27+
}
28+
}

packages/pg/package.json

+3
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@
3434
"co": "4.6.0",
3535
"pg-copy-streams": "0.3.0"
3636
},
37+
"optionalDependencies": {
38+
"pg-cloudflare": "1.x"
39+
},
3740
"peerDependencies": {
3841
"pg-native": ">=3.0.1"
3942
},

tsconfig.json

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
},
77
"include": [],
88
"references": [
9+
{"path": "./packages/pg-cloudflare"},
910
{"path": "./packages/pg-query-stream"},
1011
{"path": "./packages/pg-protocol"}
1112
]

0 commit comments

Comments
 (0)