import { AssociatedCollectionItem, Collection, CollectionIndex, CollectionItem, ComparisonFunction, DeterminesEquality, KeyFunction, KeyOperator, KeyReducerFunction, MaybeCollectionIndex, MaybeCollectionItem } from './Collection.ts' import {Iterable, StopIteration} from './Iterable.ts' import {applyWhere, WhereOperator} from './Where.ts' type AsyncCollectionComparable = CollectionItem[] | Collection | AsyncCollection type AsyncKeyFunction = (item: CollectionItem, index: number) => CollectionItem | Promise> type AsyncCollectionFunction = (items: AsyncCollection) => T2 /** * Like a collection, but asynchronous. */ export class AsyncCollection { constructor( /** * Iterable of items to base this collction on. * @type Iterable */ private _items: Iterable, /** * Size to use when chunking results for memory-optimization. * @type number */ private _chunk_size: number = 1000, // TODO fix this. It's just for testing ) {} private async _chunk(callback: (items: Collection) => any): Promise { await this._items.chunk(this._chunk_size, async items => { await callback(items) }) await this._items.reset() } private async _chunk_all(key: KeyOperator, callback: (items: Collection) => any): Promise { await this._items.chunk(this._chunk_size, async items => { await callback(items.pluck(key)) }) await this._items.reset() } private async _chunk_all_numbers(key: KeyOperator, callback: (items: number[]) => any): Promise { await this._items.chunk(this._chunk_size, async items => { await callback(items.pluck(key).map(x => Number(x)).all()) }) await this._items.reset() } private async _chunk_all_associate(key: KeyOperator, callback: (items: AssociatedCollectionItem[]) => any): Promise { await this._items.chunk(this._chunk_size, async items => { const assoc_items: AssociatedCollectionItem[] = [] if ( typeof key === 'function' ) { items.map((item, index) => { const key_item = key(item, index) assoc_items.push({ key: key_item, item }) }) } else if ( typeof key === 'string' ) { items.map((item, index) => { assoc_items.push({ key: (item)[key], item }) }) } await callback(assoc_items) }) await this._items.reset() } /** * Get all items in this collection as an array. * @return Promise */ async all(): Promise[]> { return (await this._items.from_range(0, await this._items.count())).all() } /** * Get all items in this collection as a synchronous Collection * @return Promise */ async collect(): Promise> { return this._items.from_range(0, await this._items.count()) } /** * Get the average value of the collection or one of its keys. * @param {KeyOperator} key * @return Promise */ async average(key?: KeyOperator): Promise { let running_total = 0 let running_items = 0 const chunk_helper = (items: number[]) => { running_items += items.length running_total += items.reduce((prev, curr) => prev + curr) } if ( key ) await this._chunk_all_numbers(key, chunk_helper) else await this._chunk((items) => { chunk_helper(items.map(x => Number(x)).all()) }) return running_total / running_items } /** * Get the median value of the collection or one of its keys. * @param {KeyOperator} key * @return Promise */ async median(key?: KeyOperator): Promise { let items: number[] = [] const chunk_helper = (next_items: number[]) => { items = items.concat(next_items) } if ( key ) await this._chunk_all_numbers(key, chunk_helper) else await this._chunk(items => { chunk_helper(items.map(x => Number(x)).all()) }) items = items.sort((a, b) => a - b) const middle = Math.floor((items.length - 1) / 2) if ( items.length % 2 ) return items[middle] else return (items[middle] + items[middle + 1]) / 2 } /** * Get the mode value of the collection or one of its keys. * @param {KeyOperator} key * @return Promise */ async mode(key?: KeyOperator): Promise { let counts: any = {} const chunk_helper = (items: number[]) => { for ( const item of items ) { if ( !counts[item] ) counts[item] = 1 else counts[item] += 1 } } if ( key ) await this._chunk_all_numbers(key, chunk_helper) else await this._chunk(items => { chunk_helper(items.map(x => Number(x)).all()) }) return Number(Object.keys(counts).reduce((a, b) => counts[a] > counts[b] ? a : b)[0]) } /** * If this collection contains nested collections, collapse them to a single level. * @return Promise */ async collapse(): Promise> { const items = await this.collect() return items.collapse() as Collection } /** * Returns true if the collection contains an item satisfying the given collection. * @example * collection.contains('id', '>', 4) * @param {KeyOperator} key * @param {WhereOperator} operator * @param [operand] * @return Promise */ async contains(key: KeyOperator, operator: WhereOperator, operand?: any): Promise { let contains = false await this._chunk_all_associate(key, (items: AssociatedCollectionItem[]) => { const matches = applyWhere(items, operator, operand) if ( matches.length > 0 ) { contains = true throw new StopIteration() } }) return contains } /** * Returns a clean instance of this collection pointing to the same result set of the iterable. * @return Promise */ async clone(): Promise> { return new AsyncCollection(await this._items.clone()) } /** * Returns the elements that are different between the two collections. * @param {AsyncCollectionComparable} items * @return Promise */ async diff(items: AsyncCollectionComparable): Promise> { const matches: T[] = [] await this._chunk(async chunk => { for ( const item of chunk.all() ) { if ( !(await items.includes(item)) ) { matches.push(item) } } }) return new Collection(matches) } /** * Returns the elements that are different between the two collections, using the given function * as a comparator for the elements. * @param {AsyncCollectionComparable} items * @param {DeterminesEquality} compare * @return Promise */ async diffUsing(items: AsyncCollectionComparable, compare: DeterminesEquality): Promise> { const matches: T[] = [] await this._chunk(async chunk => { for ( const item of chunk.all() ) { if ( !(await items.some(exc => compare(item, exc))) ) matches.push(item) } }) return new Collection(matches) } /** * Returns true if the given item is present in the collection. * @param item * @return Promise */ async includes(item: CollectionItem): Promise { let contains = false await this._chunk(items => { if ( items.includes(item) ) { contains = true throw new StopIteration() } }) return contains } /** * Returns true if there is an item in the collection for which the given operator returns true. * @param {function} operator - item => boolean * @return Promise */ async some(operator: (item: T) => boolean): Promise { let contains = false await this._chunk(items => { for ( const item of items.all() ) { if ( operator(item) ) { contains = true throw new StopIteration() } } }) return contains } /** * Applies a callback to each item in the collection. * @param {AsyncKeyFunction} func * @return Promise */ async each(func: AsyncKeyFunction): Promise { let index = 0 await this._chunk(async items => { for ( const item of items.all() ) { await func(item, index) index += 1 } }) } /** * Applies a callback to each item in the collection and returns the results as a collection. * @param {AsyncKeyFunction} func * @return Promise */ async map(func: AsyncKeyFunction): Promise> { const new_items: CollectionItem[] = [] await this.each(async (item, index) => { new_items.push(await func(item, index)) }) return new Collection(new_items) } /** * Returns true if the given operator returns true for every item in the collection. * @param {AsyncKeyFunction} func * @return Promise */ async every(func: AsyncKeyFunction): Promise { let pass = true let index = 0 await this._chunk(async items => { for ( const item of items.all() ) { if ( !(await func(item, index)) ) { pass = false throw new StopIteration() } index += 1 } }) return pass } /** * Returns true if every item in the collection satisfies the given where clause. * @param {KeyOperator} key * @param {WhereOperator} operator * @param [operand] */ async everyWhere(key: KeyOperator, operator: WhereOperator, operand?: any): Promise { let pass = true await this._chunk(async items => { pass = pass && items.everyWhere(key, operator, operand) if ( !pass ) { throw new StopIteration() } }) return pass } /** * Applies a filter to every item in the collection and returns the results that pass the filter. * @param {KeyFunction} func * @return Promise */ async filter(func: KeyFunction): Promise> { let new_items: CollectionItem[] = [] await this._chunk(async items => { new_items = new_items.concat(items.filter(func).all()) }) return new Collection(new_items) } /** * Calls the passed in function if the boolean condition is true. Allows for functional syntax. * @param {boolean} bool * @param {AsyncCollectionFunction} then * @return AsyncCollection */ when(bool: boolean, then: AsyncCollectionFunction): AsyncCollection { if ( bool ) then(this) return this } /** * Calls the passed in function if the boolean condition is false. Allows for functional syntax. * @param {boolean} bool * @param {AsyncCollectionFunction} then * @return AsyncCollection */ unless(bool: boolean, then: AsyncCollectionFunction): AsyncCollection { if ( !bool ) then(this) return this } /** * Applies the given where condition to the collection and returns a new collection of the results. * @param {KeyOperator} key * @param {WhereOperator} operator * @param [operand] * @return Promise */ async where(key: KeyOperator, operator: WhereOperator, operand?: any): Promise> { let new_items: CollectionItem[] = [] await this._chunk(async items => { new_items = new_items.concat(items.where(key, operator, operand).all()) }) return new Collection(new_items) } /** * Applies the given where condition to the collection and returns a new collection of the items * that did not satisfy the condition. * @param {KeyOperator} key * @param {WhereOperator} operator * @param [operand] * @return Promise */ async whereNot(key: KeyOperator, operator: WhereOperator, operand?: any): Promise> { let new_items: CollectionItem[] = [] await this._chunk(async items => { new_items = new_items.concat(items.whereNot(key, operator, operand).all()) }) return new Collection(new_items) } /** * Applies a WHERE ... IN ... condition to the collection an returns a new collection of the results. * @param {KeyOperator} key * @param {AsyncCollectionComparable} items * @return Promise */ async whereIn(key: KeyOperator, items: AsyncCollectionComparable): Promise> { let new_items: CollectionItem[] = [] await this._chunk_all_associate(key,async chunk => { for ( const item of chunk ) { if ( await items.includes(item.key) ) { new_items.push(item.item) } } }) return new Collection(new_items) } /** * Applies a WHERE ... IN ... condition to the collection and returns a new collection of the items * that did not satisfy the condition. * @param {KeyOperator} key * @param {AsyncCollectionComparable} items * @return Promise */ async whereNotIn(key: KeyOperator, items: AsyncCollectionComparable): Promise> { let new_items: CollectionItem[] = [] await this._chunk_all_associate(key,async chunk => { for ( const item of chunk ) { if ( !(await items.includes(item.key)) ) { new_items.push(item.item) } } }) return new Collection(new_items) } /** * Returns the first item in the collection, if one exists. * @return Promise */ async first(): Promise> { if ( await this._items.count() > 0 ) { return this._items.at_index(0) } } /** * Return the first item in the collection that satisfies the given where condition, if one exists. * @param {KeyOperator} key * @param {WhereOperator} [operator = '='] * @param [operand = true] * @return Promise */ async firstWhere(key: KeyOperator, operator: WhereOperator = '=', operand: any = true): Promise> { let item = undefined await this._chunk_all_associate(key, async items => { const matches = applyWhere(items, operator, operand) if ( matches.length > 0 ) { item = matches[0] throw new StopIteration() } }) return item } /** * Return the first item in the collection that does not satisfy the given where condition, if one exists. * @param {KeyOperator} key * @param {WhereOperator} [operator = '='] * @param [operand = true] */ async firstWhereNot(key: KeyOperator, operator: WhereOperator = '=', operand: any = true): Promise> { let item: MaybeCollectionItem = undefined await this._chunk(async items => { const matches = items.whereNot(key, operator, operand) if ( matches.length > 0 ) { item = matches.first() throw new StopIteration() } }) return item } /** * Returns the number of elements in this collection. * @return Promise */ async count() { return this._items.count() } /** * Returns the number of elements in this collection. * @return Promise */ async length() { return this._items.count() } /** * Get the item at the given index of this collection, if one exists. * If none exists and a fallback value is provided, that value will be returned. * @param {number} index * @param [fallback] * @return Promise */ async get(index: number, fallback?: any) { if ( (await this.count()) > index ) return this._items.at_index(index) else return fallback } /** * Get the item at the given index of this collection, if one exists. * @param {number} index */ async at(index: number): Promise> { return this.get(index) } /** * Return an object which maps key values to arrays of items in the collection that satisfy that value. * @param {KeyOperator} key * @return Promise */ async groupBy(key: KeyOperator): Promise { return (await this.collect()).groupBy(key) } /** * Return an object mapping the given key value to items in this collection. * @param {KeyOperator} key * @return Promise */ async associate(key: KeyOperator): Promise { return (await this.collect()).associate(key) } /** * Join the items in this collection with the given delimiter. * @example * await collection.join(',') // => '1,2,3,4' * @param {string} delimiter * @return Promise */ async join(delimiter: string): Promise { let running_strings: string[] = [] await this._chunk(async items => { running_strings.push(items.join(delimiter)) }) return running_strings.join(delimiter) } /** * Join the items in this collection with the given delimiter. * @example * await collection.implode(',') // => '1,2,3,4' * @param {string} delimiter * @return Promise */ async implode(delimiter: string): Promise { return this.join(delimiter) } // TODO intersect /** * Returns true if there are no items in this collection. * @return Promise */ async isEmpty(): Promise { return (await this._items.count()) < 1 } /** * Returns true if there is at least one item in this collection. * @return Promise */ async isNotEmpty(): Promise { return (await this._items.count()) > 0 } /** * Return the last item in this collection, if one exists. * @return Promise */ async last(): Promise> { const length = await this._items.count() if ( length > 0 ) return this._items.at_index(length - 1) } /** * Return the last item in this collection which satisfies the given where condition, if one exists. * @param {KeyOperator} key * @param {WhereOperator} operator * @param [operand] * @return Promise */ async lastWhere(key: KeyOperator, operator: WhereOperator, operand?: any): Promise> { return (await this.where(key, operator, operand)).last() } /** * Return the last item in this collection which does not satisfy the given condition, if one exists. * @param {KeyOperator} key * @param {WhereOperator} operator * @param [operand] * @return Promise */ async lastWhereNot(key: KeyOperator, operator: WhereOperator, operand?: any): Promise> { return (await this.whereNot(key, operator, operand)).last() } /** * Builds a collection of the values of a given key for each item in this collection. * @example * // collection has { a: 1 }, { a: 2 }, { a: 3 } * await collection.pluck('a') // => [1, 2, 3] * @param {KeyOperator} key * @return Promise */ async pluck(key: KeyOperator): Promise> { let new_items: CollectionItem[] = [] await this._chunk_all(key, async items => { new_items = new_items.concat(items.all()) }) return new Collection(new_items) } /** * Return the max value of the given key. * @param {KeyOperator} key * @return Promise */ async max(key: KeyOperator): Promise { let running_max: number await this._chunk_all_numbers(key, async items => { const local_max = Math.max(...items) if ( typeof running_max === 'undefined' ) running_max = local_max else running_max = Math.max(running_max, local_max) }) // @ts-ignore return running_max } /** * Return a collection of items that have the max value of the given key. * @param {KeyOperator} key * @return Promise */ async whereMax(key: KeyOperator): Promise> { return this.where(key, '=', await this.max(key)) } /** * Return the min value of the given key. * @param {KeyOperator} key * @return Promise */ async min(key: KeyOperator): Promise { let running_min: number await this._chunk_all_numbers(key, async items => { const local_min = Math.min(...items) if ( typeof running_min === 'undefined' ) running_min = local_min else running_min = Math.min(running_min, local_min) }) // @ts-ignore return running_min } /** * Return a collection of items that have the min value of the given key. * @param {KeyOperator} key * @return Promise */ async whereMin(key: KeyOperator): Promise> { return this.where(key, '=', await this.min(key)) } /** * Merge the two collections. * @param {AsyncCollectionComparable} merge_with * @return Promise */ async merge(merge_with: AsyncCollectionComparable): Promise> { let items: T2[] if ( merge_with instanceof Collection ) items = await merge_with.all() else if ( merge_with instanceof AsyncCollection ) items = await merge_with.all() else if ( Array.isArray(merge_with) ) items = merge_with // @ts-ignore return new Collection([...items, ...await this.all()]) } /** * Return a collection of every nth item in this collection. * @param {number} n * @return Promise */ async nth(n: number): Promise> { const matches: CollectionItem[] = [] let current = 1 await this._chunk(async chunk => { for ( const item of chunk.all() ) { if ( current === 1 ) matches.push(item) current += 1 if ( current > n ) current = 1 } }) return new Collection(matches) } /** * Return a collection containing the items that would be on the given page, with the given number of items per page. * @param {number} page * @param {number} per_page */ async forPage(page: number, per_page: number): Promise> { const start = page * per_page - per_page const end = page * per_page - 1 return this._items.from_range(start, end) } /** * Return the value of the function, passing this collection to it. * @param {AsyncCollectionFunction} func */ pipe(func: AsyncCollectionFunction): any { return func(this) } /*async pop(): Promise> { const next_item = await this._items.next() if ( !next_item.done ) { return next_item.value } }*/ // TODO Fix this /** * Get n random items from this collection. * @todo add safety check for it loop exceeds max number of items * @param {number} n * @return Promise */ async random(n: number): Promise> { const random_items: CollectionItem[] = [] const fetched_indices: number[] = [] const max_n = await this._items.count() if ( n > max_n ) n = max_n while ( random_items.length < n ) { const index = Math.floor(Math.random() * max_n) if ( !fetched_indices.includes(index) ) { fetched_indices.push(index) const item = await this._items.at_index(index) if ( typeof item !== 'undefined' ) random_items.push(item) } } return new Collection(random_items) } /** * Collapse the collection into a single value using a reducer function. * @param {KeyReducerFunction} reducer * @param [initial_value] * @return Promise */ async reduce(reducer: KeyReducerFunction, initial_value?: T2): Promise { let current_value = initial_value let index = 0 await this._chunk(async items => { for ( const item of items.all() ) { current_value = reducer(current_value, item, index) index += 1 } }) return current_value } /** * Returns a collection of items that fail the truth test. * @param {AsyncKeyFunction} truth_test * @return Promise */ async reject(truth_test: AsyncKeyFunction): Promise> { let rejected: CollectionItem[] = [] await this._chunk(async items => { rejected = rejected.concat(items.all().filter((item, index) => { return !truth_test(item, index) })) }) return new Collection(rejected) } /** * Get a reversed collection of this collection's items. * @return Promise */ async reverse(): Promise> { return (await this.collect()).reverse() } /** * Search the collection and return the index of that item, if one exists. * @param {CollectionItem} item * @return Promise */ async search(item: CollectionItem): Promise { let found_index let index = 0 await this._chunk(async items => { items.some(possible_item => { if ( possible_item === item ) { found_index = index throw new StopIteration() } index += 1 return false }) }) return found_index } /** * Get the next item in the collection and remove it. * @return Promise */ async shift(): Promise> { const next_item = await this._items.next() if ( !next_item.done ) { return next_item.value } } /** * Shuffle the items in the collection to a random order. * @return Promise */ async shuffle(): Promise> { return (await this.collect()).shuffle() } /** * Return a slice of this collection. * @param {number} start - the starting index * @param {number} end - the ending index * @return Promise */ async slice(start: number, end: number): Promise> { return this._items.from_range(start, end - 1) } /** * Sort the collection, optionally with the given comparison function. * @param {ComparisonFunction} compare_func * @return Promise */ async sort(compare_func?: ComparisonFunction): Promise> { return (await this.collect()).sort(compare_func) } /** * Sort the collection by the given key. * @param {KeyOperator} key * @return Promise */ async sortBy(key?: KeyOperator): Promise> { return (await this.collect()).sortBy(key) } /** * Reverse sort the collection, optionally with the given comparison function. * @param {ComparisonFunction} compare_func * @return Promise */ async sortDesc(compare_func?: ComparisonFunction): Promise> { return (await this.collect()).sortDesc(compare_func) } /** * Reverse sort the collection by the given key. * @param {KeyOperator} key * @return Promise */ async sortByDesc(key?: KeyOperator): Promise> { return (await this.collect()).sortByDesc(key) } /** * Splice the collection at the given index. Optionally, removing the given number of items. * @param {CollectionIndex} start * @param {number} [deleteCount] * @return Promise */ async splice(start: CollectionIndex, deleteCount?: number): Promise> { return (await this.collect()).splice(start, deleteCount) } /** * Sum the items in the collection, or the values of the given key. * @param {KeyOperator} key * @return Promise */ async sum(key?: KeyOperator): Promise { let running_sum: number = 0 const chunk_handler = (items: number[]) => { for ( const item of items ) { running_sum += item } } if ( key ) await this._chunk_all_numbers(key, chunk_handler) else await this._chunk(async chunk => { chunk_handler(chunk.map(x => Number(x)).all()) }) return running_sum } /** * Take the first n items from the front or back of the collection. * @param {number} limit * @return Promise */ async take(limit: number): Promise> { if ( limit === 0 ) return new Collection() else if ( limit > 0 ) { return this.slice(0, limit) } else { const cnt = await this._items.count() return this._items.from_range(cnt - (-1 * limit), cnt - 1) } } /** * Call the given function, passing in this collection. Allows functional syntax. * @param {AsyncCollectionFunction} func * @return Promise */ async tap(func: AsyncCollectionFunction): Promise> { await func(this) return this } /** * Return all the unique values in the collection, or the unique values of the given key. * @param {KeyOperator} key * @return Promise */ async unique(key?: KeyOperator): Promise> { const has: CollectionItem[] = [] if ( !key ) { await this._chunk(async items => { for ( const item of items.all() ) { if ( !has.includes(item) ) has.push(item) } }) } else { await this._chunk_all(key, async items => { for ( const item of items.all() ) { if ( !has.includes(item) ) has.push(item) } }) } return new Collection(has) } /** * Cast this collection to an array. * @return Promise */ async toArray(): Promise { const returns: any = [] for ( const item of (await this.all()) ) { if ( item instanceof Collection ) returns.push(item.toArray()) else if ( item instanceof AsyncCollection ) returns.push(await item.toArray()) else returns.push(item) } return returns } /** * Cast this collection to a JSON string. * @param [replacer] - the replacer to use * @param {number} [space = 4] number of indentation spaces to use */ async toJSON(replacer = undefined, space = 4): Promise { return JSON.stringify(this.toArray(), replacer, space) } /** * Iterator to support async iteration: * * @example * for await (const item of collection) {} */ [Symbol.asyncIterator]() { return this._items.clone() } /** * Get a clone of the underlying iterator of this collection. * @return Iterable */ iterator() { return this._items.clone() } }