feat: add manager with hash cache, tunnel rebuild
This commit is contained in:
@@ -0,0 +1,108 @@
|
||||
import type { Registry } from '../config/registry.js'
|
||||
import { type ConnectionConfig, type ConnectionSource, defaultPort } from '../config/types.js'
|
||||
import { openTunnel, type Tunnel } from '../net/tunnel.js'
|
||||
import type { Driver, DriverTarget } from './driver.js'
|
||||
import { createMysqlDriver } from './mysql.js'
|
||||
import { createPostgresDriver } from './postgres.js'
|
||||
|
||||
type Entry = {
|
||||
driver: Driver
|
||||
tunnel?: Tunnel
|
||||
}
|
||||
|
||||
type CacheSlot = {
|
||||
hash: string
|
||||
promise: Promise<Entry>
|
||||
}
|
||||
|
||||
export type ManagerDeps = {
|
||||
createDriver?: (target: DriverTarget) => Driver
|
||||
createTunnel?: typeof openTunnel
|
||||
}
|
||||
|
||||
export type ManagedConnection = {
|
||||
driver: Driver
|
||||
config: ConnectionConfig
|
||||
source: ConnectionSource
|
||||
}
|
||||
|
||||
export type Manager = {
|
||||
get: (name: string) => Promise<ManagedConnection>
|
||||
invalidate: (name: string) => Promise<void>
|
||||
disposeAll: () => Promise<void>
|
||||
}
|
||||
|
||||
const defaultCreateDriver = (target: DriverTarget): Driver =>
|
||||
target.config.type === 'postgres' ? createPostgresDriver(target) : createMysqlDriver(target)
|
||||
|
||||
export const createManager = (registry: Registry, deps: ManagerDeps = {}): Manager => {
|
||||
const createDriver = deps.createDriver ?? defaultCreateDriver
|
||||
const createTunnel = deps.createTunnel ?? openTunnel
|
||||
const cache = new Map<string, CacheSlot>()
|
||||
|
||||
const disposeEntry = async (entry: Entry): Promise<void> => {
|
||||
// Order matters: pg/mysql2 pool.end() waits for in-flight queries to
|
||||
// finish, so the driver drains BEFORE the tunnel closes. New work
|
||||
// lands on the rebuilt entry, not this one.
|
||||
await entry.driver.dispose().catch(() => {})
|
||||
await entry.tunnel?.close().catch(() => {})
|
||||
}
|
||||
|
||||
const disposeSlot = (slot: CacheSlot): void => {
|
||||
slot.promise.then(disposeEntry).catch(() => {})
|
||||
}
|
||||
|
||||
const build = async (config: ConnectionConfig): Promise<Entry> => {
|
||||
const port = config.port ?? defaultPort(config.type)
|
||||
const tunnel = config.ssh ? await createTunnel(config.ssh, config.host, port) : undefined
|
||||
const target: DriverTarget = {
|
||||
config,
|
||||
host: tunnel ? tunnel.localHost : config.host,
|
||||
port: tunnel ? tunnel.localPort : port
|
||||
}
|
||||
return { driver: createDriver(target), tunnel }
|
||||
}
|
||||
|
||||
const get = async (name: string): Promise<ManagedConnection> => {
|
||||
const resolved = registry.resolve(name)
|
||||
const cached = cache.get(name)
|
||||
if (cached && cached.hash === resolved.hash) {
|
||||
const entry = await cached.promise
|
||||
if (!entry.tunnel?.isClosed()) {
|
||||
return { driver: entry.driver, config: resolved.config, source: resolved.source }
|
||||
}
|
||||
}
|
||||
if (cached) {
|
||||
cache.delete(name)
|
||||
disposeSlot(cached)
|
||||
}
|
||||
const slot: CacheSlot = { hash: resolved.hash, promise: build(resolved.config) }
|
||||
cache.set(name, slot)
|
||||
try {
|
||||
const entry = await slot.promise
|
||||
return { driver: entry.driver, config: resolved.config, source: resolved.source }
|
||||
} catch (error) {
|
||||
if (cache.get(name) === slot) {
|
||||
cache.delete(name)
|
||||
}
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
const invalidate = async (name: string): Promise<void> => {
|
||||
const slot = cache.get(name)
|
||||
if (!slot) {
|
||||
return
|
||||
}
|
||||
cache.delete(name)
|
||||
await slot.promise.then(disposeEntry).catch(() => {})
|
||||
}
|
||||
|
||||
const disposeAll = async (): Promise<void> => {
|
||||
const slots = [...cache.values()]
|
||||
cache.clear()
|
||||
await Promise.all(slots.map((slot) => slot.promise.then(disposeEntry).catch(() => {})))
|
||||
}
|
||||
|
||||
return { get, invalidate, disposeAll }
|
||||
}
|
||||
Reference in New Issue
Block a user