You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
lib/src/orm/schema/PostgresSchema.ts

342 lines
12 KiB

import {Schema} from './Schema'
import {Awaitable, collect, Collection} from '../../util'
import {ConstraintType, TableBuilder} from './TableBuilder'
import {PostgresConnection} from '../connection/PostgresConnection'
import {Builder} from '../builder/Builder'
import {raw} from '../dialect/SQLDialect'
import {QueryRow} from '../types'
/**
* A PostgreSQL-compatible schema implementation.
*/
export class PostgresSchema extends Schema {
constructor(
connection: PostgresConnection,
public readonly schema: string = 'public',
) {
super(connection)
}
hasColumn(table: string, name: string): Awaitable<boolean> {
return (new Builder()).connection(this.connection)
.select(raw('*'))
.from('information_schema.columns')
.where('table_schema', '=', this.schema)
.where('table_name', '=', table)
.where('column_name', '=', name)
.exists()
}
async hasColumns(table: string, name: string[]): Promise<boolean> {
const num = await (new Builder()).connection(this.connection)
.select(raw('*'))
.from('information_schema.columns')
.where('table_schema', '=', this.schema)
.where('table_name', '=', table)
.whereIn('column_name', name)
.get()
.count()
return num === name.length
}
hasTable(name: string): Awaitable<boolean> {
return (new Builder()).connection(this.connection)
.select(raw('*'))
.from('information_schema.tables')
.where('table_schema', '=', this.schema)
.where('table_name', '=', name)
.exists()
}
async table(table: string): Promise<TableBuilder> {
return this.populateTable(new TableBuilder(table))
}
/**
* If the table for the given TableBuilder already exists in the
* database, fill in the columns, constraints, and indexes.
* @param table
* @protected
*/
protected async populateTable(table: TableBuilder): Promise<TableBuilder> {
if ( await this.hasTable(table.name) ) {
// Load the existing columns
const cols = await this.getColumns(table.name)
cols.each(col => {
table.column(col.column_name)
.type(col.data_type)
.pipe(line => {
return line
.when(col.is_nullable, builder => {
return builder.nullable()
})
.when(col.column_default, builder => {
return builder.default(raw(col.column_default))
})
})
})
// Load the existing constraints
const constraints = await this.getConstraints(table.name)
// Apply the unique constraints
const uniques = constraints.where('constraint_type', '=', 'u')
.sortBy('constraint_name')
.groupBy('constraint_name')
for ( const key in uniques ) {
if ( !Object.prototype.hasOwnProperty.call(uniques, key) ) {
continue
}
table.constraint(key)
.type(ConstraintType.Unique)
.tap(constraint => {
collect<{column_name: string}>(uniques[key]) // eslint-disable-line camelcase
.pluck('column_name')
.each(column => constraint.field(column))
})
.flagAsExistingInSchema()
}
// Apply the primary key constraints
constraints.where('constraint_type', '=', 'p')
.pipe(line => {
return line.when(c => c.count() > 0, pk => {
pk.each(constraint => {
table.column(constraint.column_name)
.primary()
})
return pk
})
})
// Apply the non-null constraints
// Builder columns are non-null by default, so mark the others as nullable
const nonNullable = constraints.filter(x => !x.constraint_type)
.where('is_nullable', '=', 'NO')
collect<string>(Object.keys(table.getColumns()))
.map(column => {
return {
column,
}
})
.whereNotIn('column', nonNullable.pluck('column_name'))
.pluck('column')
.each(column => {
table.column(column)
.nullable()
})
// Look up and apply the check constraints
const checkConstraints = await this.getCheckConstraints(table.name)
checkConstraints.each(constraint => {
table.constraint(constraint.constraint_name)
.type(ConstraintType.Check)
.expression(constraint.check_clause)
.flagAsExistingInSchema()
})
// Mark the columns as existing in the database
cols.each(col => {
table.column(col.column_name)
.flagAsExistingInSchema()
})
// Look up table indexes
const indexes = await this.getIndexes(table.name)
const groupedIndexes = indexes.groupBy('index_name')
for ( const key in groupedIndexes ) {
if ( !Object.prototype.hasOwnProperty.call(groupedIndexes, key) ) {
continue
}
table.index(key)
.pipe(builder => {
return builder
.peek(idx => {
collect<{column_name: string}>(groupedIndexes[key]) // eslint-disable-line camelcase
.pluck('column_name')
.each(col => idx.field(col))
})
.when(groupedIndexes[key]?.[0]?.indisprimary, idx => idx.primary())
.when(groupedIndexes[key]?.[0]?.indisunique, idx => idx.unique())
})
.flagAsExistingInSchema()
}
table.flagAsExistingInSchema()
}
return table
}
/**
* Query the database to look up all indexes on a table, by column.
* @see https://stackoverflow.com/a/2213199/4971138
* @param table
* @protected
*/
protected async getIndexes(table: string): Promise<Collection<QueryRow>> {
const rawQuery = `
select
t.relname as table_name,
i.relname as index_name,
a.attname as column_name,
ix.*
from pg_class t
left join pg_attribute a
on a.attrelid = t.oid
left join pg_index ix
on t.oid = ix.indrelid
left join pg_class i
on i.oid = ix.indexrelid
left join pg_namespace n
on n.oid = i.relnamespace
where
a.attnum = any(ix.indkey)
and t.relkind = 'r'
and t.relname = '${table}'
and n.nspname = '${this.schema}'
order by
t.relname,
i.relname;
`
return (new Builder()).connection(this.connection)
.raw(rawQuery)
.get()
.collect()
}
/**
* Query the database to look up all constraints on a table, by column.
* @see https://dba.stackexchange.com/a/290854
* @param table
* @protected
*/
protected async getConstraints(table: string): Promise<Collection<QueryRow>> {
const rawQuery = `
SELECT * FROM (
SELECT
pgc.contype AS constraint_type,
pgc.conname AS constraint_name,
ccu.table_schema AS table_schema,
kcu.table_name AS table_name,
CASE WHEN (pgc.contype = 'f') THEN kcu.COLUMN_NAME ELSE ccu.COLUMN_NAME END AS column_name,
CASE WHEN (pgc.contype = 'f') THEN ccu.TABLE_NAME ELSE (null) END AS reference_table,
CASE WHEN (pgc.contype = 'f') THEN ccu.COLUMN_NAME ELSE (null) END AS reference_col,
CASE WHEN (pgc.contype = 'p') THEN 'yes' ELSE 'no' END AS auto_inc,
CASE WHEN (pgc.contype = 'p') THEN 'NO' ELSE 'YES' END AS is_nullable,
'integer' AS data_type,
'0' AS numeric_scale,
'32' AS numeric_precision
FROM
pg_constraint AS pgc
JOIN pg_namespace nsp
ON nsp.oid = pgc.connamespace
JOIN pg_class cls
ON pgc.conrelid = cls.oid
JOIN information_schema.key_column_usage kcu
ON kcu.constraint_name = pgc.conname
LEFT JOIN information_schema.constraint_column_usage ccu
ON pgc.conname = ccu.CONSTRAINT_NAME
AND nsp.nspname = ccu.CONSTRAINT_SCHEMA
WHERE
kcu.table_name = '${table}'
UNION
SELECT
NULL AS constraint_type,
NULL AS constraint_name,
table_schema,
table_name,
column_name,
NULL AS refrence_table,
NULL AS refrence_col,
'no' AS auto_inc,
is_nullable,
data_type,
numeric_scale,
numeric_precision
FROM information_schema.columns cols
WHERE
table_schema = '${this.schema}'
AND table_name = '${table}'
) AS child
ORDER BY table_name DESC
`
return (new Builder()).connection(this.connection)
.raw(rawQuery)
.get()
.collect()
}
/**
* @see https://dataedo.com/kb/query/postgresql/list-table-check-constraints
* @param table
* @protected
*/
protected async getCheckConstraints(table: string): Promise<Collection<QueryRow>> {
const rawQuery = `
SELECT
tc.table_schema,
tc.table_name,
ARRAY_AGG(col.column_name) AS columns,
tc.constraint_name,
cc.check_clause
FROM information_schema.table_constraints tc
JOIN information_schema.check_constraints cc
ON tc.constraint_schema = cc.constraint_schema
AND tc.constraint_name = cc.constraint_name
JOIN pg_namespace nsp
ON nsp.nspname = cc.constraint_schema
JOIN pg_constraint pgc
ON pgc.conname = cc.constraint_name
AND pgc.connamespace = nsp.oid
AND pgc.contype = 'c'
JOIN information_schema.columns col
ON col.table_schema = tc.table_schema
AND col.table_name = tc.table_name
AND col.ordinal_position = ANY(pgc.conkey)
WHERE
tc.constraint_schema NOT IN ('pg_catalog', 'information_schema')
AND tc.table_schema = '${this.schema}'
AND tc.table_name = '${table}'
GROUP BY
tc.table_schema,
tc.table_name,
tc.constraint_name,
cc.check_clause
ORDER BY
tc.table_schema,
tc.table_name
`
return (new Builder()).connection(this.connection)
.raw(rawQuery)
.get()
.collect()
}
/**
* Query the database to look up all columns on a table.
* @param table
* @protected
*/
protected async getColumns(table: string): Promise<Collection<QueryRow>> {
return (new Builder()).connection(this.connection)
.select(raw('*'))
.from('information_schema.columns')
.where('table_schema', '=', this.schema)
.where('table_name', '=', table)
.get()
.collect()
}
}