[WIP] Start tinkering with IMAP
This commit is contained in:
parent
7791baff20
commit
063809e446
82
index.ts
82
index.ts
@ -1,4 +1,82 @@
|
||||
|
||||
import { config } from "./src/config.ts";
|
||||
import {ImapFlow, type MailboxLockObject} from "imapflow";
|
||||
import { extract } from 'letterparser'
|
||||
import {ReplyParser} from "./src/mail/replies.ts";
|
||||
import type {Message} from "./src/types.ts";
|
||||
import type {Awaitable} from "./src/bones";
|
||||
import {AsyncCollection} from "./src/bones/collection/AsyncCollection.ts";
|
||||
import {AsyncGeneratorIterable} from "./src/bones/collection/AsyncGeneratorIterable.ts";
|
||||
|
||||
console.log(config)
|
||||
export async function withMessageClient<TReturn>(cb: (c: ImapFlow) => Awaitable<TReturn>): Promise<TReturn> {
|
||||
const client = new ImapFlow(config.mail.imap)
|
||||
await client.connect()
|
||||
|
||||
try {
|
||||
return cb(client)
|
||||
} finally {
|
||||
await client.logout()
|
||||
}
|
||||
}
|
||||
|
||||
export async function getFolders(): Promise<string[]> {
|
||||
return withMessageClient(async client => {
|
||||
const list = await client.list()
|
||||
return list.map(l => l.name)
|
||||
})
|
||||
}
|
||||
|
||||
export const getMessagesForMailbox = (box: string): AsyncCollection<Message> => {
|
||||
return new AsyncCollection(
|
||||
new AsyncGeneratorIterable(
|
||||
() => getMessagesForMailboxOld(box)))
|
||||
}
|
||||
|
||||
export async function* getMessagesForMailboxOld(box: string): AsyncGenerator<Message, void, unknown> {
|
||||
const client = new ImapFlow(config.mail.imap)
|
||||
await client.connect()
|
||||
|
||||
try {
|
||||
let lock: MailboxLockObject
|
||||
try {
|
||||
lock = await client.getMailboxLock(box)
|
||||
} catch (e: unknown) {
|
||||
// This is usually because the mailbox does not exist, so yield nothing
|
||||
console.warn(`Error when opening mailbox lock for mailbox "${box}":\n`, e)
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
await client.mailboxOpen(box)
|
||||
|
||||
const messages = client.fetch('1:*', {
|
||||
envelope: true,
|
||||
source: true,
|
||||
uid: true,
|
||||
bodyParts: ['text'],
|
||||
})
|
||||
|
||||
for await ( const message of messages ) {
|
||||
const source = message.source.toString('utf-8')
|
||||
const content = ReplyParser.parseReply(extract(source).text || '')
|
||||
|
||||
const msg: Message = {
|
||||
id: message.envelope.messageId,
|
||||
date: message.envelope.date,
|
||||
recipients: message.envelope.to.map(x => x.address || '').filter(Boolean),
|
||||
from: message.envelope.from[0],
|
||||
subject: message.envelope.subject,
|
||||
content,
|
||||
}
|
||||
|
||||
yield msg
|
||||
}
|
||||
} finally {
|
||||
lock.release()
|
||||
}
|
||||
} finally {
|
||||
await client.logout()
|
||||
}
|
||||
}
|
||||
|
||||
const c = getMessagesForMailbox('e12a')
|
||||
console.log(await c.all())
|
@ -10,7 +10,9 @@
|
||||
},
|
||||
"dependencies": {
|
||||
"@types/imapflow": "^1.0.19",
|
||||
"@types/mailparser": "^3.4.5",
|
||||
"imapflow": "^1.0.171",
|
||||
"letterparser": "^0.1.8",
|
||||
"zod": "^3.24.1"
|
||||
}
|
||||
}
|
264
src/bones/Pipe.ts
Normal file
264
src/bones/Pipe.ts
Normal file
@ -0,0 +1,264 @@
|
||||
/**
|
||||
* A closure that maps a given pipe item to a different type.
|
||||
*/
|
||||
import type {Awaitable, Maybe} from './types'
|
||||
|
||||
export type PipeOperator<T, T2> = (subject: T) => T2
|
||||
|
||||
/**
|
||||
* A closure that maps a given pipe item to an item of the same type.
|
||||
*/
|
||||
export type ReflexivePipeOperator<T> = (subject: T) => Maybe<T>
|
||||
|
||||
/**
|
||||
* A condition or condition-resolving function for pipe methods.
|
||||
*/
|
||||
export type PipeCondition<T> = boolean | ((subject: T) => boolean)
|
||||
|
||||
/**
|
||||
* A class for writing chained/conditional operations in a data-flow manner.
|
||||
*
|
||||
* This is useful when you need to do a series of operations on an object, perhaps conditionally.
|
||||
*/
|
||||
export class Pipeline<TIn, TOut> {
|
||||
static id<T>(): Pipeline<T, T> {
|
||||
return new Pipeline(x => x)
|
||||
}
|
||||
|
||||
constructor(
|
||||
protected readonly factory: (TIn: TIn) => TOut,
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Apply the given operator to the item in the pipe, and return a new pipe with the result.
|
||||
*
|
||||
* @example
|
||||
* ```typescript
|
||||
* Pipe.wrap(2)
|
||||
* .tap(x => x * 4)
|
||||
* .get() // => 8
|
||||
* ```
|
||||
*
|
||||
* @param op
|
||||
*/
|
||||
tap<T2>(op: PipeOperator<TOut, T2>): Pipeline<TIn, T2> {
|
||||
return new Pipeline((val: TIn) => {
|
||||
return op(this.factory(val))
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Like tap, but operates on a tuple with both the first value and the tapped value.
|
||||
* @param op
|
||||
*/
|
||||
first<T2>(op: PipeOperator<[TIn, TOut], T2>): Pipeline<TIn, T2> {
|
||||
return new Pipeline((val: TIn) => {
|
||||
return op([val, this.factory(val)])
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Like tap, but always returns the original pipe type.
|
||||
* @param op
|
||||
*/
|
||||
peek<T2>(op: PipeOperator<TOut, T2>): Pipeline<TIn, TOut> {
|
||||
return new Pipeline((val: TIn) => {
|
||||
const nextVal = this.factory(val)
|
||||
op(nextVal)
|
||||
return nextVal
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* If `check` is truthy, apply the given operator to the item in the pipe and return the result.
|
||||
* Otherwise, just return the current pipe unchanged.
|
||||
*
|
||||
* @param check
|
||||
* @param op
|
||||
*/
|
||||
when(check: PipeCondition<TOut>, op: ReflexivePipeOperator<TOut>): Pipeline<TIn, TOut> {
|
||||
return new Pipeline((val: TIn) => {
|
||||
const nextVal = this.factory(val)
|
||||
if ( this.checkCondition(check, nextVal) ) {
|
||||
const appliedVal = op(nextVal)
|
||||
if ( typeof appliedVal === 'undefined' ) {
|
||||
return nextVal
|
||||
}
|
||||
|
||||
return appliedVal
|
||||
}
|
||||
|
||||
return nextVal
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* If `check` is falsy, apply the given operator to the item in the pipe and return the result.
|
||||
* Otherwise, just return the current pipe unchanged.
|
||||
*
|
||||
* @param check
|
||||
* @param op
|
||||
*/
|
||||
unless(check: PipeCondition<TOut>, op: ReflexivePipeOperator<TOut>): Pipeline<TIn, TOut> {
|
||||
return new Pipeline((val: TIn) => {
|
||||
const nextVal = this.factory(val)
|
||||
if ( !this.checkCondition(check, nextVal) ) {
|
||||
const appliedVal = op(nextVal)
|
||||
if ( typeof appliedVal === 'undefined' ) {
|
||||
return nextVal
|
||||
}
|
||||
|
||||
return appliedVal
|
||||
}
|
||||
|
||||
return nextVal
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply the pipeline to an input.
|
||||
*/
|
||||
apply(input: TIn): TOut {
|
||||
return this.factory(input)
|
||||
}
|
||||
|
||||
protected checkCondition(check: PipeCondition<TOut>, val: TOut): boolean {
|
||||
return (typeof check === 'function' && check(val))
|
||||
|| (typeof check !== 'function' && check)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A subject function that yields the value in the AsyncPipe.
|
||||
*/
|
||||
export type AsyncPipeResolver<T> = () => Awaitable<T>
|
||||
|
||||
/**
|
||||
* A closure that maps a given pipe item to a different type.
|
||||
*/
|
||||
export type AsyncPipeOperator<T, T2> = (subject: T) => Awaitable<T2>
|
||||
|
||||
export type PromisePipeOperator<T, T2> = (subject: T, resolve: (val: T2) => unknown, reject: (err: Error) => unknown) => Awaitable<unknown>
|
||||
|
||||
/**
|
||||
* A closure that maps a given pipe item to an item of the same type.
|
||||
*/
|
||||
export type ReflexiveAsyncPipeOperator<T> = (subject: T) => Awaitable<T|void>
|
||||
|
||||
/**
|
||||
* A condition or condition-resolving function for pipe methods.
|
||||
*/
|
||||
export type AsyncPipeCondition<T> = boolean | ((subject: T) => Awaitable<boolean>)
|
||||
|
||||
/**
|
||||
* An asynchronous version of the Pipe helper.
|
||||
*/
|
||||
export class AsyncPipe<T> {
|
||||
/**
|
||||
* Get an AsyncPipe with the given value in it.
|
||||
* @param subject
|
||||
*/
|
||||
static wrap<subjectType>(subject: subjectType): AsyncPipe<subjectType> {
|
||||
return new AsyncPipe<subjectType>(() => subject)
|
||||
}
|
||||
|
||||
constructor(
|
||||
/** The current value resolver of the pipe. */
|
||||
private subject: AsyncPipeResolver<T>,
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Apply a transformative operator to the pipe.
|
||||
* @param op
|
||||
*/
|
||||
tap<T2>(op: AsyncPipeOperator<T, T2>): AsyncPipe<T2> {
|
||||
return new AsyncPipe<T2>(async () => op(await this.subject()))
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply a transformative operator to the pipe, wrapping it
|
||||
* in a Promise and passing the resolve/reject callbacks to the
|
||||
* closure.
|
||||
* @param op
|
||||
*/
|
||||
promise<T2>(op: PromisePipeOperator<T, T2>): AsyncPipe<T2> {
|
||||
return new AsyncPipe<T2>(() => {
|
||||
return new Promise<T2>((res, rej) => {
|
||||
(async () => this.subject())()
|
||||
.then(subject => {
|
||||
op(subject, res, rej)
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply an operator to the pipe, but return the reference
|
||||
* to the current pipe. The operator is resolved when the
|
||||
* overall pipe is resolved.
|
||||
* @param op
|
||||
*/
|
||||
peek<T2>(op: AsyncPipeOperator<T, T2>): AsyncPipe<T> {
|
||||
return new AsyncPipe<T>(async () => {
|
||||
const subject = await this.subject()
|
||||
await op(subject)
|
||||
return subject
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply an operator to the pipe, if the check condition passes.
|
||||
* @param check
|
||||
* @param op
|
||||
*/
|
||||
when(check: AsyncPipeCondition<T>, op: ReflexiveAsyncPipeOperator<T>): AsyncPipe<T> {
|
||||
return new AsyncPipe<T>(async () => {
|
||||
let subject
|
||||
|
||||
if ( typeof check === 'function' ) {
|
||||
check = await check(subject = await this.subject())
|
||||
}
|
||||
|
||||
subject = subject ?? await this.subject()
|
||||
if ( check ) {
|
||||
return ((await op(subject)) ?? subject) as T
|
||||
}
|
||||
|
||||
return subject as T
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply an operator to the pipe, if the check condition fails.
|
||||
* @param check
|
||||
* @param op
|
||||
*/
|
||||
unless(check: AsyncPipeCondition<T>, op: ReflexiveAsyncPipeOperator<T>): AsyncPipe<T> {
|
||||
if ( typeof check === 'function' ) {
|
||||
return this.when(async (subject: T) => !(await check(subject)), op)
|
||||
}
|
||||
|
||||
return this.when(!check, op)
|
||||
}
|
||||
|
||||
/**
|
||||
* Alias of `unless()`.
|
||||
* @param check
|
||||
* @param op
|
||||
*/
|
||||
whenNot(check: AsyncPipeCondition<T>, op: ReflexiveAsyncPipeOperator<T>): AsyncPipe<T> {
|
||||
return this.unless(check, op)
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the transformed value from the pipe.
|
||||
*/
|
||||
async resolve(): Promise<T> {
|
||||
return this.subject()
|
||||
}
|
||||
|
||||
/** Get the transformed value from the pipe. Allows awaiting the pipe directly. */
|
||||
then(): Promise<T> {
|
||||
return this.resolve()
|
||||
}
|
||||
}
|
210
src/bones/Reactive.ts
Normal file
210
src/bones/Reactive.ts
Normal file
@ -0,0 +1,210 @@
|
||||
/**
|
||||
* Base error used to trigger an unsubscribe action from a subscriber.
|
||||
* @extends Error
|
||||
*/
|
||||
export class UnsubscribeError extends Error {}
|
||||
|
||||
/**
|
||||
* Thrown when a closed observable is pushed to.
|
||||
* @extends Error
|
||||
*/
|
||||
export class CompletedObservableError extends Error {
|
||||
constructor() {
|
||||
super('This observable can no longer be pushed to, as it has been completed.')
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Type of a basic subscriber function.
|
||||
*/
|
||||
export type SubscriberFunction<T> = (val: T) => any
|
||||
|
||||
/**
|
||||
* Type of a basic subscriber function that handles errors.
|
||||
*/
|
||||
export type SubscriberErrorFunction = (error: Error) => any
|
||||
|
||||
/**
|
||||
* Type of a basic subscriber function that handles completed events.
|
||||
*/
|
||||
export type SubscriberCompleteFunction<T> = (val?: T) => any
|
||||
|
||||
/**
|
||||
* Subscribers that define multiple handler methods.
|
||||
*/
|
||||
export type ComplexSubscriber<T> = {
|
||||
next?: SubscriberFunction<T>,
|
||||
error?: SubscriberErrorFunction,
|
||||
complete?: SubscriberCompleteFunction<T>,
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscription to a behavior subject.
|
||||
*/
|
||||
export type Subscription<T> = SubscriberFunction<T> | ComplexSubscriber<T>
|
||||
|
||||
/**
|
||||
* Object providing helpers for unsubscribing from a subscription.
|
||||
*/
|
||||
export type Unsubscribe = { unsubscribe: () => void }
|
||||
|
||||
/**
|
||||
* A stream-based state class.
|
||||
*/
|
||||
export class Reactive<T> {
|
||||
/**
|
||||
* Subscribers to this subject.
|
||||
* @type Array<ComplexSubscriber>
|
||||
*/
|
||||
protected subscribers: ComplexSubscriber<T>[] = []
|
||||
|
||||
/**
|
||||
* True if this subject has been marked complete.
|
||||
* @type boolean
|
||||
*/
|
||||
protected subjectIsComplete = false
|
||||
|
||||
/**
|
||||
* The current value of this subject.
|
||||
*/
|
||||
protected currentValue?: T
|
||||
|
||||
/**
|
||||
* True if any value has been pushed to this subject.
|
||||
* @type boolean
|
||||
*/
|
||||
protected hasPush = false
|
||||
|
||||
/**
|
||||
* Register a new subscription to this subject.
|
||||
* @param {Subscription} subscriber
|
||||
* @return Unsubscribe
|
||||
*/
|
||||
public subscribe(subscriber: Subscription<T>): Unsubscribe {
|
||||
if ( typeof subscriber === 'function' ) {
|
||||
this.subscribers.push({ next: subscriber })
|
||||
} else {
|
||||
this.subscribers.push(subscriber)
|
||||
}
|
||||
|
||||
return {
|
||||
unsubscribe: () => {
|
||||
this.subscribers = this.subscribers.filter(x => x !== subscriber)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cast this subject to a promise, which resolves on the output of the next value.
|
||||
* @return Promise
|
||||
*/
|
||||
public toPromise(): Promise<T> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const { unsubscribe } = this.subscribe({
|
||||
next: (val: T) => {
|
||||
resolve(val)
|
||||
unsubscribe()
|
||||
},
|
||||
error: (error: Error) => {
|
||||
reject(error)
|
||||
unsubscribe()
|
||||
},
|
||||
complete: (val?: T) => {
|
||||
if ( typeof val !== 'undefined' ) {
|
||||
resolve(val)
|
||||
}
|
||||
unsubscribe()
|
||||
},
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Push a new value to this subject. The promise resolves when all subscribers have been pushed to.
|
||||
* @param val
|
||||
* @return Promise<void>
|
||||
*/
|
||||
public async next(val: T): Promise<void> {
|
||||
if ( this.subjectIsComplete ) {
|
||||
throw new CompletedObservableError()
|
||||
}
|
||||
this.currentValue = val
|
||||
this.hasPush = true
|
||||
for ( const subscriber of this.subscribers ) {
|
||||
if ( subscriber.next ) {
|
||||
try {
|
||||
await subscriber.next(val)
|
||||
} catch (e) {
|
||||
if ( e instanceof UnsubscribeError ) {
|
||||
this.subscribers = this.subscribers.filter(x => x !== subscriber)
|
||||
} else if (subscriber.error && e instanceof Error) {
|
||||
await subscriber.error(e)
|
||||
} else {
|
||||
throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Push the given array of values to this subject in order.
|
||||
* The promise resolves when all subscribers have been pushed to for all values.
|
||||
* @param {Array} vals
|
||||
* @return Promise<void>
|
||||
*/
|
||||
public async push(vals: T[]): Promise<void> {
|
||||
if ( this.subjectIsComplete ) {
|
||||
throw new CompletedObservableError()
|
||||
}
|
||||
await Promise.all(vals.map(val => this.next(val)))
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark this subject as complete.
|
||||
* The promise resolves when all subscribers have been pushed to.
|
||||
* @param [finalValue] - optionally, a final value to set
|
||||
* @return Promise<void>
|
||||
*/
|
||||
public async complete(finalValue?: T): Promise<void> {
|
||||
if ( this.subjectIsComplete ) {
|
||||
throw new CompletedObservableError()
|
||||
}
|
||||
if ( typeof finalValue === 'undefined' ) {
|
||||
finalValue = this.value()
|
||||
} else {
|
||||
this.currentValue = finalValue
|
||||
}
|
||||
|
||||
for ( const subscriber of this.subscribers ) {
|
||||
if ( subscriber.complete ) {
|
||||
try {
|
||||
await subscriber.complete(finalValue)
|
||||
} catch (e) {
|
||||
if ( subscriber.error && e instanceof Error ) {
|
||||
await subscriber.error(e)
|
||||
} else {
|
||||
throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.subjectIsComplete = true
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current value of this subject.
|
||||
*/
|
||||
public value(): T | undefined {
|
||||
return this.currentValue
|
||||
}
|
||||
|
||||
/**
|
||||
* True if this subject is marked as complete.
|
||||
* @return boolean
|
||||
*/
|
||||
public isComplete(): boolean {
|
||||
return this.subjectIsComplete
|
||||
}
|
||||
}
|
33
src/bones/collection/ArrayIterable.ts
Normal file
33
src/bones/collection/ArrayIterable.ts
Normal file
@ -0,0 +1,33 @@
|
||||
import { Iterable } from './Iterable'
|
||||
import {collect, Collection} from './Collection'
|
||||
|
||||
/**
|
||||
* A basic Iterable implementation that uses an array as a backend.
|
||||
* @extends Iterable
|
||||
*/
|
||||
export class ArrayIterable<T> extends Iterable<T> {
|
||||
constructor(
|
||||
/**
|
||||
* Items to use for this iterable.
|
||||
*/
|
||||
protected items: T[],
|
||||
) {
|
||||
super()
|
||||
}
|
||||
|
||||
async at(i: number): Promise<T | undefined> {
|
||||
return this.items[i]
|
||||
}
|
||||
|
||||
async range(start: number, end: number): Promise<Collection<T>> {
|
||||
return collect(this.items.slice(start, end + 1))
|
||||
}
|
||||
|
||||
async count(): Promise<number> {
|
||||
return this.items.length
|
||||
}
|
||||
|
||||
clone(): ArrayIterable<T> {
|
||||
return new ArrayIterable([...this.items])
|
||||
}
|
||||
}
|
1151
src/bones/collection/AsyncCollection.ts
Normal file
1151
src/bones/collection/AsyncCollection.ts
Normal file
File diff suppressed because it is too large
Load Diff
139
src/bones/collection/AsyncGeneratorIterable.ts
Normal file
139
src/bones/collection/AsyncGeneratorIterable.ts
Normal file
@ -0,0 +1,139 @@
|
||||
import {type ChunkCallback, Iterable, type MaybeIterationItem} from "./Iterable.ts";
|
||||
import {collect, type Collection} from "./Collection.ts";
|
||||
import {type Either, isLeft, isRight, left, type Maybe, right, unright} from "../types.ts";
|
||||
|
||||
export class AsyncGeneratorIterable<T> extends Iterable<T> {
|
||||
private sourceIndex: number = -1
|
||||
private cacheStartIndex: number = 0
|
||||
private cache: T[] = []
|
||||
private source?: AsyncGenerator<T, unknown, unknown>
|
||||
|
||||
constructor(
|
||||
private sourceFactory: () => AsyncGenerator<T, unknown, unknown>,
|
||||
private maxCacheSize: number = 100,
|
||||
) {
|
||||
super()
|
||||
}
|
||||
|
||||
private computeCacheIndex(realIndex: number): Either<null, number> {
|
||||
let i = realIndex - this.cacheStartIndex
|
||||
console.log('agi cci', { i, realIndex, cSI: this.cacheStartIndex, cl: this.cache.length})
|
||||
if ( i >= this.cache.length ) {
|
||||
return left(null)
|
||||
}
|
||||
return right(i)
|
||||
}
|
||||
|
||||
private async advanceIndexInCache(realIndex: number): Promise<void> {
|
||||
if ( isRight(this.computeCacheIndex(realIndex)) ) {
|
||||
return
|
||||
}
|
||||
console.log('aIIC needs advance')
|
||||
if ( realIndex < this.cacheStartIndex ) {
|
||||
this.source = undefined
|
||||
}
|
||||
|
||||
if ( !this.source ) {
|
||||
this.source = this.sourceFactory()
|
||||
this.sourceIndex = -1
|
||||
}
|
||||
|
||||
for await ( const item of this.source ) {
|
||||
console.log('aIIC source item', item, this.sourceIndex, realIndex)
|
||||
this.sourceIndex += 1
|
||||
this.cache.push(item)
|
||||
|
||||
if ( this.cache.length >= this.maxCacheSize ) {
|
||||
this.cache.shift()
|
||||
this.cacheStartIndex += 1
|
||||
}
|
||||
|
||||
if ( this.sourceIndex >= realIndex ) {
|
||||
console.log('aIIC break')
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async at(i: number): Promise<Maybe<T>> {
|
||||
await this.advanceIndexInCache(i)
|
||||
|
||||
const cacheIndex = this.computeCacheIndex(i)
|
||||
console.log('agi at', { i, cacheIndex })
|
||||
if ( isRight(cacheIndex) ) {
|
||||
return this.cache[unright(cacheIndex)]
|
||||
}
|
||||
|
||||
return undefined
|
||||
}
|
||||
|
||||
clone(): AsyncGeneratorIterable<T> {
|
||||
return new AsyncGeneratorIterable(this.sourceFactory, this.maxCacheSize)
|
||||
}
|
||||
|
||||
count(): Promise<number> {
|
||||
throw new Error('cannot count!')
|
||||
}
|
||||
|
||||
async range(start: number, end: number): Promise<Collection<T>> {
|
||||
const c: Collection<T> = collect()
|
||||
|
||||
for ( let i = start; i <= end; i += 1 ) {
|
||||
const item = await this.at(i)
|
||||
if ( !item ) {
|
||||
break
|
||||
}
|
||||
|
||||
c.push(item)
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
async all(): Promise<Collection<T>> {
|
||||
const c: Collection<T> = collect()
|
||||
|
||||
let i = -1
|
||||
while ( true ) {
|
||||
i += 1
|
||||
const item = await this.at(i)
|
||||
if ( !item ) {
|
||||
break
|
||||
}
|
||||
|
||||
c.push(item)
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
async next(): Promise<MaybeIterationItem<T>> {
|
||||
const value = await this.at(this.index)
|
||||
|
||||
if ( !value ) {
|
||||
return { done: true }
|
||||
}
|
||||
|
||||
this.index += 1
|
||||
return { done: false, value }
|
||||
}
|
||||
|
||||
async seek(index: number): Promise<void> {
|
||||
if ( index < 0 ) {
|
||||
throw new TypeError('Cannot seek to negative index.')
|
||||
}
|
||||
|
||||
await this.advanceIndexInCache(index)
|
||||
|
||||
const cacheIndex = this.computeCacheIndex(index)
|
||||
if ( isLeft(cacheIndex) ) {
|
||||
throw new TypeError('Cannot seek past last item.')
|
||||
}
|
||||
|
||||
this.index = index
|
||||
}
|
||||
|
||||
async peek(): Promise<Maybe<T>> {
|
||||
return this.at(this.index + 1)
|
||||
}
|
||||
}
|
1378
src/bones/collection/Collection.ts
Normal file
1378
src/bones/collection/Collection.ts
Normal file
File diff suppressed because it is too large
Load Diff
135
src/bones/collection/Iterable.ts
Normal file
135
src/bones/collection/Iterable.ts
Normal file
@ -0,0 +1,135 @@
|
||||
import {Collection} from './Collection'
|
||||
|
||||
export type MaybeIterationItem<T> = { done: boolean, value?: T }
|
||||
export type ChunkCallback<T> = (items: Collection<T>) => any
|
||||
|
||||
export class StopIteration extends Error {}
|
||||
|
||||
/**
|
||||
* Abstract class representing an iterable, lazy-loaded dataset.
|
||||
* @abstract
|
||||
*/
|
||||
export abstract class Iterable<T> {
|
||||
/**
|
||||
* The current index of the iterable.
|
||||
* @type number
|
||||
*/
|
||||
protected index = 0
|
||||
|
||||
/**
|
||||
* Get the item of this iterable at the given index, if one exists.
|
||||
* @param {number} i
|
||||
* @return Promise<any|undefined>
|
||||
*/
|
||||
abstract at(i: number): Promise<T | undefined>
|
||||
|
||||
/**
|
||||
* Get the collection of items in the given range of this iterable.
|
||||
* @param {number} start
|
||||
* @param {number} end
|
||||
* @return Promise<Collection>
|
||||
*/
|
||||
abstract range(start: number, end: number): Promise<Collection<T>>
|
||||
|
||||
/**
|
||||
* Count the number of items in this collection.
|
||||
* @return Promise<number>
|
||||
*/
|
||||
abstract count(): Promise<number>
|
||||
|
||||
/**
|
||||
* Get a copy of this iterable.
|
||||
* @return Iterable
|
||||
*/
|
||||
abstract clone(): Iterable<T>
|
||||
|
||||
/**
|
||||
* Return a collection of all items in this iterable.
|
||||
* @return Promise<Collection>
|
||||
*/
|
||||
public async all(): Promise<Collection<T>> {
|
||||
return this.range(0, (await this.count()) + 1)
|
||||
}
|
||||
|
||||
/**
|
||||
* Advance to the next value of this iterable.
|
||||
* @return Promise<MaybeIterationItem>
|
||||
*/
|
||||
public async next(): Promise<MaybeIterationItem<T>> {
|
||||
const i = this.index
|
||||
|
||||
if ( i >= await this.count() ) {
|
||||
return { done: true }
|
||||
}
|
||||
|
||||
this.index = i + 1
|
||||
return { done: false,
|
||||
value: await this.at(i) }
|
||||
}
|
||||
|
||||
/**
|
||||
* Chunk the iterable into the given size and call the callback passing the chunk along.
|
||||
* @param {number} size
|
||||
* @param {ChunkCallback} callback
|
||||
* @return Promise<void>
|
||||
*/
|
||||
public async chunk(size: number, callback: ChunkCallback<T>): Promise<void> {
|
||||
if ( size < 1 ) {
|
||||
throw new Error('Chunk size must be at least 1')
|
||||
}
|
||||
|
||||
while ( true ) {
|
||||
const items = await this.range(this.index, this.index + size - 1)
|
||||
this.index += items.count()
|
||||
|
||||
try {
|
||||
await callback(items)
|
||||
} catch ( error ) {
|
||||
if ( error instanceof StopIteration ) {
|
||||
break
|
||||
} else {
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
if ( items.count() < size ) {
|
||||
// We hit the last chunk, so bail out
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Advance the iterable to the given index.
|
||||
* @param {number} index
|
||||
* @return Promise<void>
|
||||
*/
|
||||
public async seek(index: number): Promise<void> {
|
||||
if ( index < 0 ) {
|
||||
throw new TypeError('Cannot seek to negative index.')
|
||||
} else if ( index >= await this.count() ) {
|
||||
throw new TypeError('Cannot seek past last item.')
|
||||
}
|
||||
this.index = index
|
||||
}
|
||||
|
||||
/**
|
||||
* Peek at the next value of the iterable, without advancing.
|
||||
* @return Promise<any|undefined>
|
||||
*/
|
||||
public async peek(): Promise<T | undefined> {
|
||||
if ( this.index + 1 >= await this.count() ) {
|
||||
return undefined
|
||||
} else {
|
||||
return this.at(this.index + 1)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reset the iterable to the first index.
|
||||
* @return Promise<any>
|
||||
*/
|
||||
public async reset(): Promise<void> {
|
||||
this.index = 0
|
||||
}
|
||||
}
|
110
src/bones/collection/where.ts
Normal file
110
src/bones/collection/where.ts
Normal file
@ -0,0 +1,110 @@
|
||||
/**
|
||||
* Type representing a valid where operator.
|
||||
*/
|
||||
export type WhereOperator = '&' | '>' | '>=' | '<' | '<=' | '!=' | '<=>' | '%' | '|' | '!' | '~' | '=' | '^'
|
||||
|
||||
/**
|
||||
* Type associating search items with a key.
|
||||
*/
|
||||
export type AssociatedSearchItem = { key: any, item: any }
|
||||
|
||||
/**
|
||||
* Type representing the result of a where.
|
||||
*/
|
||||
export type WhereResult = any[]
|
||||
|
||||
/**
|
||||
* Returns true if the given item satisfies the given where clause.
|
||||
* @param {AssociatedSearchItem} item
|
||||
* @param {WhereOperator} operator
|
||||
* @param [operand]
|
||||
* @return boolean
|
||||
*/
|
||||
export const whereMatch = (item: AssociatedSearchItem, operator: WhereOperator, operand?: unknown): boolean => {
|
||||
switch ( operator ) {
|
||||
case '&':
|
||||
if ( item.key & Number(operand) ) {
|
||||
return true
|
||||
}
|
||||
break
|
||||
case '>':
|
||||
if ( item.key > (operand as any) ) {
|
||||
return true
|
||||
}
|
||||
break
|
||||
case '>=':
|
||||
if ( item.key >= (operand as any) ) {
|
||||
return true
|
||||
}
|
||||
break
|
||||
case '<':
|
||||
if ( item.key < (operand as any) ) {
|
||||
return true
|
||||
}
|
||||
break
|
||||
case '<=':
|
||||
if ( item.key <= (operand as any) ) {
|
||||
return true
|
||||
}
|
||||
break
|
||||
case '!=':
|
||||
if ( item.key !== (operand as any) ) {
|
||||
return true
|
||||
}
|
||||
break
|
||||
case '<=>':
|
||||
if ( item.key === operand && typeof item.key !== 'undefined' && item.key !== null ) {
|
||||
return true
|
||||
}
|
||||
break
|
||||
case '%':
|
||||
if ( item.key % Number(operand) ) {
|
||||
return true
|
||||
}
|
||||
break
|
||||
case '|':
|
||||
if ( item.key | Number(operand) ) {
|
||||
return true
|
||||
}
|
||||
break
|
||||
case '!':
|
||||
if ( !item.key ) {
|
||||
return true
|
||||
}
|
||||
break
|
||||
case '~':
|
||||
if ( ~item.key ) {
|
||||
return true
|
||||
}
|
||||
break
|
||||
case '=':
|
||||
if ( item.key === operand ) {
|
||||
return true
|
||||
}
|
||||
break
|
||||
case '^':
|
||||
if ( item.key ^ Number(operand) ) {
|
||||
return true
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply the given where clause to the items and return those that match.
|
||||
* @param {Array<AssociatedSearchItem>} items
|
||||
* @param {WhereOperator} operator
|
||||
* @param [operand]
|
||||
*/
|
||||
export const applyWhere = (items: AssociatedSearchItem[], operator: WhereOperator, operand?: unknown): WhereResult => {
|
||||
const matches: WhereResult = []
|
||||
for ( const item of items ) {
|
||||
if ( whereMatch(item, operator, operand) ) {
|
||||
matches.push(item.item)
|
||||
}
|
||||
}
|
||||
|
||||
return matches
|
||||
}
|
@ -13,6 +13,7 @@ const maybeConfig: any = {
|
||||
threads: {
|
||||
type: 'alias',
|
||||
template: process.env.CHORUS_THREAD_TEMPLATE,
|
||||
idPrefix: 't.',
|
||||
},
|
||||
},
|
||||
}
|
||||
|
0
src/mail/read.ts
Normal file
0
src/mail/read.ts
Normal file
133
src/mail/replies.ts
Normal file
133
src/mail/replies.ts
Normal file
@ -0,0 +1,133 @@
|
||||
// EmailReplyParser is a library to parse plain text email content.
|
||||
// The goal is to identify quoted text, signatures, or original content.
|
||||
|
||||
export class ReplyParser {
|
||||
static VERSION = "0.5.11";
|
||||
|
||||
// Splits an email body into a list of Fragments.
|
||||
static read(text: string): Email {
|
||||
return new Email().read(text);
|
||||
}
|
||||
|
||||
// Get the text of the visible portions of the given email body.
|
||||
static parseReply(text: string): string {
|
||||
return this.read(text).visibleText();
|
||||
}
|
||||
}
|
||||
|
||||
export class Email {
|
||||
private fragments: Fragment[] = [];
|
||||
private foundVisible: boolean = false;
|
||||
|
||||
// Gets the combined text of the visible fragments of the email body.
|
||||
visibleText(): string {
|
||||
return this.fragments
|
||||
.filter((fragment) => !fragment.hidden)
|
||||
.map((fragment) => fragment.toString())
|
||||
.join("\n")
|
||||
.trimEnd();
|
||||
}
|
||||
|
||||
// Splits the given text into a list of Fragments.
|
||||
read(text: string): this {
|
||||
let modifiedText = text.slice();
|
||||
|
||||
// Normalize line endings.
|
||||
modifiedText = modifiedText.replace(/\r\n/g, "\n");
|
||||
|
||||
// Handle multi-line reply headers.
|
||||
const multiLineHeaderRegex = /^(?!On.*On\s.+?wrote:)(On\s(.+?)wrote:)$/m;
|
||||
modifiedText = modifiedText.replace(multiLineHeaderRegex, (match) =>
|
||||
match.replace(/\n/g, " ")
|
||||
);
|
||||
|
||||
// Ensure proper splitting for lines of underscores.
|
||||
modifiedText = modifiedText.replace(/([^\n])(?=\n_{7}_+)$/m, "$1\n");
|
||||
|
||||
// Reverse the text for parsing.
|
||||
modifiedText = modifiedText.split("").reverse().join("");
|
||||
|
||||
this.foundVisible = false;
|
||||
let fragment: Fragment | null = null;
|
||||
|
||||
const lines = modifiedText.split("\n");
|
||||
for (const line of lines) {
|
||||
const processedLine = line.trimEnd();
|
||||
const isQuoted = processedLine.endsWith(">");
|
||||
|
||||
if (fragment && processedLine === "") {
|
||||
if (Fragment.isSignature(fragment.lines[fragment.lines.length - 1])) {
|
||||
fragment.signature = true;
|
||||
this.finishFragment(fragment);
|
||||
fragment = null;
|
||||
}
|
||||
}
|
||||
|
||||
if (
|
||||
fragment &&
|
||||
(fragment.quoted === isQuoted ||
|
||||
(fragment.quoted &&
|
||||
(Fragment.isQuoteHeader(processedLine) || processedLine === "")))
|
||||
) {
|
||||
fragment.lines.push(processedLine);
|
||||
} else {
|
||||
if (fragment) {
|
||||
this.finishFragment(fragment);
|
||||
}
|
||||
fragment = new Fragment(isQuoted, processedLine);
|
||||
}
|
||||
}
|
||||
|
||||
if (fragment) {
|
||||
this.finishFragment(fragment);
|
||||
}
|
||||
|
||||
this.fragments.reverse();
|
||||
return this;
|
||||
}
|
||||
|
||||
private finishFragment(fragment: Fragment): void {
|
||||
fragment.finish();
|
||||
if (!this.foundVisible) {
|
||||
if (fragment.quoted || fragment.signature || fragment.toString().trim() === "") {
|
||||
fragment.hidden = true;
|
||||
} else {
|
||||
this.foundVisible = true;
|
||||
}
|
||||
}
|
||||
this.fragments.push(fragment);
|
||||
}
|
||||
}
|
||||
|
||||
class Fragment {
|
||||
static SIGNATURE_REGEX = /(--\s*$|__\s*$|\w-$)|(^(\w+\s+){1,3}ym morf tneS$)/m;
|
||||
static QUOTE_HEADER_REGEX = /^:etorw.*nO$|^.*:(morF|tneS|oT|tcejbuS)$/;
|
||||
|
||||
quoted: boolean;
|
||||
signature: boolean = false;
|
||||
hidden: boolean = false;
|
||||
lines: string[];
|
||||
private content: string | null = null;
|
||||
|
||||
constructor(quoted: boolean, firstLine: string) {
|
||||
this.quoted = quoted;
|
||||
this.lines = [firstLine];
|
||||
}
|
||||
|
||||
static isSignature(line: string): boolean {
|
||||
return this.SIGNATURE_REGEX.test(line);
|
||||
}
|
||||
|
||||
static isQuoteHeader(line: string): boolean {
|
||||
return this.QUOTE_HEADER_REGEX.test(line);
|
||||
}
|
||||
|
||||
finish(): void {
|
||||
this.content = this.lines.join("\n").split("").reverse().join("");
|
||||
this.lines = [];
|
||||
}
|
||||
|
||||
toString(): string {
|
||||
return this.content || "";
|
||||
}
|
||||
}
|
15
src/types.ts
15
src/types.ts
@ -13,6 +13,7 @@ const commentsConfigSchema = z.object({
|
||||
threads: z.object({
|
||||
type: z.string(), // fixme : in validation
|
||||
template: z.string(),
|
||||
idPrefix: z.string(),
|
||||
}),
|
||||
}),
|
||||
})
|
||||
@ -21,3 +22,17 @@ export type CommentsConfig = z.infer<typeof commentsConfigSchema>
|
||||
export const castCommentsConfig = (what: unknown): CommentsConfig => {
|
||||
return commentsConfigSchema.parse(what)
|
||||
}
|
||||
|
||||
|
||||
|
||||
export type Message = {
|
||||
id: string,
|
||||
date: Date,
|
||||
recipients: string[],
|
||||
from: {
|
||||
name?: string,
|
||||
address?: string,
|
||||
},
|
||||
subject: string,
|
||||
content: string,
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user