import {Schema} from './Schema' import {Awaitable, collect, Collection} from '../../util' import {ConstraintType, TableBuilder} from './TableBuilder' import {PostgresConnection} from '../connection/PostgresConnection' import {Builder} from '../builder/Builder' import {raw} from '../dialect/SQLDialect' import {QueryRow} from '../types' /** * A PostgreSQL-compatible schema implementation. */ export class PostgresSchema extends Schema { constructor( connection: PostgresConnection, public readonly schema: string = 'public', ) { super(connection) } hasColumn(table: string, name: string): Awaitable { return (new Builder()).connection(this.connection) .select(raw('*')) .from('information_schema.columns') .where('table_schema', '=', this.schema) .where('table_name', '=', table) .where('column_name', '=', name) .exists() } async hasColumns(table: string, name: string[]): Promise { const num = await (new Builder()).connection(this.connection) .select(raw('*')) .from('information_schema.columns') .where('table_schema', '=', this.schema) .where('table_name', '=', table) .whereIn('column_name', name) .get() .count() return num === name.length } hasTable(name: string): Awaitable { return (new Builder()).connection(this.connection) .select(raw('*')) .from('information_schema.tables') .where('table_schema', '=', this.schema) .where('table_name', '=', name) .exists() } async table(table: string): Promise { return this.populateTable(new TableBuilder(table)) } /** * If the table for the given TableBuilder already exists in the * database, fill in the columns, constraints, and indexes. * @param table * @protected */ protected async populateTable(table: TableBuilder): Promise { if ( await this.hasTable(table.name) ) { // Load the existing columns const cols = await this.getColumns(table.name) cols.each(col => { table.column(col.column_name) .type(col.data_type) .pipe() .when(col.is_nullable, builder => { builder.isNullable() return builder }) .when(col.column_default, builder => { builder.default(raw(col.column_default)) return builder }) }) // Load the existing constraints const constraints = await this.getConstraints(table.name) // Apply the unique constraints const uniques = constraints.where('constraint_type', '=', 'u') .sortBy('constraint_name') .groupBy('constraint_name') for ( const key in uniques ) { if ( !Object.prototype.hasOwnProperty.call(uniques, key) ) { continue } table.constraint(key) .type(ConstraintType.Unique) .pipe() .peek(constraint => { collect<{column_name: string}>(uniques[key]) // eslint-disable-line camelcase .pluck('column_name') .each(column => constraint.field(column)) }) .get() .flagAsExistingInSchema() } // Apply the primary key constraints constraints.where('constraint_type', '=', 'p') .pipe() .when(c => c.count() > 0, pk => { pk.each(constraint => { table.column(constraint.column_name) .primary() }) return pk }) // Apply the non-null constraints // Builder columns are non-null by default, so mark the others as nullable const nonNullable = constraints.filter(x => !x.constraint_type) .where('is_nullable', '=', 'NO') collect(Object.keys(table.getColumns())) .map(column => { return { column, } }) .whereNotIn('column', nonNullable.pluck('column_name')) .pluck('column') .each(column => { table.column(column) .nullable() }) // Look up and apply the check constraints const checkConstraints = await this.getCheckConstraints(table.name) checkConstraints.each(constraint => { table.constraint(constraint.constraint_name) .type(ConstraintType.Check) .expression(constraint.check_clause) .flagAsExistingInSchema() }) // Mark the columns as existing in the database cols.each(col => { table.column(col.column_name) .flagAsExistingInSchema() }) // Look up table indexes const indexes = await this.getIndexes(table.name) const groupedIndexes = indexes.groupBy('index_name') for ( const key in groupedIndexes ) { if ( !Object.prototype.hasOwnProperty.call(groupedIndexes, key) ) { continue } table.index(key) .pipe() .peek(idx => { collect<{column_name: string}>(groupedIndexes[key]) // eslint-disable-line camelcase .pluck('column_name') .each(col => idx.field(col)) }) .when(groupedIndexes[key]?.[0]?.indisprimary, idx => idx.primary()) .when(groupedIndexes[key]?.[0]?.indisunique, idx => idx.unique()) .get() .flagAsExistingInSchema() } table.flagAsExistingInSchema() } return table } /** * Query the database to look up all indexes on a table, by column. * @see https://stackoverflow.com/a/2213199/4971138 * @param table * @protected */ protected async getIndexes(table: string): Promise> { const rawQuery = ` select t.relname as table_name, i.relname as index_name, a.attname as column_name, ix.* from pg_class t left join pg_attribute a on a.attrelid = t.oid left join pg_index ix on t.oid = ix.indrelid left join pg_class i on i.oid = ix.indexrelid left join pg_namespace n on n.oid = i.relnamespace where a.attnum = any(ix.indkey) and t.relkind = 'r' and t.relname = '${table}' and n.nspname = '${this.schema}' order by t.relname, i.relname; ` return (new Builder()).connection(this.connection) .raw(rawQuery) .get() .collect() } /** * Query the database to look up all constraints on a table, by column. * @see https://dba.stackexchange.com/a/290854 * @param table * @protected */ protected async getConstraints(table: string): Promise> { const rawQuery = ` SELECT * FROM ( SELECT pgc.contype AS constraint_type, pgc.conname AS constraint_name, ccu.table_schema AS table_schema, kcu.table_name AS table_name, CASE WHEN (pgc.contype = 'f') THEN kcu.COLUMN_NAME ELSE ccu.COLUMN_NAME END AS column_name, CASE WHEN (pgc.contype = 'f') THEN ccu.TABLE_NAME ELSE (null) END AS reference_table, CASE WHEN (pgc.contype = 'f') THEN ccu.COLUMN_NAME ELSE (null) END AS reference_col, CASE WHEN (pgc.contype = 'p') THEN 'yes' ELSE 'no' END AS auto_inc, CASE WHEN (pgc.contype = 'p') THEN 'NO' ELSE 'YES' END AS is_nullable, 'integer' AS data_type, '0' AS numeric_scale, '32' AS numeric_precision FROM pg_constraint AS pgc JOIN pg_namespace nsp ON nsp.oid = pgc.connamespace JOIN pg_class cls ON pgc.conrelid = cls.oid JOIN information_schema.key_column_usage kcu ON kcu.constraint_name = pgc.conname LEFT JOIN information_schema.constraint_column_usage ccu ON pgc.conname = ccu.CONSTRAINT_NAME AND nsp.nspname = ccu.CONSTRAINT_SCHEMA WHERE kcu.table_name = '${table}' UNION SELECT NULL AS constraint_type, NULL AS constraint_name, table_schema, table_name, column_name, NULL AS refrence_table, NULL AS refrence_col, 'no' AS auto_inc, is_nullable, data_type, numeric_scale, numeric_precision FROM information_schema.columns cols WHERE table_schema = '${this.schema}' AND table_name = '${table}' ) AS child ORDER BY table_name DESC ` return (new Builder()).connection(this.connection) .raw(rawQuery) .get() .collect() } /** * @see https://dataedo.com/kb/query/postgresql/list-table-check-constraints * @param table * @protected */ protected async getCheckConstraints(table: string): Promise> { const rawQuery = ` SELECT tc.table_schema, tc.table_name, ARRAY_AGG(col.column_name) AS columns, tc.constraint_name, cc.check_clause FROM information_schema.table_constraints tc JOIN information_schema.check_constraints cc ON tc.constraint_schema = cc.constraint_schema AND tc.constraint_name = cc.constraint_name JOIN pg_namespace nsp ON nsp.nspname = cc.constraint_schema JOIN pg_constraint pgc ON pgc.conname = cc.constraint_name AND pgc.connamespace = nsp.oid AND pgc.contype = 'c' JOIN information_schema.columns col ON col.table_schema = tc.table_schema AND col.table_name = tc.table_name AND col.ordinal_position = ANY(pgc.conkey) WHERE tc.constraint_schema NOT IN ('pg_catalog', 'information_schema') AND tc.table_schema = '${this.schema}' AND tc.table_name = '${table}' GROUP BY tc.table_schema, tc.table_name, tc.constraint_name, cc.check_clause ORDER BY tc.table_schema, tc.table_name ` return (new Builder()).connection(this.connection) .raw(rawQuery) .get() .collect() } /** * Query the database to look up all columns on a table. * @param table * @protected */ protected async getColumns(table: string): Promise> { return (new Builder()).connection(this.connection) .select(raw('*')) .from('information_schema.columns') .where('table_schema', '=', this.schema) .where('table_name', '=', table) .get() .collect() } }