import { Connection, ConnectionNotReadyError } from './Connection.ts' import { Client } from '../../../lib/src/external/db.ts' import {collect, Collection} from '../../../lib/src/collection/Collection.ts' import { QueryResult, QueryRow } from './types.ts' import { logger } from '../../../lib/src/service/logging/global.ts' import {Database} from "../schema/tree/Database.ts"; import {escape} from "../builder/types.ts"; import {Table} from "../schema/tree/Table.ts"; import {Builder} from "../builder/Builder.ts"; /** * Database connection class for PostgreSQL connections. * @extends Connection */ export default class PostgresConnection extends Connection { /** * The underlying PostgreSQL client. * @type Client */ private _client?: Client public async init() { this._client = new Client(this.config) logger.info(`Opening PostgreSQL database for connection: ${this.name}`) await this._client.connect() } public async query(query: string) { if ( !this._client ) throw new ConnectionNotReadyError(this.name) logger.verbose(`Executing query: \n${query}`) const result = await this._client.query(query) let base_i = 0 const cols = collect(result?.rowDescription?.columns || []).map(col => { col.index = base_i base_i += 1 return col }) const rows = new Collection() for ( const row of result.rows ) { const row_obj: { [key: string]: any } = {} for ( const col of cols ) { // @ts-ignore row_obj[col.name] = row[col.index] } rows.push(row_obj) } logger.verbose(`Query result returned ${result.rowCount} row(s).`) return { rows, row_count: result.rowCount, } as QueryResult } public async close() { if ( this._client ) await this._client.end() } public async databases() { const query = (new Builder).select('datname') .from('pg_database') .target_connection(this) const database_names: Collection = (await query.execute()).rows.pluck('datname') const databases: Collection = new Collection() for ( const name of database_names ) { const db = this.make(Database, this, name) await db.introspect() databases.push(db) } return databases } public async database(name: string) { const query = (new Builder).select('datname') .from('pg_database') .target_connection(this) const database_names: Collection = (await query.execute()).rows.pluck('datname') if ( database_names.includes(name) ) { const db = this.make(Database, this, name) await db.introspect() return db } } public async database_as_schema(name: string) { const db = await this.database(name) if ( db ) return db return this.make(Database, this, name) } public async tables(database_name: string) { const query = (new Builder).select('tablename') .from('pg_catalog.pg_tables') .where('pg_tables.tableowner', '=', database_name) .target_connection(this) const table_names: Collection = (await query.execute()).rows.pluck('tablename') const tables: Collection = new Collection
() for ( const name of table_names ) { tables.push(this.make(Table, this, database_name, name)) } return tables } public async table(database_name: string, table_name: string) { const database = await this.database(database_name) if ( database ) { const query = (new Builder).select('tablename') .from('pg_catalog.pg_tables') .where('tableowner', '=', database_name) .target_connection(this) const table_names: Collection = (await query.execute()).rows.pluck('tablename') if ( table_names.includes(table_name) ) { return this.make(Table, this, database_name, table_name) } } } }