2020-08-05 18:38:55 +02:00
/ *
Copyright 2020 Bruno Windels < bruno @ windels . cloud >
2020-08-17 14:13:23 +02:00
Copyright 2020 The Matrix . org Foundation C . I . C .
2020-08-05 18:38:55 +02:00
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
2020-04-04 17:34:46 +02:00
import { AbortError } from "./error.js" ;
2020-04-20 21:26:39 +02:00
import { ObservableValue } from "../observable/ObservableValue.js" ;
import { createEnum } from "../utils/enum.js" ;
2018-12-21 14:35:24 +01:00
2019-02-10 21:25:29 +01:00
const INCREMENTAL _TIMEOUT = 30000 ;
const SYNC _EVENT _LIMIT = 10 ;
2018-12-21 14:35:24 +01:00
2020-04-19 19:52:26 +02:00
export const SyncStatus = createEnum (
"InitialSync" ,
"CatchupSync" ,
"Syncing" ,
"Stopped"
) ;
2020-08-17 14:13:23 +02:00
function timelineIsEmpty ( roomResponse ) {
try {
2020-08-19 11:36:43 +02:00
const events = roomResponse ? . timeline ? . events ;
return Array . isArray ( events ) && events . length === 0 ;
2020-08-17 14:13:23 +02:00
} catch ( err ) {
return true ;
}
}
2020-09-10 12:11:43 +02:00
/ * *
* Sync steps in js - pseudocode :
* ` ` ` js
2020-09-23 14:26:14 +02:00
* // can only read some stores
* const preparation = await room . prepareSync ( roomResponse , membership , prepareTxn ) ;
* // can do async work that is not related to storage (such as decryption)
* await room . afterPrepareSync ( preparation ) ;
2020-09-10 12:11:43 +02:00
* // writes and calculates changes
2020-09-23 14:26:14 +02:00
* const changes = await room . writeSync ( roomResponse , isInitialSync , preparation , syncTxn ) ;
2020-09-10 12:11:43 +02:00
* // applies and emits changes once syncTxn is committed
2020-09-23 18:06:16 +02:00
* room . afterSync ( changes ) ;
2020-09-10 12:11:43 +02:00
* if ( room . needsAfterSyncCompleted ( changes ) ) {
* // can do network requests
* await room . afterSyncCompleted ( changes ) ;
* }
* ` ` `
* /
2020-04-20 19:47:45 +02:00
export class Sync {
2019-09-08 10:19:16 +02:00
constructor ( { hsApi , session , storage } ) {
2019-05-12 20:26:46 +02:00
this . _hsApi = hsApi ;
this . _session = session ;
this . _storage = storage ;
this . _currentRequest = null ;
2020-04-19 19:52:26 +02:00
this . _status = new ObservableValue ( SyncStatus . Stopped ) ;
this . _error = null ;
}
get status ( ) {
return this . _status ;
2019-05-12 20:26:46 +02:00
}
2019-06-16 10:54:16 +02:00
2020-04-19 19:52:26 +02:00
/** the error that made the sync stop */
get error ( ) {
return this . _error ;
2019-06-16 10:54:16 +02:00
}
2020-04-19 19:52:26 +02:00
start ( ) {
// not already syncing?
if ( this . _status . get ( ) !== SyncStatus . Stopped ) {
2019-05-12 20:26:46 +02:00
return ;
}
2020-09-21 13:55:35 +02:00
this . _error = null ;
2019-05-12 20:26:46 +02:00
let syncToken = this . _session . syncToken ;
2020-04-19 19:52:26 +02:00
if ( syncToken ) {
this . _status . set ( SyncStatus . CatchupSync ) ;
} else {
this . _status . set ( SyncStatus . InitialSync ) ;
2019-05-12 20:26:46 +02:00
}
this . _syncLoop ( syncToken ) ;
}
2018-12-21 14:35:24 +01:00
2019-05-12 20:26:46 +02:00
async _syncLoop ( syncToken ) {
// if syncToken is falsy, it will first do an initial sync ...
2020-04-19 19:52:26 +02:00
while ( this . _status . get ( ) !== SyncStatus . Stopped ) {
2020-09-10 12:11:43 +02:00
let roomStates ;
2020-09-24 10:52:56 +02:00
let sessionChanges ;
2019-05-12 20:26:46 +02:00
try {
console . log ( ` starting sync request with since ${ syncToken } ... ` ) ;
2020-09-21 17:57:01 +02:00
// unless we are happily syncing already, we want the server to return
// as quickly as possible, even if there are no events queued. This
// serves two purposes:
//
// * When the connection dies, we want to know asap when it comes back,
// so that we can hide the error from the user. (We don't want to
// have to wait for an event or a timeout).
//
// * We want to know if the server has any to_device messages queued up
// for us. We do that by calling it with a zero timeout until it
// doesn't give us any more to_device messages.
const timeout = this . _status . get ( ) === SyncStatus . Syncing ? INCREMENTAL _TIMEOUT : 0 ;
2020-09-21 17:53:29 +02:00
const syncResult = await this . _syncRequest ( syncToken , timeout ) ;
2020-09-08 14:37:24 +02:00
syncToken = syncResult . syncToken ;
2020-09-10 12:11:43 +02:00
roomStates = syncResult . roomStates ;
2020-09-24 10:52:56 +02:00
sessionChanges = syncResult . sessionChanges ;
2020-09-21 17:57:01 +02:00
// initial sync or catchup sync
if ( this . _status . get ( ) !== SyncStatus . Syncing && syncResult . hadToDeviceMessages ) {
this . _status . set ( SyncStatus . CatchupSync ) ;
} else {
this . _status . set ( SyncStatus . Syncing ) ;
}
2019-05-12 20:26:46 +02:00
} catch ( err ) {
2020-09-25 10:44:29 +02:00
// retry same request on timeout
if ( err . name === "ConnectionError" && err . isTimeout ) {
// don't run afterSyncCompleted
continue ;
}
2020-09-25 10:44:19 +02:00
if ( err . name !== AbortError ) {
2020-09-18 13:11:18 +02:00
console . warn ( "stopping sync because of error" ) ;
console . error ( err ) ;
2020-04-19 19:52:26 +02:00
this . _error = err ;
2019-05-12 20:26:46 +02:00
}
2020-09-28 16:06:19 +02:00
this . _status . set ( SyncStatus . Stopped ) ;
2019-05-12 20:26:46 +02:00
}
2020-09-21 13:55:35 +02:00
if ( this . _status . get ( ) !== SyncStatus . Stopped ) {
2020-09-24 10:52:56 +02:00
// TODO: if we're not going to run this phase in parallel with the next
// sync request (because this causes OTKs to be uploaded twice)
// should we move this inside _syncRequest?
// Alternatively, we can try to fix the OTK upload issue while still
// running in parallel.
await this . _runAfterSyncCompleted ( sessionChanges , roomStates ) ;
2020-09-08 14:37:24 +02:00
}
}
}
2020-09-24 10:52:56 +02:00
async _runAfterSyncCompleted ( sessionChanges , roomStates ) {
2020-09-21 17:57:01 +02:00
const isCatchupSync = this . _status . get ( ) === SyncStatus . CatchupSync ;
2020-09-08 14:37:24 +02:00
const sessionPromise = ( async ( ) => {
try {
2020-09-24 10:52:56 +02:00
await this . _session . afterSyncCompleted ( sessionChanges , isCatchupSync ) ;
2020-09-08 14:37:24 +02:00
} catch ( err ) {
console . error ( "error during session afterSyncCompleted, continuing" , err . stack ) ;
}
} ) ( ) ;
2020-09-10 12:11:43 +02:00
const roomsNeedingAfterSyncCompleted = roomStates . filter ( rs => {
return rs . room . needsAfterSyncCompleted ( rs . changes ) ;
} ) ;
const roomsPromises = roomsNeedingAfterSyncCompleted . map ( async rs => {
try {
await rs . room . afterSyncCompleted ( rs . changes ) ;
} catch ( err ) {
console . error ( ` error during room ${ rs . room . id } afterSyncCompleted, continuing ` , err . stack ) ;
}
2020-09-08 14:37:24 +02:00
} ) ;
// run everything in parallel,
// we don't want to delay the next sync too much
2020-09-10 12:11:43 +02:00
// Also, since all promises won't reject (as they have a try/catch)
// it's fine to use Promise.all
await Promise . all ( roomsPromises . concat ( sessionPromise ) ) ;
2019-05-12 20:26:46 +02:00
}
2019-02-03 21:17:24 +00:00
2020-09-21 17:53:29 +02:00
async _syncRequest ( syncToken , timeout ) {
2019-10-12 20:24:09 +02:00
let { syncFilterId } = this . _session ;
if ( typeof syncFilterId !== "string" ) {
2020-05-07 00:04:41 +02:00
this . _currentRequest = this . _hsApi . createFilter ( this . _session . user . id , { room : { state : { lazy _load _members : true } } } ) ;
syncFilterId = ( await this . _currentRequest . response ( ) ) . filter _id ;
2019-10-12 20:24:09 +02:00
}
2020-05-06 23:50:12 +02:00
const totalRequestTimeout = timeout + ( 80 * 1000 ) ; // same as riot-web, don't get stuck on wedged long requests
this . _currentRequest = this . _hsApi . sync ( syncToken , syncFilterId , timeout , { timeout : totalRequestTimeout } ) ;
2019-05-12 20:26:46 +02:00
const response = await this . _currentRequest . response ( ) ;
2020-09-08 14:37:24 +02:00
2020-08-17 14:13:23 +02:00
const isInitialSync = ! syncToken ;
2019-05-12 20:26:46 +02:00
syncToken = response . next _batch ;
2020-09-10 12:11:43 +02:00
const roomStates = this . _parseRoomsResponse ( response . rooms , isInitialSync ) ;
2020-09-23 18:06:16 +02:00
await this . _prepareRooms ( roomStates ) ;
2020-03-14 20:45:36 +01:00
let sessionChanges ;
2020-09-25 16:42:41 +02:00
const syncTxn = this . _openSyncTxn ( ) ;
2019-02-27 19:27:45 +01:00
try {
2020-10-01 14:36:22 +02:00
sessionChanges = await this . _session . writeSync ( response , syncFilterId , syncTxn ) ;
2020-09-10 12:11:43 +02:00
await Promise . all ( roomStates . map ( async rs => {
console . log ( ` * applying sync response to room ${ rs . room . id } ... ` ) ;
rs . changes = await rs . room . writeSync (
2020-09-23 14:26:14 +02:00
rs . roomResponse , isInitialSync , rs . preparation , syncTxn ) ;
2020-09-10 12:11:43 +02:00
} ) ) ;
2019-05-12 20:26:46 +02:00
} catch ( err ) {
// avoid corrupting state by only
// storing the sync up till the point
// the exception occurred
2020-09-29 10:52:52 +02:00
try {
syncTxn . abort ( ) ;
2020-10-01 16:23:15 +02:00
} catch ( abortErr ) {
console . error ( "Could not abort sync transaction, the sync response was probably only partially written and may have put storage in a inconsistent state." , abortErr ) ;
}
2019-05-12 20:26:46 +02:00
throw err ;
}
try {
await syncTxn . complete ( ) ;
console . info ( "syncTxn committed!!" ) ;
} catch ( err ) {
2019-10-12 22:18:19 +02:00
console . error ( "unable to commit sync tranaction" ) ;
2019-06-26 22:00:50 +02:00
throw err ;
2019-05-12 20:26:46 +02:00
}
2020-03-14 20:45:36 +01:00
this . _session . afterSync ( sessionChanges ) ;
2019-02-27 19:27:45 +01:00
// emit room related events after txn has been closed
2020-09-10 12:11:43 +02:00
for ( let rs of roomStates ) {
rs . room . afterSync ( rs . changes ) ;
2019-02-27 19:27:45 +01:00
}
2020-09-21 17:57:01 +02:00
const toDeviceEvents = response . to _device ? . events ;
return {
syncToken ,
roomStates ,
2020-09-24 10:52:56 +02:00
sessionChanges ,
2020-09-21 17:57:01 +02:00
hadToDeviceMessages : Array . isArray ( toDeviceEvents ) && toDeviceEvents . length > 0 ,
} ;
2020-09-08 14:37:24 +02:00
}
2020-09-25 16:42:41 +02:00
_openPrepareSyncTxn ( ) {
2020-09-10 12:11:43 +02:00
const storeNames = this . _storage . storeNames ;
2020-09-25 16:42:41 +02:00
return this . _storage . readTxn ( [
2020-09-10 12:11:43 +02:00
storeNames . inboundGroupSessions ,
] ) ;
}
async _prepareRooms ( roomStates ) {
2020-09-25 16:42:41 +02:00
const prepareTxn = this . _openPrepareSyncTxn ( ) ;
2020-09-23 14:26:14 +02:00
await Promise . all ( roomStates . map ( async rs => {
rs . preparation = await rs . room . prepareSync ( rs . roomResponse , rs . membership , prepareTxn ) ;
} ) ) ;
2020-10-01 16:14:06 +02:00
// This is needed for safari to not throw TransactionInactiveErrors on the syncTxn. See docs/INDEXEDDB.md
2020-10-01 14:31:38 +02:00
await prepareTxn . complete ( ) ;
2020-09-23 14:26:14 +02:00
await Promise . all ( roomStates . map ( rs => rs . room . afterPrepareSync ( rs . preparation ) ) ) ;
2020-09-08 14:37:24 +02:00
}
2020-09-25 16:42:41 +02:00
_openSyncTxn ( ) {
2020-09-08 14:37:24 +02:00
const storeNames = this . _storage . storeNames ;
2020-09-25 16:42:41 +02:00
return this . _storage . readWriteTxn ( [
2020-09-08 14:37:24 +02:00
storeNames . session ,
storeNames . roomSummary ,
storeNames . roomState ,
storeNames . roomMembers ,
storeNames . timelineEvents ,
storeNames . timelineFragments ,
storeNames . pendingEvents ,
storeNames . userIdentities ,
storeNames . groupSessionDecryptions ,
storeNames . deviceIdentities ,
2020-09-08 15:00:00 +02:00
// to discard outbound session when somebody leaves a room
2020-10-01 14:39:23 +02:00
// and to create room key messages when somebody joins
2020-09-11 14:41:12 +02:00
storeNames . outboundGroupSessions ,
2020-09-17 10:39:51 +02:00
storeNames . operations ,
storeNames . accountData ,
2020-09-08 14:37:24 +02:00
] ) ;
2019-05-12 20:26:46 +02:00
}
2020-09-10 12:11:43 +02:00
_parseRoomsResponse ( roomsSection , isInitialSync ) {
const roomStates = [ ] ;
if ( roomsSection ) {
// don't do "invite", "leave" for now
const allMemberships = [ "join" ] ;
for ( const membership of allMemberships ) {
const membershipSection = roomsSection [ membership ] ;
if ( membershipSection ) {
for ( const [ roomId , roomResponse ] of Object . entries ( membershipSection ) ) {
// ignore rooms with empty timelines during initial sync,
// see https://github.com/vector-im/hydrogen-web/issues/15
if ( isInitialSync && timelineIsEmpty ( roomResponse ) ) {
2020-09-21 14:11:28 +02:00
continue ;
2020-09-10 12:11:43 +02:00
}
let room = this . _session . rooms . get ( roomId ) ;
if ( ! room ) {
room = this . _session . createRoom ( roomId ) ;
}
roomStates . push ( new RoomSyncProcessState ( room , roomResponse , membership ) ) ;
}
}
}
}
return roomStates ;
}
2018-12-21 14:35:24 +01:00
2019-05-12 20:26:46 +02:00
stop ( ) {
2020-04-19 19:52:26 +02:00
if ( this . _status . get ( ) === SyncStatus . Stopped ) {
2019-05-12 20:26:46 +02:00
return ;
}
2020-04-19 19:52:26 +02:00
this . _status . set ( SyncStatus . Stopped ) ;
2019-05-12 20:26:46 +02:00
if ( this . _currentRequest ) {
this . _currentRequest . abort ( ) ;
this . _currentRequest = null ;
}
}
2019-02-20 23:48:16 +01:00
}
2020-09-10 12:11:43 +02:00
class RoomSyncProcessState {
constructor ( room , roomResponse , membership ) {
this . room = room ;
this . roomResponse = roomResponse ;
this . membership = membership ;
this . preparation = null ;
this . changes = null ;
}
}