fix: dispose races, tunnel guards, retry on stale
This commit is contained in:
@@ -13,12 +13,17 @@ const ssh2State = vi.hoisted(() => {
|
||||
|
||||
const state = {
|
||||
instances: [] as FakeSshClient[],
|
||||
failConnect: false
|
||||
failConnect: false,
|
||||
// When true, forwardOut stashes its callback instead of invoking it, so
|
||||
// a test can deliver the stream LATE (after close()).
|
||||
deferForward: false,
|
||||
streams: [] as PassThrough[]
|
||||
}
|
||||
|
||||
class FakeSshClient extends EventEmitter {
|
||||
connectConfig: Record<string, unknown> | undefined
|
||||
forwardCalls: unknown[][] = []
|
||||
deferred: Array<() => void> = []
|
||||
ended = false
|
||||
|
||||
connect(config: Record<string, unknown>) {
|
||||
@@ -40,7 +45,14 @@ const ssh2State = vi.hoisted(() => {
|
||||
callback: (err: Error | null, stream?: PassThrough) => void
|
||||
) {
|
||||
this.forwardCalls.push([srcHost, srcPort, dstHost, dstPort])
|
||||
callback(null, new PassThrough())
|
||||
const stream = new PassThrough()
|
||||
state.streams.push(stream)
|
||||
const deliver = () => callback(null, stream)
|
||||
if (state.deferForward) {
|
||||
this.deferred.push(deliver)
|
||||
return
|
||||
}
|
||||
deliver()
|
||||
}
|
||||
|
||||
end() {
|
||||
@@ -139,6 +151,8 @@ describe('openTunnel', () => {
|
||||
beforeEach(() => {
|
||||
ssh2State.state.instances.length = 0
|
||||
ssh2State.state.failConnect = false
|
||||
ssh2State.state.deferForward = false
|
||||
ssh2State.state.streams.length = 0
|
||||
})
|
||||
|
||||
it('forwards local TCP traffic through ssh forwardOut', async () => {
|
||||
@@ -217,4 +231,56 @@ describe('openTunnel', () => {
|
||||
expect(tunnel.isClosed()).toBe(true)
|
||||
await tunnel.close()
|
||||
})
|
||||
|
||||
it('flips isClosed to true when the ssh client ends', async () => {
|
||||
const tunnel = await openTunnel(sshConfig({ password: 'pw' }), 'db-host', 5432)
|
||||
const client = ssh2State.state.instances[0]
|
||||
client.emit('end')
|
||||
expect(tunnel.isClosed()).toBe(true)
|
||||
await tunnel.close()
|
||||
})
|
||||
|
||||
it('flips isClosed to true on a late listener error', async () => {
|
||||
const created: net.Server[] = []
|
||||
const realCreateServer = net.createServer.bind(net)
|
||||
const spy = vi.spyOn(net, 'createServer').mockImplementation((handler) => {
|
||||
const server = realCreateServer(handler as (socket: net.Socket) => void)
|
||||
created.push(server)
|
||||
return server
|
||||
})
|
||||
const tunnel = await openTunnel(sshConfig({ password: 'pw' }), 'db-host', 5432)
|
||||
// A listener error after listen() resolved must not be swallowed: it
|
||||
// marks the tunnel closed so the manager can rebuild it.
|
||||
created[0].emit('error', new Error('EADDRNOTAVAIL'))
|
||||
expect(tunnel.isClosed()).toBe(true)
|
||||
spy.mockRestore()
|
||||
await tunnel.close()
|
||||
})
|
||||
|
||||
it('destroys a late forwardOut stream delivered after close, wiring no pipes', async () => {
|
||||
ssh2State.state.deferForward = true
|
||||
const tunnel = await openTunnel(sshConfig({ password: 'pw' }), 'db-host', 5432)
|
||||
const client = ssh2State.state.instances[0]
|
||||
|
||||
// Connect a socket; the server's forwardOut callback is held back.
|
||||
await new Promise<net.Socket>((resolve, reject) => {
|
||||
const s = net.connect(tunnel.localPort, tunnel.localHost, () => resolve(s))
|
||||
s.once('error', reject)
|
||||
})
|
||||
// Wait until the deferred forwardOut callback has been registered.
|
||||
await vi.waitFor(() => {
|
||||
expect(client.deferred).toHaveLength(1)
|
||||
})
|
||||
|
||||
const stream = ssh2State.state.streams[0]
|
||||
const pipeSpy = vi.spyOn(stream, 'pipe')
|
||||
|
||||
// Tear the tunnel down, THEN deliver the forwarded stream late.
|
||||
await tunnel.close()
|
||||
client.deferred[0]()
|
||||
|
||||
// The late stream is destroyed and never piped.
|
||||
expect(stream.destroyed).toBe(true)
|
||||
expect(pipeSpy).not.toHaveBeenCalled()
|
||||
})
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user