import { AssociatedCollectionItem, Collection, CollectionIndex, CollectionItem, ComparisonFunction, DeterminesEquality, KeyFunction, KeyOperator, KeyReducerFunction, MaybeCollectionIndex, MaybeCollectionItem } from './Collection.ts' import {Iterable, StopIteration} from './Iterable.ts' import {applyWhere, WhereOperator} from './Where.ts' type AsyncCollectionComparable = CollectionItem[] | Collection | AsyncCollection type AsyncKeyFunction = (item: CollectionItem, index: number) => CollectionItem | Promise> type AsyncCollectionFunction = (items: AsyncCollection) => T2 export class AsyncCollection { constructor( private _items: Iterable, private _chunk_size: number = 1000, // TODO fix this. It's just for testing ) {} private async _chunk(callback: (items: Collection) => any): Promise { await this._items.chunk(this._chunk_size, async items => { await callback(items) }) await this._items.reset() } private async _chunk_all(key: KeyOperator, callback: (items: Collection) => any): Promise { await this._items.chunk(this._chunk_size, async items => { await callback(items.pluck(key)) }) await this._items.reset() } private async _chunk_all_numbers(key: KeyOperator, callback: (items: number[]) => any): Promise { await this._items.chunk(this._chunk_size, async items => { await callback(items.pluck(key).map(x => Number(x)).all()) }) await this._items.reset() } private async _chunk_all_associate(key: KeyOperator, callback: (items: AssociatedCollectionItem[]) => any): Promise { await this._items.chunk(this._chunk_size, async items => { const assoc_items: AssociatedCollectionItem[] = [] if ( typeof key === 'function' ) { items.map((item, index) => { const key_item = key(item, index) assoc_items.push({ key: key_item, item }) }) } else if ( typeof key === 'string' ) { items.map((item, index) => { assoc_items.push({ key: (item)[key], item }) }) } await callback(assoc_items) }) await this._items.reset() } async all(): Promise[]> { return (await this._items.from_range(0, await this._items.count())).all() } async collect(): Promise> { return this._items.from_range(0, await this._items.count()) } async average(key?: KeyOperator): Promise { let running_total = 0 let running_items = 0 const chunk_helper = (items: number[]) => { running_items += items.length running_total += items.reduce((prev, curr) => prev + curr) } if ( key ) await this._chunk_all_numbers(key, chunk_helper) else await this._chunk((items) => { chunk_helper(items.map(x => Number(x)).all()) }) return running_total / running_items } async median(key?: KeyOperator): Promise { let items: number[] = [] const chunk_helper = (next_items: number[]) => { items = items.concat(next_items) } if ( key ) await this._chunk_all_numbers(key, chunk_helper) else await this._chunk(items => { chunk_helper(items.map(x => Number(x)).all()) }) items = items.sort((a, b) => a - b) const middle = Math.floor((items.length - 1) / 2) if ( items.length % 2 ) return items[middle] else return (items[middle] + items[middle + 1]) / 2 } async mode(key?: KeyOperator): Promise { let counts: any = {} const chunk_helper = (items: number[]) => { for ( const item of items ) { if ( !counts[item] ) counts[item] = 1 else counts[item] += 1 } } if ( key ) await this._chunk_all_numbers(key, chunk_helper) else await this._chunk(items => { chunk_helper(items.map(x => Number(x)).all()) }) return Number(Object.keys(counts).reduce((a, b) => counts[a] > counts[b] ? a : b)[0]) } async collapse(): Promise> { const items = await this.collect() return items.collapse() as Collection } async contains(key: KeyOperator, operator: WhereOperator, operand?: any): Promise { let contains = false await this._chunk_all_associate(key, (items: AssociatedCollectionItem[]) => { const matches = applyWhere(items, operator, operand) if ( matches.length > 0 ) { contains = true throw new StopIteration() } }) return contains } async clone(): Promise> { return new AsyncCollection(await this._items.clone()) } async diff(items: AsyncCollectionComparable): Promise> { const matches: T[] = [] await this._chunk(async chunk => { for ( const item of chunk.all() ) { if ( !(await items.includes(item)) ) { matches.push(item) } } }) return new Collection(matches) } async diffUsing(items: AsyncCollectionComparable, compare: DeterminesEquality): Promise> { const matches: T[] = [] await this._chunk(async chunk => { for ( const item of chunk.all() ) { if ( !(await items.some(exc => compare(item, exc))) ) matches.push(item) } }) return new Collection(matches) } async includes(item: CollectionItem): Promise { let contains = false await this._chunk(items => { if ( items.includes(item) ) { contains = true throw new StopIteration() } }) return contains } async some(operator: (item: T) => boolean): Promise { let contains = false await this._chunk(items => { for ( const item of items.all() ) { if ( operator(item) ) { contains = true throw new StopIteration() } } }) return contains } async each(func: AsyncKeyFunction): Promise { let index = 0 await this._chunk(async items => { for ( const item of items.all() ) { await func(item, index) index += 1 } }) } async map(func: AsyncKeyFunction): Promise> { const new_items: CollectionItem[] = [] await this.each(async (item, index) => { new_items.push(await func(item, index)) }) return new Collection(new_items) } async every(func: AsyncKeyFunction): Promise { let pass = true let index = 0 await this._chunk(async items => { for ( const item of items.all() ) { if ( !(await func(item, index)) ) { pass = false throw new StopIteration() } index += 1 } }) return pass } async everyWhere(key: KeyOperator, operator: WhereOperator, operand?: any): Promise { let pass = true await this._chunk(async items => { pass = pass && items.everyWhere(key, operator, operand) if ( !pass ) { throw new StopIteration() } }) return pass } async filter(func: KeyFunction): Promise> { let new_items: CollectionItem[] = [] await this._chunk(async items => { new_items = new_items.concat(items.filter(func).all()) }) return new Collection(new_items) } when(bool: boolean, then: AsyncCollectionFunction): AsyncCollection { if ( bool ) then(this) return this } unless(bool: boolean, then: AsyncCollectionFunction): AsyncCollection { if ( !bool ) then(this) return this } async where(key: KeyOperator, operator: WhereOperator, operand?: any): Promise> { let new_items: CollectionItem[] = [] await this._chunk(async items => { new_items = new_items.concat(items.where(key, operator, operand).all()) }) return new Collection(new_items) } async whereNot(key: KeyOperator, operator: WhereOperator, operand?: any): Promise> { let new_items: CollectionItem[] = [] await this._chunk(async items => { new_items = new_items.concat(items.whereNot(key, operator, operand).all()) }) return new Collection(new_items) } async whereIn(key: KeyOperator, items: AsyncCollectionComparable): Promise> { let new_items: CollectionItem[] = [] await this._chunk_all_associate(key,async chunk => { for ( const item of chunk ) { if ( await items.includes(item.key) ) { new_items.push(item.item) } } }) return new Collection(new_items) } async whereNotIn(key: KeyOperator, items: AsyncCollectionComparable): Promise> { let new_items: CollectionItem[] = [] await this._chunk_all_associate(key,async chunk => { for ( const item of chunk ) { if ( !(await items.includes(item.key)) ) { new_items.push(item.item) } } }) return new Collection(new_items) } async first(): Promise> { if ( await this._items.count() > 0 ) { return this._items.at_index(0) } } async firstWhere(key: KeyOperator, operator: WhereOperator = '=', operand: any = true): Promise> { let item = undefined await this._chunk_all_associate(key, async items => { const matches = applyWhere(items, operator, operand) if ( matches.length > 0 ) { item = matches[0] throw new StopIteration() } }) return item } async firstWhereNot(key: KeyOperator, operator: WhereOperator = '=', operand: any = true): Promise> { let item: MaybeCollectionItem = undefined await this._chunk(async items => { const matches = items.whereNot(key, operator, operand) if ( matches.length > 0 ) { item = matches.first() throw new StopIteration() } }) return item } async count() { return this._items.count() } async length() { return this._items.count() } async get(index: number, fallback?: any) { if ( (await this.count()) > index ) return this._items.at_index(index) else return fallback } async at(index: number): Promise> { return this.get(index) } async groupBy(key: KeyOperator): Promise { return (await this.collect()).groupBy(key) } async associate(key: KeyOperator): Promise { return (await this.collect()).associate(key) } async join(delimiter: string): Promise { let running_strings: string[] = [] await this._chunk(async items => { running_strings.push(items.join(delimiter)) }) return running_strings.join(delimiter) } async implode(delimiter: string): Promise { return this.join(delimiter) } // TODO intersect async isEmpty(): Promise { return (await this._items.count()) < 1 } async isNotEmpty(): Promise { return (await this._items.count()) > 0 } async last(): Promise> { const length = await this._items.count() if ( length > 0 ) return this._items.at_index(length - 1) } async lastWhere(key: KeyOperator, operator: WhereOperator, operand?: any): Promise> { return (await this.where(key, operator, operand)).last() } async lastWhereNot(key: KeyOperator, operator: WhereOperator, operand?: any): Promise> { return (await this.whereNot(key, operator, operand)).last() } async pluck(key: KeyOperator): Promise> { let new_items: CollectionItem[] = [] await this._chunk_all(key, async items => { new_items = new_items.concat(items.all()) }) return new Collection(new_items) } async max(key: KeyOperator): Promise { let running_max: number await this._chunk_all_numbers(key, async items => { const local_max = Math.max(...items) if ( typeof running_max === 'undefined' ) running_max = local_max else running_max = Math.max(running_max, local_max) }) // @ts-ignore return running_max } async whereMax(key: KeyOperator): Promise> { return this.where(key, '=', await this.max(key)) } async min(key: KeyOperator): Promise { let running_min: number await this._chunk_all_numbers(key, async items => { const local_min = Math.min(...items) if ( typeof running_min === 'undefined' ) running_min = local_min else running_min = Math.min(running_min, local_min) }) // @ts-ignore return running_min } async whereMin(key: KeyOperator): Promise> { return this.where(key, '=', await this.min(key)) } async merge(merge_with: AsyncCollectionComparable): Promise> { let items: T2[] if ( merge_with instanceof Collection ) items = await merge_with.all() else if ( merge_with instanceof AsyncCollection ) items = await merge_with.all() else if ( Array.isArray(merge_with) ) items = merge_with // @ts-ignore return new Collection([...items, ...await this.all()]) } async nth(n: number): Promise> { const matches: CollectionItem[] = [] let current = 1 await this._chunk(async chunk => { for ( const item of chunk.all() ) { if ( current === 1 ) matches.push(item) current += 1 if ( current > n ) current = 1 } }) return new Collection(matches) } async forPage(page: number, per_page: number): Promise> { const start = page * per_page - per_page const end = page * per_page - 1 return this._items.from_range(start, end) } pipe(func: AsyncCollectionFunction): any { return func(this) } /*async pop(): Promise> { const next_item = await this._items.next() if ( !next_item.done ) { return next_item.value } }*/ // TODO Fix this async random(n: number): Promise> { const random_items: CollectionItem[] = [] const fetched_indices: number[] = [] const max_n = await this._items.count() if ( n > max_n ) n = max_n while ( random_items.length < n ) { const index = Math.floor(Math.random() * max_n) if ( !fetched_indices.includes(index) ) { fetched_indices.push(index) random_items.push(await this._items.at_index(index)) } } return new Collection(random_items) } async reduce(reducer: KeyReducerFunction, initial_value?: T2): Promise { let current_value = initial_value let index = 0 await this._chunk(async items => { for ( const item of items.all() ) { current_value = reducer(current_value, item, index) index += 1 } }) return current_value } async reject(truth_test: AsyncKeyFunction): Promise> { let rejected: CollectionItem[] = [] await this._chunk(async items => { rejected = rejected.concat(items.all().filter((item, index) => { return !truth_test(item, index) })) }) return new Collection(rejected) } async reverse(): Promise> { return (await this.collect()).reverse() } async search(item: CollectionItem): Promise { let found_index let index = 0 await this._chunk(async items => { items.some(possible_item => { if ( possible_item === item ) { found_index = index throw new StopIteration() } index += 1 return false }) }) return found_index } async shift(): Promise> { const next_item = await this._items.next() if ( !next_item.done ) { return next_item.value } } async shuffle(): Promise> { return (await this.collect()).shuffle() } async slice(start: number, end: number): Promise> { return this._items.from_range(start, end - 1) } async sort(compare_func?: ComparisonFunction): Promise> { return (await this.collect()).sort(compare_func) } async sortBy(key?: KeyOperator): Promise> { return (await this.collect()).sortBy(key) } async sortDesc(compare_func?: ComparisonFunction): Promise> { return (await this.collect()).sortDesc(compare_func) } async sortByDesc(key?: KeyOperator): Promise> { return (await this.collect()).sortByDesc(key) } async splice(start: CollectionIndex, deleteCount?: number): Promise> { return (await this.collect()).splice(start, deleteCount) } async sum(key?: KeyOperator): Promise { let running_sum: number = 0 const chunk_handler = (items: number[]) => { for ( const item of items ) { running_sum += item } } if ( key ) await this._chunk_all_numbers(key, chunk_handler) else await this._chunk(async chunk => { chunk_handler(chunk.map(x => Number(x)).all()) }) return running_sum } async take(limit: number): Promise> { if ( limit === 0 ) return new Collection() else if ( limit > 0 ) { return this.slice(0, limit) } else { const cnt = await this._items.count() return this._items.from_range(cnt - (-1 * limit), cnt - 1) } } async tap(func: AsyncCollectionFunction): Promise> { await func(this) return this } async unique(key?: KeyOperator): Promise> { const has: CollectionItem[] = [] if ( !key ) { await this._chunk(async items => { for ( const item of items.all() ) { if ( !has.includes(item) ) has.push(item) } }) } else { await this._chunk_all(key, async items => { for ( const item of items.all() ) { if ( !has.includes(item) ) has.push(item) } }) } return new Collection(has) } async toArray(): Promise { const returns: any = [] for ( const item of (await this.all()) ) { if ( item instanceof Collection ) returns.push(item.toArray()) else if ( item instanceof AsyncCollection ) returns.push(await item.toArray()) else returns.push(item) } return returns } async toJSON(replacer = undefined, space = 4): Promise { return JSON.stringify(this.toArray(), replacer, space) } [Symbol.asyncIterator]() { return this._items.clone() } iterator() { return this._items.clone() } }