fix: add backend changes to slate-collab

This commit is contained in:
Eric Maciel 2020-10-29 11:40:12 -04:00
parent 47dc072d14
commit 2917c98010
4 changed files with 120 additions and 60 deletions

View File

@ -27,9 +27,11 @@
"@babel/plugin-proposal-optional-chaining": "^7.9.0", "@babel/plugin-proposal-optional-chaining": "^7.9.0",
"@babel/runtime": "^7.6.3", "@babel/runtime": "^7.6.3",
"@hiveteams/collab-bridge": "^0.7.7", "@hiveteams/collab-bridge": "^0.7.7",
"@types/debug": "^4.1.5",
"@types/lodash": "^4.14.150", "@types/lodash": "^4.14.150",
"@types/socket.io": "^2.1.4", "@types/socket.io": "^2.1.4",
"automerge": "0.14.0", "automerge": "0.14.0",
"debug": "^4.2.0",
"lodash": "^4.17.15", "lodash": "^4.17.15",
"slate": "0.58.3", "slate": "0.58.3",
"socket.io": "^2.3.0", "socket.io": "^2.3.0",

View File

@ -8,26 +8,22 @@ import {
SyncDoc, SyncDoc,
CollabAction CollabAction
} from '@hiveteams/collab-bridge' } from '@hiveteams/collab-bridge'
import { debugCollabBackend } from 'utils/debug'
export interface Connections {
[key: string]: Automerge.Connection<SyncDoc>
}
/** /**
* AutomergeBackend contains collaboration with Automerge * AutomergeBackend contains collaboration with Automerge
*/ */
class AutomergeBackend { class AutomergeBackend {
connections: Connections = {} connectionMap: { [key: string]: Automerge.Connection<SyncDoc> } = {}
documentSetMap: { [key: string]: Automerge.DocSet<SyncDoc> } = {}
docSet: Automerge.DocSet<SyncDoc> = new Automerge.DocSet()
/** /**
* Create Autmorge Connection * Create Autmorge Connection
*/ */
createConnection = (id: string, send: any) => { createConnection = (id: string, docId: string, send: any) => {
if (this.connections[id]) { if (this.connectionMap[id]) {
console.warn( console.warn(
`Already has connection with id: ${id}. It will be terminated before creating new connection` `Already has connection with id: ${id}. It will be terminated before creating new connection`
) )
@ -35,8 +31,12 @@ class AutomergeBackend {
this.closeConnection(id) this.closeConnection(id)
} }
this.connections[id] = new Automerge.Connection( if (!this.documentSetMap[docId]) {
this.docSet, throw new Error('Cannot create connection for missing docSet')
}
this.connectionMap[id] = new Automerge.Connection(
this.documentSetMap[docId],
toCollabAction('operation', send) toCollabAction('operation', send)
) )
} }
@ -45,16 +45,16 @@ class AutomergeBackend {
* Start Automerge Connection * Start Automerge Connection
*/ */
openConnection = (id: string) => this.connections[id].open() openConnection = (id: string) => this.connectionMap[id].open()
/** /**
* Close Automerge Connection and remove it from connections * Close Automerge Connection and remove it from connections
*/ */
closeConnection(id: string) { closeConnection(id: string) {
this.connections[id]?.close() this.connectionMap[id]?.close()
delete this.connections[id] delete this.connectionMap[id]
} }
/** /**
@ -63,7 +63,12 @@ class AutomergeBackend {
receiveOperation = (id: string, data: CollabAction) => { receiveOperation = (id: string, data: CollabAction) => {
try { try {
this.connections[id].receiveMsg(data.payload) if (!this.connectionMap[id]) {
debugCollabBackend('Could not receive op for closed connection %s', id)
return
}
this.connectionMap[id].receiveMsg(data.payload)
} catch (e) { } catch (e) {
console.error('Unexpected error in receiveOperation', e) console.error('Unexpected error in receiveOperation', e)
} }
@ -73,7 +78,7 @@ class AutomergeBackend {
* Get document from Automerge DocSet * Get document from Automerge DocSet
*/ */
getDocument = (docId: string) => this.docSet.getDoc(docId) getDocument = (docId: string) => this.documentSetMap[docId]?.getDoc(docId)
/** /**
* Append document to Automerge DocSet * Append document to Automerge DocSet
@ -89,7 +94,10 @@ class AutomergeBackend {
const doc = Automerge.from<SyncDoc>(sync) const doc = Automerge.from<SyncDoc>(sync)
this.docSet.setDoc(docId, doc) if (!this.documentSetMap[docId]) {
this.documentSetMap[docId] = new Automerge.DocSet<SyncDoc>()
}
this.documentSetMap[docId].setDoc(docId, doc)
} catch (e) { } catch (e) {
console.error(e, docId) console.error(e, docId)
} }
@ -99,7 +107,12 @@ class AutomergeBackend {
* Remove document from Automerge DocSet * Remove document from Automerge DocSet
*/ */
removeDocument = (docId: string) => this.docSet.removeDoc(docId) removeDocument = (docId: string) => {
if (this.documentSetMap[docId]) {
this.documentSetMap[docId].removeDoc(docId)
delete this.documentSetMap[docId]
}
}
/** /**
* Remove client cursor data * Remove client cursor data
@ -109,13 +122,13 @@ class AutomergeBackend {
try { try {
const doc = this.getDocument(docId) const doc = this.getDocument(docId)
if (!doc.cursors) return if (!doc || !doc.cursors) return
const change = Automerge.change(doc, (d: any) => { const change = Automerge.change(doc, (d: any) => {
delete d.cursors[id] delete d.cursors[id]
}) })
this.docSet.setDoc(docId, change) this.documentSetMap[docId].setDoc(docId, change)
} catch (e) { } catch (e) {
console.error('Unexpected error in garbageCursor', e) console.error('Unexpected error in garbageCursor', e)
} }

View File

@ -10,11 +10,12 @@ import { SyncDoc, CollabAction, toJS } from '@hiveteams/collab-bridge'
import { getClients } from './utils' import { getClients } from './utils'
import AutomergeBackend from './AutomergeBackend' import AutomergeBackend from './AutomergeBackend'
import { debugCollabBackend } from 'utils/debug'
export interface SocketIOCollaborationOptions { export interface SocketIOCollaborationOptions {
entry: Server entry: Server
connectOpts?: SocketIO.ServerOptions connectOpts?: SocketIO.ServerOptions
defaultValue?: Node[] defaultValue: Node[]
saveFrequency?: number saveFrequency?: number
onAuthRequest?: ( onAuthRequest?: (
query: Object, query: Object,
@ -31,6 +32,7 @@ export default class SocketIOCollaboration {
private io: SocketIO.Server private io: SocketIO.Server
private options: SocketIOCollaborationOptions private options: SocketIOCollaborationOptions
private backend: AutomergeBackend private backend: AutomergeBackend
private autoSaveDoc: (id: string, docId: string) => void
/** /**
* Constructor * Constructor
@ -43,6 +45,15 @@ export default class SocketIOCollaboration {
this.options = options this.options = options
/**
* Save document with throttle
*/
this.autoSaveDoc = throttle(
async (id: string, docId: string) =>
this.backend.getDocument(docId) && this.saveDocument(id, docId),
this.options?.saveFrequency || 2000
)
this.configure() this.configure()
return this return this
@ -63,18 +74,6 @@ export default class SocketIOCollaboration {
*/ */
private nspMiddleware = async (path: string, query: any, next: any) => { private nspMiddleware = async (path: string, query: any, next: any) => {
const { onDocumentLoad } = this.options
if (!this.backend.getDocument(path)) {
const doc = onDocumentLoad
? await onDocumentLoad(path, query)
: this.options.defaultValue
if (!doc) return next(null, false)
this.backend.appendDocument(path, doc)
}
return next(null, true) return next(null, true)
} }
@ -86,9 +85,14 @@ export default class SocketIOCollaboration {
socket: SocketIO.Socket, socket: SocketIO.Socket,
next: (e?: any) => void next: (e?: any) => void
) => { ) => {
const { id } = socket
const { query } = socket.handshake const { query } = socket.handshake
const { onAuthRequest } = this.options const { onAuthRequest } = this.options
// we connect before any async logic so that we
// never miss a socket disconnection event
socket.on('disconnect', this.onDisconnect(id, socket))
if (onAuthRequest) { if (onAuthRequest) {
const permit = await onAuthRequest(query, socket) const permit = await onAuthRequest(query, socket)
@ -103,26 +107,63 @@ export default class SocketIOCollaboration {
* On 'connect' handler. * On 'connect' handler.
*/ */
private onConnect = (socket: SocketIO.Socket) => { private onConnect = async (socket: SocketIO.Socket) => {
const { id, conn } = socket const { id, conn } = socket
const { name } = socket.nsp // do nothing if the socket connection has already been closed
if (conn.readyState === 'closed') {
return
}
this.backend.createConnection(id, ({ type, payload }: CollabAction) => { const { name } = socket.nsp
const { onDocumentLoad } = this.options
if (!this.backend.getDocument(name)) {
const doc = onDocumentLoad
? await onDocumentLoad(name)
: this.options.defaultValue
// Ensure socket is still opened
// recheck ready state after async operation
if (conn.readyState === 'closed') {
return
}
// recheck backend getDocument after async operation
if (!this.backend.getDocument(name)) {
debugCollabBackend('Append document\t\t%s', id)
this.backend.appendDocument(name, doc)
}
}
debugCollabBackend('Create connection\t%s', id)
this.backend.createConnection(
id,
name,
({ type, payload }: CollabAction) => {
socket.emit('msg', { type, payload: { id: conn.id, ...payload } }) socket.emit('msg', { type, payload: { id: conn.id, ...payload } })
}) }
)
socket.on('msg', this.onMessage(id, name)) socket.on('msg', this.onMessage(id, name))
socket.on('disconnect', this.onDisconnect(id, socket))
socket.join(id, () => { socket.join(id, () => {
const doc = this.backend.getDocument(name) const doc = this.backend.getDocument(name)
if (!doc) {
debugCollabBackend(
'onConnect: No document available at the time of socket.io join docId=%s socketId=%s',
name,
id
)
return
}
socket.emit('msg', { socket.emit('msg', {
type: 'document', type: 'document',
payload: Automerge.save<SyncDoc>(doc) payload: Automerge.save<SyncDoc>(doc)
}) })
debugCollabBackend('Open connection\t\t%s', id)
this.backend.openConnection(id) this.backend.openConnection(id)
}) })
@ -139,7 +180,7 @@ export default class SocketIOCollaboration {
try { try {
this.backend.receiveOperation(id, data) this.backend.receiveOperation(id, data)
this.autoSaveDoc(name) this.autoSaveDoc(id, name)
this.garbageCursors(name) this.garbageCursors(name)
} catch (e) { } catch (e) {
@ -148,28 +189,21 @@ export default class SocketIOCollaboration {
} }
} }
/**
* Save document with throttle
*/
private autoSaveDoc = throttle(
async (docId: string) =>
this.backend.getDocument(docId) && this.saveDocument(docId),
this.options?.saveFrequency || 2000
)
/** /**
* Save document * Save document
*/ */
private saveDocument = async (docId: string) => { private saveDocument = async (id: string, docId: string) => {
try { try {
const { onDocumentSave } = this.options const { onDocumentSave } = this.options
const doc = this.backend.getDocument(docId) const doc = this.backend.getDocument(docId)
// Return early if there is no valid document in our crdt backend
// Note: this will happen when user disconnects from the collab server
// before document load has completed
if (!doc) { if (!doc) {
throw new Error(`Can't receive document by id: ${docId}`) return
} }
onDocumentSave && (await onDocumentSave(docId, toJS(doc.children))) onDocumentSave && (await onDocumentSave(docId, toJS(doc.children)))
@ -183,29 +217,36 @@ export default class SocketIOCollaboration {
*/ */
private onDisconnect = (id: string, socket: SocketIO.Socket) => async () => { private onDisconnect = (id: string, socket: SocketIO.Socket) => async () => {
debugCollabBackend('Connection closed\t%s', id)
this.backend.closeConnection(id) this.backend.closeConnection(id)
await this.saveDocument(socket.nsp.name) await this.saveDocument(id, socket.nsp.name)
// cleanup automerge cursor and socket connection
this.garbageCursors(socket.nsp.name) this.garbageCursors(socket.nsp.name)
socket.leave(id) socket.leave(id)
this.garbageNsp(id)
this.garbageNsp()
} }
/** /**
* Clean up unused SocketIO namespaces. * Clean up unused SocketIO namespaces.
*/ */
garbageNsp = () => { garbageNsp = (id: string) => {
Object.keys(this.io.nsps) Object.keys(this.io.nsps)
.filter(n => n !== '/') .filter(n => n !== '/')
.forEach(nsp => { .forEach(nsp => {
getClients(this.io, nsp).then((clientsList: any) => { getClients(this.io, nsp).then((clientsList: any) => {
debugCollabBackend(
'Garbage namespace\t%s clientsList=%o %s',
id,
clientsList,
nsp
)
if (!clientsList.length) { if (!clientsList.length) {
debugCollabBackend('Removing document\t%s', id)
this.backend.removeDocument(nsp) this.backend.removeDocument(nsp)
delete this.io.nsps[nsp] delete this.io.nsps[nsp]
} }
}) })
@ -218,13 +259,14 @@ export default class SocketIOCollaboration {
garbageCursors = (nsp: string) => { garbageCursors = (nsp: string) => {
const doc = this.backend.getDocument(nsp) const doc = this.backend.getDocument(nsp)
// if document has already been cleaned up, it is safe to return early
if (!doc.cursors) return if (!doc || !doc.cursors) return
const namespace = this.io.of(nsp) const namespace = this.io.of(nsp)
Object.keys(doc?.cursors)?.forEach(key => { Object.keys(doc?.cursors)?.forEach(key => {
if (!namespace.sockets[key]) { if (!namespace.sockets[key]) {
debugCollabBackend('Garbage cursor\t\t%s', key)
this.backend.garbageCursor(nsp, key) this.backend.garbageCursor(nsp, key)
} }
}) })

View File

@ -0,0 +1,3 @@
import debug from 'debug'
export const debugCollabBackend = debug('collab-backend')