From 0894a7a917949e3ef08a5cd9533258a59b7a3486 Mon Sep 17 00:00:00 2001 From: Eric Maciel Date: Wed, 6 Jan 2021 15:09:27 -0500 Subject: [PATCH] feat: automerge collaboration to backend --- packages/backend/src/AutomergeBackend.ts | 2 +- .../backend/src/AutomergeCollaboration.ts | 289 ++++++++++++++++++ packages/client/src/client-copy.spec.ts | 185 +++++++++++ packages/client/src/client.spec.ts | 79 ++++- 4 files changed, 539 insertions(+), 16 deletions(-) create mode 100644 packages/backend/src/AutomergeCollaboration.ts create mode 100644 packages/client/src/client-copy.spec.ts diff --git a/packages/backend/src/AutomergeBackend.ts b/packages/backend/src/AutomergeBackend.ts index 6a83b4e..3583d4c 100644 --- a/packages/backend/src/AutomergeBackend.ts +++ b/packages/backend/src/AutomergeBackend.ts @@ -45,7 +45,7 @@ class AutomergeBackend { * Start Automerge Connection */ - openConnection = (id: string) => this.connectionMap[id].open() + openConnection = (id: string) => this.connectionMap[id]?.open() /** * Close Automerge Connection and remove it from connections diff --git a/packages/backend/src/AutomergeCollaboration.ts b/packages/backend/src/AutomergeCollaboration.ts new file mode 100644 index 0000000..5c84e2f --- /dev/null +++ b/packages/backend/src/AutomergeCollaboration.ts @@ -0,0 +1,289 @@ +import io from 'socket.io' +import * as Automerge from 'automerge' +import { Node } from 'slate' +import { Server } from 'http' +import throttle from 'lodash/throttle' +import { SyncDoc, CollabAction, toJS } from '@hiveteams/collab-bridge' +import { getClients } from './utils/index' +import { debugCollabBackend } from './utils/debug' +import AutomergeBackend from './AutomergeBackend' + +export interface IAutomergeCollaborationOptions { + entry: Server + connectOpts?: SocketIO.ServerOptions + defaultValue: Node[] + saveFrequency?: number + onAuthRequest?: (query: any, socket?: SocketIO.Socket) => Promise + onDocumentLoad?: (pathname: string, query?: any) => Promise | Node[] + onDocumentSave?: ( + pathname: string, + doc: Node[], + user: any + ) => Promise | void + onDisconnect?: (pathname: string, user: any) => Promise | void +} + +export default class AutomergeCollaboration { + private io: SocketIO.Server + private options: IAutomergeCollaborationOptions + private backend: AutomergeBackend + private userMap: { [key: string]: any | undefined } + + /** + * Constructor + */ + + constructor(options: IAutomergeCollaborationOptions) { + this.io = io(options.entry, options.connectOpts) + + this.backend = new AutomergeBackend() + + this.options = options + + this.configure() + + this.userMap = {} + + return this + } + + /** + * Initial IO configuration + */ + + private configure = () => + this.io + .of(this.nspMiddleware) + .use(this.authMiddleware) + .on('connect', this.onConnect) + + /** + * Namespace SocketIO middleware. Load document value and append it to CollaborationBackend. + */ + + private nspMiddleware = async (path: string, query: any, next: any) => { + return next(null, true) + } + + /** + * SocketIO auth middleware. Used for user authentification. + */ + + private authMiddleware = async ( + socket: SocketIO.Socket, + next: (e?: any) => void + ) => { + const { id } = socket + const { query } = socket.handshake + const { onAuthRequest } = this.options + + // we connect before any async logic so that we + // never miss a socket disconnection event + socket.on('disconnect', this.onDisconnect(id, socket)) + + if (onAuthRequest) { + const user = await onAuthRequest(query, socket) + + if (!user) return next(new Error(`Authentification error: ${socket.id}`)) + + this.userMap[socket.id] = user + } + + return next() + } + + /** + * On 'connect' handler. + */ + + private onConnect = async (socket: SocketIO.Socket) => { + const { id, conn } = socket + // do nothing if the socket connection has already been closed + if (conn.readyState === 'closed') { + return + } + + const { name } = socket.nsp + const { onDocumentLoad } = this.options + + if (!this.backend.getDocument(name)) { + const doc = onDocumentLoad + ? await onDocumentLoad(name) + : this.options.defaultValue + + // Ensure socket is still opened + // recheck ready state after async operation + if (conn.readyState === 'closed') { + return + } + + // recheck backend getDocument after async operation + if (!this.backend.getDocument(name)) { + debugCollabBackend('Append document\t\t%s', id) + this.backend.appendDocument(name, doc) + } + } + + debugCollabBackend('Create connection\t%s', id) + this.backend.createConnection( + id, + name, + ({ type, payload }: CollabAction) => { + socket.emit('msg', { type, payload: { id: conn.id, ...payload } }) + } + ) + + socket.on('msg', this.onMessage(id, name)) + + socket.join(id, () => { + const doc = this.backend.getDocument(name) + + if (!doc) { + debugCollabBackend( + 'onConnect: No document available at the time of socket.io join docId=%s socketId=%s', + name, + id + ) + return + } + + socket.emit('msg', { + type: 'document', + payload: Automerge.save(doc) + }) + + debugCollabBackend('Open connection\t\t%s', id) + this.backend.openConnection(id) + }) + + this.garbageCursors(name) + } + + /** + * On 'message' handler + */ + + private onMessage = (id: string, name: string) => (data: any) => { + switch (data.type) { + case 'operation': + try { + this.backend.receiveOperation(id, data) + + this.autoSaveDoc(id, name) + + this.garbageCursors(name) + } catch (e) { + console.log(e) + } + } + } + + /** + * Save document with throttle + */ + + private autoSaveDoc = throttle( + async (id: string, docId: string) => + this.backend.getDocument(docId) && this.saveDocument(id, docId), + // @ts-ignore: property used before initialization + this.options?.saveFrequency || 2000 + ) + + /** + * Save document + */ + + private saveDocument = async (id: string, docId: string) => { + try { + const { onDocumentSave } = this.options + + const doc = this.backend.getDocument(docId) + + // Return early if there is no valid document in our crdt backend + // Note: this will happen when user disconnects from the collab server + // before document load has completed + if (!doc) { + return + } + + const user = this.userMap[id] + + if (onDocumentSave && user) { + await onDocumentSave(docId, toJS(doc.children), user) + } + } catch (e) { + console.error(e, docId) + } + } + + /** + * On 'disconnect' handler + */ + + private onDisconnect = (id: string, socket: SocketIO.Socket) => async () => { + debugCollabBackend('Connection closed\t%s', id) + this.backend.closeConnection(id) + + await this.saveDocument(id, socket.nsp.name) + + // trigger onDisconnect callback if one was provided + // and if a user has been loaded for this socket connection + const user = this.userMap[id] + if (this.options.onDisconnect && user) { + await this.options.onDisconnect(socket.nsp.name, user) + } + + // cleanup automerge cursor and socket connection + this.garbageCursors(socket.nsp.name) + socket.leave(id) + this.garbageNsp(id) + + // cleanup usermap + delete this.userMap[id] + } + + /** + * Clean up unused SocketIO namespaces. + */ + + garbageNsp = (id: string) => { + Object.keys(this.io.nsps) + .filter(n => n !== '/') + .forEach(nsp => { + getClients(this.io, nsp).then((clientsList: any) => { + if (!clientsList.length) { + debugCollabBackend('Removing document\t%s', id) + this.backend.removeDocument(nsp) + delete this.io.nsps[nsp] + } + }) + }) + } + + /** + * Clean up unused cursor data. + */ + + garbageCursors = (nsp: string) => { + const doc = this.backend.getDocument(nsp) + // if document has already been cleaned up, it is safe to return early + if (!doc || !doc.cursors) return + + const namespace = this.io.of(nsp) + + Object.keys(doc?.cursors)?.forEach(key => { + if (!namespace.sockets[key]) { + debugCollabBackend('Garbage cursor\t\t%s', key) + this.backend.garbageCursor(nsp, key) + } + }) + } + + /** + * Destroy SocketIO connection + */ + + destroy = async () => { + this.io.close() + } +} diff --git a/packages/client/src/client-copy.spec.ts b/packages/client/src/client-copy.spec.ts new file mode 100644 index 0000000..5320745 --- /dev/null +++ b/packages/client/src/client-copy.spec.ts @@ -0,0 +1,185 @@ +// import { createEditor, Element, Node, Transforms } from 'slate' +// import * as Automerge from 'automerge' +// import withAutomerge, { AutomergeOptions } from './withAutomerge' +// import { SyncDoc, toJS } from '@hiveteams/collab-bridge' +// import AutomergeCollaboration from '@hiveteams/collab-backend/lib/AutomergeCollaboration' +// import { insertText } from '../../bridge/src/apply/text' + +// describe('automerge editor client tests', () => { +// const docId = 'test' +// const automergeOptions: AutomergeOptions = { +// docId, +// onError: msg => console.log('Encountered test error', msg) +// } +// const editor = withAutomerge(createEditor(), automergeOptions) +// const automergeBackend = new AutomergeBackend() +// const backendSend = (msg: any) => { +// serverMessages.push(msg) +// } +// const clientId = 'test-client' +// editor.clientId = clientId + +// /** +// * Initialize a basic automerge backend +// */ + +// // Create a new server automerge connection with a basic send function +// let serverMessages: any[] = [] +// automergeBackend.appendDocument(docId, [ +// { type: 'paragraph', children: [{ text: 'Hi' }] } +// ]) +// automergeBackend.createConnection(clientId, docId, backendSend) + +// // define an editor send function for the clientside automerge editor +// let clientMessages: any[] = [] +// editor.send = (msg: any) => { +// clientMessages.push(msg) +// } + +// automergeBackend.openConnection(clientId) +// // open the editor connection +// editor.openConnection() + +// /** +// * Helper function to flush client messages and send them to the server +// */ +// const sendClientMessagesToServer = () => { +// if (!clientMessages.length) return + +// console.log('clientMessages', JSON.stringify(clientMessages)) +// clientMessages.forEach(msg => { +// automergeBackend.receiveOperation(clientId, msg) +// }) +// clientMessages = [] +// } + +// /** +// * Helper function to flush server messages and send them to the client +// */ +// const receiveMessagesFromServer = () => { +// if (!serverMessages.length) return + +// console.log('serverMessages', JSON.stringify(serverMessages)) +// serverMessages.forEach(msg => { +// editor.receiveOperation(msg.payload) +// }) +// serverMessages = [] +// } + +// afterEach(() => { +// sendClientMessagesToServer() +// receiveMessagesFromServer() +// }) + +// it('should properly receiveDocument', () => { +// const initialDocData = Automerge.save(automergeBackend.getDocument(docId)) +// editor.receiveDocument(initialDocData) + +// expect(editor.children.length).toEqual(1) +// const paragraphNode = editor.children[0] as Element +// expect(paragraphNode.type).toEqual('paragraph') +// expect(paragraphNode.children.length).toEqual(1) +// expect(Node.string(paragraphNode)).toEqual('Hi') +// }) + +// it('should sync insert node operation with server', done => { +// Transforms.insertNodes(editor, { +// type: 'paragraph', +// children: [{ text: 'a' }] +// }) + +// // ensure that we eventually send a message for the insert_node oepration +// const handle = setInterval(() => { +// sendClientMessagesToServer() +// receiveMessagesFromServer() + +// const serverDoc = toJS(automergeBackend.getDocument(docId)) +// if (serverDoc.children.length === 2) { +// const paragraphNode = serverDoc.children[1] +// expect(Node.string(paragraphNode)).toEqual('a') +// clearInterval(handle) +// done() +// } +// }, 10) +// }) + +// it('should sync insert text operation with client', done => { +// const serverDoc = automergeBackend.getDocument(docId) + +// const updatedServerDoc = Automerge.change(serverDoc, newServerDoc => { +// insertText(newServerDoc as any, { +// type: 'insert_text', +// path: [1, 0], +// offset: 1, +// text: 'b' +// }) +// }) +// automergeBackend.documentSetMap[docId].setDoc(docId, updatedServerDoc) + +// // ensure that we eventually send a message for the insert_node oepration +// const handle = setInterval(() => { +// sendClientMessagesToServer() +// receiveMessagesFromServer() +// const [, secondParagraph] = editor.children +// if (Node.string(secondParagraph) === 'ab') { +// clearInterval(handle) +// done() +// } +// }, 10) +// }) + +// it('should reapply server state client side when server restarts', done => { +// automergeBackend.closeConnection(clientId) +// automergeBackend.removeDocument(docId) +// automergeBackend.appendDocument(docId, [ +// { type: 'paragraph', children: [{ text: 'Hi' }] } +// ]) +// automergeBackend.createConnection(clientId, docId, backendSend) +// automergeBackend.openConnection(clientId) + +// const docData = Automerge.save(automergeBackend.getDocument(docId)) +// editor.receiveDocument(docData) + +// const handle = setInterval(() => { +// sendClientMessagesToServer() +// receiveMessagesFromServer() +// console.log('server doc', toJS(automergeBackend.getDocument(docId))) +// if (editor.children.length === 1) { +// done() +// clearInterval(handle) +// } +// }, 1000) +// }) + +// // it('should ? on client restart', done => { +// // editor.closeConnection() + +// // Transforms.insertNodes( +// // editor, +// // { +// // type: 'paragraph', +// // children: [{ text: 'a' }] +// // }, +// // { at: [1] } +// // ) + +// // editor.openConnection() +// // const docData = Automerge.save(automergeBackend.getDocument(docId)) +// // editor.receiveDocument(docData) +// // // ensure that we eventually send a message for the insert_node operation +// // const handle = setInterval(() => { +// // sendClientMessagesToServer() +// // receiveMessagesFromServer() + +// // const serverDoc = toJS(automergeBackend.getDocument(docId)) +// // console.log(JSON.stringify(serverDoc)) +// // console.log(editor.children) +// // if (serverDoc.children.length === 2) { +// // const paragraphNode = serverDoc.children[1] +// // expect(Node.string(paragraphNode)).toEqual('a') +// // clearInterval(handle) +// // done() +// // } +// // }, 1000) +// // }) +// }) diff --git a/packages/client/src/client.spec.ts b/packages/client/src/client.spec.ts index 87546d8..0074703 100644 --- a/packages/client/src/client.spec.ts +++ b/packages/client/src/client.spec.ts @@ -13,7 +13,11 @@ describe('automerge editor client tests', () => { } const editor = withAutomerge(createEditor(), automergeOptions) const automergeBackend = new AutomergeBackend() + const backendSend = (msg: any) => { + serverMessages.push(msg) + } const clientId = 'test-client' + editor.clientId = clientId /** * Initialize a basic automerge backend @@ -24,10 +28,7 @@ describe('automerge editor client tests', () => { automergeBackend.appendDocument(docId, [ { type: 'paragraph', children: [{ text: 'Hi' }] } ]) - automergeBackend.createConnection(clientId, docId, (msg: any) => { - serverMessages.push(msg) - }) - automergeBackend.openConnection(clientId) + automergeBackend.createConnection(clientId, docId, backendSend) // define an editor send function for the clientside automerge editor let clientMessages: any[] = [] @@ -35,6 +36,7 @@ describe('automerge editor client tests', () => { clientMessages.push(msg) } + automergeBackend.openConnection(clientId) // open the editor connection editor.openConnection() @@ -42,7 +44,9 @@ describe('automerge editor client tests', () => { * Helper function to flush client messages and send them to the server */ const sendClientMessagesToServer = () => { - // console.log('clientMessages', JSON.stringify(clientMessages)) + if (!clientMessages.length) return + + console.log('clientMessages', JSON.stringify(clientMessages)) clientMessages.forEach(msg => { automergeBackend.receiveOperation(clientId, msg) }) @@ -53,9 +57,11 @@ describe('automerge editor client tests', () => { * Helper function to flush server messages and send them to the client */ const receiveMessagesFromServer = () => { + if (!serverMessages.length) return + console.log('serverMessages', JSON.stringify(serverMessages)) serverMessages.forEach(msg => { - editor.receiveOperation(msg) + editor.receiveOperation(msg.payload) }) serverMessages = [] } @@ -115,7 +121,6 @@ describe('automerge editor client tests', () => { sendClientMessagesToServer() receiveMessagesFromServer() const [, secondParagraph] = editor.children - console.log(secondParagraph) if (Node.string(secondParagraph) === 'ab') { clearInterval(handle) done() @@ -123,14 +128,58 @@ describe('automerge editor client tests', () => { }, 10) }) - // it('replicate old state error', done => { - // serverConnection.close() - // serverConnection = new Automerge.Connection(serverDocSet, msg => { - // serverMessages.push(msg) - // }) - // serverConnection.open() + it('should reapply server state client side when server restarts', done => { + automergeBackend.closeConnection(clientId) + automergeBackend.removeDocument(docId) + automergeBackend.appendDocument(docId, [ + { type: 'paragraph', children: [{ text: 'Hi' }] } + ]) + automergeBackend.createConnection(clientId, docId, backendSend) + automergeBackend.openConnection(clientId) - // sendClientMessagesToServer() - // receiveMessagesFromServer() + const docData = Automerge.save(automergeBackend.getDocument(docId)) + editor.receiveDocument(docData) + + const handle = setInterval(() => { + sendClientMessagesToServer() + receiveMessagesFromServer() + console.log('server doc', toJS(automergeBackend.getDocument(docId))) + if (editor.children.length === 1) { + done() + clearInterval(handle) + } + }, 1000) + }) + + // it('should ? on client restart', done => { + // editor.closeConnection() + + // Transforms.insertNodes( + // editor, + // { + // type: 'paragraph', + // children: [{ text: 'a' }] + // }, + // { at: [1] } + // ) + + // editor.openConnection() + // const docData = Automerge.save(automergeBackend.getDocument(docId)) + // editor.receiveDocument(docData) + // // ensure that we eventually send a message for the insert_node operation + // const handle = setInterval(() => { + // sendClientMessagesToServer() + // receiveMessagesFromServer() + + // const serverDoc = toJS(automergeBackend.getDocument(docId)) + // console.log(JSON.stringify(serverDoc)) + // console.log(editor.children) + // if (serverDoc.children.length === 2) { + // const paragraphNode = serverDoc.children[1] + // expect(Node.string(paragraphNode)).toEqual('a') + // clearInterval(handle) + // done() + // } + // }, 1000) // }) })