From 063809e44613264341d673a97f5279b57d8e50da Mon Sep 17 00:00:00 2001 From: garrettmills Date: Mon, 30 Dec 2024 21:36:12 -0500 Subject: [PATCH] [WIP] Start tinkering with IMAP --- bun.lockb | Bin 13883 -> 15431 bytes index.ts | 82 +- package.json | 2 + src/bones/Pipe.ts | 264 ++++ src/bones/Reactive.ts | 210 +++ src/bones/collection/ArrayIterable.ts | 33 + src/bones/collection/AsyncCollection.ts | 1151 ++++++++++++++ .../collection/AsyncGeneratorIterable.ts | 139 ++ src/bones/collection/Collection.ts | 1378 +++++++++++++++++ src/bones/collection/Iterable.ts | 135 ++ src/bones/collection/where.ts | 110 ++ src/config.ts | 1 + src/mail/read.ts | 0 src/mail/replies.ts | 133 ++ src/types.ts | 15 + 15 files changed, 3651 insertions(+), 2 deletions(-) create mode 100644 src/bones/Pipe.ts create mode 100644 src/bones/Reactive.ts create mode 100644 src/bones/collection/ArrayIterable.ts create mode 100644 src/bones/collection/AsyncCollection.ts create mode 100644 src/bones/collection/AsyncGeneratorIterable.ts create mode 100644 src/bones/collection/Collection.ts create mode 100644 src/bones/collection/Iterable.ts create mode 100644 src/bones/collection/where.ts create mode 100644 src/mail/read.ts create mode 100644 src/mail/replies.ts diff --git a/bun.lockb b/bun.lockb index 93f63c1cba7ec9eadf5c0450dbb29bef43a885ef..960e3dbb02259c574bf153fab280d8fb96c6a9e9 100755 GIT binary patch delta 3456 zcmbtW3s6+&6~2#M;4Ut(JY->csKEtRxGWEO_*gA@>8>W=BMVMsA_^77K*Tg^qc9<< zkwlB0OzJDf)CAP2#YbpKkT6=usFg;olZhRTiP|xys6^4KnPR_l*{jUdOgiH~bMAM~ zf6o7($Nm3vPkZI(iyPJCJvYC6`}vHqDbrh*ot}HqR(!KdTyfQ|G`(`WG%4Bs8vS{+ zppB$8-uT*pq<*IV86*gGORMH+Og^52z#jqz0y}^Tuwuci>hj8}S8A$e&#%GUEwNv( z8)$-jxzvyIYD!;(Sg5Xev5K)n7KBL1YpN?N7G(%RGx))^m{^OK2wn&b2QC0wfF;0S zU?I>9d=khG<@rzrvSSHAHoVK{Kr@gXt*j`0sTziX3n~_r37dWM@A&lI2oQu?-mo4M zY%m?lnqjQwjkf|>J`D5hh!x0&%JEw-s`o3^Od;ZJC%AkpXU9?4-D?UStJkGKVrpCiGK!Ddm0PU9&sZJhbcKK#m;?J{d zsWn6=AJDw){jaE+J&5f<#rJQ57;k!M~8zUhGY_27wLLs-w zCABEj0O(Q31aO+@D4@zjra+gp*F4HGp4j;i(N9D{f;;>7cT(7p)HgfTgnf=64a5pg5D!M?`5H0ej%#{ZK7J%nACHtlj{gx;sQK}*6tm+$m%-XcPvIkF zknWie)o$4M()0<9H*d84d;7Jwi_g6A+`&V4i`}sat-snmwSISFThFoh3s2$gSaSDNoLSJwnkWYa=46_e^dD*%2=J&%|KEIRe z$bR?ol!J~W{M5=i2hU&?4`rty1*$E72>7E z7Beji@rZ$R1YA<6nNlqt(M(G$Uiu2$S#ZIW9O|Vv!_2fQ)FWExG`QSwGmQ=Nh+(uc z%uC;ay82}1)D~r?4y#9e zj6|zfOduu7D-Newc-rX`JQK+heT}N4$Lz7iNOW8Z-y0Kq+mD>KgQf!CWuKEoaS5(C z{^))8`Uk%!f*O4)DaoV`A4cmE!fR2s1(aazV)G^EO2Fj}|3>~X3;h4X|4|45;r}Uw zg7|_5fVc>7q2Ou~1>%2bENCc*?Z<$IfTBV1ATEbC5SL9f16&TnwAwisOi?l(7%2|+ zt(M*8yUO=?2#EdUx`2#nm5k3dS2W(2ZL%%in{Da7aLMJ^*jM&hFWc+~o8)rMey~3r z7stnOB3^14Zk(a#fOS}RppAic*(S%U$BwwP9B?VI``$SxFF5ePIFLr6b~{HvIre9t z;hGB5b$Y8Hi6xn8mWq<9w_|!!0ZF>*RMT_P^RWi*)#+^rB$!uovE>$fbU_Ei_(9d_ zU5U1EqzX%iAmRQ;?_0EmPBjNT+)u8l7ipi}8l(4CM_OE;&Y3o5h$K4IY&8?hHjC6_ zpCmPlOI=bFU{ABp=kAJcii z1et=8tTB2I_{DFVdruwztQ!iMh!H{L$@D_9ReDmU=b@+fkS@phulK$lV`CMF03FC}dzGDCliz=GAHCeoxF_4@nMugk-HuUt@2rkwv;*fB##1 zaOct@C}cqaPIkx?nw;VDLGpRzOT_<|FdIcWBD1+uJ!1hIRj& zZ+v4d;Ux+kb68{a9#-!uC2{#kH6vTi68aUSauY3gShMwBcI#i)EqQ<5$qMM@syGc! z==Glly*q^@1Lr71&F$a86oORnC~(qMM?R?|GNLcx8nMH#=Z-ukE)MFe^Qp9qrllji zvEvslsV=KYTQIAlvU=9SnzDtJWs4S-E$n}%6C>Pt11Hi_(+0As+96gK{c>sPteUdy ztkiim{o>hGbIKNCwJm-Ce}Azi#af(WvZTe^?k|SM{rA*#j+y0qZ7HYyV;^`3tp`l1 WcK0YKtBray<0vA{wzo6uRq?;DPuE8P delta 2450 zcmcImS!`5g6#nm>rE{mfE!}6MEv1xp+D>PN?$BvLQJ4WOK6qP}inw*48o#%+n;Z4d#R)CnN1y-Q6(Ybux1;um5RJPrH^FqO{%Q-Hm|#BRKF;w1st1XO`*fo!Nw$ECnn@P$A& zJPsp_qd*1N9Nw_G9q+)FaLaRISf3x%_q`c2t#2QABKBtk+1`s5Ar`UXkMP0eE_>elYj<^tfqjxU!ox38;SZ%0V$RyOf~WinXKl3 zX|X|w5;|pED0dpD&m2%DF!2QZlIgUmTyYwOr~`|kM02^k$wBF;fumzM1*}m(4V( z1f(=ltTl373x}p)Vy?~^5pywL=Zw5RPvtw6I|Y_Z_H>P!(p1`>9+FaN0?d}KQjskrrPEfMMqhwk0<%-T zU863WO1tbK$w?Q%^6V;AJ3`WI>UU~%0qh5`OsWKX&7smS&X6>R7P~a6aH{mSDIR%G2aTkCT;*r)s z5ZqW{f$&{JW#wwi1(B<9BojGJIh9eiMKXj7r3+$*pwtT|1f^X#Abb~5?jv^=RW@=L z`M#p^in@7ta6VXa;YapTb}b9S_m#7q6NmM(9@fovSU>Ay{p=$r6Z_5AjQuv#(frIt z_LMzl&!fMBEwVQ=HO^_)|k({>Id8^WBWA{P&gYi=rY=hAxjV~%J~EI$&zYUPQM zY9Nn=x!`w(?Uf1 z<5#c0^YhuSN2jo|z~keUI+>;lZSq=~ZWh_(CYiE|Y-Q1Qd;a{P6WY7+c3ly?jC5qU zb=)v@DAjTIL#+6r2rC0Jwc=>Bh5tHrul3g8*hFLlZ2i^bd-q?-O*po#URSz+{|?Zn zMgFJ{x<`5({#(49H0DWnMZ5SNHG4R<_P|Y9+KJDP%t9Zh_+pzY+Qp*{SeAB_di-Uc z5>chlT5O26^MhaR*}8w@nJ_j~cuGA!uYiW##Ws0bk=4t^7o)v^P8KhrSzfPW1Ye8) vA@o+1o|L-c?;X&$94w{PUX51yob;Wyc<7q%Vhp`el|kQDelT>Vs#E$CL0+%3 diff --git a/index.ts b/index.ts index 9716130..3c65225 100644 --- a/index.ts +++ b/index.ts @@ -1,4 +1,82 @@ - import { config } from "./src/config.ts"; +import {ImapFlow, type MailboxLockObject} from "imapflow"; +import { extract } from 'letterparser' +import {ReplyParser} from "./src/mail/replies.ts"; +import type {Message} from "./src/types.ts"; +import type {Awaitable} from "./src/bones"; +import {AsyncCollection} from "./src/bones/collection/AsyncCollection.ts"; +import {AsyncGeneratorIterable} from "./src/bones/collection/AsyncGeneratorIterable.ts"; -console.log(config) +export async function withMessageClient(cb: (c: ImapFlow) => Awaitable): Promise { + const client = new ImapFlow(config.mail.imap) + await client.connect() + + try { + return cb(client) + } finally { + await client.logout() + } +} + +export async function getFolders(): Promise { + return withMessageClient(async client => { + const list = await client.list() + return list.map(l => l.name) + }) +} + +export const getMessagesForMailbox = (box: string): AsyncCollection => { + return new AsyncCollection( + new AsyncGeneratorIterable( + () => getMessagesForMailboxOld(box))) +} + +export async function* getMessagesForMailboxOld(box: string): AsyncGenerator { + const client = new ImapFlow(config.mail.imap) + await client.connect() + + try { + let lock: MailboxLockObject + try { + lock = await client.getMailboxLock(box) + } catch (e: unknown) { + // This is usually because the mailbox does not exist, so yield nothing + console.warn(`Error when opening mailbox lock for mailbox "${box}":\n`, e) + return + } + + try { + await client.mailboxOpen(box) + + const messages = client.fetch('1:*', { + envelope: true, + source: true, + uid: true, + bodyParts: ['text'], + }) + + for await ( const message of messages ) { + const source = message.source.toString('utf-8') + const content = ReplyParser.parseReply(extract(source).text || '') + + const msg: Message = { + id: message.envelope.messageId, + date: message.envelope.date, + recipients: message.envelope.to.map(x => x.address || '').filter(Boolean), + from: message.envelope.from[0], + subject: message.envelope.subject, + content, + } + + yield msg + } + } finally { + lock.release() + } + } finally { + await client.logout() + } +} + +const c = getMessagesForMailbox('e12a') +console.log(await c.all()) \ No newline at end of file diff --git a/package.json b/package.json index 218b3df..a85dee4 100644 --- a/package.json +++ b/package.json @@ -10,7 +10,9 @@ }, "dependencies": { "@types/imapflow": "^1.0.19", + "@types/mailparser": "^3.4.5", "imapflow": "^1.0.171", + "letterparser": "^0.1.8", "zod": "^3.24.1" } } \ No newline at end of file diff --git a/src/bones/Pipe.ts b/src/bones/Pipe.ts new file mode 100644 index 0000000..a1847c7 --- /dev/null +++ b/src/bones/Pipe.ts @@ -0,0 +1,264 @@ +/** + * A closure that maps a given pipe item to a different type. + */ +import type {Awaitable, Maybe} from './types' + +export type PipeOperator = (subject: T) => T2 + +/** + * A closure that maps a given pipe item to an item of the same type. + */ +export type ReflexivePipeOperator = (subject: T) => Maybe + +/** + * A condition or condition-resolving function for pipe methods. + */ +export type PipeCondition = boolean | ((subject: T) => boolean) + +/** + * A class for writing chained/conditional operations in a data-flow manner. + * + * This is useful when you need to do a series of operations on an object, perhaps conditionally. + */ +export class Pipeline { + static id(): Pipeline { + return new Pipeline(x => x) + } + + constructor( + protected readonly factory: (TIn: TIn) => TOut, + ) {} + + /** + * Apply the given operator to the item in the pipe, and return a new pipe with the result. + * + * @example + * ```typescript + * Pipe.wrap(2) + * .tap(x => x * 4) + * .get() // => 8 + * ``` + * + * @param op + */ + tap(op: PipeOperator): Pipeline { + return new Pipeline((val: TIn) => { + return op(this.factory(val)) + }) + } + + /** + * Like tap, but operates on a tuple with both the first value and the tapped value. + * @param op + */ + first(op: PipeOperator<[TIn, TOut], T2>): Pipeline { + return new Pipeline((val: TIn) => { + return op([val, this.factory(val)]) + }) + } + + /** + * Like tap, but always returns the original pipe type. + * @param op + */ + peek(op: PipeOperator): Pipeline { + return new Pipeline((val: TIn) => { + const nextVal = this.factory(val) + op(nextVal) + return nextVal + }) + } + + /** + * If `check` is truthy, apply the given operator to the item in the pipe and return the result. + * Otherwise, just return the current pipe unchanged. + * + * @param check + * @param op + */ + when(check: PipeCondition, op: ReflexivePipeOperator): Pipeline { + return new Pipeline((val: TIn) => { + const nextVal = this.factory(val) + if ( this.checkCondition(check, nextVal) ) { + const appliedVal = op(nextVal) + if ( typeof appliedVal === 'undefined' ) { + return nextVal + } + + return appliedVal + } + + return nextVal + }) + } + + /** + * If `check` is falsy, apply the given operator to the item in the pipe and return the result. + * Otherwise, just return the current pipe unchanged. + * + * @param check + * @param op + */ + unless(check: PipeCondition, op: ReflexivePipeOperator): Pipeline { + return new Pipeline((val: TIn) => { + const nextVal = this.factory(val) + if ( !this.checkCondition(check, nextVal) ) { + const appliedVal = op(nextVal) + if ( typeof appliedVal === 'undefined' ) { + return nextVal + } + + return appliedVal + } + + return nextVal + }) + } + + /** + * Apply the pipeline to an input. + */ + apply(input: TIn): TOut { + return this.factory(input) + } + + protected checkCondition(check: PipeCondition, val: TOut): boolean { + return (typeof check === 'function' && check(val)) + || (typeof check !== 'function' && check) + } +} + +/** + * A subject function that yields the value in the AsyncPipe. + */ +export type AsyncPipeResolver = () => Awaitable + +/** + * A closure that maps a given pipe item to a different type. + */ +export type AsyncPipeOperator = (subject: T) => Awaitable + +export type PromisePipeOperator = (subject: T, resolve: (val: T2) => unknown, reject: (err: Error) => unknown) => Awaitable + +/** + * A closure that maps a given pipe item to an item of the same type. + */ +export type ReflexiveAsyncPipeOperator = (subject: T) => Awaitable + +/** + * A condition or condition-resolving function for pipe methods. + */ +export type AsyncPipeCondition = boolean | ((subject: T) => Awaitable) + +/** + * An asynchronous version of the Pipe helper. + */ +export class AsyncPipe { + /** + * Get an AsyncPipe with the given value in it. + * @param subject + */ + static wrap(subject: subjectType): AsyncPipe { + return new AsyncPipe(() => subject) + } + + constructor( + /** The current value resolver of the pipe. */ + private subject: AsyncPipeResolver, + ) {} + + /** + * Apply a transformative operator to the pipe. + * @param op + */ + tap(op: AsyncPipeOperator): AsyncPipe { + return new AsyncPipe(async () => op(await this.subject())) + } + + /** + * Apply a transformative operator to the pipe, wrapping it + * in a Promise and passing the resolve/reject callbacks to the + * closure. + * @param op + */ + promise(op: PromisePipeOperator): AsyncPipe { + return new AsyncPipe(() => { + return new Promise((res, rej) => { + (async () => this.subject())() + .then(subject => { + op(subject, res, rej) + }) + }) + }) + } + + /** + * Apply an operator to the pipe, but return the reference + * to the current pipe. The operator is resolved when the + * overall pipe is resolved. + * @param op + */ + peek(op: AsyncPipeOperator): AsyncPipe { + return new AsyncPipe(async () => { + const subject = await this.subject() + await op(subject) + return subject + }) + } + + /** + * Apply an operator to the pipe, if the check condition passes. + * @param check + * @param op + */ + when(check: AsyncPipeCondition, op: ReflexiveAsyncPipeOperator): AsyncPipe { + return new AsyncPipe(async () => { + let subject + + if ( typeof check === 'function' ) { + check = await check(subject = await this.subject()) + } + + subject = subject ?? await this.subject() + if ( check ) { + return ((await op(subject)) ?? subject) as T + } + + return subject as T + }) + } + + /** + * Apply an operator to the pipe, if the check condition fails. + * @param check + * @param op + */ + unless(check: AsyncPipeCondition, op: ReflexiveAsyncPipeOperator): AsyncPipe { + if ( typeof check === 'function' ) { + return this.when(async (subject: T) => !(await check(subject)), op) + } + + return this.when(!check, op) + } + + /** + * Alias of `unless()`. + * @param check + * @param op + */ + whenNot(check: AsyncPipeCondition, op: ReflexiveAsyncPipeOperator): AsyncPipe { + return this.unless(check, op) + } + + /** + * Get the transformed value from the pipe. + */ + async resolve(): Promise { + return this.subject() + } + + /** Get the transformed value from the pipe. Allows awaiting the pipe directly. */ + then(): Promise { + return this.resolve() + } +} diff --git a/src/bones/Reactive.ts b/src/bones/Reactive.ts new file mode 100644 index 0000000..332cd1c --- /dev/null +++ b/src/bones/Reactive.ts @@ -0,0 +1,210 @@ +/** + * Base error used to trigger an unsubscribe action from a subscriber. + * @extends Error + */ +export class UnsubscribeError extends Error {} + +/** + * Thrown when a closed observable is pushed to. + * @extends Error + */ +export class CompletedObservableError extends Error { + constructor() { + super('This observable can no longer be pushed to, as it has been completed.') + } +} + +/** + * Type of a basic subscriber function. + */ +export type SubscriberFunction = (val: T) => any + +/** + * Type of a basic subscriber function that handles errors. + */ +export type SubscriberErrorFunction = (error: Error) => any + +/** + * Type of a basic subscriber function that handles completed events. + */ +export type SubscriberCompleteFunction = (val?: T) => any + +/** + * Subscribers that define multiple handler methods. + */ +export type ComplexSubscriber = { + next?: SubscriberFunction, + error?: SubscriberErrorFunction, + complete?: SubscriberCompleteFunction, +} + +/** + * Subscription to a behavior subject. + */ +export type Subscription = SubscriberFunction | ComplexSubscriber + +/** + * Object providing helpers for unsubscribing from a subscription. + */ +export type Unsubscribe = { unsubscribe: () => void } + +/** + * A stream-based state class. + */ +export class Reactive { + /** + * Subscribers to this subject. + * @type Array + */ + protected subscribers: ComplexSubscriber[] = [] + + /** + * True if this subject has been marked complete. + * @type boolean + */ + protected subjectIsComplete = false + + /** + * The current value of this subject. + */ + protected currentValue?: T + + /** + * True if any value has been pushed to this subject. + * @type boolean + */ + protected hasPush = false + + /** + * Register a new subscription to this subject. + * @param {Subscription} subscriber + * @return Unsubscribe + */ + public subscribe(subscriber: Subscription): Unsubscribe { + if ( typeof subscriber === 'function' ) { + this.subscribers.push({ next: subscriber }) + } else { + this.subscribers.push(subscriber) + } + + return { + unsubscribe: () => { + this.subscribers = this.subscribers.filter(x => x !== subscriber) + }, + } + } + + /** + * Cast this subject to a promise, which resolves on the output of the next value. + * @return Promise + */ + public toPromise(): Promise { + return new Promise((resolve, reject) => { + const { unsubscribe } = this.subscribe({ + next: (val: T) => { + resolve(val) + unsubscribe() + }, + error: (error: Error) => { + reject(error) + unsubscribe() + }, + complete: (val?: T) => { + if ( typeof val !== 'undefined' ) { + resolve(val) + } + unsubscribe() + }, + }) + }) + } + + /** + * Push a new value to this subject. The promise resolves when all subscribers have been pushed to. + * @param val + * @return Promise + */ + public async next(val: T): Promise { + if ( this.subjectIsComplete ) { + throw new CompletedObservableError() + } + this.currentValue = val + this.hasPush = true + for ( const subscriber of this.subscribers ) { + if ( subscriber.next ) { + try { + await subscriber.next(val) + } catch (e) { + if ( e instanceof UnsubscribeError ) { + this.subscribers = this.subscribers.filter(x => x !== subscriber) + } else if (subscriber.error && e instanceof Error) { + await subscriber.error(e) + } else { + throw e + } + } + } + } + } + + /** + * Push the given array of values to this subject in order. + * The promise resolves when all subscribers have been pushed to for all values. + * @param {Array} vals + * @return Promise + */ + public async push(vals: T[]): Promise { + if ( this.subjectIsComplete ) { + throw new CompletedObservableError() + } + await Promise.all(vals.map(val => this.next(val))) + } + + /** + * Mark this subject as complete. + * The promise resolves when all subscribers have been pushed to. + * @param [finalValue] - optionally, a final value to set + * @return Promise + */ + public async complete(finalValue?: T): Promise { + if ( this.subjectIsComplete ) { + throw new CompletedObservableError() + } + if ( typeof finalValue === 'undefined' ) { + finalValue = this.value() + } else { + this.currentValue = finalValue + } + + for ( const subscriber of this.subscribers ) { + if ( subscriber.complete ) { + try { + await subscriber.complete(finalValue) + } catch (e) { + if ( subscriber.error && e instanceof Error ) { + await subscriber.error(e) + } else { + throw e + } + } + } + } + + this.subjectIsComplete = true + } + + /** + * Get the current value of this subject. + */ + public value(): T | undefined { + return this.currentValue + } + + /** + * True if this subject is marked as complete. + * @return boolean + */ + public isComplete(): boolean { + return this.subjectIsComplete + } +} diff --git a/src/bones/collection/ArrayIterable.ts b/src/bones/collection/ArrayIterable.ts new file mode 100644 index 0000000..a715d3b --- /dev/null +++ b/src/bones/collection/ArrayIterable.ts @@ -0,0 +1,33 @@ +import { Iterable } from './Iterable' +import {collect, Collection} from './Collection' + +/** + * A basic Iterable implementation that uses an array as a backend. + * @extends Iterable + */ +export class ArrayIterable extends Iterable { + constructor( + /** + * Items to use for this iterable. + */ + protected items: T[], + ) { + super() + } + + async at(i: number): Promise { + return this.items[i] + } + + async range(start: number, end: number): Promise> { + return collect(this.items.slice(start, end + 1)) + } + + async count(): Promise { + return this.items.length + } + + clone(): ArrayIterable { + return new ArrayIterable([...this.items]) + } +} diff --git a/src/bones/collection/AsyncCollection.ts b/src/bones/collection/AsyncCollection.ts new file mode 100644 index 0000000..0266cf7 --- /dev/null +++ b/src/bones/collection/AsyncCollection.ts @@ -0,0 +1,1151 @@ +import type { + AssociatedCollectionItem, + CollectionIndex, CollectionItem, ComparisonFunction, + DeterminesEquality, KeyFunction, + KeyOperator, KeyReducerFunction, MaybeCollectionIndex, MaybeCollectionItem, +} from './Collection' +import {Collection} from './Collection' +import {Iterable, StopIteration} from './Iterable' +import {applyWhere, type WhereOperator} from './where' +import {AsyncPipe, Pipeline} from '../Pipe' +import {type Awaitable} from '../types' +type AsyncCollectionComparable = CollectionItem[] | Collection | AsyncCollection +type AsyncKeyFunction = (item: CollectionItem, index: number) => CollectionItem | Promise> +type AsyncCollectionFunction = (items: AsyncCollection) => T2 + +/** + * Like a collection, but asynchronous. + */ +export class AsyncCollection { + constructor( + /** + * Iterable of items to base this collection on. + * @type Iterable + */ + private storedItems: Iterable, + + /** + * Size to use when chunking results for memory-optimization. + * @type number + */ + private iteratorChunkSize: number = 1000, // TODO fix this. It's just for testing + ) {} + + private async inChunks(callback: (items: Collection) => any): Promise { + await this.storedItems.chunk(this.iteratorChunkSize, async items => { + await callback(items) + }) + await this.storedItems.reset() + } + + private async inChunksAll(key: KeyOperator, callback: (items: Collection) => any): Promise { + await this.storedItems.chunk(this.iteratorChunkSize, async items => { + if ( typeof key !== 'function' ) { + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + await callback(items.map(x => x[key])) + return + } + + await callback(items.map(key)) + }) + await this.storedItems.reset() + } + + private async inChunksAllNumbers(key: KeyOperator, callback: (items: number[]) => any): Promise { + await this.storedItems.chunk(this.iteratorChunkSize, async items => { + if ( typeof key !== 'function' ) { + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + await callback(items.map(x => x[key]).map(x => Number(x)) + .all()) + return + } + + await callback(items.map(key).map(x => Number(x)) + .all()) + }) + await this.storedItems.reset() + } + + private async inChunksAllAssociated(key: KeyOperator, callback: (items: AssociatedCollectionItem[]) => any): Promise { + await this.storedItems.chunk(this.iteratorChunkSize, async items => { + const assocItems: AssociatedCollectionItem[] = [] + if ( typeof key === 'function' ) { + items.map((item, index) => { + const keyItem = key(item, index) + assocItems.push({ key: keyItem, + item }) + }) + } else { + items.map(item => { + assocItems.push({key: (item)[key], + item}) + }) + } + + await callback(assocItems) + }) + await this.storedItems.reset() + } + + /** + * Get all items in this collection as an array. + * @return Promise + */ + async all(): Promise[]> { + return (await this.storedItems.all()).toArray() + } + + /** + * Get all items in this collection as a synchronous Collection + * @return Promise + */ + async collect(): Promise> { + return this.storedItems.all() + } + + /** + * Get the average value of the collection or one of its keys. + * @param {KeyOperator} key + * @return Promise + */ + async average(key?: KeyOperator): Promise { + let runningTotal = 0 + let runningItems = 0 + + const chunkHelper = (items: number[]) => { + runningItems += items.length + runningTotal += items.reduce((prev, curr) => prev + curr) + } + + if ( key ) { + await this.inChunksAllNumbers(key, chunkHelper) + } else { + await this.inChunks((items) => { + chunkHelper(items.map(x => Number(x)).all()) + }) + } + + return runningTotal / runningItems + } + + /** + * Get the median value of the collection or one of its keys. + * @param {KeyOperator} key + * @return Promise + */ + async median(key?: KeyOperator): Promise { + let items: number[] = [] + + const chunkHelper = (nextItems: number[]) => { + items = items.concat(nextItems) + } + + if ( key ) { + await this.inChunksAllNumbers(key, chunkHelper) + } else { + await this.inChunks(chunkItems => { + chunkHelper(chunkItems.map(x => Number(x)).all()) + }) + } + + items = items.sort((a, b) => a - b) + const middle = Math.floor((items.length - 1) / 2) + if ( items.length % 2 ) { + return items[middle] + } else { + return (items[middle] + items[middle + 1]) / 2 + } + } + + /** + * Get the mode value of the collection or one of its keys. + * @param {KeyOperator} key + * @return Promise + */ + async mode(key?: KeyOperator): Promise { + const counts: any = {} + + const chunkHelper = (items: number[]) => { + for ( const item of items ) { + if ( !counts[item] ) { + counts[item] = 1 + } else { + counts[item] += 1 + } + } + } + + if ( key ) { + await this.inChunksAllNumbers(key, chunkHelper) + } else { + await this.inChunks(items => { + chunkHelper(items.map(x => Number(x)).all()) + }) + } + + return Number(Object.keys(counts).reduce((a, b) => counts[a] > counts[b] ? a : b)[0]) + } + + /** + * If this collection contains nested collections, collapse them to a single level. + * @return Promise + */ + async collapse(): Promise> { + const items = await this.collect() + return items.collapse() as Collection + } + + /** + * Returns true if the collection contains an item satisfying the given collection. + * @example + * collection.contains('id', '>', 4) + * @param {KeyOperator} key + * @param {WhereOperator} operator + * @param [operand] + * @return Promise + */ + async contains(key: KeyOperator, operator: WhereOperator, operand?: unknown): Promise { + let contains = false + + await this.inChunksAllAssociated(key, (items: AssociatedCollectionItem[]) => { + const matches = applyWhere(items, operator, operand) + if ( matches.length > 0 ) { + contains = true + throw new StopIteration() + } + }) + + return contains + } + + /** + * Returns a clean instance of this collection pointing to the same result set of the iterable. + * @return Promise + */ + async clone(): Promise> { + return new AsyncCollection(await this.storedItems.clone()) + } + + /** + * Returns the elements that are different between the two collections. + * @param {AsyncCollectionComparable} items + * @return Promise + */ + async diff(items: AsyncCollectionComparable): Promise> { + const matches: T[] = [] + + await this.inChunks(async chunk => { + for ( const item of chunk.all() ) { + if ( !(await items.includes(item)) ) { + matches.push(item) + } + } + }) + + return new Collection(matches) + } + + /** + * Returns the elements that are different between the two collections, using the given function + * as a comparator for the elements. + * @param {AsyncCollectionComparable} items + * @param {DeterminesEquality} compare + * @return Promise + */ + async diffUsing(items: AsyncCollectionComparable, compare: DeterminesEquality): Promise> { + const matches: T[] = [] + + await this.inChunks(async chunk => { + for ( const item of chunk.all() ) { + if ( !(await items.some(exc => compare(item, exc))) ) { + matches.push(item) + } + } + }) + + return new Collection(matches) + } + + /** + * Returns true if the given item is present in the collection. + * @param item + * @return Promise + */ + async includes(item: CollectionItem): Promise { + let contains = false + + await this.inChunks(items => { + if ( items.includes(item) ) { + contains = true + throw new StopIteration() + } + }) + + return contains + } + + /** + * Returns true if there is an item in the collection for which the given operator returns true. + * @param {function} operator - item => boolean + * @return Promise + */ + async some(operator: (item: T) => Awaitable): Promise { + let contains = false + + await this.inChunks(async items => { + for ( const item of items.all() ) { + if ( await operator(item) ) { + contains = true + throw new StopIteration() + } + } + }) + + return contains + } + + /** + * Applies a callback to each item in the collection. + * @param {AsyncKeyFunction} func + * @return Promise + */ + async each(func: AsyncKeyFunction): Promise { + let index = 0 + + await this.inChunks(async items => { + for ( const item of items.all() ) { + await func(item, index) + index += 1 + } + }) + } + + /** + * Applies a callback to each item in the collection and returns the results as a collection. + * @param {AsyncKeyFunction} func + * @return Promise + */ + async map(func: AsyncKeyFunction): Promise> { + const newItems: CollectionItem[] = [] + await this.each(async (item, index) => { + newItems.push(await func(item, index)) + }) + return new Collection(newItems) + } + + /** + * Create a new collection by mapping the items in this collection using the given function, + * excluding any for which the function resolves undefined. + * @param func + */ + async partialMap(func: AsyncKeyFunction): Promise>> { + const newItems: CollectionItem>[] = [] + + await this.each(async (item, index) => { + const result = await func(item, index) + if ( typeof result !== 'undefined' ) { + newItems.push(result as unknown as NonNullable) + } + }) + + return new Collection>(newItems) + } + + /** + * Returns true if the given operator returns true for every item in the collection. + * @param {AsyncKeyFunction} func + * @return Promise + */ + async every(func: AsyncKeyFunction): Promise { + let pass = true + let index = 0 + + await this.inChunks(async items => { + for ( const item of items.all() ) { + if ( !(await func(item, index)) ) { + pass = false + throw new StopIteration() + } + + index += 1 + } + }) + + return pass + } + + /** + * Returns true if every item in the collection satisfies the given where clause. + * @param {KeyOperator} key + * @param {WhereOperator} operator + * @param [operand] + */ + async everyWhere(key: KeyOperator, operator: WhereOperator, operand?: unknown): Promise { + let pass = true + + await this.inChunks(async items => { + pass = pass && items.everyWhere(key, operator, operand) + if ( !pass ) { + throw new StopIteration() + } + }) + + return pass + } + + /** + * Applies a filter to every item in the collection and returns the results that pass the filter. + * @param {KeyFunction} func + * @return Promise + */ + async filter(func: KeyFunction): Promise> { + let newItems: CollectionItem[] = [] + + await this.inChunks(async items => { + const filterItems: CollectionItem[] = [] + + for ( let i = 0; i < items.length; i += 1 ) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const item = items.get(i)! + if ( await func(item, i) ) { + filterItems.push(item) + } + } + + newItems = newItems.concat(filterItems) + }) + + return new Collection(newItems) + } + + /** + * Like filter, but inverted. That is, filters out items that DO match the criterion. + * @param func + */ + async filterOut(func: KeyFunction): Promise> { + return this.filter(async (...args) => !(await func(...args))) + } + + /** + * Calls the passed in function if the boolean condition is true. Allows for functional syntax. + * @param {boolean} bool + * @param {AsyncCollectionFunction} then + * @return AsyncCollection + */ + when(bool: boolean, then: AsyncCollectionFunction): AsyncCollection { + if ( bool ) { + then(this) + } + return this + } + + /** + * Calls the passed in function if the boolean condition is false. Allows for functional syntax. + * @param {boolean} bool + * @param {AsyncCollectionFunction} then + * @return AsyncCollection + */ + unless(bool: boolean, then: AsyncCollectionFunction): AsyncCollection { + if ( !bool ) { + then(this) + } + return this + } + + /** + * Applies the given where condition to the collection and returns a new collection of the results. + * @param {KeyOperator} key + * @param {WhereOperator} operator + * @param [operand] + * @return Promise + */ + async where(key: KeyOperator, operator: WhereOperator, operand?: unknown): Promise> { + let newItems: CollectionItem[] = [] + await this.inChunks(async items => { + newItems = newItems.concat(items.where(key, operator, operand).all()) + }) + return new Collection(newItems) + } + + /** + * Applies the given where condition to the collection and returns a new collection of the items + * that did not satisfy the condition. + * @param {KeyOperator} key + * @param {WhereOperator} operator + * @param [operand] + * @return Promise + */ + async whereNot(key: KeyOperator, operator: WhereOperator, operand?: unknown): Promise> { + let newItems: CollectionItem[] = [] + await this.inChunks(async items => { + newItems = newItems.concat(items.whereNot(key, operator, operand).all()) + }) + return new Collection(newItems) + } + + /** + * Applies a WHERE ... IN ... condition to the collection an returns a new collection of the results. + * @param {KeyOperator} key + * @param {AsyncCollectionComparable} items + * @return Promise + */ + async whereIn(key: KeyOperator, items: AsyncCollectionComparable): Promise> { + const newItems: CollectionItem[] = [] + await this.inChunksAllAssociated(key, async chunk => { + for ( const item of chunk ) { + if ( await items.includes(item.key) ) { + newItems.push(item.item) + } + } + }) + return new Collection(newItems) + } + + /** + * Applies a WHERE ... IN ... condition to the collection and returns a new collection of the items + * that did not satisfy the condition. + * @param {KeyOperator} key + * @param {AsyncCollectionComparable} items + * @return Promise + */ + async whereNotIn(key: KeyOperator, items: AsyncCollectionComparable): Promise> { + const newItems: CollectionItem[] = [] + await this.inChunksAllAssociated(key, async chunk => { + for ( const item of chunk ) { + if ( !(await items.includes(item.key)) ) { + newItems.push(item.item) + } + } + }) + return new Collection(newItems) + } + + /** + * Returns the first item in the collection, if one exists. + * @return Promise + */ + async first(): Promise> { + return this.storedItems.at(0) + } + + /** + * Return the first item in the collection that satisfies the given where condition, if one exists. + * @param {KeyOperator} key + * @param {WhereOperator} [operator = '='] + * @param [operand = true] + * @return Promise + */ + async firstWhere(key: KeyOperator, operator: WhereOperator = '=', operand: any = true): Promise> { + let item = undefined + await this.inChunksAllAssociated(key, async items => { + const matches = applyWhere(items, operator, operand) + if ( matches.length > 0 ) { + item = matches[0] + throw new StopIteration() + } + }) + return item + } + + /** + * Return the first item in the collection that does not satisfy the given where condition, if one exists. + * @param {KeyOperator} key + * @param {WhereOperator} [operator = '='] + * @param [operand = true] + */ + async firstWhereNot(key: KeyOperator, operator: WhereOperator = '=', operand: any = true): Promise> { + let item: MaybeCollectionItem = undefined + await this.inChunks(async items => { + const matches = items.whereNot(key, operator, operand) + if ( matches.length > 0 ) { + item = matches.first() + throw new StopIteration() + } + }) + return item + } + + /** + * Returns the number of elements in this collection. + * @return Promise + */ + async count(): Promise { + return this.storedItems.count() + } + + /** + * Returns the number of elements in this collection. + * @return Promise + */ + async length(): Promise { + return this.storedItems.count() + } + + /** + * Get the item at the given index of this collection, if one exists. + * If none exists and a fallback value is provided, that value will be returned. + * @param {number} index + * @param [fallback] + * @return Promise + */ + async get(index: number, fallback?: T): Promise { + if ( (await this.count()) > index ) { + return this.storedItems.at(index) + } else { + return fallback + } + } + + /** + * Get the item at the given index of this collection, if one exists. + * @param {number} index + */ + async at(index: number): Promise> { + return this.get(index) + } + + /** + * Return an object which maps key values to arrays of items in the collection that satisfy that value. + * @param {KeyOperator} key + * @return Promise + */ + async groupBy(key: KeyOperator): Promise { + return (await this.collect()).groupBy(key) + } + + /** + * Return an object mapping the given key value to items in this collection. + * @param {KeyOperator} key + * @return Promise + */ + async associate(key: KeyOperator): Promise { + return (await this.collect()).associate(key) + } + + /** + * Join the items in this collection with the given delimiter. + * @example + * await collection.join(',') // => '1,2,3,4' + * @param {string} delimiter + * @return Promise + */ + async join(delimiter: string): Promise { + const runningStrings: string[] = [] + + await this.inChunks(async items => { + runningStrings.push(items.join(delimiter)) + }) + + return runningStrings.join(delimiter) + } + + /** + * Join the items in this collection with the given delimiter. + * @example + * await collection.implode(',') // => '1,2,3,4' + * @param {string} delimiter + * @return Promise + */ + async implode(delimiter: string): Promise { + return this.join(delimiter) + } + + // TODO intersect + + /** + * Returns true if there are no items in this collection. + * @return Promise + */ + async isEmpty(): Promise { + return (await this.storedItems.count()) < 1 + } + + /** + * Returns true if there is at least one item in this collection. + * @return Promise + */ + async isNotEmpty(): Promise { + return (await this.storedItems.count()) > 0 + } + + /** + * Return the last item in this collection, if one exists. + * @return Promise + */ + async last(): Promise> { + const length = await this.storedItems.count() + if ( length > 0 ) { + return this.storedItems.at(length - 1) + } + } + + /** + * Return the last item in this collection which satisfies the given where condition, if one exists. + * @param {KeyOperator} key + * @param {WhereOperator} operator + * @param [operand] + * @return Promise + */ + async lastWhere(key: KeyOperator, operator: WhereOperator, operand?: unknown): Promise> { + return (await this.where(key, operator, operand)).last() + } + + /** + * Return the last item in this collection which does not satisfy the given condition, if one exists. + * @param {KeyOperator} key + * @param {WhereOperator} operator + * @param [operand] + * @return Promise + */ + async lastWhereNot(key: KeyOperator, operator: WhereOperator, operand?: unknown): Promise> { + return (await this.whereNot(key, operator, operand)).last() + } + + /** + * Builds a collection of the values of a given key for each item in this collection. + * @example + * // collection has { a: 1 }, { a: 2 }, { a: 3 } + * await collection.pluck('a') // => [1, 2, 3] + * @param {KeyOperator} key + * @return Promise + */ + async pluck(key: T2): Promise> { + let newItems: CollectionItem[] = [] + + await this.inChunksAll(key, async items => { + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + newItems = newItems.concat(items.all()) + }) + + return new Collection(newItems) + } + + /** + * Return the max value of the given key. + * @param {KeyOperator} key + * @return Promise + */ + async max(key: KeyOperator): Promise { + let runningMax: number | undefined = undefined + + await this.inChunksAllNumbers(key, async items => { + const localMax = Math.max(...items) + if ( typeof runningMax === 'undefined' ) { + runningMax = localMax + } else { + runningMax = Math.max(runningMax, localMax) + } + }) + + return runningMax + } + + /** + * Return a collection of items that have the max value of the given key. + * @param {KeyOperator} key + * @return Promise + */ + async whereMax(key: KeyOperator): Promise> { + return this.where(key, '=', await this.max(key)) + } + + /** + * Return the min value of the given key. + * @param {KeyOperator} key + * @return Promise + */ + async min(key: KeyOperator): Promise { + let runningMin: number | undefined = undefined + + await this.inChunksAllNumbers(key, async items => { + const localMin = Math.min(...items) + if ( typeof runningMin === 'undefined' ) { + runningMin = localMin + } else { + runningMin = Math.min(runningMin, localMin) + } + }) + + return runningMin + } + + /** + * Return a collection of items that have the min value of the given key. + * @param {KeyOperator} key + * @return Promise + */ + async whereMin(key: KeyOperator): Promise> { + return this.where(key, '=', await this.min(key)) + } + + /** + * Merge the two collections. + * @param {AsyncCollectionComparable} mergeWith + * @return Promise + */ + async merge(mergeWith: AsyncCollectionComparable): Promise> { + let items: T2[] = [] + if ( mergeWith instanceof Collection ) { + items = await mergeWith.all() + } else if ( mergeWith instanceof AsyncCollection ) { + items = await mergeWith.all() + } else if ( Array.isArray(mergeWith) ) { + items = mergeWith + } + + return new Collection([...items, ...await this.all()]) + } + + /** + * Return a collection of every nth item in this collection. + * @param {number} n + * @return Promise + */ + async nth(n: number): Promise> { + const matches: CollectionItem[] = [] + let current = 1 + + await this.inChunks(async chunk => { + for ( const item of chunk.all() ) { + if ( current === 1 ) { + matches.push(item) + } + current += 1 + if ( current > n ) { + current = 1 + } + } + }) + + return new Collection(matches) + } + + /** + * Return a collection containing the items that would be on the given page, with the given number of items per page. + * @param {number} page + * @param {number} perPage + */ + async forPage(page: number, perPage: number): Promise> { + const start = page * perPage - perPage + const end = page * perPage - 1 + return this.storedItems.range(start, end) + } + + /** + * Return a new Pipe of this collection. + */ + pipeTo(pipeline: Pipeline): TOut { + return pipeline.apply(this) + } + + /** Build and apply a pipeline. */ + pipe(builder: (pipeline: Pipeline) => Pipeline): TOut { + return builder(Pipeline.id()).apply(this) + } + + /** + * Return a new AsyncPipe of this collection. + */ + asyncPipe(): AsyncPipe> { + return AsyncPipe.wrap(this) + } + + /* async pop(): Promise> { + const nextItem = await this.storedItems.next() + if ( !nextItem.done ) { + return nextItem.value + } + }*/ // TODO Fix this + + /** + * Get n random items from this collection. + * @todo add safety check for it loop exceeds max number of items + * @param {number} n + * @return Promise + */ + async random(n: number): Promise> { + const randomItems: CollectionItem[] = [] + const fetchedIndices: number[] = [] + + const maxN = await this.storedItems.count() + if ( n > maxN ) { + n = maxN + } + + while ( randomItems.length < n ) { + const index = Math.floor(Math.random() * maxN) + + if ( !fetchedIndices.includes(index) ) { + fetchedIndices.push(index) + const item = await this.storedItems.at(index) + if ( typeof item !== 'undefined' ) { + randomItems.push(item) + } + } + } + + return new Collection(randomItems) + } + + /** + * Collapse the collection into a single value using a reducer function. + * @param {KeyReducerFunction} reducer + * @param [initialValue] + * @return Promise + */ + async reduce(reducer: KeyReducerFunction, initialValue?: T2): Promise { + let currentValue = initialValue + let index = 0 + + await this.inChunks(async items => { + for ( const item of items.all() ) { + currentValue = reducer(currentValue, item, index) + index += 1 + } + }) + + return currentValue + } + + /** + * Returns a collection of items that fail the truth test. + * @param {AsyncKeyFunction} truthTestFunction + * @return Promise + */ + async reject(truthTestFunction: AsyncKeyFunction): Promise> { + let rejected: CollectionItem[] = [] + + await this.inChunks(async items => { + rejected = rejected.concat(items.all().filter((item, index) => { + return !truthTestFunction(item, index) + })) + }) + + return new Collection(rejected) + } + + /** + * Get a reversed collection of this collection's items. + * @return Promise + */ + async reverse(): Promise> { + return (await this.collect()).reverse() + } + + /** + * Search the collection and return the index of that item, if one exists. + * @param {CollectionItem} item + * @return Promise + */ + async search(item: CollectionItem): Promise { + let foundIndex + let index = 0 + + await this.inChunks(async items => { + items.some(possibleItem => { + if ( possibleItem === item ) { + foundIndex = index + throw new StopIteration() + } + + index += 1 + return false + }) + }) + + return foundIndex + } + + /** + * Get the next item in the collection and remove it. + * @return Promise + */ + async shift(): Promise> { + const nextItem = await this.storedItems.next() + if ( !nextItem.done ) { + return nextItem.value + } + } + + /** + * Shuffle the items in the collection to a random order. + * @return Promise + */ + async shuffle(): Promise> { + return (await this.collect()).shuffle() + } + + /** + * Return a slice of this collection. + * @param {number} start - the starting index + * @param {number} end - the ending index + * @return Promise + */ + async slice(start: number, end: number): Promise> { + return this.storedItems.range(start, end - 1) + } + + /** + * Sort the collection, optionally with the given comparison function. + * @param {ComparisonFunction} comparisonFunction + * @return Promise + */ + async sort(comparisonFunction?: ComparisonFunction): Promise> { + return (await this.collect()).sort(comparisonFunction) + } + + /** + * Sort the collection by the given key. + * @param {KeyOperator} key + * @return Promise + */ + async sortBy(key?: KeyOperator): Promise> { + return (await this.collect()).sortBy(key) + } + + /** + * Reverse sort the collection, optionally with the given comparison function. + * @param {ComparisonFunction} comparisonFunction + * @return Promise + */ + async sortDesc(comparisonFunction?: ComparisonFunction): Promise> { + return (await this.collect()).sortDesc(comparisonFunction) + } + + /** + * Reverse sort the collection by the given key. + * @param {KeyOperator} key + * @return Promise + */ + async sortByDesc(key?: KeyOperator): Promise> { + return (await this.collect()).sortByDesc(key) + } + + /** + * Splice the collection at the given index. Optionally, removing the given number of items. + * @param {CollectionIndex} start + * @param {number} [deleteCount] + * @return Promise + */ + async splice(start: CollectionIndex, deleteCount?: number): Promise> { + return (await this.collect()).splice(start, deleteCount) + } + + /** + * Sum the items in the collection, or the values of the given key. + * @param {KeyOperator} key + * @return Promise + */ + async sum(key?: KeyOperator): Promise { + let runningSum = 0 + + const chunkHandler = (items: number[]) => { + for ( const item of items ) { + runningSum += item + } + } + + if ( key ) { + await this.inChunksAllNumbers(key, chunkHandler) + } else { + await this.inChunks(async chunk => { + chunkHandler(chunk.map(x => Number(x)).all()) + }) + } + + return runningSum + } + + /** + * Take the first n items from the front or back of the collection. + * @param {number} limit + * @return Promise + */ + async take(limit: number): Promise> { + if ( limit === 0 ) { + return new Collection() + } else if ( limit > 0 ) { + return this.slice(0, limit) + } else { + const cnt = await this.storedItems.count() + return this.storedItems.range(cnt - (-1 * limit), cnt - 1) + } + } + + /** + * Call the given function, passing in this collection. Allows functional syntax. + * @param {AsyncCollectionFunction} func + * @return Promise + */ + async tap(func: AsyncCollectionFunction): Promise> { + await func(this) + return this + } + + /** + * Return all the unique values in the collection, or the unique values of the given key. + * @param {KeyOperator} key + * @return Promise + */ + async unique(key?: KeyOperator): Promise> { + const has: CollectionItem[] = [] + + if ( !key ) { + await this.inChunks(async items => { + for ( const item of items.all() ) { + if ( !has.includes(item) ) { + has.push(item) + } + } + }) + } else { + await this.inChunksAll(key, async items => { + for ( const item of items.all() ) { + if ( !has.includes(item) ) { + has.push(item) + } + } + }) + } + + return new Collection(has) + } + + /** + * Cast this collection to an array. + * @return Promise + */ + async toArray(): Promise { + const returns: any = [] + for ( const item of (await this.all()) ) { + if ( item instanceof Collection ) { + returns.push(item.toArray()) + } else if ( item instanceof AsyncCollection ) { + returns.push(await item.toArray()) + } else { + returns.push(item) + } + } + return returns + } + + /** + * Cast this collection to a JSON string. + * @param [replacer] - the replacer to use + * @param {number} [space = 4] number of indentation spaces to use + */ + async toJSON(replacer = undefined, space = 4): Promise { + return JSON.stringify(this.toArray(), replacer, space) + } + + /** + * Get a clone of the underlying iterator of this collection. + * @return Iterable + */ + iterator(): Iterable { + return this.storedItems.clone() + } +} diff --git a/src/bones/collection/AsyncGeneratorIterable.ts b/src/bones/collection/AsyncGeneratorIterable.ts new file mode 100644 index 0000000..45b79b1 --- /dev/null +++ b/src/bones/collection/AsyncGeneratorIterable.ts @@ -0,0 +1,139 @@ +import {type ChunkCallback, Iterable, type MaybeIterationItem} from "./Iterable.ts"; +import {collect, type Collection} from "./Collection.ts"; +import {type Either, isLeft, isRight, left, type Maybe, right, unright} from "../types.ts"; + +export class AsyncGeneratorIterable extends Iterable { + private sourceIndex: number = -1 + private cacheStartIndex: number = 0 + private cache: T[] = [] + private source?: AsyncGenerator + + constructor( + private sourceFactory: () => AsyncGenerator, + private maxCacheSize: number = 100, + ) { + super() + } + + private computeCacheIndex(realIndex: number): Either { + let i = realIndex - this.cacheStartIndex +console.log('agi cci', { i, realIndex, cSI: this.cacheStartIndex, cl: this.cache.length}) + if ( i >= this.cache.length ) { + return left(null) + } + return right(i) + } + + private async advanceIndexInCache(realIndex: number): Promise { + if ( isRight(this.computeCacheIndex(realIndex)) ) { + return + } +console.log('aIIC needs advance') + if ( realIndex < this.cacheStartIndex ) { + this.source = undefined + } + + if ( !this.source ) { + this.source = this.sourceFactory() + this.sourceIndex = -1 + } + + for await ( const item of this.source ) { +console.log('aIIC source item', item, this.sourceIndex, realIndex) + this.sourceIndex += 1 + this.cache.push(item) + + if ( this.cache.length >= this.maxCacheSize ) { + this.cache.shift() + this.cacheStartIndex += 1 + } + + if ( this.sourceIndex >= realIndex ) { +console.log('aIIC break') + break + } + } + } + + async at(i: number): Promise> { + await this.advanceIndexInCache(i) + + const cacheIndex = this.computeCacheIndex(i) + console.log('agi at', { i, cacheIndex }) + if ( isRight(cacheIndex) ) { + return this.cache[unright(cacheIndex)] + } + + return undefined + } + + clone(): AsyncGeneratorIterable { + return new AsyncGeneratorIterable(this.sourceFactory, this.maxCacheSize) + } + + count(): Promise { + throw new Error('cannot count!') + } + + async range(start: number, end: number): Promise> { + const c: Collection = collect() + + for ( let i = start; i <= end; i += 1 ) { + const item = await this.at(i) + if ( !item ) { + break + } + + c.push(item) + } + + return c + } + + async all(): Promise> { + const c: Collection = collect() + + let i = -1 + while ( true ) { + i += 1 + const item = await this.at(i) + if ( !item ) { + break + } + + c.push(item) + } + + return c + } + + async next(): Promise> { + const value = await this.at(this.index) + + if ( !value ) { + return { done: true } + } + + this.index += 1 + return { done: false, value } + } + + async seek(index: number): Promise { + if ( index < 0 ) { + throw new TypeError('Cannot seek to negative index.') + } + + await this.advanceIndexInCache(index) + + const cacheIndex = this.computeCacheIndex(index) + if ( isLeft(cacheIndex) ) { + throw new TypeError('Cannot seek past last item.') + } + + this.index = index + } + + async peek(): Promise> { + return this.at(this.index + 1) + } +} diff --git a/src/bones/collection/Collection.ts b/src/bones/collection/Collection.ts new file mode 100644 index 0000000..8c24e2e --- /dev/null +++ b/src/bones/collection/Collection.ts @@ -0,0 +1,1378 @@ +import {AsyncPipe, Pipeline} from '../Pipe' +import type {Unsubscribe, Subscription} from '../Reactive' + +export type CollectionItem = T +export type MaybeCollectionItem = CollectionItem | undefined +export type KeyFunction = (item: CollectionItem, index: number) => CollectionItem +export type KeyReducerFunction = (current: any, item: CollectionItem, index: number) => T2 +export type CollectionFunction = (items: Collection) => T2 +export type KeyOperator = keyof T | KeyFunction +export type AssociatedCollectionItem = { key: T2, item: CollectionItem } +export type CollectionComparable = CollectionItem[] | Collection +export type DeterminesEquality = (item: CollectionItem, other: any) => boolean +export type CollectionIndex = number +export type MaybeCollectionIndex = CollectionIndex | undefined +export type ComparisonFunction = (item: CollectionItem, otherItem: CollectionItem) => number +export type Collectable = CollectionItem[] | Collection + +import {type WhereOperator, applyWhere, whereMatch } from './where' +import type {Awaitable, Awaited, Either, Maybe, MethodsOf, MethodType} from '../types' +import {isLeft, right, unright} from '../types' +import {AsyncCollection} from './AsyncCollection' +import {ArrayIterable} from './ArrayIterable' + +export const collect = (items: CollectionItem[] = []): Collection => Collection.collect(items) +const toString = (item: unknown): string => String(item) + +/** + * A helper class for working with arrays of items in a more robust fashion. + * Provides helpers for accessing sub-keys, filtering, piping, and aggregate functions. + */ +export class Collection { + private storedItems: CollectionItem[] = [] + + private pushSubscribers: Subscription[] = [] + + /** + * Create a new collection from an array of items. + * @param items + */ + public static collect(items: CollectionItem[]): Collection { + return new Collection(items) + } + + /** + * Create a new collection from an item or array of items. + * Filters out undefined items. + * @param itemOrItems + */ + public static normalize(itemOrItems: Collection | (CollectionItem)[] | CollectionItem): Collection { + if ( itemOrItems instanceof Collection ) { + return itemOrItems + } + + if ( !Array.isArray(itemOrItems) ) { + itemOrItems = [itemOrItems] + } + + return new Collection(itemOrItems) + } + + /** + * Create a collection of "undefined" elements of a given size. + * @param size + */ + public static size(size: number): Collection { + const arr = Array(size).fill(undefined) + return new Collection(arr) + } + + /** + * Fill a new collection of the given size with the given item. + * @param size + * @param item + */ + public static fill(size: number, item: T2): Collection { + const arr = Array(size).fill(item) + return new Collection(arr) + } + + constructor( + /** + * The items to base the collection on. + */ + items?: CollectionItem[], + ) { + if ( items ) { + this.storedItems = items + } + } + + private allOperator(key: KeyOperator): CollectionItem[] { + let items: CollectionItem[] = [] + if ( typeof key === 'function' ) { + items = this.storedItems.map(key) + } else { + items = this.storedItems.map((item: CollectionItem) => (item)[key]) + } + return items + } + + private allAsNumbers(key: KeyOperator): number[] { + return this.allOperator(key).map(value => Number(value)) + } + + private allAssociated(key: KeyOperator): AssociatedCollectionItem[] { + const associatedItems: AssociatedCollectionItem[] = [] + const items = [...this.storedItems] + if ( typeof key === 'function' ) { + items.map((item, index) => { + const keyItem = key(item, index) + associatedItems.push({ + key: keyItem, + item, + }) + }) + } else { + items.map(item => { + associatedItems.push({ + key: (item)[key], + item, + }) + }) + } + return associatedItems + } + + /** + * Cast the collection to an array. + */ + all(): CollectionItem[] { + return [...this.storedItems] + } + + /** + * Get the average value of the items or one of their keys. + * @param key + */ + average(key?: KeyOperator): number { + let items + if ( key ) { + items = this.allAsNumbers(key) + } else { + items = this.storedItems.map(x => Number(x)) + } + if ( items.length === 0 ) { + return 0 + } + + const sum = items.reduce((prev, curr) => prev + curr) + return sum / items.length + } + + /** + * Get the median value of the items or one of their keys. + * @param key + */ + median(key?: KeyOperator): number { + let items + if ( key ) { + items = this.allAsNumbers(key).sort((a, b) => a - b) + } else { + items = this.storedItems.map(x => Number(x)).sort((a, b) => a - b) + } + + const middle = Math.floor((items.length - 1) / 2) + if ( items.length % 2 ) { + return items[middle] + } else { + return (items[middle] + items[middle + 1]) / 2 + } + } + + /** + * Get the mode of the items or one of their keys. + * @param key + */ + mode(key?: KeyOperator): number { + let items + if ( key ) { + items = this.allAsNumbers(key).sort((a, b) => a - b) + } else { + items = this.storedItems.map(x => Number(x)).sort((a, b) => a - b) + } + + const counts: any = {} + for ( const item of items ) { + counts[item] = (counts[item] ?? -1) + 1 + } + + return Number(Object.keys(counts).reduce((a, b) => counts[a] > counts[b] ? a : b)[0]) + } + + /** + * Collapse a (potentially nested) collection of items down to a single dimension. + */ + collapse(): Collection { + const newItems: CollectionItem[] = [] + const items = [...this.storedItems] + const getLayer = (current: CollectionItem|CollectionItem[]) => { + if ( typeof (current)[Symbol.iterator] === 'function' ) { + for (const item of (current as any)) { + if (Array.isArray(item)) { + getLayer(item) + } else { + newItems.push(item) + } + } + } + } + + getLayer(items) + return new Collection(newItems) + } + + /** + * Returns true if the given key matches the given condition for any item in the collection. + * + * @example + * ```typescript + * const userExists = users.contains('username', '=', 'jdoe') + * ``` + * + * @param key + * @param operator + * @param operand + */ + contains(key: KeyOperator, operator: WhereOperator, operand?: unknown): boolean { + const associate = this.allAssociated(key) + const matches = applyWhere(associate, operator, operand) + return matches.length > 0 + } + + // TODO crossJoin + + /** + * Create a copy of this collection. + * Does NOT deep copy the underlying items. + */ + clone(): Collection { + return new Collection(this.storedItems) + } + + /** + * Return a collection of items that ARE in this collection, but NOT in the `items` collection. + * @param items + */ + diff(items: CollectionComparable): Collection { + const exclude = items instanceof Collection ? items.all() : items + const matches = [] + for ( const item of [...this.storedItems] ) { + if ( !exclude.includes(item) ) { + matches.push(item) + } + } + return new Collection(matches) + } + + /** + * Like diff, but mutates the current collection. + * @param items + */ + diffInPlace(items: CollectionComparable): this { + const exclude = items instanceof Collection ? items.all() : items + this.storedItems = this.storedItems.filter(item => !exclude.includes(item)) + return this + } + + /** + * Return a collection of items that ARE in this collection, but NOT In the `items` collection + * using a helper function to determine whether two items are equal. + * + * @example + * ```typescript + * potentialUsers.diffUsing(storedUsers, (u1, u2) => u1.username.toLowerCase() === u2.username.toLowerCase()) + * ``` + * + * @param items + * @param compare + */ + diffUsing(items: CollectionComparable, compare: DeterminesEquality): Collection { + const exclude = items instanceof Collection ? items.all() : items + const matches = [] + for ( const item of [...this.storedItems] ) { + if ( !exclude.some(exc => compare(item, exc)) ) { + matches.push(item) + } + } + return new Collection(matches) + } + + /** + * Returns true if the given function returns truthy for any item in the collection. + * Stops executing if a single truth case is found. + * @param func + */ + some(func: (item: T) => Maybe): boolean { + return this.storedItems.some(func) + } + + /** + * Execute the function for every item in the collection. + * @param func + */ + each(func: KeyFunction): void { + this.storedItems.map(func) + } + + /** + * Create a new collection by mapping the items in this collection using the given function. + * @param func + */ + map(func: KeyFunction): Collection { + const newItems: CollectionItem[] = [] + this.each(((item, index) => { + newItems.push(func(item, index)) + })) + return new Collection(newItems) + } + + /** + * Create a new collection by mapping the items in this collection using the given function + * where the function returns an Either. The collection is all Right instances. If a Left + * is encountered, that value is returned. + * @param func + */ + mapRight(func: KeyFunction>): Either> { + const newItems: CollectionItem[] = [] + for ( let i = 0; i < this.length; i += 1 ) { + const result = func(this.storedItems[i], i) + if ( isLeft(result) ) { + return result + } + + newItems.push(unright(result)) + } + return right(new Collection(newItems)) + } + + /** + * Create a new collection by mapping the items in this collection using the given function + * where the function returns an Either. The collection is all Right instances. If a Left + * is encountered, that value is returned. + * @param func + */ + async asyncMapRight(func: KeyFunction>>): Promise>> { + const newItems: CollectionItem[] = [] + for ( let i = 0; i < this.length; i += 1 ) { + const result = await func(this.storedItems[i], i) + if ( isLeft(result) ) { + return result + } + + newItems.push(unright(result)) + } + return right(new Collection(newItems)) + } + + /** + * Get the collection as an AsyncCollection. + */ + toAsync(): AsyncCollection { + const iter = new ArrayIterable([...this.storedItems]) + return new AsyncCollection(iter) + } + + /** + * Map a method on the underlying type, passing it any required parameters. + * This is delightfully type-safe. + * @param method + * @param params + */ + mapCall>(method: T2, ...params: Parameters>): Collection>> { + // This is dumb, but I'm not sure how else to resolve it. The types check out, but TypeScript loses track of the fact that + // typeof x[method] === MethodType, so it assumes we're indexing an object incorrectly. + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + return this.map((x: T) => x[method](...params)) + } + + /** + * Shortcut for .mapCall(...).awaitAll(). + * @param method + * @param params + */ + async awaitMapCall>(method: T2, ...params: Parameters>): Promise>>>> { + return this.mapCall(method, ...params).awaitAll() + } + + /** + * Await all values in the collection. + */ + async awaitAll(): Promise>> { + return this.promiseMap(async x => x as Awaited) + } + + /** + * Map each element in the collection to a string. + */ + strings(): Collection { + return this.map(x => String(x)) + } + + /** + * Create a new collection by mapping the items in this collection using the given function, + * excluding any for which the function returns undefined. + * @param func + */ + partialMap(func: KeyFunction): Collection> { + const newItems: CollectionItem>[] = [] + + this.each(((item, index) => { + const result = func(item, index) + if ( typeof result !== 'undefined' ) { + newItems.push(result as NonNullable) + } + })) + + return new Collection>(newItems) + } + + /** + * Convert this collection to an object keyed by the given field. + * + * @example + * ```typescript + * const users = collect([{uid: 1, name: 'John'}, {uid: 2, name: 'Jane'}]) + * users.keyBy('name') // => {John: {uid: 1, name: 'John'}, Jane: {uid: 2, name: 'Jane'}} + * ``` + * + * @param key + */ + keyBy(key: KeyOperator): {[key: string]: T} { + const obj: {[key: string]: T} = {} + + this.allAssociated(key).forEach(assoc => { + obj[assoc.key] = assoc.item + }) + + return obj + } + + /** + * Convert this collection to an object keyed by the given field, whose values are + * the output of the `value` operator. + * + * @example + * ```typescript + * const users = collect([{uid: 1, name: 'John'}, {uid: 2, name: 'Jane'}]) + * users.keyMap('name', 'uid') // => {John: 1, Jane: 2} + * ``` + * + * @param key + * @param value + */ + keyMap(key: KeyOperator, value: KeyOperator): {[key: string]: T2} { + const obj: {[key: string]: T2} = {} + + let i = -1 + this.allAssociated(key).forEach(assoc => { + i += 1 + + if ( typeof value === 'function' ) { + obj[assoc.key] = value(assoc.item, i) + } else { + obj[assoc.key] = (assoc.item[value] as any) as T2 + } + }) + + return obj + } + + /** + * Returns true if the given function returns true for every item in the collection. + * @param func + */ + every(func: KeyFunction): boolean { + return this.storedItems.every(func) + } + + everyWhere(key: KeyOperator, operator: WhereOperator, operand?: unknown): boolean { + const items = this.allAssociated(key) + return items.every(item => whereMatch(item, operator, operand)) + } + + /** + * Return a new collection filtered by the given function. + * @param func + */ + filter(func?: KeyFunction): Collection { + return new Collection(this.storedItems.filter(func ?? Boolean)) + } + + /** + * Like filter, but inverted. That is, removes items that DO match the criterion. + * @param func + */ + filterOut(func?: KeyFunction): Collection { + return this.filter((...args) => !(func ?? Boolean)(...args)) + } + + whereDefined(): Collection> { + return this.filter() as unknown as Collection> + } + + /** + * Returns the index of the record for which the given function returns true, if such an index exists. + * @param func + */ + find(func: KeyFunction): number | undefined { + let foundIndex: number | undefined = undefined + this.storedItems.some((item, index) => { + if ( func(item, index) ) { + foundIndex = index + return true + } + }) + return foundIndex + } + + /** + * When `bool` is truthy, execute the callback, passing in the collection. + * Can be used for functional-style chained calls. + * @param bool + * @param then + */ + when(bool: boolean, then: CollectionFunction): Collection { + if ( bool ) { + then(this) + } + return this + } + + /** + * When `bool` is falsy, execute the callback, passing in the collection. + * Can be used for functional-style chained calls. + * @param bool + * @param then + */ + unless(bool: boolean, then: CollectionFunction): Collection { + if ( !bool ) { + then(this) + } + return this + } + + /** + * Filter the collection by the given where-condition. + * + * @example + * ```typescript + * const users = collect([ + * {uid: 1, name: 'John'}, + * {uid: 2, name: 'Jane'}, + * {uid: 3, name: 'James'}, + * ]) + * + * users.where('uid', '<', 3) // => Collection[{uid: 1, name: 'John'}, {uid: 2, name: 'Jane'}] + * ``` + * + * @param key + * @param operator + * @param operand + */ + where(key: KeyOperator, operator: WhereOperator, operand?: unknown): Collection { + const items = this.allAssociated(key) + return new Collection(applyWhere(items, operator, operand)) + } + + /** + * Filter the collection by the inverse of the given where-condition. + * @param key + * @param operator + * @param operand + */ + whereNot(key: KeyOperator, operator: WhereOperator, operand?: unknown): Collection { + return this.diff(this.where(key, operator, operand)) + } + + /** + * Filter the collection for all records where the given key is in a set of items. + * @param key + * @param items + */ + whereIn(key: KeyOperator, items: CollectionComparable): Collection { + const allowed = items instanceof Collection ? items.all() : items + const matches = [] + for ( const { key: search, item } of this.allAssociated(key) ) { + if ( allowed.includes(search) ) { + matches.push(item) + } + } + return new Collection(matches) + } + + /** + * Filter the collection for all records where the given key is NOT in a set of items. + * @param key + * @param items + */ + whereNotIn(key: KeyOperator, items: CollectionComparable): Collection { + return this.diff(this.whereIn(key, items)) + } + + /** + * Return the first item in the collection, if it exists. + */ + first(): MaybeCollectionItem { + if ( this.length > 0 ) { + return this.storedItems[0] + } + } + + /** + * Return the first item in the collection that matches the given where-condition. + * @param key + * @param operator + * @param operand + */ + firstWhere(key: KeyOperator, operator: WhereOperator = '=', operand: any = true): MaybeCollectionItem { + const items = this.where(key, operator, operand).all() + if ( items.length > 0 ) { + return items[0] + } + } + + /** + * Return the first item in the collection that does NOT match the given where-condition. + * @param key + * @param operator + * @param operand + */ + firstWhereNot(key: KeyOperator, operator: WhereOperator, operand?: unknown): MaybeCollectionItem { + const items = this.whereNot(key, operator, operand).all() + if ( items.length > 0 ) { + return items[0] + } + } + + /** + * Get the number of items in the collection. + */ + get length(): number { + return this.storedItems.length + } + + /** + * Get the number of items in the collection. + */ + count(): number { + return this.storedItems.length + } + + // TODO flatten - depth + + /** + * Get the item at the given index in the collection. If none exists, + * return the fallback. + * @param index + * @param fallback + */ + get(index: number, fallback?: T): MaybeCollectionItem { + if ( this.length > index ) { + return this.storedItems[index] + } else { + return fallback + } + } + + /** + * Return the item at the given index in the collection, if it exists. + * @param index + */ + at(index: number): MaybeCollectionItem { + return this.get(index) + } + + /** + * Return an object mapping key values to arrays of records with that key. + * + * @example + * ```typescript + * const users = collect([ + * {uid: 1, name: 'John', type: 'admin'}, + * {uid: 2, name: 'Jane', type: 'user'}, + * {uid: 3, name: 'James', type: 'user'}, + * ]) + * + * users.groupBy('type') // => {admin: [{uid: 1, ...}], user: [{uid: 2, ...}, {uid: 3, ...}]} + * ``` + * + * @param key + */ + groupBy(key: KeyOperator): any { + const items = this.allAssociated(key) + const groups: any = {} + for ( const item of items ) { + const itemKey = String(item.key) + if ( !groups[itemKey] ) { + groups[itemKey] = [] + } + + groups[itemKey].push(item.item) + } + + return groups + } + + /** + * Return an object mapping the given key to the record with that key. + * See `keyBy()`. + * @param key + */ + associate(key: KeyOperator): any { + const items = this.allAssociated(key) + const values: any = {} + for ( const item of items ) { + values[String(item.key)] = item.item + } + return values + } + + /** + * Join the items in the collection to a string delimited by the given delimiter. + * @param delimiter + */ + join(delimiter: string): string { + return this.storedItems.join(delimiter) + } + + /** + * Join the items in the collection to a string delimited by the given delimiter. + * @param delimiter + */ + implode(delimiter: string): string { + return this.join(delimiter) + } + + /** + * Return a collection of the items that exist in both this collection and this collection, + * (optionally) using the given key to compare. + * @param items + * @param key + */ + intersect(items: CollectionComparable, key?: KeyOperator): Collection { + const compare = items instanceof Collection ? items.all() : items + const intersect = [] + let allItems + if ( key ) { + allItems = this.allAssociated(key) + } else { + allItems = this.storedItems.map(item => { + return { + key: item, + item, + } + }) + } + + for ( const item of allItems ) { + if ( compare.includes(item.key) ) { + intersect.push(item.item) + } + } + return new Collection(intersect) + } + + /** + * True if the collection has no items. + */ + isEmpty(): boolean { + return this.length < 1 + } + + /** + * True if the collection has at least one item. + */ + isNotEmpty(): boolean { + return this.length > 0 + } + + /** + * Return the last item in the collection. + */ + last(): MaybeCollectionItem { + if ( this.length > 0 ) { + return this.storedItems.reverse()[0] + } + } + + /** + * Return the last item in the collection that matches the given where-condition. + * @param key + * @param operator + * @param operand + */ + lastWhere(key: KeyOperator, operator: WhereOperator, operand?: unknown): MaybeCollectionItem { + const items = this.where(key, operator, operand).all() + if ( items.length > 0 ) { + return items.reverse()[0] + } + } + + /** + * Return the last item in the collection that does NOT match the given where-condition. + * @param key + * @param operator + * @param operand + */ + lastWhereNot(key: KeyOperator, operator: WhereOperator, operand?: unknown): MaybeCollectionItem { + const items = this.whereNot(key, operator, operand).all() + if ( items.length > 0 ) { + return items.reverse()[0] + } + } + + /** + * Map the collection to a collection of the values of the key. + * + * @example + * ```typescript + * const users = collect([ + * {uid: 1, name: 'John', type: 'admin'}, + * {uid: 2, name: 'Jane', type: 'user'}, + * {uid: 3, name: 'James', type: 'user'}, + * ]) + * + * users.pluck('name') // => Collection['John', 'Jane', 'James'] + * ``` + * + * @param key + */ + pluck(key: T2): Collection { + return new Collection(this.allOperator(key)) + } + + /** + * Return the max value of the given key. + * @param key + */ + max(key: KeyOperator): number { + const values = this.allAsNumbers(key) + return Math.max(...values) + } + + /** + * Return the item with the max value of the given key. + * @param key + */ + whereMax(key: KeyOperator): Collection { + return this.where(key, '=', this.max(key)) + } + + /** + * Return the min value of the given key. + * @param key + */ + min(key: KeyOperator): number { + const values = this.allAsNumbers(key) + return Math.min(...values) + } + + /** + * Return the item with the min value of the given key. + * @param key + */ + whereMin(key: KeyOperator): Collection { + return this.where(key, '=', this.min(key)) + } + + /** + * Get a new collection containing both the items in this collection, and the `items` collection. + * @param items + */ + merge(items: CollectionComparable): Collection { + const merge = items instanceof Collection ? items.all() : items + return new Collection([...this.storedItems, ...merge]) + } + + /** + * Return every nth item in the collection. + * + * @example + * ``` + * const items = collect(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i']) + * + * items.nth(3) // => Collection['a', 'd', 'g'] + * ``` + * + * @param n + */ + nth(n: number): Collection { + const matches: CollectionItem[] = [] + let current = 1 + this.storedItems.forEach(item => { + if ( current === 1 ) { + matches.push(item) + } + + current += 1 + if ( current > n ) { + current = 1 + } + }) + return new Collection(matches) + } + + /** + * Return the items that should exist on the given page, assuming there are `perPage` many items on a single page. + * @param page + * @param perPage + */ + forPage(page: number, perPage: number): Collection { + const start = page * perPage - perPage + const end = page * perPage + return new Collection(this.storedItems.slice(start, end)) + } + + /** + * Return a new Pipe of this collection. + */ + pipeTo(pipeline: Pipeline): TOut { + return pipeline.apply(this) + } + + /** Build and apply a pipeline. */ + pipe(builder: (pipeline: Pipeline) => Pipeline): TOut { + return builder(Pipeline.id()).apply(this) + } + + /** + * Return a new AsyncPipe of this collection. + */ + asyncPipe(): AsyncPipe> { + return AsyncPipe.wrap(this) + } + + /** + * Remove the last item from this collection. + */ + pop(): MaybeCollectionItem { + if ( this.length > 0 ) { + return this.storedItems.pop() + } + } + + /** + * Add the given item to the beginning of this collection. + * @param item + */ + prepend(item: CollectionItem): Collection { + this.storedItems = [item, ...this.storedItems] + this.callPushSubscribers(item) + return this + } + + /** + * Add the given item to the end of this collection. + * @param item + */ + push(item: CollectionItem): Collection { + this.storedItems.push(item) + this.callPushSubscribers(item) + return this + } + + /** + * Subscribe to listen for items being added to the collection. + * @param sub + */ + push$(sub: Subscription): Unsubscribe { + this.pushSubscribers.push(sub) + return { + unsubscribe: () => this.pushSubscribers = this.pushSubscribers.filter(x => x !== sub), + } + } + + /** Helper to notify subscribers that an item has been pushed to the collection. */ + private callPushSubscribers(item: T): void { + this.pushSubscribers + .forEach(sub => { + if ( typeof sub === 'object' ) { + sub?.next?.(item) + } else { + sub(item) + } + }) + } + + /** + * Push the given items to the end of this collection. + * Unlike `merge()`, this mutates the current collection's items. + * @param items + */ + concat(items: CollectionComparable): Collection { + const concats = items instanceof Collection ? items.all() : items + for ( const item of concats ) { + this.storedItems.push(item) + this.callPushSubscribers(item) + } + return this + } + + /** + * Insert the given item into this collection at the specified index. + * @param index + * @param item + */ + put(index: number, item: CollectionItem): Collection { + const newItems = [] + let inserted = false + this.storedItems.forEach((existing, existingIndex) => { + if ( existingIndex === index ) { + newItems.push(item) + inserted = true + } + + newItems.push(existing) + }) + + if ( !inserted ) { + newItems.push(item) + } + return new Collection(newItems) + } + + /** + * Return `n` many randomly-selected items from this collection. + * @param n + */ + random(n: number): Collection { + const randomItems: CollectionItem[] = [] + const all = this.storedItems + if ( n > this.length ) { + n = this.length + } + while ( randomItems.length < n ) { + const item = all[Math.floor(Math.random() * all.length)] + if ( !randomItems.includes(item) ) { + randomItems.push(item) + } + } + return new Collection(randomItems) + } + + /** + * Reduce this collection to a single value using the given reducer function. + * + * @example + * ```typescript + * const items = collect([1, 3, 5, 7]) + * + * items.reduce((sum, item) => sum + item, 0) // => 16 + * ``` + * + * @param reducer + * @param initialValue + */ + reduce(reducer: KeyReducerFunction, initialValue: T2): T2 + + reduce(reducer: KeyReducerFunction, initialValue?: T2): T2 | undefined { + let currentValue = initialValue + this.storedItems.forEach((item, index) => { + currentValue = reducer(currentValue, item, index) + }) + + return currentValue + } + + /** + * Return a new collection of items that fail the given truth-test function. + * @param truthTestFunction + */ + reject(truthTestFunction: KeyFunction): Collection { + const rejected = this.storedItems.filter((item, index) => { + return !truthTestFunction(item, index) + }) + return new Collection(rejected) + } + + /** + * Return a new collection whose items are in the reverse order of the current one. + */ + reverse(): Collection { + return new Collection([...this.storedItems.reverse()]) + } + + /** + * Try to find the given item in the collection. If it exists, return the index. + * @param item + */ + search(item: CollectionItem): MaybeCollectionIndex { + let foundIndex + this.storedItems.some((possibleItem, index) => { + if ( possibleItem === item ) { + foundIndex = index + return true + } + }) + return foundIndex + } + + /** + * Remove and return the first item in the collection, if it exists. + */ + shift(): MaybeCollectionItem { + if ( this.length > 0 ) { + return this.storedItems.shift() + } + } + + /** + * Shuffle the items in this collection to a random order. + */ + shuffle(): Collection { + const items = [...this.storedItems] + for ( let i = items.length - 1; i > 0; i-- ) { + const j = Math.floor(Math.random() * (i + 1)) + ;[items[i], items[j]] = [items[j], items[i]] + } + return new Collection(items) + } + + /** + * Return a sub-set of this collection between the given index ranges. + * @param start + * @param end + */ + slice(start: number, end: number): Collection { + return new Collection(this.storedItems.slice(start, end)) + } + + // TODO split + // TODO chunk + + /** + * Sort the collection (optionally) using the given comparison function. + * @param comparisonFunction + */ + sort(comparisonFunction?: ComparisonFunction): Collection { + const items = this.storedItems + if ( comparisonFunction ) { + items.sort(comparisonFunction) + } else { + items.sort() + } + + return new Collection(items) + } + + /** + * Sort the collection (optionally) using the given key operator. + * @param key + */ + sortBy(key?: KeyOperator): Collection { + let items: any[] + if ( key ) { + items = this.allAssociated(key) + } else { + items = this.storedItems.map(item => { + return { key: item, + item } + }) + } + + items.sort((a: any, b: any) => { + if ( a.key > b.key ) { + return 1 + } else if ( a.key < b.key ) { + return -1 + } else { + return 0 + } + }) + return new Collection(items.map((item: AssociatedCollectionItem) => item.item)) + } + + /** + * Identical to `sort()`, but in reverse order. + * @param comparisonFunction + */ + sortDesc(comparisonFunction?: ComparisonFunction): Collection { + return this.sort(comparisonFunction).reverse() + } + + /** + * Identical to `sortBy()`, but in reverse order. + * @param key + */ + sortByDesc(key?: KeyOperator): Collection { + return this.sortBy(key).reverse() + } + + /** + * Remove `deleteCount` many items from the collection, starting at the `start` index. + * @param start + * @param deleteCount + */ + splice(start: CollectionIndex, deleteCount?: number): Collection { + return new Collection([...this.storedItems].splice(start, deleteCount)) + } + + /** + * Return the sum of the items in the collection, optionally by key. + * + * @example + * ```typescript + * const items = collect([{k1: 1}, {k1: 3}, {k1: 5}, {k1: 7}]) + * + * items.sum('k1') // => 16 + * ``` + * + * @param key + */ + sum(key?: KeyOperator): number { + let items + if ( key ) { + items = this.allAsNumbers(key) + } else { + items = this.storedItems.map(x => Number(x)) + } + return items.reduce((prev, curr) => prev + curr) + } + + /** + * Return a collection of the first `limit` many items in this collection. + * If `limit` is negative, returns the last `limit` many items. + * @param limit + */ + take(limit: number): Collection { + if ( limit === 0 ) { + return new Collection() + } else if ( limit > 0 ) { + return new Collection(this.storedItems.slice(0, limit)) + } else { + return new Collection(this.storedItems.reverse().slice(0, -1 * limit) + .reverse()) + } + } + + /** + * Apply the given function to this collection then return the collection. + * This is intended to help with chaining. + * + * @example + * ```typescript + * collection.tap(coll => { + * coll.push(item) + * }) + * .where('someKey', '>', 4) + * // ... &c. + * ``` + * + * @param func + */ + tap(func: CollectionFunction): this { + func(this) + return this + } + + /** + * Return all distinct items in this collection. If a key is specified, returns + * all unique values of that key. + * + * @example + * ```typescript + * const users = collect([ + * {uid: 1, name: 'John', type: 'admin'}, + * {uid: 2, name: 'Jane', type: 'user'}, + * {uid: 3, name: 'James', type: 'user'}, + * ]) + * + * users.unique('type') // => Collection['admin', 'user'] + * ``` + * + * @param key + */ + unique(key?: KeyOperator): Collection { + const has: CollectionItem[] = [] + let items + if ( key ) { + items = this.allOperator(key) + } else { + items = [...this.storedItems] + } + for ( const item of items ) { + if ( !has.includes(item) ) { + has.push(item) + } + } + return new Collection(has) + } + + /** + * Returns true if the given item is in this collection. + * @param item + */ + includes(item: CollectionItem): boolean { + return this.storedItems.includes(item) + } + + /** + * Add on to the end of this collection as many `value` items as necessary until the collection is `length` long. + * @param length + * @param value + */ + pad(length: number, value: CollectionItem): Collection { + const items = [...this.storedItems] + while ( items.length < length ) { + items.push(value) + } + return new Collection(items) + } + + /** + * Cast the collection to an array. + */ + toArray(recursive = true): any[] { + const returns: any = [] + for ( const item of this.storedItems ) { + if ( recursive && item instanceof Collection ) { + returns.push(item.toArray()) + } else { + returns.push(item) + } + } + return returns + } + + /** + * Cast the collection to a JSON string, optionally specifying the replacer and indentation. + * @param replacer + * @param space + */ + toJSON(replacer = undefined, space = 4): string { + return JSON.stringify(this.toArray(), replacer, space) + } + + // TODO getIterator + // TODO getCachingIterator + + [Symbol.iterator](): Iterator { + const items = this.storedItems + let currentIndex = 0 + return { + next() { + const item = items[currentIndex] + currentIndex += 1 + + return { + done: currentIndex > items.length, + value: item, + } + }, + } + } + + /** + * Like `map()`, but the callback can be async. + * + * @example + * A trivial example, but demonstrative: + * + * ```typescript + * const collection = collect([1, 2, 3]) + * + * collection.map(async item => item + 1) // => Collection[Promise<1>, Promise<2>, Promise<3>] + * + * collection.promiseMap(async item => item + 1) // => Promise + * ``` + * + * @param func + */ + async promiseMap(func: KeyFunction>): Promise> { + return new Collection(await Promise.all( + this.map(func).toArray(), + )) + } +} diff --git a/src/bones/collection/Iterable.ts b/src/bones/collection/Iterable.ts new file mode 100644 index 0000000..c496947 --- /dev/null +++ b/src/bones/collection/Iterable.ts @@ -0,0 +1,135 @@ +import {Collection} from './Collection' + +export type MaybeIterationItem = { done: boolean, value?: T } +export type ChunkCallback = (items: Collection) => any + +export class StopIteration extends Error {} + +/** + * Abstract class representing an iterable, lazy-loaded dataset. + * @abstract + */ +export abstract class Iterable { + /** + * The current index of the iterable. + * @type number + */ + protected index = 0 + + /** + * Get the item of this iterable at the given index, if one exists. + * @param {number} i + * @return Promise + */ + abstract at(i: number): Promise + + /** + * Get the collection of items in the given range of this iterable. + * @param {number} start + * @param {number} end + * @return Promise + */ + abstract range(start: number, end: number): Promise> + + /** + * Count the number of items in this collection. + * @return Promise + */ + abstract count(): Promise + + /** + * Get a copy of this iterable. + * @return Iterable + */ + abstract clone(): Iterable + + /** + * Return a collection of all items in this iterable. + * @return Promise + */ + public async all(): Promise> { + return this.range(0, (await this.count()) + 1) + } + + /** + * Advance to the next value of this iterable. + * @return Promise + */ + public async next(): Promise> { + const i = this.index + + if ( i >= await this.count() ) { + return { done: true } + } + + this.index = i + 1 + return { done: false, + value: await this.at(i) } + } + + /** + * Chunk the iterable into the given size and call the callback passing the chunk along. + * @param {number} size + * @param {ChunkCallback} callback + * @return Promise + */ + public async chunk(size: number, callback: ChunkCallback): Promise { + if ( size < 1 ) { + throw new Error('Chunk size must be at least 1') + } + + while ( true ) { + const items = await this.range(this.index, this.index + size - 1) + this.index += items.count() + + try { + await callback(items) + } catch ( error ) { + if ( error instanceof StopIteration ) { + break + } else { + throw error + } + } + + if ( items.count() < size ) { + // We hit the last chunk, so bail out + break + } + } + } + + /** + * Advance the iterable to the given index. + * @param {number} index + * @return Promise + */ + public async seek(index: number): Promise { + if ( index < 0 ) { + throw new TypeError('Cannot seek to negative index.') + } else if ( index >= await this.count() ) { + throw new TypeError('Cannot seek past last item.') + } + this.index = index + } + + /** + * Peek at the next value of the iterable, without advancing. + * @return Promise + */ + public async peek(): Promise { + if ( this.index + 1 >= await this.count() ) { + return undefined + } else { + return this.at(this.index + 1) + } + } + + /** + * Reset the iterable to the first index. + * @return Promise + */ + public async reset(): Promise { + this.index = 0 + } +} diff --git a/src/bones/collection/where.ts b/src/bones/collection/where.ts new file mode 100644 index 0000000..d04536e --- /dev/null +++ b/src/bones/collection/where.ts @@ -0,0 +1,110 @@ +/** + * Type representing a valid where operator. + */ +export type WhereOperator = '&' | '>' | '>=' | '<' | '<=' | '!=' | '<=>' | '%' | '|' | '!' | '~' | '=' | '^' + +/** + * Type associating search items with a key. + */ +export type AssociatedSearchItem = { key: any, item: any } + +/** + * Type representing the result of a where. + */ +export type WhereResult = any[] + +/** + * Returns true if the given item satisfies the given where clause. + * @param {AssociatedSearchItem} item + * @param {WhereOperator} operator + * @param [operand] + * @return boolean + */ +export const whereMatch = (item: AssociatedSearchItem, operator: WhereOperator, operand?: unknown): boolean => { + switch ( operator ) { + case '&': + if ( item.key & Number(operand) ) { + return true + } + break + case '>': + if ( item.key > (operand as any) ) { + return true + } + break + case '>=': + if ( item.key >= (operand as any) ) { + return true + } + break + case '<': + if ( item.key < (operand as any) ) { + return true + } + break + case '<=': + if ( item.key <= (operand as any) ) { + return true + } + break + case '!=': + if ( item.key !== (operand as any) ) { + return true + } + break + case '<=>': + if ( item.key === operand && typeof item.key !== 'undefined' && item.key !== null ) { + return true + } + break + case '%': + if ( item.key % Number(operand) ) { + return true + } + break + case '|': + if ( item.key | Number(operand) ) { + return true + } + break + case '!': + if ( !item.key ) { + return true + } + break + case '~': + if ( ~item.key ) { + return true + } + break + case '=': + if ( item.key === operand ) { + return true + } + break + case '^': + if ( item.key ^ Number(operand) ) { + return true + } + break + } + + return false +} + +/** + * Apply the given where clause to the items and return those that match. + * @param {Array} items + * @param {WhereOperator} operator + * @param [operand] + */ +export const applyWhere = (items: AssociatedSearchItem[], operator: WhereOperator, operand?: unknown): WhereResult => { + const matches: WhereResult = [] + for ( const item of items ) { + if ( whereMatch(item, operator, operand) ) { + matches.push(item.item) + } + } + + return matches +} diff --git a/src/config.ts b/src/config.ts index 54afd59..225c494 100644 --- a/src/config.ts +++ b/src/config.ts @@ -13,6 +13,7 @@ const maybeConfig: any = { threads: { type: 'alias', template: process.env.CHORUS_THREAD_TEMPLATE, + idPrefix: 't.', }, }, } diff --git a/src/mail/read.ts b/src/mail/read.ts new file mode 100644 index 0000000..e69de29 diff --git a/src/mail/replies.ts b/src/mail/replies.ts new file mode 100644 index 0000000..c40b648 --- /dev/null +++ b/src/mail/replies.ts @@ -0,0 +1,133 @@ +// EmailReplyParser is a library to parse plain text email content. +// The goal is to identify quoted text, signatures, or original content. + +export class ReplyParser { + static VERSION = "0.5.11"; + + // Splits an email body into a list of Fragments. + static read(text: string): Email { + return new Email().read(text); + } + + // Get the text of the visible portions of the given email body. + static parseReply(text: string): string { + return this.read(text).visibleText(); + } +} + +export class Email { + private fragments: Fragment[] = []; + private foundVisible: boolean = false; + + // Gets the combined text of the visible fragments of the email body. + visibleText(): string { + return this.fragments + .filter((fragment) => !fragment.hidden) + .map((fragment) => fragment.toString()) + .join("\n") + .trimEnd(); + } + + // Splits the given text into a list of Fragments. + read(text: string): this { + let modifiedText = text.slice(); + + // Normalize line endings. + modifiedText = modifiedText.replace(/\r\n/g, "\n"); + + // Handle multi-line reply headers. + const multiLineHeaderRegex = /^(?!On.*On\s.+?wrote:)(On\s(.+?)wrote:)$/m; + modifiedText = modifiedText.replace(multiLineHeaderRegex, (match) => + match.replace(/\n/g, " ") + ); + + // Ensure proper splitting for lines of underscores. + modifiedText = modifiedText.replace(/([^\n])(?=\n_{7}_+)$/m, "$1\n"); + + // Reverse the text for parsing. + modifiedText = modifiedText.split("").reverse().join(""); + + this.foundVisible = false; + let fragment: Fragment | null = null; + + const lines = modifiedText.split("\n"); + for (const line of lines) { + const processedLine = line.trimEnd(); + const isQuoted = processedLine.endsWith(">"); + + if (fragment && processedLine === "") { + if (Fragment.isSignature(fragment.lines[fragment.lines.length - 1])) { + fragment.signature = true; + this.finishFragment(fragment); + fragment = null; + } + } + + if ( + fragment && + (fragment.quoted === isQuoted || + (fragment.quoted && + (Fragment.isQuoteHeader(processedLine) || processedLine === ""))) + ) { + fragment.lines.push(processedLine); + } else { + if (fragment) { + this.finishFragment(fragment); + } + fragment = new Fragment(isQuoted, processedLine); + } + } + + if (fragment) { + this.finishFragment(fragment); + } + + this.fragments.reverse(); + return this; + } + + private finishFragment(fragment: Fragment): void { + fragment.finish(); + if (!this.foundVisible) { + if (fragment.quoted || fragment.signature || fragment.toString().trim() === "") { + fragment.hidden = true; + } else { + this.foundVisible = true; + } + } + this.fragments.push(fragment); + } +} + +class Fragment { + static SIGNATURE_REGEX = /(--\s*$|__\s*$|\w-$)|(^(\w+\s+){1,3}ym morf tneS$)/m; + static QUOTE_HEADER_REGEX = /^:etorw.*nO$|^.*:(morF|tneS|oT|tcejbuS)$/; + + quoted: boolean; + signature: boolean = false; + hidden: boolean = false; + lines: string[]; + private content: string | null = null; + + constructor(quoted: boolean, firstLine: string) { + this.quoted = quoted; + this.lines = [firstLine]; + } + + static isSignature(line: string): boolean { + return this.SIGNATURE_REGEX.test(line); + } + + static isQuoteHeader(line: string): boolean { + return this.QUOTE_HEADER_REGEX.test(line); + } + + finish(): void { + this.content = this.lines.join("\n").split("").reverse().join(""); + this.lines = []; + } + + toString(): string { + return this.content || ""; + } +} diff --git a/src/types.ts b/src/types.ts index 426d2f3..33057e1 100644 --- a/src/types.ts +++ b/src/types.ts @@ -13,6 +13,7 @@ const commentsConfigSchema = z.object({ threads: z.object({ type: z.string(), // fixme : in validation template: z.string(), + idPrefix: z.string(), }), }), }) @@ -21,3 +22,17 @@ export type CommentsConfig = z.infer export const castCommentsConfig = (what: unknown): CommentsConfig => { return commentsConfigSchema.parse(what) } + + + +export type Message = { + id: string, + date: Date, + recipients: string[], + from: { + name?: string, + address?: string, + }, + subject: string, + content: string, +}