mirror of
https://github.com/cudr/slate-collaborative.git
synced 2024-10-27 20:34:06 +00:00
feat: automerge collaboration to backend
This commit is contained in:
parent
60f7fb21ba
commit
0894a7a917
@ -45,7 +45,7 @@ class AutomergeBackend {
|
||||
* Start Automerge Connection
|
||||
*/
|
||||
|
||||
openConnection = (id: string) => this.connectionMap[id].open()
|
||||
openConnection = (id: string) => this.connectionMap[id]?.open()
|
||||
|
||||
/**
|
||||
* Close Automerge Connection and remove it from connections
|
||||
|
289
packages/backend/src/AutomergeCollaboration.ts
Normal file
289
packages/backend/src/AutomergeCollaboration.ts
Normal file
@ -0,0 +1,289 @@
|
||||
import io from 'socket.io'
|
||||
import * as Automerge from 'automerge'
|
||||
import { Node } from 'slate'
|
||||
import { Server } from 'http'
|
||||
import throttle from 'lodash/throttle'
|
||||
import { SyncDoc, CollabAction, toJS } from '@hiveteams/collab-bridge'
|
||||
import { getClients } from './utils/index'
|
||||
import { debugCollabBackend } from './utils/debug'
|
||||
import AutomergeBackend from './AutomergeBackend'
|
||||
|
||||
export interface IAutomergeCollaborationOptions {
|
||||
entry: Server
|
||||
connectOpts?: SocketIO.ServerOptions
|
||||
defaultValue: Node[]
|
||||
saveFrequency?: number
|
||||
onAuthRequest?: (query: any, socket?: SocketIO.Socket) => Promise<any>
|
||||
onDocumentLoad?: (pathname: string, query?: any) => Promise<Node[]> | Node[]
|
||||
onDocumentSave?: (
|
||||
pathname: string,
|
||||
doc: Node[],
|
||||
user: any
|
||||
) => Promise<void> | void
|
||||
onDisconnect?: (pathname: string, user: any) => Promise<void> | void
|
||||
}
|
||||
|
||||
export default class AutomergeCollaboration {
|
||||
private io: SocketIO.Server
|
||||
private options: IAutomergeCollaborationOptions
|
||||
private backend: AutomergeBackend
|
||||
private userMap: { [key: string]: any | undefined }
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*/
|
||||
|
||||
constructor(options: IAutomergeCollaborationOptions) {
|
||||
this.io = io(options.entry, options.connectOpts)
|
||||
|
||||
this.backend = new AutomergeBackend()
|
||||
|
||||
this.options = options
|
||||
|
||||
this.configure()
|
||||
|
||||
this.userMap = {}
|
||||
|
||||
return this
|
||||
}
|
||||
|
||||
/**
|
||||
* Initial IO configuration
|
||||
*/
|
||||
|
||||
private configure = () =>
|
||||
this.io
|
||||
.of(this.nspMiddleware)
|
||||
.use(this.authMiddleware)
|
||||
.on('connect', this.onConnect)
|
||||
|
||||
/**
|
||||
* Namespace SocketIO middleware. Load document value and append it to CollaborationBackend.
|
||||
*/
|
||||
|
||||
private nspMiddleware = async (path: string, query: any, next: any) => {
|
||||
return next(null, true)
|
||||
}
|
||||
|
||||
/**
|
||||
* SocketIO auth middleware. Used for user authentification.
|
||||
*/
|
||||
|
||||
private authMiddleware = async (
|
||||
socket: SocketIO.Socket,
|
||||
next: (e?: any) => void
|
||||
) => {
|
||||
const { id } = socket
|
||||
const { query } = socket.handshake
|
||||
const { onAuthRequest } = this.options
|
||||
|
||||
// we connect before any async logic so that we
|
||||
// never miss a socket disconnection event
|
||||
socket.on('disconnect', this.onDisconnect(id, socket))
|
||||
|
||||
if (onAuthRequest) {
|
||||
const user = await onAuthRequest(query, socket)
|
||||
|
||||
if (!user) return next(new Error(`Authentification error: ${socket.id}`))
|
||||
|
||||
this.userMap[socket.id] = user
|
||||
}
|
||||
|
||||
return next()
|
||||
}
|
||||
|
||||
/**
|
||||
* On 'connect' handler.
|
||||
*/
|
||||
|
||||
private onConnect = async (socket: SocketIO.Socket) => {
|
||||
const { id, conn } = socket
|
||||
// do nothing if the socket connection has already been closed
|
||||
if (conn.readyState === 'closed') {
|
||||
return
|
||||
}
|
||||
|
||||
const { name } = socket.nsp
|
||||
const { onDocumentLoad } = this.options
|
||||
|
||||
if (!this.backend.getDocument(name)) {
|
||||
const doc = onDocumentLoad
|
||||
? await onDocumentLoad(name)
|
||||
: this.options.defaultValue
|
||||
|
||||
// Ensure socket is still opened
|
||||
// recheck ready state after async operation
|
||||
if (conn.readyState === 'closed') {
|
||||
return
|
||||
}
|
||||
|
||||
// recheck backend getDocument after async operation
|
||||
if (!this.backend.getDocument(name)) {
|
||||
debugCollabBackend('Append document\t\t%s', id)
|
||||
this.backend.appendDocument(name, doc)
|
||||
}
|
||||
}
|
||||
|
||||
debugCollabBackend('Create connection\t%s', id)
|
||||
this.backend.createConnection(
|
||||
id,
|
||||
name,
|
||||
({ type, payload }: CollabAction) => {
|
||||
socket.emit('msg', { type, payload: { id: conn.id, ...payload } })
|
||||
}
|
||||
)
|
||||
|
||||
socket.on('msg', this.onMessage(id, name))
|
||||
|
||||
socket.join(id, () => {
|
||||
const doc = this.backend.getDocument(name)
|
||||
|
||||
if (!doc) {
|
||||
debugCollabBackend(
|
||||
'onConnect: No document available at the time of socket.io join docId=%s socketId=%s',
|
||||
name,
|
||||
id
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
socket.emit('msg', {
|
||||
type: 'document',
|
||||
payload: Automerge.save<SyncDoc>(doc)
|
||||
})
|
||||
|
||||
debugCollabBackend('Open connection\t\t%s', id)
|
||||
this.backend.openConnection(id)
|
||||
})
|
||||
|
||||
this.garbageCursors(name)
|
||||
}
|
||||
|
||||
/**
|
||||
* On 'message' handler
|
||||
*/
|
||||
|
||||
private onMessage = (id: string, name: string) => (data: any) => {
|
||||
switch (data.type) {
|
||||
case 'operation':
|
||||
try {
|
||||
this.backend.receiveOperation(id, data)
|
||||
|
||||
this.autoSaveDoc(id, name)
|
||||
|
||||
this.garbageCursors(name)
|
||||
} catch (e) {
|
||||
console.log(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Save document with throttle
|
||||
*/
|
||||
|
||||
private autoSaveDoc = throttle(
|
||||
async (id: string, docId: string) =>
|
||||
this.backend.getDocument(docId) && this.saveDocument(id, docId),
|
||||
// @ts-ignore: property used before initialization
|
||||
this.options?.saveFrequency || 2000
|
||||
)
|
||||
|
||||
/**
|
||||
* Save document
|
||||
*/
|
||||
|
||||
private saveDocument = async (id: string, docId: string) => {
|
||||
try {
|
||||
const { onDocumentSave } = this.options
|
||||
|
||||
const doc = this.backend.getDocument(docId)
|
||||
|
||||
// Return early if there is no valid document in our crdt backend
|
||||
// Note: this will happen when user disconnects from the collab server
|
||||
// before document load has completed
|
||||
if (!doc) {
|
||||
return
|
||||
}
|
||||
|
||||
const user = this.userMap[id]
|
||||
|
||||
if (onDocumentSave && user) {
|
||||
await onDocumentSave(docId, toJS(doc.children), user)
|
||||
}
|
||||
} catch (e) {
|
||||
console.error(e, docId)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* On 'disconnect' handler
|
||||
*/
|
||||
|
||||
private onDisconnect = (id: string, socket: SocketIO.Socket) => async () => {
|
||||
debugCollabBackend('Connection closed\t%s', id)
|
||||
this.backend.closeConnection(id)
|
||||
|
||||
await this.saveDocument(id, socket.nsp.name)
|
||||
|
||||
// trigger onDisconnect callback if one was provided
|
||||
// and if a user has been loaded for this socket connection
|
||||
const user = this.userMap[id]
|
||||
if (this.options.onDisconnect && user) {
|
||||
await this.options.onDisconnect(socket.nsp.name, user)
|
||||
}
|
||||
|
||||
// cleanup automerge cursor and socket connection
|
||||
this.garbageCursors(socket.nsp.name)
|
||||
socket.leave(id)
|
||||
this.garbageNsp(id)
|
||||
|
||||
// cleanup usermap
|
||||
delete this.userMap[id]
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up unused SocketIO namespaces.
|
||||
*/
|
||||
|
||||
garbageNsp = (id: string) => {
|
||||
Object.keys(this.io.nsps)
|
||||
.filter(n => n !== '/')
|
||||
.forEach(nsp => {
|
||||
getClients(this.io, nsp).then((clientsList: any) => {
|
||||
if (!clientsList.length) {
|
||||
debugCollabBackend('Removing document\t%s', id)
|
||||
this.backend.removeDocument(nsp)
|
||||
delete this.io.nsps[nsp]
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up unused cursor data.
|
||||
*/
|
||||
|
||||
garbageCursors = (nsp: string) => {
|
||||
const doc = this.backend.getDocument(nsp)
|
||||
// if document has already been cleaned up, it is safe to return early
|
||||
if (!doc || !doc.cursors) return
|
||||
|
||||
const namespace = this.io.of(nsp)
|
||||
|
||||
Object.keys(doc?.cursors)?.forEach(key => {
|
||||
if (!namespace.sockets[key]) {
|
||||
debugCollabBackend('Garbage cursor\t\t%s', key)
|
||||
this.backend.garbageCursor(nsp, key)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Destroy SocketIO connection
|
||||
*/
|
||||
|
||||
destroy = async () => {
|
||||
this.io.close()
|
||||
}
|
||||
}
|
185
packages/client/src/client-copy.spec.ts
Normal file
185
packages/client/src/client-copy.spec.ts
Normal file
@ -0,0 +1,185 @@
|
||||
// import { createEditor, Element, Node, Transforms } from 'slate'
|
||||
// import * as Automerge from 'automerge'
|
||||
// import withAutomerge, { AutomergeOptions } from './withAutomerge'
|
||||
// import { SyncDoc, toJS } from '@hiveteams/collab-bridge'
|
||||
// import AutomergeCollaboration from '@hiveteams/collab-backend/lib/AutomergeCollaboration'
|
||||
// import { insertText } from '../../bridge/src/apply/text'
|
||||
|
||||
// describe('automerge editor client tests', () => {
|
||||
// const docId = 'test'
|
||||
// const automergeOptions: AutomergeOptions = {
|
||||
// docId,
|
||||
// onError: msg => console.log('Encountered test error', msg)
|
||||
// }
|
||||
// const editor = withAutomerge(createEditor(), automergeOptions)
|
||||
// const automergeBackend = new AutomergeBackend()
|
||||
// const backendSend = (msg: any) => {
|
||||
// serverMessages.push(msg)
|
||||
// }
|
||||
// const clientId = 'test-client'
|
||||
// editor.clientId = clientId
|
||||
|
||||
// /**
|
||||
// * Initialize a basic automerge backend
|
||||
// */
|
||||
|
||||
// // Create a new server automerge connection with a basic send function
|
||||
// let serverMessages: any[] = []
|
||||
// automergeBackend.appendDocument(docId, [
|
||||
// { type: 'paragraph', children: [{ text: 'Hi' }] }
|
||||
// ])
|
||||
// automergeBackend.createConnection(clientId, docId, backendSend)
|
||||
|
||||
// // define an editor send function for the clientside automerge editor
|
||||
// let clientMessages: any[] = []
|
||||
// editor.send = (msg: any) => {
|
||||
// clientMessages.push(msg)
|
||||
// }
|
||||
|
||||
// automergeBackend.openConnection(clientId)
|
||||
// // open the editor connection
|
||||
// editor.openConnection()
|
||||
|
||||
// /**
|
||||
// * Helper function to flush client messages and send them to the server
|
||||
// */
|
||||
// const sendClientMessagesToServer = () => {
|
||||
// if (!clientMessages.length) return
|
||||
|
||||
// console.log('clientMessages', JSON.stringify(clientMessages))
|
||||
// clientMessages.forEach(msg => {
|
||||
// automergeBackend.receiveOperation(clientId, msg)
|
||||
// })
|
||||
// clientMessages = []
|
||||
// }
|
||||
|
||||
// /**
|
||||
// * Helper function to flush server messages and send them to the client
|
||||
// */
|
||||
// const receiveMessagesFromServer = () => {
|
||||
// if (!serverMessages.length) return
|
||||
|
||||
// console.log('serverMessages', JSON.stringify(serverMessages))
|
||||
// serverMessages.forEach(msg => {
|
||||
// editor.receiveOperation(msg.payload)
|
||||
// })
|
||||
// serverMessages = []
|
||||
// }
|
||||
|
||||
// afterEach(() => {
|
||||
// sendClientMessagesToServer()
|
||||
// receiveMessagesFromServer()
|
||||
// })
|
||||
|
||||
// it('should properly receiveDocument', () => {
|
||||
// const initialDocData = Automerge.save(automergeBackend.getDocument(docId))
|
||||
// editor.receiveDocument(initialDocData)
|
||||
|
||||
// expect(editor.children.length).toEqual(1)
|
||||
// const paragraphNode = editor.children[0] as Element
|
||||
// expect(paragraphNode.type).toEqual('paragraph')
|
||||
// expect(paragraphNode.children.length).toEqual(1)
|
||||
// expect(Node.string(paragraphNode)).toEqual('Hi')
|
||||
// })
|
||||
|
||||
// it('should sync insert node operation with server', done => {
|
||||
// Transforms.insertNodes(editor, {
|
||||
// type: 'paragraph',
|
||||
// children: [{ text: 'a' }]
|
||||
// })
|
||||
|
||||
// // ensure that we eventually send a message for the insert_node oepration
|
||||
// const handle = setInterval(() => {
|
||||
// sendClientMessagesToServer()
|
||||
// receiveMessagesFromServer()
|
||||
|
||||
// const serverDoc = toJS(automergeBackend.getDocument(docId))
|
||||
// if (serverDoc.children.length === 2) {
|
||||
// const paragraphNode = serverDoc.children[1]
|
||||
// expect(Node.string(paragraphNode)).toEqual('a')
|
||||
// clearInterval(handle)
|
||||
// done()
|
||||
// }
|
||||
// }, 10)
|
||||
// })
|
||||
|
||||
// it('should sync insert text operation with client', done => {
|
||||
// const serverDoc = automergeBackend.getDocument(docId)
|
||||
|
||||
// const updatedServerDoc = Automerge.change(serverDoc, newServerDoc => {
|
||||
// insertText(newServerDoc as any, {
|
||||
// type: 'insert_text',
|
||||
// path: [1, 0],
|
||||
// offset: 1,
|
||||
// text: 'b'
|
||||
// })
|
||||
// })
|
||||
// automergeBackend.documentSetMap[docId].setDoc(docId, updatedServerDoc)
|
||||
|
||||
// // ensure that we eventually send a message for the insert_node oepration
|
||||
// const handle = setInterval(() => {
|
||||
// sendClientMessagesToServer()
|
||||
// receiveMessagesFromServer()
|
||||
// const [, secondParagraph] = editor.children
|
||||
// if (Node.string(secondParagraph) === 'ab') {
|
||||
// clearInterval(handle)
|
||||
// done()
|
||||
// }
|
||||
// }, 10)
|
||||
// })
|
||||
|
||||
// it('should reapply server state client side when server restarts', done => {
|
||||
// automergeBackend.closeConnection(clientId)
|
||||
// automergeBackend.removeDocument(docId)
|
||||
// automergeBackend.appendDocument(docId, [
|
||||
// { type: 'paragraph', children: [{ text: 'Hi' }] }
|
||||
// ])
|
||||
// automergeBackend.createConnection(clientId, docId, backendSend)
|
||||
// automergeBackend.openConnection(clientId)
|
||||
|
||||
// const docData = Automerge.save(automergeBackend.getDocument(docId))
|
||||
// editor.receiveDocument(docData)
|
||||
|
||||
// const handle = setInterval(() => {
|
||||
// sendClientMessagesToServer()
|
||||
// receiveMessagesFromServer()
|
||||
// console.log('server doc', toJS(automergeBackend.getDocument(docId)))
|
||||
// if (editor.children.length === 1) {
|
||||
// done()
|
||||
// clearInterval(handle)
|
||||
// }
|
||||
// }, 1000)
|
||||
// })
|
||||
|
||||
// // it('should ? on client restart', done => {
|
||||
// // editor.closeConnection()
|
||||
|
||||
// // Transforms.insertNodes(
|
||||
// // editor,
|
||||
// // {
|
||||
// // type: 'paragraph',
|
||||
// // children: [{ text: 'a' }]
|
||||
// // },
|
||||
// // { at: [1] }
|
||||
// // )
|
||||
|
||||
// // editor.openConnection()
|
||||
// // const docData = Automerge.save(automergeBackend.getDocument(docId))
|
||||
// // editor.receiveDocument(docData)
|
||||
// // // ensure that we eventually send a message for the insert_node operation
|
||||
// // const handle = setInterval(() => {
|
||||
// // sendClientMessagesToServer()
|
||||
// // receiveMessagesFromServer()
|
||||
|
||||
// // const serverDoc = toJS(automergeBackend.getDocument(docId))
|
||||
// // console.log(JSON.stringify(serverDoc))
|
||||
// // console.log(editor.children)
|
||||
// // if (serverDoc.children.length === 2) {
|
||||
// // const paragraphNode = serverDoc.children[1]
|
||||
// // expect(Node.string(paragraphNode)).toEqual('a')
|
||||
// // clearInterval(handle)
|
||||
// // done()
|
||||
// // }
|
||||
// // }, 1000)
|
||||
// // })
|
||||
// })
|
@ -13,7 +13,11 @@ describe('automerge editor client tests', () => {
|
||||
}
|
||||
const editor = withAutomerge(createEditor(), automergeOptions)
|
||||
const automergeBackend = new AutomergeBackend()
|
||||
const backendSend = (msg: any) => {
|
||||
serverMessages.push(msg)
|
||||
}
|
||||
const clientId = 'test-client'
|
||||
editor.clientId = clientId
|
||||
|
||||
/**
|
||||
* Initialize a basic automerge backend
|
||||
@ -24,10 +28,7 @@ describe('automerge editor client tests', () => {
|
||||
automergeBackend.appendDocument(docId, [
|
||||
{ type: 'paragraph', children: [{ text: 'Hi' }] }
|
||||
])
|
||||
automergeBackend.createConnection(clientId, docId, (msg: any) => {
|
||||
serverMessages.push(msg)
|
||||
})
|
||||
automergeBackend.openConnection(clientId)
|
||||
automergeBackend.createConnection(clientId, docId, backendSend)
|
||||
|
||||
// define an editor send function for the clientside automerge editor
|
||||
let clientMessages: any[] = []
|
||||
@ -35,6 +36,7 @@ describe('automerge editor client tests', () => {
|
||||
clientMessages.push(msg)
|
||||
}
|
||||
|
||||
automergeBackend.openConnection(clientId)
|
||||
// open the editor connection
|
||||
editor.openConnection()
|
||||
|
||||
@ -42,7 +44,9 @@ describe('automerge editor client tests', () => {
|
||||
* Helper function to flush client messages and send them to the server
|
||||
*/
|
||||
const sendClientMessagesToServer = () => {
|
||||
// console.log('clientMessages', JSON.stringify(clientMessages))
|
||||
if (!clientMessages.length) return
|
||||
|
||||
console.log('clientMessages', JSON.stringify(clientMessages))
|
||||
clientMessages.forEach(msg => {
|
||||
automergeBackend.receiveOperation(clientId, msg)
|
||||
})
|
||||
@ -53,9 +57,11 @@ describe('automerge editor client tests', () => {
|
||||
* Helper function to flush server messages and send them to the client
|
||||
*/
|
||||
const receiveMessagesFromServer = () => {
|
||||
if (!serverMessages.length) return
|
||||
|
||||
console.log('serverMessages', JSON.stringify(serverMessages))
|
||||
serverMessages.forEach(msg => {
|
||||
editor.receiveOperation(msg)
|
||||
editor.receiveOperation(msg.payload)
|
||||
})
|
||||
serverMessages = []
|
||||
}
|
||||
@ -115,7 +121,6 @@ describe('automerge editor client tests', () => {
|
||||
sendClientMessagesToServer()
|
||||
receiveMessagesFromServer()
|
||||
const [, secondParagraph] = editor.children
|
||||
console.log(secondParagraph)
|
||||
if (Node.string(secondParagraph) === 'ab') {
|
||||
clearInterval(handle)
|
||||
done()
|
||||
@ -123,14 +128,58 @@ describe('automerge editor client tests', () => {
|
||||
}, 10)
|
||||
})
|
||||
|
||||
// it('replicate old state error', done => {
|
||||
// serverConnection.close()
|
||||
// serverConnection = new Automerge.Connection(serverDocSet, msg => {
|
||||
// serverMessages.push(msg)
|
||||
// })
|
||||
// serverConnection.open()
|
||||
it('should reapply server state client side when server restarts', done => {
|
||||
automergeBackend.closeConnection(clientId)
|
||||
automergeBackend.removeDocument(docId)
|
||||
automergeBackend.appendDocument(docId, [
|
||||
{ type: 'paragraph', children: [{ text: 'Hi' }] }
|
||||
])
|
||||
automergeBackend.createConnection(clientId, docId, backendSend)
|
||||
automergeBackend.openConnection(clientId)
|
||||
|
||||
// sendClientMessagesToServer()
|
||||
// receiveMessagesFromServer()
|
||||
const docData = Automerge.save(automergeBackend.getDocument(docId))
|
||||
editor.receiveDocument(docData)
|
||||
|
||||
const handle = setInterval(() => {
|
||||
sendClientMessagesToServer()
|
||||
receiveMessagesFromServer()
|
||||
console.log('server doc', toJS(automergeBackend.getDocument(docId)))
|
||||
if (editor.children.length === 1) {
|
||||
done()
|
||||
clearInterval(handle)
|
||||
}
|
||||
}, 1000)
|
||||
})
|
||||
|
||||
// it('should ? on client restart', done => {
|
||||
// editor.closeConnection()
|
||||
|
||||
// Transforms.insertNodes(
|
||||
// editor,
|
||||
// {
|
||||
// type: 'paragraph',
|
||||
// children: [{ text: 'a' }]
|
||||
// },
|
||||
// { at: [1] }
|
||||
// )
|
||||
|
||||
// editor.openConnection()
|
||||
// const docData = Automerge.save(automergeBackend.getDocument(docId))
|
||||
// editor.receiveDocument(docData)
|
||||
// // ensure that we eventually send a message for the insert_node operation
|
||||
// const handle = setInterval(() => {
|
||||
// sendClientMessagesToServer()
|
||||
// receiveMessagesFromServer()
|
||||
|
||||
// const serverDoc = toJS(automergeBackend.getDocument(docId))
|
||||
// console.log(JSON.stringify(serverDoc))
|
||||
// console.log(editor.children)
|
||||
// if (serverDoc.children.length === 2) {
|
||||
// const paragraphNode = serverDoc.children[1]
|
||||
// expect(Node.string(paragraphNode)).toEqual('a')
|
||||
// clearInterval(handle)
|
||||
// done()
|
||||
// }
|
||||
// }, 1000)
|
||||
// })
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user