import { AssociatedCollectionItem, Collection, CollectionIndex, CollectionItem, ComparisonFunction, DeterminesEquality, KeyFunction, KeyOperator, KeyReducerFunction, MaybeCollectionIndex, MaybeCollectionItem, } from './Collection' import {Iterable, StopIteration} from './Iterable' import {applyWhere, WhereOperator} from './where' import {AsyncPipe, Pipeline} from '../support/Pipe' import {Awaitable} from '../support/types' type AsyncCollectionComparable = CollectionItem[] | Collection | AsyncCollection type AsyncKeyFunction = (item: CollectionItem, index: number) => CollectionItem | Promise> type AsyncCollectionFunction = (items: AsyncCollection) => T2 /** * Like a collection, but asynchronous. */ export class AsyncCollection { constructor( /** * Iterable of items to base this collction on. * @type Iterable */ private storedItems: Iterable, /** * Size to use when chunking results for memory-optimization. * @type number */ private iteratorChunkSize: number = 1000, // TODO fix this. It's just for testing ) {} private async inChunks(callback: (items: Collection) => any): Promise { await this.storedItems.chunk(this.iteratorChunkSize, async items => { await callback(items) }) await this.storedItems.reset() } private async inChunksAll(key: KeyOperator, callback: (items: Collection) => any): Promise { await this.storedItems.chunk(this.iteratorChunkSize, async items => { if ( typeof key !== 'function' ) { // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore await callback(items.map(x => x[key])) return } await callback(items.map(key)) }) await this.storedItems.reset() } private async inChunksAllNumbers(key: KeyOperator, callback: (items: number[]) => any): Promise { await this.storedItems.chunk(this.iteratorChunkSize, async items => { if ( typeof key !== 'function' ) { // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore await callback(items.map(x => x[key]).map(x => Number(x)) .all()) return } await callback(items.map(key).map(x => Number(x)) .all()) }) await this.storedItems.reset() } private async inChunksAllAssociated(key: KeyOperator, callback: (items: AssociatedCollectionItem[]) => any): Promise { await this.storedItems.chunk(this.iteratorChunkSize, async items => { const assocItems: AssociatedCollectionItem[] = [] if ( typeof key === 'function' ) { items.map((item, index) => { const keyItem = key(item, index) assocItems.push({ key: keyItem, item }) }) } else { items.map(item => { assocItems.push({key: (item)[key], item}) }) } await callback(assocItems) }) await this.storedItems.reset() } /** * Get all items in this collection as an array. * @return Promise */ async all(): Promise[]> { return (await this.storedItems.all()).toArray() } /** * Get all items in this collection as a synchronous Collection * @return Promise */ async collect(): Promise> { return this.storedItems.all() } /** * Get the average value of the collection or one of its keys. * @param {KeyOperator} key * @return Promise */ async average(key?: KeyOperator): Promise { let runningTotal = 0 let runningItems = 0 const chunkHelper = (items: number[]) => { runningItems += items.length runningTotal += items.reduce((prev, curr) => prev + curr) } if ( key ) { await this.inChunksAllNumbers(key, chunkHelper) } else { await this.inChunks((items) => { chunkHelper(items.map(x => Number(x)).all()) }) } return runningTotal / runningItems } /** * Get the median value of the collection or one of its keys. * @param {KeyOperator} key * @return Promise */ async median(key?: KeyOperator): Promise { let items: number[] = [] const chunkHelper = (nextItems: number[]) => { items = items.concat(nextItems) } if ( key ) { await this.inChunksAllNumbers(key, chunkHelper) } else { await this.inChunks(chunkItems => { chunkHelper(chunkItems.map(x => Number(x)).all()) }) } items = items.sort((a, b) => a - b) const middle = Math.floor((items.length - 1) / 2) if ( items.length % 2 ) { return items[middle] } else { return (items[middle] + items[middle + 1]) / 2 } } /** * Get the mode value of the collection or one of its keys. * @param {KeyOperator} key * @return Promise */ async mode(key?: KeyOperator): Promise { const counts: any = {} const chunkHelper = (items: number[]) => { for ( const item of items ) { if ( !counts[item] ) { counts[item] = 1 } else { counts[item] += 1 } } } if ( key ) { await this.inChunksAllNumbers(key, chunkHelper) } else { await this.inChunks(items => { chunkHelper(items.map(x => Number(x)).all()) }) } return Number(Object.keys(counts).reduce((a, b) => counts[a] > counts[b] ? a : b)[0]) } /** * If this collection contains nested collections, collapse them to a single level. * @return Promise */ async collapse(): Promise> { const items = await this.collect() return items.collapse() as Collection } /** * Returns true if the collection contains an item satisfying the given collection. * @example * collection.contains('id', '>', 4) * @param {KeyOperator} key * @param {WhereOperator} operator * @param [operand] * @return Promise */ async contains(key: KeyOperator, operator: WhereOperator, operand?: unknown): Promise { let contains = false await this.inChunksAllAssociated(key, (items: AssociatedCollectionItem[]) => { const matches = applyWhere(items, operator, operand) if ( matches.length > 0 ) { contains = true throw new StopIteration() } }) return contains } /** * Returns a clean instance of this collection pointing to the same result set of the iterable. * @return Promise */ async clone(): Promise> { return new AsyncCollection(await this.storedItems.clone()) } /** * Returns the elements that are different between the two collections. * @param {AsyncCollectionComparable} items * @return Promise */ async diff(items: AsyncCollectionComparable): Promise> { const matches: T[] = [] await this.inChunks(async chunk => { for ( const item of chunk.all() ) { if ( !(await items.includes(item)) ) { matches.push(item) } } }) return new Collection(matches) } /** * Returns the elements that are different between the two collections, using the given function * as a comparator for the elements. * @param {AsyncCollectionComparable} items * @param {DeterminesEquality} compare * @return Promise */ async diffUsing(items: AsyncCollectionComparable, compare: DeterminesEquality): Promise> { const matches: T[] = [] await this.inChunks(async chunk => { for ( const item of chunk.all() ) { if ( !(await items.some(exc => compare(item, exc))) ) { matches.push(item) } } }) return new Collection(matches) } /** * Returns true if the given item is present in the collection. * @param item * @return Promise */ async includes(item: CollectionItem): Promise { let contains = false await this.inChunks(items => { if ( items.includes(item) ) { contains = true throw new StopIteration() } }) return contains } /** * Returns true if there is an item in the collection for which the given operator returns true. * @param {function} operator - item => boolean * @return Promise */ async some(operator: (item: T) => Awaitable): Promise { let contains = false await this.inChunks(async items => { for ( const item of items.all() ) { if ( await operator(item) ) { contains = true throw new StopIteration() } } }) return contains } /** * Applies a callback to each item in the collection. * @param {AsyncKeyFunction} func * @return Promise */ async each(func: AsyncKeyFunction): Promise { let index = 0 await this.inChunks(async items => { for ( const item of items.all() ) { await func(item, index) index += 1 } }) } /** * Applies a callback to each item in the collection and returns the results as a collection. * @param {AsyncKeyFunction} func * @return Promise */ async map(func: AsyncKeyFunction): Promise> { const newItems: CollectionItem[] = [] await this.each(async (item, index) => { newItems.push(await func(item, index)) }) return new Collection(newItems) } /** * Create a new collection by mapping the items in this collection using the given function, * excluding any for which the function resolves undefined. * @param func */ async partialMap(func: AsyncKeyFunction): Promise>> { const newItems: CollectionItem>[] = [] await this.each(async (item, index) => { const result = await func(item, index) if ( typeof result !== 'undefined' ) { newItems.push(result as unknown as NonNullable) } }) return new Collection>(newItems) } /** * Returns true if the given operator returns true for every item in the collection. * @param {AsyncKeyFunction} func * @return Promise */ async every(func: AsyncKeyFunction): Promise { let pass = true let index = 0 await this.inChunks(async items => { for ( const item of items.all() ) { if ( !(await func(item, index)) ) { pass = false throw new StopIteration() } index += 1 } }) return pass } /** * Returns true if every item in the collection satisfies the given where clause. * @param {KeyOperator} key * @param {WhereOperator} operator * @param [operand] */ async everyWhere(key: KeyOperator, operator: WhereOperator, operand?: unknown): Promise { let pass = true await this.inChunks(async items => { pass = pass && items.everyWhere(key, operator, operand) if ( !pass ) { throw new StopIteration() } }) return pass } /** * Applies a filter to every item in the collection and returns the results that pass the filter. * @param {KeyFunction} func * @return Promise */ async filter(func: KeyFunction): Promise> { let newItems: CollectionItem[] = [] await this.inChunks(async items => { const filterItems: CollectionItem[] = [] for ( let i = 0; i < items.length; i += 1 ) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const item = items.get(i)! if ( await func(item, i) ) { filterItems.push(item) } } newItems = newItems.concat(filterItems) }) return new Collection(newItems) } /** * Like filter, but inverted. That is, filters out items that DO match the criterion. * @param func */ async filterOut(func: KeyFunction): Promise> { return this.filter(async (...args) => !(await func(...args))) } /** * Calls the passed in function if the boolean condition is true. Allows for functional syntax. * @param {boolean} bool * @param {AsyncCollectionFunction} then * @return AsyncCollection */ when(bool: boolean, then: AsyncCollectionFunction): AsyncCollection { if ( bool ) { then(this) } return this } /** * Calls the passed in function if the boolean condition is false. Allows for functional syntax. * @param {boolean} bool * @param {AsyncCollectionFunction} then * @return AsyncCollection */ unless(bool: boolean, then: AsyncCollectionFunction): AsyncCollection { if ( !bool ) { then(this) } return this } /** * Applies the given where condition to the collection and returns a new collection of the results. * @param {KeyOperator} key * @param {WhereOperator} operator * @param [operand] * @return Promise */ async where(key: KeyOperator, operator: WhereOperator, operand?: unknown): Promise> { let newItems: CollectionItem[] = [] await this.inChunks(async items => { newItems = newItems.concat(items.where(key, operator, operand).all()) }) return new Collection(newItems) } /** * Applies the given where condition to the collection and returns a new collection of the items * that did not satisfy the condition. * @param {KeyOperator} key * @param {WhereOperator} operator * @param [operand] * @return Promise */ async whereNot(key: KeyOperator, operator: WhereOperator, operand?: unknown): Promise> { let newItems: CollectionItem[] = [] await this.inChunks(async items => { newItems = newItems.concat(items.whereNot(key, operator, operand).all()) }) return new Collection(newItems) } /** * Applies a WHERE ... IN ... condition to the collection an returns a new collection of the results. * @param {KeyOperator} key * @param {AsyncCollectionComparable} items * @return Promise */ async whereIn(key: KeyOperator, items: AsyncCollectionComparable): Promise> { const newItems: CollectionItem[] = [] await this.inChunksAllAssociated(key, async chunk => { for ( const item of chunk ) { if ( await items.includes(item.key) ) { newItems.push(item.item) } } }) return new Collection(newItems) } /** * Applies a WHERE ... IN ... condition to the collection and returns a new collection of the items * that did not satisfy the condition. * @param {KeyOperator} key * @param {AsyncCollectionComparable} items * @return Promise */ async whereNotIn(key: KeyOperator, items: AsyncCollectionComparable): Promise> { const newItems: CollectionItem[] = [] await this.inChunksAllAssociated(key, async chunk => { for ( const item of chunk ) { if ( !(await items.includes(item.key)) ) { newItems.push(item.item) } } }) return new Collection(newItems) } /** * Returns the first item in the collection, if one exists. * @return Promise */ async first(): Promise> { return this.storedItems.at(0) } /** * Return the first item in the collection that satisfies the given where condition, if one exists. * @param {KeyOperator} key * @param {WhereOperator} [operator = '='] * @param [operand = true] * @return Promise */ async firstWhere(key: KeyOperator, operator: WhereOperator = '=', operand: any = true): Promise> { let item = undefined await this.inChunksAllAssociated(key, async items => { const matches = applyWhere(items, operator, operand) if ( matches.length > 0 ) { item = matches[0] throw new StopIteration() } }) return item } /** * Return the first item in the collection that does not satisfy the given where condition, if one exists. * @param {KeyOperator} key * @param {WhereOperator} [operator = '='] * @param [operand = true] */ async firstWhereNot(key: KeyOperator, operator: WhereOperator = '=', operand: any = true): Promise> { let item: MaybeCollectionItem = undefined await this.inChunks(async items => { const matches = items.whereNot(key, operator, operand) if ( matches.length > 0 ) { item = matches.first() throw new StopIteration() } }) return item } /** * Returns the number of elements in this collection. * @return Promise */ async count(): Promise { return this.storedItems.count() } /** * Returns the number of elements in this collection. * @return Promise */ async length(): Promise { return this.storedItems.count() } /** * Get the item at the given index of this collection, if one exists. * If none exists and a fallback value is provided, that value will be returned. * @param {number} index * @param [fallback] * @return Promise */ async get(index: number, fallback?: T): Promise { if ( (await this.count()) > index ) { return this.storedItems.at(index) } else { return fallback } } /** * Get the item at the given index of this collection, if one exists. * @param {number} index */ async at(index: number): Promise> { return this.get(index) } /** * Return an object which maps key values to arrays of items in the collection that satisfy that value. * @param {KeyOperator} key * @return Promise */ async groupBy(key: KeyOperator): Promise { return (await this.collect()).groupBy(key) } /** * Return an object mapping the given key value to items in this collection. * @param {KeyOperator} key * @return Promise */ async associate(key: KeyOperator): Promise { return (await this.collect()).associate(key) } /** * Join the items in this collection with the given delimiter. * @example * await collection.join(',') // => '1,2,3,4' * @param {string} delimiter * @return Promise */ async join(delimiter: string): Promise { const runningStrings: string[] = [] await this.inChunks(async items => { runningStrings.push(items.join(delimiter)) }) return runningStrings.join(delimiter) } /** * Join the items in this collection with the given delimiter. * @example * await collection.implode(',') // => '1,2,3,4' * @param {string} delimiter * @return Promise */ async implode(delimiter: string): Promise { return this.join(delimiter) } // TODO intersect /** * Returns true if there are no items in this collection. * @return Promise */ async isEmpty(): Promise { return (await this.storedItems.count()) < 1 } /** * Returns true if there is at least one item in this collection. * @return Promise */ async isNotEmpty(): Promise { return (await this.storedItems.count()) > 0 } /** * Return the last item in this collection, if one exists. * @return Promise */ async last(): Promise> { const length = await this.storedItems.count() if ( length > 0 ) { return this.storedItems.at(length - 1) } } /** * Return the last item in this collection which satisfies the given where condition, if one exists. * @param {KeyOperator} key * @param {WhereOperator} operator * @param [operand] * @return Promise */ async lastWhere(key: KeyOperator, operator: WhereOperator, operand?: unknown): Promise> { return (await this.where(key, operator, operand)).last() } /** * Return the last item in this collection which does not satisfy the given condition, if one exists. * @param {KeyOperator} key * @param {WhereOperator} operator * @param [operand] * @return Promise */ async lastWhereNot(key: KeyOperator, operator: WhereOperator, operand?: unknown): Promise> { return (await this.whereNot(key, operator, operand)).last() } /** * Builds a collection of the values of a given key for each item in this collection. * @example * // collection has { a: 1 }, { a: 2 }, { a: 3 } * await collection.pluck('a') // => [1, 2, 3] * @param {KeyOperator} key * @return Promise */ async pluck(key: T2): Promise> { let newItems: CollectionItem[] = [] await this.inChunksAll(key, async items => { // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore newItems = newItems.concat(items.all()) }) return new Collection(newItems) } /** * Return the max value of the given key. * @param {KeyOperator} key * @return Promise */ async max(key: KeyOperator): Promise { let runningMax: number | undefined = undefined await this.inChunksAllNumbers(key, async items => { const localMax = Math.max(...items) if ( typeof runningMax === 'undefined' ) { runningMax = localMax } else { runningMax = Math.max(runningMax, localMax) } }) return runningMax } /** * Return a collection of items that have the max value of the given key. * @param {KeyOperator} key * @return Promise */ async whereMax(key: KeyOperator): Promise> { return this.where(key, '=', await this.max(key)) } /** * Return the min value of the given key. * @param {KeyOperator} key * @return Promise */ async min(key: KeyOperator): Promise { let runningMin: number | undefined = undefined await this.inChunksAllNumbers(key, async items => { const localMin = Math.min(...items) if ( typeof runningMin === 'undefined' ) { runningMin = localMin } else { runningMin = Math.min(runningMin, localMin) } }) return runningMin } /** * Return a collection of items that have the min value of the given key. * @param {KeyOperator} key * @return Promise */ async whereMin(key: KeyOperator): Promise> { return this.where(key, '=', await this.min(key)) } /** * Merge the two collections. * @param {AsyncCollectionComparable} mergeWith * @return Promise */ async merge(mergeWith: AsyncCollectionComparable): Promise> { let items: T2[] = [] if ( mergeWith instanceof Collection ) { items = await mergeWith.all() } else if ( mergeWith instanceof AsyncCollection ) { items = await mergeWith.all() } else if ( Array.isArray(mergeWith) ) { items = mergeWith } return new Collection([...items, ...await this.all()]) } /** * Return a collection of every nth item in this collection. * @param {number} n * @return Promise */ async nth(n: number): Promise> { const matches: CollectionItem[] = [] let current = 1 await this.inChunks(async chunk => { for ( const item of chunk.all() ) { if ( current === 1 ) { matches.push(item) } current += 1 if ( current > n ) { current = 1 } } }) return new Collection(matches) } /** * Return a collection containing the items that would be on the given page, with the given number of items per page. * @param {number} page * @param {number} perPage */ async forPage(page: number, perPage: number): Promise> { const start = page * perPage - perPage const end = page * perPage - 1 return this.storedItems.range(start, end) } /** * Return a new Pipe of this collection. */ pipeTo(pipeline: Pipeline): TOut { return pipeline.apply(this) } /** Build and apply a pipeline. */ pipe(builder: (pipeline: Pipeline) => Pipeline): TOut { return builder(Pipeline.id()).apply(this) } /** * Return a new AsyncPipe of this collection. */ asyncPipe(): AsyncPipe> { return AsyncPipe.wrap(this) } /* async pop(): Promise> { const nextItem = await this.storedItems.next() if ( !nextItem.done ) { return nextItem.value } }*/ // TODO Fix this /** * Get n random items from this collection. * @todo add safety check for it loop exceeds max number of items * @param {number} n * @return Promise */ async random(n: number): Promise> { const randomItems: CollectionItem[] = [] const fetchedIndices: number[] = [] const maxN = await this.storedItems.count() if ( n > maxN ) { n = maxN } while ( randomItems.length < n ) { const index = Math.floor(Math.random() * maxN) if ( !fetchedIndices.includes(index) ) { fetchedIndices.push(index) const item = await this.storedItems.at(index) if ( typeof item !== 'undefined' ) { randomItems.push(item) } } } return new Collection(randomItems) } /** * Collapse the collection into a single value using a reducer function. * @param {KeyReducerFunction} reducer * @param [initialValue] * @return Promise */ async reduce(reducer: KeyReducerFunction, initialValue?: T2): Promise { let currentValue = initialValue let index = 0 await this.inChunks(async items => { for ( const item of items.all() ) { currentValue = reducer(currentValue, item, index) index += 1 } }) return currentValue } /** * Returns a collection of items that fail the truth test. * @param {AsyncKeyFunction} truthTestFunction * @return Promise */ async reject(truthTestFunction: AsyncKeyFunction): Promise> { let rejected: CollectionItem[] = [] await this.inChunks(async items => { rejected = rejected.concat(items.all().filter((item, index) => { return !truthTestFunction(item, index) })) }) return new Collection(rejected) } /** * Get a reversed collection of this collection's items. * @return Promise */ async reverse(): Promise> { return (await this.collect()).reverse() } /** * Search the collection and return the index of that item, if one exists. * @param {CollectionItem} item * @return Promise */ async search(item: CollectionItem): Promise { let foundIndex let index = 0 await this.inChunks(async items => { items.some(possibleItem => { if ( possibleItem === item ) { foundIndex = index throw new StopIteration() } index += 1 return false }) }) return foundIndex } /** * Get the next item in the collection and remove it. * @return Promise */ async shift(): Promise> { const nextItem = await this.storedItems.next() if ( !nextItem.done ) { return nextItem.value } } /** * Shuffle the items in the collection to a random order. * @return Promise */ async shuffle(): Promise> { return (await this.collect()).shuffle() } /** * Return a slice of this collection. * @param {number} start - the starting index * @param {number} end - the ending index * @return Promise */ async slice(start: number, end: number): Promise> { return this.storedItems.range(start, end - 1) } /** * Sort the collection, optionally with the given comparison function. * @param {ComparisonFunction} comparisonFunction * @return Promise */ async sort(comparisonFunction?: ComparisonFunction): Promise> { return (await this.collect()).sort(comparisonFunction) } /** * Sort the collection by the given key. * @param {KeyOperator} key * @return Promise */ async sortBy(key?: KeyOperator): Promise> { return (await this.collect()).sortBy(key) } /** * Reverse sort the collection, optionally with the given comparison function. * @param {ComparisonFunction} comparisonFunction * @return Promise */ async sortDesc(comparisonFunction?: ComparisonFunction): Promise> { return (await this.collect()).sortDesc(comparisonFunction) } /** * Reverse sort the collection by the given key. * @param {KeyOperator} key * @return Promise */ async sortByDesc(key?: KeyOperator): Promise> { return (await this.collect()).sortByDesc(key) } /** * Splice the collection at the given index. Optionally, removing the given number of items. * @param {CollectionIndex} start * @param {number} [deleteCount] * @return Promise */ async splice(start: CollectionIndex, deleteCount?: number): Promise> { return (await this.collect()).splice(start, deleteCount) } /** * Sum the items in the collection, or the values of the given key. * @param {KeyOperator} key * @return Promise */ async sum(key?: KeyOperator): Promise { let runningSum = 0 const chunkHandler = (items: number[]) => { for ( const item of items ) { runningSum += item } } if ( key ) { await this.inChunksAllNumbers(key, chunkHandler) } else { await this.inChunks(async chunk => { chunkHandler(chunk.map(x => Number(x)).all()) }) } return runningSum } /** * Take the first n items from the front or back of the collection. * @param {number} limit * @return Promise */ async take(limit: number): Promise> { if ( limit === 0 ) { return new Collection() } else if ( limit > 0 ) { return this.slice(0, limit) } else { const cnt = await this.storedItems.count() return this.storedItems.range(cnt - (-1 * limit), cnt - 1) } } /** * Call the given function, passing in this collection. Allows functional syntax. * @param {AsyncCollectionFunction} func * @return Promise */ async tap(func: AsyncCollectionFunction): Promise> { await func(this) return this } /** * Return all the unique values in the collection, or the unique values of the given key. * @param {KeyOperator} key * @return Promise */ async unique(key?: KeyOperator): Promise> { const has: CollectionItem[] = [] if ( !key ) { await this.inChunks(async items => { for ( const item of items.all() ) { if ( !has.includes(item) ) { has.push(item) } } }) } else { await this.inChunksAll(key, async items => { for ( const item of items.all() ) { if ( !has.includes(item) ) { has.push(item) } } }) } return new Collection(has) } /** * Cast this collection to an array. * @return Promise */ async toArray(): Promise { const returns: any = [] for ( const item of (await this.all()) ) { if ( item instanceof Collection ) { returns.push(item.toArray()) } else if ( item instanceof AsyncCollection ) { returns.push(await item.toArray()) } else { returns.push(item) } } return returns } /** * Cast this collection to a JSON string. * @param [replacer] - the replacer to use * @param {number} [space = 4] number of indentation spaces to use */ async toJSON(replacer = undefined, space = 4): Promise { return JSON.stringify(this.toArray(), replacer, space) } /** * Get a clone of the underlying iterator of this collection. * @return Iterable */ iterator(): Iterable { return this.storedItems.clone() } }