You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

179 lines
5.8 KiB

import {QueryResult} from '../../db/types.ts'
import {make} from '../../../../di/src/global.ts'
import Database from '../../service/Database.ts'
import {logger} from '../../../../lib/src/service/logging/global.ts'
import {Connection} from '../../db/Connection.ts'
import {ResultCollection} from './result/ResultCollection.ts'
import {ResultIterable} from './result/ResultIterable.ts'
import ResultOperator from './result/ResultOperator.ts'
import {collect, Collection} from '../../../../lib/src/collection/Collection.ts'
import NoTargetOperatorError from '../../error/NoTargetOperatorError.ts'
/**
* Base class for a query that can be executed in a database connection.
* @abstract
*/
export default abstract class ConnectionExecutable<T> {
/**
* Render the query to raw SQL, starting with the base indentation level.
* @param {number} level
* @return string
*/
abstract sql(level: number): string
/**
* Cast the query to an SQL statement which counts the incoming rows.
* @return string
*/
to_count(): string {
return `SELECT COUNT(*) AS to_count FROM (${this.sql(0)}) AS target_query`
}
/**
* Get the result row for this query at index i.
* @param {number} i
* @return Promise<any>
*/
async get_row(i: number): Promise<T | undefined> {
if ( !(this.__target_connection instanceof Connection) ) {
throw new Error('Unable to execute database item: no target connection.')
}
const query = `SELECT * FROM (${this.sql(0)}) AS target_query OFFSET ${i} LIMIT 1`
const result = await this.__target_connection.query(query)
const row = result.rows.first()
if ( row ) {
if ( !this.__target_operator ) throw new NoTargetOperatorError()
const inflated = this.__target_operator.inflate_row(row)
await this.__target_operator.process_eager_loads(this, collect([inflated]))
return inflated
}
}
/**
* Get a range of resultant rows for this query between the start and end indices.
* @param {string} start
* @param {string} end
* @return Promise<Collection>
*/
async get_range(start: number, end: number): Promise<Collection<T>> {
if ( !(this.__target_connection instanceof Connection) ) {
throw new Error('Unable to execute database item: no target connection.')
}
const query = `SELECT * FROM (${this.sql(0)}) AS target_query OFFSET ${start} LIMIT ${(end - start) + 1}`
const result = await this.__target_connection.query(query)
const inflated = result.rows.map(row => {
if ( !this.__target_operator ) throw new NoTargetOperatorError()
return this.__target_operator.inflate_row(row)
})
if ( !this.__target_operator ) throw new NoTargetOperatorError()
await this.__target_operator.process_eager_loads(this, inflated)
return inflated
}
/**
* Get an iterator for this result set.
* @return ResultIterable
*/
iterator(): ResultIterable<T> {
return new ResultIterable<T>(this)
}
/**
* Get the results as an async collection, with the processing chunk size.
* @param {number} chunk_size
* @return ResultCollection
*/
results(chunk_size = 1000) {
return new ResultCollection<T>(this.iterator(), chunk_size)
}
/**
* The database connection to execute the statement in.
* @type Connection
*/
__target_connection?: Connection
/**
* The result operator to use to process the incoming rows.
* @type ResultOperator
*/
__target_operator?: ResultOperator<T>
/**
* Set the target connection.
* @param {string|Connection} connection - the connection or connection name
* @return ConnectionExecutable
*/
target_connection(connection: string | Connection) {
this.__target_connection = typeof connection === 'string' ? make(Database).connection(connection) : connection
return this
}
/**
* Set the target operator.
* @param {ResultOperator} operator
* @return ConnectionExecutable
*/
target_operator(operator: ResultOperator<T>) {
this.__target_operator = operator
return this
}
/**
* Execute the query and get back the raw result.
* @return Promise<QueryResult>
*/
async execute(): Promise<QueryResult> {
if ( !(this.__target_connection instanceof Connection) ) {
throw new Error('Unable to execute database item: no target connection.')
}
return this.execute_in_connection(this.__target_connection)
}
/**
* Count the number of returned rows.
* @return Promise<number>
*/
async count(): Promise<number> {
if ( !(this.__target_connection instanceof Connection) ) {
throw new Error('Unable to execute database item: no target connection.')
}
const result = await this.__target_connection.query(this.to_count())
const row = result.rows.first()
if ( row ) return Number(row.to_count)
return 0
}
/**
* True if the number of rows returned is greater than 0.
* @return Promise<boolean>
*/
async exists(): Promise<boolean> {
return (await this.count()) > 0
}
/**
* Execute the query in the given connection and return the raw result.
* @param {string|Connection} connection - the connection or connection name
* @return Promise<QueryResult>
*/
async execute_in_connection(connection: string | Connection): Promise<QueryResult> {
const conn = typeof connection === 'string' ? make(Database).connection(connection) : connection
logger.debug(`Executing statement in connection: ${conn.name}`)
const sql = this.sql(0)
logger.verbose(sql)
return conn.query(sql)
}
}