From aef71752dbc6bce8d3108dc4546ef0bf4e908cfa Mon Sep 17 00:00:00 2001 From: smartass Date: Thu, 11 Jun 2026 23:39:16 +0500 Subject: [PATCH] feat: add postgres driver --- src/db/postgres.ts | 245 ++++++++++++++++++++++++++++++++++ test/unit/db/postgres.test.ts | 185 +++++++++++++++++++++++++ 2 files changed, 430 insertions(+) create mode 100644 src/db/postgres.ts create mode 100644 test/unit/db/postgres.test.ts diff --git a/src/db/postgres.ts b/src/db/postgres.ts new file mode 100644 index 0000000..7966f7e --- /dev/null +++ b/src/db/postgres.ts @@ -0,0 +1,245 @@ +import pg from 'pg' +import { normalizeCell, truncateRows } from '../format.js' +import { + type DatabaseInfo, + type Driver, + type DriverTarget, + type QueryArgs, + type QueryResult, + resolveDatabase, + type TableDescription, + type TableInfo +} from './driver.js' + +const MAINTENANCE_DB = 'postgres' + +export const createPostgresDriver = (target: DriverTarget): Driver => { + const pools = new Map() + + const getPool = (database: string): pg.Pool => { + const existing = pools.get(database) + if (existing) { + return existing + } + const pool = new pg.Pool({ + host: target.host, + port: target.port, + user: target.config.user, + password: target.config.password, + database, + max: 4, + ...(target.config.readonly ? { options: '-c default_transaction_read_only=on' } : {}) + }) + pools.set(database, pool) + return pool + } + + const maintenancePool = (): pg.Pool => getPool(target.config.database ?? MAINTENANCE_DB) + + const runArray = async ( + database: string, + sql: string, + params: unknown[] = [] + ): Promise<{ columns: string[]; rows: unknown[][]; rowCount: number }> => { + const result = await getPool(database).query({ + text: sql, + values: params, + rowMode: 'array' + }) + if (Array.isArray(result)) { + throw new Error( + 'one SQL statement per call: split the statements into separate execute_sql calls' + ) + } + return { + columns: (result.fields ?? []).map((field) => field.name), + rows: (result.rows ?? []) as unknown[][], + rowCount: result.rowCount ?? result.rows?.length ?? 0 + } + } + + const query = async ({ + sql, + params = [], + database, + rowLimit + }: QueryArgs): Promise => { + const db = resolveDatabase(target.config, database) + const result = await runArray(db, sql, params) + const { rows, truncated } = truncateRows(result.rows, rowLimit) + return { + columns: result.columns, + rows: rows.map((row) => row.map(normalizeCell)), + rowCount: result.rowCount, + truncated + } + } + + const listDatabases = async (): Promise => { + const result = await maintenancePool().query({ + text: ` + SELECT datname, pg_database_size(datname) + FROM pg_database + WHERE NOT datistemplate + ORDER BY datname + `, + rowMode: 'array' + }) + return result.rows.map((row) => ({ + name: String(row[0]), + sizeBytes: row[1] === null ? null : Number(row[1]) + })) + } + + const listTables = async (args: { + database?: string + schema?: string + }): Promise => { + const db = resolveDatabase(target.config, args.database) + const result = await runArray( + db, + ` + SELECT n.nspname, c.relname, c.reltuples::bigint + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE c.relkind IN ('r', 'p') + AND n.nspname NOT IN ('pg_catalog', 'information_schema') + AND ($1::text IS NULL OR n.nspname = $1) + ORDER BY n.nspname, c.relname + `, + [args.schema ?? null] + ) + return result.rows.map((row) => { + const estimate = Number(row[2]) + return { + schema: String(row[0]), + name: String(row[1]), + rowEstimate: estimate < 0 ? null : estimate + } + }) + } + + const describeTable = async (args: { + table: string + database?: string + schema?: string + }): Promise => { + const db = resolveDatabase(target.config, args.database) + const schema = args.schema ?? 'public' + + const columns = await runArray( + db, + ` + SELECT column_name, data_type, is_nullable, column_default + FROM information_schema.columns + WHERE table_schema = $1 AND table_name = $2 + ORDER BY ordinal_position + `, + [schema, args.table] + ) + if (columns.rows.length === 0) { + throw new Error("table '" + schema + '.' + args.table + "' not found") + } + + const primaryKey = await runArray( + db, + ` + SELECT kcu.column_name + FROM information_schema.table_constraints tc + JOIN information_schema.key_column_usage kcu + ON kcu.constraint_name = tc.constraint_name + AND kcu.table_schema = tc.table_schema + WHERE tc.constraint_type = 'PRIMARY KEY' + AND tc.table_schema = $1 AND tc.table_name = $2 + ORDER BY kcu.ordinal_position + `, + [schema, args.table] + ) + + const indexes = await runArray( + db, + ` + SELECT i.relname, ix.indisunique, a.attname, + array_position(ix.indkey, a.attnum) + FROM pg_index ix + JOIN pg_class i ON i.oid = ix.indexrelid + JOIN pg_class t ON t.oid = ix.indrelid + JOIN pg_namespace n ON n.oid = t.relnamespace + JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) + WHERE n.nspname = $1 AND t.relname = $2 AND NOT ix.indisprimary + ORDER BY i.relname, array_position(ix.indkey, a.attnum) + `, + [schema, args.table] + ) + + const foreignKeys = await runArray( + db, + ` + SELECT tc.constraint_name, kcu.column_name, ccu.table_name, ccu.column_name + FROM information_schema.table_constraints tc + JOIN information_schema.key_column_usage kcu + ON kcu.constraint_name = tc.constraint_name + AND kcu.table_schema = tc.table_schema + JOIN information_schema.constraint_column_usage ccu + ON ccu.constraint_name = tc.constraint_name + AND ccu.table_schema = tc.table_schema + WHERE tc.constraint_type = 'FOREIGN KEY' + AND tc.table_schema = $1 AND tc.table_name = $2 + ORDER BY tc.constraint_name, kcu.ordinal_position + `, + [schema, args.table] + ) + + const indexMap = new Map() + for (const row of indexes.rows) { + const name = String(row[0]) + const entry = indexMap.get(name) ?? { unique: Boolean(row[1]), columns: [] } + entry.columns.push(String(row[2])) + indexMap.set(name, entry) + } + + const fkMap = new Map< + string, + { columns: string[]; referencedTable: string; referencedColumns: string[] } + >() + for (const row of foreignKeys.rows) { + const name = String(row[0]) + const entry = fkMap.get(name) ?? { + columns: [], + referencedTable: String(row[2]), + referencedColumns: [] + } + entry.columns.push(String(row[1])) + entry.referencedColumns.push(String(row[3])) + fkMap.set(name, entry) + } + + return { + columns: columns.rows.map((row) => ({ + name: String(row[0]), + type: String(row[1]), + nullable: row[2] === 'YES', + default: row[3] === null ? null : String(row[3]) + })), + primaryKey: primaryKey.rows.map((row) => String(row[0])), + indexes: [...indexMap.entries()].map(([name, entry]) => ({ name, ...entry })), + foreignKeys: [...fkMap.entries()].map(([name, entry]) => ({ name, ...entry })) + } + } + + const serverVersion = async (): Promise => { + const result = await maintenancePool().query({ + text: 'SHOW server_version', + rowMode: 'array' + }) + return String(result.rows[0]?.[0] ?? 'unknown') + } + + const dispose = async (): Promise => { + const all = [...pools.values()] + pools.clear() + await Promise.all(all.map((pool) => pool.end().catch(() => {}))) + } + + return { query, listDatabases, listTables, describeTable, serverVersion, dispose } +} diff --git a/test/unit/db/postgres.test.ts b/test/unit/db/postgres.test.ts new file mode 100644 index 0000000..eca0db6 --- /dev/null +++ b/test/unit/db/postgres.test.ts @@ -0,0 +1,185 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const pgState = vi.hoisted(() => { + const state = { + pools: [] as FakePool[], + nextResult: undefined as unknown + } + + class FakePool { + options: Record + queries: unknown[] = [] + ended = false + + constructor(options: Record) { + this.options = options + state.pools.push(this) + } + + async query(args: unknown) { + this.queries.push(args) + return state.nextResult ?? { rows: [], fields: [], rowCount: 0, command: 'SELECT' } + } + + async end() { + this.ended = true + } + } + + return { state, FakePool } +}) + +vi.mock('pg', () => ({ + default: { Pool: pgState.FakePool } +})) + +import type { ConnectionConfig } from '../../../src/config/types.js' +import { createPostgresDriver } from '../../../src/db/postgres.js' + +const config = (extra: Partial = {}): ConnectionConfig => ({ + name: 'pg', + type: 'postgres', + host: 'real-host', + user: 'postgres', + password: 'pw', + database: 'main', + readonly: false, + ...extra +}) + +const target = (extra: Partial = {}) => ({ + config: config(extra), + host: '127.0.0.1', + port: 15432 +}) + +describe('createPostgresDriver', () => { + beforeEach(() => { + pgState.state.pools.length = 0 + pgState.state.nextResult = undefined + }) + + it('creates one pool per database and reuses it', async () => { + const driver = createPostgresDriver(target()) + await driver.query({ sql: 'select 1', rowLimit: 10 }) + await driver.query({ sql: 'select 2', rowLimit: 10 }) + await driver.query({ sql: 'select 3', database: 'other', rowLimit: 10 }) + expect(pgState.state.pools).toHaveLength(2) + expect(pgState.state.pools[0].options).toMatchObject({ + host: '127.0.0.1', + port: 15432, + database: 'main', + max: 4 + }) + expect(pgState.state.pools[1].options).toMatchObject({ database: 'other' }) + }) + + it('passes readonly as a startup option', async () => { + const driver = createPostgresDriver(target({ readonly: true })) + await driver.query({ sql: 'select 1', rowLimit: 10 }) + expect(pgState.state.pools[0].options.options).toBe('-c default_transaction_read_only=on') + }) + + it('omits startup options when not readonly', async () => { + const driver = createPostgresDriver(target()) + await driver.query({ sql: 'select 1', rowLimit: 10 }) + expect(pgState.state.pools[0].options.options).toBeUndefined() + }) + + it('maps array-mode results with column names and normalized cells', async () => { + pgState.state.nextResult = { + rows: [[1n, new Date('2026-01-01T00:00:00Z')]], + fields: [{ name: 'id' }, { name: 'created' }], + rowCount: 1, + command: 'SELECT' + } + const driver = createPostgresDriver(target()) + const result = await driver.query({ sql: 'select * from t', rowLimit: 10 }) + expect(result).toEqual({ + columns: ['id', 'created'], + rows: [['1', '2026-01-01T00:00:00.000Z']], + rowCount: 1, + truncated: false + }) + }) + + it('truncates rows beyond the limit', async () => { + pgState.state.nextResult = { + rows: [[1], [2], [3]], + fields: [{ name: 'n' }], + rowCount: 3, + command: 'SELECT' + } + const driver = createPostgresDriver(target()) + const result = await driver.query({ sql: 'select n', rowLimit: 2 }) + expect(result.rows).toEqual([[1], [2]]) + expect(result.truncated).toBe(true) + expect(result.rowCount).toBe(3) + }) + + it('sends positional params through', async () => { + const driver = createPostgresDriver(target()) + await driver.query({ sql: 'select $1', params: [42], rowLimit: 10 }) + expect(pgState.state.pools[0].queries[0]).toMatchObject({ + text: 'select $1', + values: [42], + rowMode: 'array' + }) + }) + + it('rejects multi-statement results', async () => { + pgState.state.nextResult = [ + { rows: [], fields: [], rowCount: 0 }, + { rows: [], fields: [], rowCount: 0 } + ] + const driver = createPostgresDriver(target()) + await expect(driver.query({ sql: 'select 1; select 2', rowLimit: 10 })).rejects.toThrow( + /one SQL statement/ + ) + }) + + it('lists databases against the maintenance db and maps sizes', async () => { + pgState.state.nextResult = { + rows: [ + ['app', 1024], + ['postgres', 2048] + ], + fields: [{ name: 'name' }, { name: 'size_bytes' }], + rowCount: 2, + command: 'SELECT' + } + const driver = createPostgresDriver(target({ database: undefined })) + const databases = await driver.listDatabases() + expect(databases).toEqual([ + { name: 'app', sizeBytes: 1024 }, + { name: 'postgres', sizeBytes: 2048 } + ]) + expect(pgState.state.pools[0].options.database).toBe('postgres') + }) + + it('maps reltuples=-1 to null in listTables', async () => { + pgState.state.nextResult = { + rows: [ + ['public', 'fresh', '-1'], + ['public', 'analyzed', '42'] + ], + fields: [{ name: 'schema' }, { name: 'name' }, { name: 'row_estimate' }], + rowCount: 2, + command: 'SELECT' + } + const driver = createPostgresDriver(target()) + const tables = await driver.listTables({}) + expect(tables).toEqual([ + { schema: 'public', name: 'fresh', rowEstimate: null }, + { schema: 'public', name: 'analyzed', rowEstimate: 42 } + ]) + }) + + it('dispose ends every pool', async () => { + const driver = createPostgresDriver(target()) + await driver.query({ sql: 'select 1', rowLimit: 1 }) + await driver.query({ sql: 'select 1', database: 'other', rowLimit: 1 }) + await driver.dispose() + expect(pgState.state.pools.every((pool) => pool.ended)).toBe(true) + }) +})