Clean up email reading logic
This commit is contained in:
parent
063809e446
commit
9f750bc2eb
85
index.ts
85
index.ts
@ -1,82 +1,5 @@
|
|||||||
import { config } from "./src/config.ts";
|
import {withMailbox} from "./src/mail/read.ts";
|
||||||
import {ImapFlow, type MailboxLockObject} from "imapflow";
|
|
||||||
import { extract } from 'letterparser'
|
|
||||||
import {ReplyParser} from "./src/mail/replies.ts";
|
|
||||||
import type {Message} from "./src/types.ts";
|
|
||||||
import type {Awaitable} from "./src/bones";
|
|
||||||
import {AsyncCollection} from "./src/bones/collection/AsyncCollection.ts";
|
|
||||||
import {AsyncGeneratorIterable} from "./src/bones/collection/AsyncGeneratorIterable.ts";
|
|
||||||
|
|
||||||
export async function withMessageClient<TReturn>(cb: (c: ImapFlow) => Awaitable<TReturn>): Promise<TReturn> {
|
await withMailbox('e12a', async c => {
|
||||||
const client = new ImapFlow(config.mail.imap)
|
console.log((await c.collect()).all())
|
||||||
await client.connect()
|
})
|
||||||
|
|
||||||
try {
|
|
||||||
return cb(client)
|
|
||||||
} finally {
|
|
||||||
await client.logout()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function getFolders(): Promise<string[]> {
|
|
||||||
return withMessageClient(async client => {
|
|
||||||
const list = await client.list()
|
|
||||||
return list.map(l => l.name)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
export const getMessagesForMailbox = (box: string): AsyncCollection<Message> => {
|
|
||||||
return new AsyncCollection(
|
|
||||||
new AsyncGeneratorIterable(
|
|
||||||
() => getMessagesForMailboxOld(box)))
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function* getMessagesForMailboxOld(box: string): AsyncGenerator<Message, void, unknown> {
|
|
||||||
const client = new ImapFlow(config.mail.imap)
|
|
||||||
await client.connect()
|
|
||||||
|
|
||||||
try {
|
|
||||||
let lock: MailboxLockObject
|
|
||||||
try {
|
|
||||||
lock = await client.getMailboxLock(box)
|
|
||||||
} catch (e: unknown) {
|
|
||||||
// This is usually because the mailbox does not exist, so yield nothing
|
|
||||||
console.warn(`Error when opening mailbox lock for mailbox "${box}":\n`, e)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
await client.mailboxOpen(box)
|
|
||||||
|
|
||||||
const messages = client.fetch('1:*', {
|
|
||||||
envelope: true,
|
|
||||||
source: true,
|
|
||||||
uid: true,
|
|
||||||
bodyParts: ['text'],
|
|
||||||
})
|
|
||||||
|
|
||||||
for await ( const message of messages ) {
|
|
||||||
const source = message.source.toString('utf-8')
|
|
||||||
const content = ReplyParser.parseReply(extract(source).text || '')
|
|
||||||
|
|
||||||
const msg: Message = {
|
|
||||||
id: message.envelope.messageId,
|
|
||||||
date: message.envelope.date,
|
|
||||||
recipients: message.envelope.to.map(x => x.address || '').filter(Boolean),
|
|
||||||
from: message.envelope.from[0],
|
|
||||||
subject: message.envelope.subject,
|
|
||||||
content,
|
|
||||||
}
|
|
||||||
|
|
||||||
yield msg
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
lock.release()
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
await client.logout()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const c = getMessagesForMailbox('e12a')
|
|
||||||
console.log(await c.all())
|
|
||||||
|
@ -1,139 +0,0 @@
|
|||||||
import {type ChunkCallback, Iterable, type MaybeIterationItem} from "./Iterable.ts";
|
|
||||||
import {collect, type Collection} from "./Collection.ts";
|
|
||||||
import {type Either, isLeft, isRight, left, type Maybe, right, unright} from "../types.ts";
|
|
||||||
|
|
||||||
export class AsyncGeneratorIterable<T> extends Iterable<T> {
|
|
||||||
private sourceIndex: number = -1
|
|
||||||
private cacheStartIndex: number = 0
|
|
||||||
private cache: T[] = []
|
|
||||||
private source?: AsyncGenerator<T, unknown, unknown>
|
|
||||||
|
|
||||||
constructor(
|
|
||||||
private sourceFactory: () => AsyncGenerator<T, unknown, unknown>,
|
|
||||||
private maxCacheSize: number = 100,
|
|
||||||
) {
|
|
||||||
super()
|
|
||||||
}
|
|
||||||
|
|
||||||
private computeCacheIndex(realIndex: number): Either<null, number> {
|
|
||||||
let i = realIndex - this.cacheStartIndex
|
|
||||||
console.log('agi cci', { i, realIndex, cSI: this.cacheStartIndex, cl: this.cache.length})
|
|
||||||
if ( i >= this.cache.length ) {
|
|
||||||
return left(null)
|
|
||||||
}
|
|
||||||
return right(i)
|
|
||||||
}
|
|
||||||
|
|
||||||
private async advanceIndexInCache(realIndex: number): Promise<void> {
|
|
||||||
if ( isRight(this.computeCacheIndex(realIndex)) ) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
console.log('aIIC needs advance')
|
|
||||||
if ( realIndex < this.cacheStartIndex ) {
|
|
||||||
this.source = undefined
|
|
||||||
}
|
|
||||||
|
|
||||||
if ( !this.source ) {
|
|
||||||
this.source = this.sourceFactory()
|
|
||||||
this.sourceIndex = -1
|
|
||||||
}
|
|
||||||
|
|
||||||
for await ( const item of this.source ) {
|
|
||||||
console.log('aIIC source item', item, this.sourceIndex, realIndex)
|
|
||||||
this.sourceIndex += 1
|
|
||||||
this.cache.push(item)
|
|
||||||
|
|
||||||
if ( this.cache.length >= this.maxCacheSize ) {
|
|
||||||
this.cache.shift()
|
|
||||||
this.cacheStartIndex += 1
|
|
||||||
}
|
|
||||||
|
|
||||||
if ( this.sourceIndex >= realIndex ) {
|
|
||||||
console.log('aIIC break')
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async at(i: number): Promise<Maybe<T>> {
|
|
||||||
await this.advanceIndexInCache(i)
|
|
||||||
|
|
||||||
const cacheIndex = this.computeCacheIndex(i)
|
|
||||||
console.log('agi at', { i, cacheIndex })
|
|
||||||
if ( isRight(cacheIndex) ) {
|
|
||||||
return this.cache[unright(cacheIndex)]
|
|
||||||
}
|
|
||||||
|
|
||||||
return undefined
|
|
||||||
}
|
|
||||||
|
|
||||||
clone(): AsyncGeneratorIterable<T> {
|
|
||||||
return new AsyncGeneratorIterable(this.sourceFactory, this.maxCacheSize)
|
|
||||||
}
|
|
||||||
|
|
||||||
count(): Promise<number> {
|
|
||||||
throw new Error('cannot count!')
|
|
||||||
}
|
|
||||||
|
|
||||||
async range(start: number, end: number): Promise<Collection<T>> {
|
|
||||||
const c: Collection<T> = collect()
|
|
||||||
|
|
||||||
for ( let i = start; i <= end; i += 1 ) {
|
|
||||||
const item = await this.at(i)
|
|
||||||
if ( !item ) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
c.push(item)
|
|
||||||
}
|
|
||||||
|
|
||||||
return c
|
|
||||||
}
|
|
||||||
|
|
||||||
async all(): Promise<Collection<T>> {
|
|
||||||
const c: Collection<T> = collect()
|
|
||||||
|
|
||||||
let i = -1
|
|
||||||
while ( true ) {
|
|
||||||
i += 1
|
|
||||||
const item = await this.at(i)
|
|
||||||
if ( !item ) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
c.push(item)
|
|
||||||
}
|
|
||||||
|
|
||||||
return c
|
|
||||||
}
|
|
||||||
|
|
||||||
async next(): Promise<MaybeIterationItem<T>> {
|
|
||||||
const value = await this.at(this.index)
|
|
||||||
|
|
||||||
if ( !value ) {
|
|
||||||
return { done: true }
|
|
||||||
}
|
|
||||||
|
|
||||||
this.index += 1
|
|
||||||
return { done: false, value }
|
|
||||||
}
|
|
||||||
|
|
||||||
async seek(index: number): Promise<void> {
|
|
||||||
if ( index < 0 ) {
|
|
||||||
throw new TypeError('Cannot seek to negative index.')
|
|
||||||
}
|
|
||||||
|
|
||||||
await this.advanceIndexInCache(index)
|
|
||||||
|
|
||||||
const cacheIndex = this.computeCacheIndex(index)
|
|
||||||
if ( isLeft(cacheIndex) ) {
|
|
||||||
throw new TypeError('Cannot seek past last item.')
|
|
||||||
}
|
|
||||||
|
|
||||||
this.index = index
|
|
||||||
}
|
|
||||||
|
|
||||||
async peek(): Promise<Maybe<T>> {
|
|
||||||
return this.at(this.index + 1)
|
|
||||||
}
|
|
||||||
}
|
|
@ -48,7 +48,7 @@ export abstract class Iterable<T> {
|
|||||||
* @return Promise<Collection>
|
* @return Promise<Collection>
|
||||||
*/
|
*/
|
||||||
public async all(): Promise<Collection<T>> {
|
public async all(): Promise<Collection<T>> {
|
||||||
return this.range(0, (await this.count()) + 1)
|
return this.range(0, (await this.count()) - 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
120
src/mail/read.ts
120
src/mail/read.ts
@ -0,0 +1,120 @@
|
|||||||
|
import {Iterable} from "../bones/collection/Iterable.ts";
|
||||||
|
import type {Message} from "../types.ts";
|
||||||
|
import {type FetchMessageObject, type FetchQueryObject, ImapFlow} from "imapflow";
|
||||||
|
import {config} from "../config.ts";
|
||||||
|
import type {Awaitable, Maybe} from "../bones";
|
||||||
|
import {ReplyParser} from "./replies.ts";
|
||||||
|
import {extract} from "letterparser";
|
||||||
|
import {Collection} from "../bones/collection/Collection.ts";
|
||||||
|
import {AsyncCollection} from "../bones/collection/AsyncCollection.ts";
|
||||||
|
|
||||||
|
export async function withMailbox<T>(mailbox: string, cb: (c: AsyncCollection<Message>) => Awaitable<T>): Promise<T> {
|
||||||
|
return MailboxIterable.with(mailbox, i => cb(new AsyncCollection(i, 100)))
|
||||||
|
}
|
||||||
|
|
||||||
|
export class MailboxIterable extends Iterable<Message> {
|
||||||
|
private client?: Maybe<ImapFlow>
|
||||||
|
private query: FetchQueryObject = {
|
||||||
|
envelope: true,
|
||||||
|
source: true,
|
||||||
|
uid: true,
|
||||||
|
bodyParts: ['text'],
|
||||||
|
}
|
||||||
|
|
||||||
|
public static async with<T>(mailbox: string, cb: (i: MailboxIterable) => Awaitable<T>): Promise<T> {
|
||||||
|
const inst = new MailboxIterable(mailbox)
|
||||||
|
let value: T
|
||||||
|
|
||||||
|
try {
|
||||||
|
value = await cb(inst)
|
||||||
|
} finally {
|
||||||
|
await inst.release()
|
||||||
|
}
|
||||||
|
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
public readonly mailbox: string,
|
||||||
|
) {
|
||||||
|
super()
|
||||||
|
}
|
||||||
|
|
||||||
|
async at(i: number): Promise<Maybe<Message>> {
|
||||||
|
return this.withMailbox(async client => {
|
||||||
|
const m = await client.fetchOne(`${i+1}`, this.query)
|
||||||
|
return this.format(m)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async range(start: number, end: number): Promise<Collection<Message>> {
|
||||||
|
return this.withMailbox(async client => {
|
||||||
|
const m = await client.fetchAll(`${start+1}:${end+1}`, this.query)
|
||||||
|
return Collection.normalize(m)
|
||||||
|
.map(i => this.format(i))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async count(): Promise<number> {
|
||||||
|
return this.withMailbox(async client => {
|
||||||
|
const m = await client.status(this.mailbox, {
|
||||||
|
messages: true,
|
||||||
|
})
|
||||||
|
|
||||||
|
return m.messages || 0
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
clone(): MailboxIterable {
|
||||||
|
return new MailboxIterable(this.mailbox)
|
||||||
|
}
|
||||||
|
|
||||||
|
protected format(message: FetchMessageObject): Message {
|
||||||
|
const source = message.source.toString('utf-8')
|
||||||
|
const content = ReplyParser.parseReply(extract(source).text || '')
|
||||||
|
|
||||||
|
return {
|
||||||
|
id: `${this.mailbox}.${message.uid}`,
|
||||||
|
date: message.envelope.date,
|
||||||
|
recipients: message.envelope.to.map(x => x.address || '').filter(Boolean),
|
||||||
|
from: message.envelope.from[0],
|
||||||
|
subject: message.envelope.subject,
|
||||||
|
content,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async release(): Promise<void> {
|
||||||
|
if ( this.client ) {
|
||||||
|
await this.client.logout()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected async withMailbox<T>(cb: (client: ImapFlow) => Awaitable<T>): Promise<T> {
|
||||||
|
const client = await this.getClient()
|
||||||
|
const lock = await client.getMailboxLock(this.mailbox)
|
||||||
|
|
||||||
|
let value: T
|
||||||
|
try {
|
||||||
|
await client.mailboxOpen(this.mailbox)
|
||||||
|
value = await cb(client)
|
||||||
|
} finally {
|
||||||
|
lock.release()
|
||||||
|
}
|
||||||
|
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
|
||||||
|
protected async getClient(): Promise<ImapFlow> {
|
||||||
|
if ( this.client ) {
|
||||||
|
return this.client
|
||||||
|
}
|
||||||
|
|
||||||
|
const client = new ImapFlow({
|
||||||
|
...config.mail.imap,
|
||||||
|
logger: false,
|
||||||
|
})
|
||||||
|
|
||||||
|
await client.connect()
|
||||||
|
return this.client = client
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user