@ -7,9 +7,7 @@ import {buildUrlId, parseUrlId} from 'app/common/gristUrls';
import { KeyedOps } from 'app/common/KeyedOps' ;
import { DocReplacementOptions , NEW_DOCUMENT_CODE } from 'app/common/UserAPI' ;
import { HomeDBManager } from 'app/gen-server/lib/HomeDBManager' ;
import { getUserId } from 'app/server/lib/Authorizer' ;
import { checksumFile } from 'app/server/lib/checksumFile' ;
import { OptDocSession } from 'app/server/lib/DocSession' ;
import { DocSnapshotInventory , DocSnapshotPruner } from 'app/server/lib/DocSnapshots' ;
import { IDocWorkerMap } from 'app/server/lib/DocWorkerMap' ;
import { ChecksummedExternalStorage , DELETED_TOKEN , ExternalStorage } from 'app/server/lib/ExternalStorage' ;
@ -216,8 +214,10 @@ export class HostedStorageManager implements IDocStorageManager {
* Returns whether the document is new ( needs to be created ) .
* Calling this method multiple times in parallel for the same document is treated as a sign
* of a bug .
*
* The optional srcDocName parameter is set when preparing a fork .
* /
public async prepareLocalDoc ( docName : string , docSession : OptDocSession ) : Promise < boolean > {
public async prepareLocalDoc ( docName : string , srcDocName?: string ) : Promise < boolean > {
// We could be reopening a document that is still closing down.
// Wait for that to happen. TODO: we could also try to interrupt the closing-down process.
await this . closeDocument ( docName ) ;
@ -228,7 +228,7 @@ export class HostedStorageManager implements IDocStorageManager {
try {
this . _prepareFiles . add ( docName ) ;
const isNew = ! ( await this . _ ensureDocumentIsPresent( docName , docSession ) ) ;
const isNew = ! ( await this . _ claimDocument( docName , srcDocName ) ) ;
return isNew ;
} finally {
this . _prepareFiles . delete ( docName ) ;
@ -236,17 +236,27 @@ export class HostedStorageManager implements IDocStorageManager {
}
public async prepareToCreateDoc ( docName : string ) : Promise < void > {
await this . prepareLocalDoc ( docName , 'new' ) ;
if ( this . _inventory ) {
await this . _inventory . create ( docName ) ;
this . _onInventoryChange ( docName ) ;
}
this . markAsChanged ( docName ) ;
}
/ * *
* Initialize one document from another , associating the result with the current
* worker .
* /
public async prepareFork ( srcDocName : string , destDocName : string ) : Promise < void > {
await this . prepareLocalDoc ( destDocName , srcDocName ) ;
}
// Gets a copy of the document, eg. for downloading. Returns full file path.
// Copy won't change if edits are made to the document. It is caller's responsibility
// to delete the result.
public async getCopy ( docName : string ) : Promise < string > {
const present = await this . _ensureDocumentIsPresent ( docName , { client : null } ) ;
const present = await this . _ claimDocument( docName ) ;
if ( ! present ) {
throw new Error ( 'cannot copy document that does not exist yet' ) ;
}
@ -395,9 +405,10 @@ export class HostedStorageManager implements IDocStorageManager {
return this . _baseStore ;
}
// return true if document is backed up to s3.
public isSaved ( docName : string ) : boolean {
return ! this . _uploads . hasPendingOperation ( docName ) ;
// return true if document and inventory is backed up to external store (if attached).
public isAllSaved ( docName : string ) : boolean {
return ! this . _uploads . hasPendingOperation ( docName ) &&
( this . _inventory ? this . _inventory . isSaved ( docName ) : true ) ;
}
// pick up the pace of pushing to s3, from leisurely to urgent.
@ -423,10 +434,11 @@ export class HostedStorageManager implements IDocStorageManager {
* Make sure document is backed up to s3 .
* /
public async flushDoc ( docName : string ) : Promise < void > {
while ( ! this . is Saved( docName ) ) {
while ( ! this . is All Saved( docName ) ) {
log . info ( 'HostedStorageManager: waiting for document to finish: %s' , docName ) ;
await this . _uploads . expediteOperationAndWait ( docName ) ;
if ( ! this . isSaved ( docName ) ) {
await this . _inventory ? . flush ( docName ) ;
if ( ! this . isAllSaved ( docName ) ) {
// Throttle slightly in case this operation ends up looping excessively.
await delay ( 1000 ) ;
}
@ -502,25 +514,28 @@ export class HostedStorageManager implements IDocStorageManager {
}
/ * *
* Makes sure a document is present locally , fetching it from S3 if necessary .
* Returns true on success , false if document not found . It is safe to call
* this method multiple times in parallel .
* Makes sure a document is assigned to this worker , adding an
* assignment if it has none . If the document is present in
* external storage , fetch it . Return true if the document was
* fetched .
*
* The document can optionally be copied from an alternative
* source ( srcDocName ) . This is useful for forking .
*
* If srcDocName is 'new' , checks for the document in external storage
* are skipped .
* /
private async _ensureDocumentIsPresent ( docName : string ,
docSession : OptDocSession ) : Promise < boolean > {
private async _ claimDocum ent( docName : string ,
srcDocName? : string ) : Promise < boolean > {
// AsyncCreate.mapGetOrSet ensures we don't start multiple promises to talk to S3/Redis
// and that we clean up the failed key in case of failure.
return mapGetOrSet ( this . _localFiles , docName , async ( ) = > {
if ( this . _closed ) { throw new Error ( "HostedStorageManager._ensureDocumentIsPresent called after closing" ) ; }
checkValidDocId ( docName ) ;
const { trunkId , forkId , forkUserId, snapshotId} = parseUrlId ( docName ) ;
const { trunkId , forkId , snapshotId} = parseUrlId ( docName ) ;
// If forkUserId is set to a valid user id, we can only create a fork if we know the
// requesting user and their id matches the forkUserId.
const userId = ( docSession . client && docSession . client . getCachedUserId ( ) ) ||
( docSession . req && getUserId ( docSession . req ) ) ;
const canCreateFork = forkUserId ? ( forkUserId === userId ) : true ;
const canCreateFork = Boolean ( srcDocName ) ;
const docStatus = await this . _docWorkerMap . getDocWorkerOrAssign ( docName , this . _docWorkerId ) ;
if ( ! docStatus . isActive ) { throw new Error ( ` Doc is not active on a DocWorker: ${ docName } ` ) ; }
@ -528,10 +543,12 @@ export class HostedStorageManager implements IDocStorageManager {
throw new Error ( ` Doc belongs to a different DocWorker ( ${ docStatus . docWorker . id } ): ${ docName } ` ) ;
}
if ( srcDocName === 'new' ) { return false ; }
if ( this . _disableS3 ) {
// skip S3, just use file system
let present : boolean = await fse . pathExists ( this . getPath ( docName ) ) ;
if ( forkId && ! present ) {
if ( ( forkId || snapshotId ) && ! present ) {
if ( ! canCreateFork ) { throw new Error ( ` Cannot create fork ` ) ; }
if ( snapshotId && snapshotId !== 'current' ) {
throw new Error ( ` cannot find snapshot ${ snapshotId } of ${ docName } ` ) ;
@ -569,6 +586,7 @@ export class HostedStorageManager implements IDocStorageManager {
}
}
return this . _fetchFromS3 ( docName , {
sourceDocId : srcDocName ,
trunkId : forkId ? trunkId : undefined , snapshotId , canCreateFork
} ) ;
} ) ;