From 2917c980100c983014035aa4a4ed47ea07c70a01 Mon Sep 17 00:00:00 2001 From: Eric Maciel Date: Thu, 29 Oct 2020 11:40:12 -0400 Subject: [PATCH] fix: add backend changes to slate-collab --- packages/backend/package.json | 2 + packages/backend/src/AutomergeBackend.ts | 53 +++++---- packages/backend/src/SocketIOConnection.ts | 122 ++++++++++++++------- packages/backend/src/utils/debug.ts | 3 + 4 files changed, 120 insertions(+), 60 deletions(-) create mode 100644 packages/backend/src/utils/debug.ts diff --git a/packages/backend/package.json b/packages/backend/package.json index 9ed744d..bb021a6 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -27,9 +27,11 @@ "@babel/plugin-proposal-optional-chaining": "^7.9.0", "@babel/runtime": "^7.6.3", "@hiveteams/collab-bridge": "^0.7.7", + "@types/debug": "^4.1.5", "@types/lodash": "^4.14.150", "@types/socket.io": "^2.1.4", "automerge": "0.14.0", + "debug": "^4.2.0", "lodash": "^4.17.15", "slate": "0.58.3", "socket.io": "^2.3.0", diff --git a/packages/backend/src/AutomergeBackend.ts b/packages/backend/src/AutomergeBackend.ts index 4482429..adba9ac 100644 --- a/packages/backend/src/AutomergeBackend.ts +++ b/packages/backend/src/AutomergeBackend.ts @@ -8,26 +8,22 @@ import { SyncDoc, CollabAction } from '@hiveteams/collab-bridge' - -export interface Connections { - [key: string]: Automerge.Connection -} +import { debugCollabBackend } from 'utils/debug' /** * AutomergeBackend contains collaboration with Automerge */ class AutomergeBackend { - connections: Connections = {} - - docSet: Automerge.DocSet = new Automerge.DocSet() + connectionMap: { [key: string]: Automerge.Connection } = {} + documentSetMap: { [key: string]: Automerge.DocSet } = {} /** * Create Autmorge Connection */ - createConnection = (id: string, send: any) => { - if (this.connections[id]) { + createConnection = (id: string, docId: string, send: any) => { + if (this.connectionMap[id]) { console.warn( `Already has connection with id: ${id}. It will be terminated before creating new connection` ) @@ -35,8 +31,12 @@ class AutomergeBackend { this.closeConnection(id) } - this.connections[id] = new Automerge.Connection( - this.docSet, + if (!this.documentSetMap[docId]) { + throw new Error('Cannot create connection for missing docSet') + } + + this.connectionMap[id] = new Automerge.Connection( + this.documentSetMap[docId], toCollabAction('operation', send) ) } @@ -45,16 +45,16 @@ class AutomergeBackend { * Start Automerge Connection */ - openConnection = (id: string) => this.connections[id].open() + openConnection = (id: string) => this.connectionMap[id].open() /** * Close Automerge Connection and remove it from connections */ closeConnection(id: string) { - this.connections[id]?.close() + this.connectionMap[id]?.close() - delete this.connections[id] + delete this.connectionMap[id] } /** @@ -63,7 +63,12 @@ class AutomergeBackend { receiveOperation = (id: string, data: CollabAction) => { try { - this.connections[id].receiveMsg(data.payload) + if (!this.connectionMap[id]) { + debugCollabBackend('Could not receive op for closed connection %s', id) + return + } + + this.connectionMap[id].receiveMsg(data.payload) } catch (e) { console.error('Unexpected error in receiveOperation', e) } @@ -73,7 +78,7 @@ class AutomergeBackend { * Get document from Automerge DocSet */ - getDocument = (docId: string) => this.docSet.getDoc(docId) + getDocument = (docId: string) => this.documentSetMap[docId]?.getDoc(docId) /** * Append document to Automerge DocSet @@ -89,7 +94,10 @@ class AutomergeBackend { const doc = Automerge.from(sync) - this.docSet.setDoc(docId, doc) + if (!this.documentSetMap[docId]) { + this.documentSetMap[docId] = new Automerge.DocSet() + } + this.documentSetMap[docId].setDoc(docId, doc) } catch (e) { console.error(e, docId) } @@ -99,7 +107,12 @@ class AutomergeBackend { * Remove document from Automerge DocSet */ - removeDocument = (docId: string) => this.docSet.removeDoc(docId) + removeDocument = (docId: string) => { + if (this.documentSetMap[docId]) { + this.documentSetMap[docId].removeDoc(docId) + delete this.documentSetMap[docId] + } + } /** * Remove client cursor data @@ -109,13 +122,13 @@ class AutomergeBackend { try { const doc = this.getDocument(docId) - if (!doc.cursors) return + if (!doc || !doc.cursors) return const change = Automerge.change(doc, (d: any) => { delete d.cursors[id] }) - this.docSet.setDoc(docId, change) + this.documentSetMap[docId].setDoc(docId, change) } catch (e) { console.error('Unexpected error in garbageCursor', e) } diff --git a/packages/backend/src/SocketIOConnection.ts b/packages/backend/src/SocketIOConnection.ts index 3b5d677..e32c9a2 100644 --- a/packages/backend/src/SocketIOConnection.ts +++ b/packages/backend/src/SocketIOConnection.ts @@ -10,11 +10,12 @@ import { SyncDoc, CollabAction, toJS } from '@hiveteams/collab-bridge' import { getClients } from './utils' import AutomergeBackend from './AutomergeBackend' +import { debugCollabBackend } from 'utils/debug' export interface SocketIOCollaborationOptions { entry: Server connectOpts?: SocketIO.ServerOptions - defaultValue?: Node[] + defaultValue: Node[] saveFrequency?: number onAuthRequest?: ( query: Object, @@ -31,6 +32,7 @@ export default class SocketIOCollaboration { private io: SocketIO.Server private options: SocketIOCollaborationOptions private backend: AutomergeBackend + private autoSaveDoc: (id: string, docId: string) => void /** * Constructor @@ -43,6 +45,15 @@ export default class SocketIOCollaboration { this.options = options + /** + * Save document with throttle + */ + this.autoSaveDoc = throttle( + async (id: string, docId: string) => + this.backend.getDocument(docId) && this.saveDocument(id, docId), + this.options?.saveFrequency || 2000 + ) + this.configure() return this @@ -63,18 +74,6 @@ export default class SocketIOCollaboration { */ private nspMiddleware = async (path: string, query: any, next: any) => { - const { onDocumentLoad } = this.options - - if (!this.backend.getDocument(path)) { - const doc = onDocumentLoad - ? await onDocumentLoad(path, query) - : this.options.defaultValue - - if (!doc) return next(null, false) - - this.backend.appendDocument(path, doc) - } - return next(null, true) } @@ -86,9 +85,14 @@ export default class SocketIOCollaboration { socket: SocketIO.Socket, next: (e?: any) => void ) => { + const { id } = socket const { query } = socket.handshake const { onAuthRequest } = this.options + // we connect before any async logic so that we + // never miss a socket disconnection event + socket.on('disconnect', this.onDisconnect(id, socket)) + if (onAuthRequest) { const permit = await onAuthRequest(query, socket) @@ -103,26 +107,63 @@ export default class SocketIOCollaboration { * On 'connect' handler. */ - private onConnect = (socket: SocketIO.Socket) => { + private onConnect = async (socket: SocketIO.Socket) => { const { id, conn } = socket - const { name } = socket.nsp + // do nothing if the socket connection has already been closed + if (conn.readyState === 'closed') { + return + } - this.backend.createConnection(id, ({ type, payload }: CollabAction) => { - socket.emit('msg', { type, payload: { id: conn.id, ...payload } }) - }) + const { name } = socket.nsp + const { onDocumentLoad } = this.options + + if (!this.backend.getDocument(name)) { + const doc = onDocumentLoad + ? await onDocumentLoad(name) + : this.options.defaultValue + + // Ensure socket is still opened + // recheck ready state after async operation + if (conn.readyState === 'closed') { + return + } + + // recheck backend getDocument after async operation + if (!this.backend.getDocument(name)) { + debugCollabBackend('Append document\t\t%s', id) + this.backend.appendDocument(name, doc) + } + } + + debugCollabBackend('Create connection\t%s', id) + this.backend.createConnection( + id, + name, + ({ type, payload }: CollabAction) => { + socket.emit('msg', { type, payload: { id: conn.id, ...payload } }) + } + ) socket.on('msg', this.onMessage(id, name)) - socket.on('disconnect', this.onDisconnect(id, socket)) - socket.join(id, () => { const doc = this.backend.getDocument(name) + if (!doc) { + debugCollabBackend( + 'onConnect: No document available at the time of socket.io join docId=%s socketId=%s', + name, + id + ) + return + } + socket.emit('msg', { type: 'document', payload: Automerge.save(doc) }) + debugCollabBackend('Open connection\t\t%s', id) this.backend.openConnection(id) }) @@ -139,7 +180,7 @@ export default class SocketIOCollaboration { try { this.backend.receiveOperation(id, data) - this.autoSaveDoc(name) + this.autoSaveDoc(id, name) this.garbageCursors(name) } catch (e) { @@ -148,28 +189,21 @@ export default class SocketIOCollaboration { } } - /** - * Save document with throttle - */ - - private autoSaveDoc = throttle( - async (docId: string) => - this.backend.getDocument(docId) && this.saveDocument(docId), - this.options?.saveFrequency || 2000 - ) - /** * Save document */ - private saveDocument = async (docId: string) => { + private saveDocument = async (id: string, docId: string) => { try { const { onDocumentSave } = this.options const doc = this.backend.getDocument(docId) + // Return early if there is no valid document in our crdt backend + // Note: this will happen when user disconnects from the collab server + // before document load has completed if (!doc) { - throw new Error(`Can't receive document by id: ${docId}`) + return } onDocumentSave && (await onDocumentSave(docId, toJS(doc.children))) @@ -183,29 +217,36 @@ export default class SocketIOCollaboration { */ private onDisconnect = (id: string, socket: SocketIO.Socket) => async () => { + debugCollabBackend('Connection closed\t%s', id) this.backend.closeConnection(id) - await this.saveDocument(socket.nsp.name) + await this.saveDocument(id, socket.nsp.name) + // cleanup automerge cursor and socket connection this.garbageCursors(socket.nsp.name) socket.leave(id) - - this.garbageNsp() + this.garbageNsp(id) } /** * Clean up unused SocketIO namespaces. */ - garbageNsp = () => { + garbageNsp = (id: string) => { Object.keys(this.io.nsps) .filter(n => n !== '/') .forEach(nsp => { getClients(this.io, nsp).then((clientsList: any) => { + debugCollabBackend( + 'Garbage namespace\t%s clientsList=%o %s', + id, + clientsList, + nsp + ) if (!clientsList.length) { + debugCollabBackend('Removing document\t%s', id) this.backend.removeDocument(nsp) - delete this.io.nsps[nsp] } }) @@ -218,13 +259,14 @@ export default class SocketIOCollaboration { garbageCursors = (nsp: string) => { const doc = this.backend.getDocument(nsp) - - if (!doc.cursors) return + // if document has already been cleaned up, it is safe to return early + if (!doc || !doc.cursors) return const namespace = this.io.of(nsp) Object.keys(doc?.cursors)?.forEach(key => { if (!namespace.sockets[key]) { + debugCollabBackend('Garbage cursor\t\t%s', key) this.backend.garbageCursor(nsp, key) } }) diff --git a/packages/backend/src/utils/debug.ts b/packages/backend/src/utils/debug.ts new file mode 100644 index 0000000..455d4aa --- /dev/null +++ b/packages/backend/src/utils/debug.ts @@ -0,0 +1,3 @@ +import debug from 'debug' + +export const debugCollabBackend = debug('collab-backend')