Merge pull request #27 from bwindels/bwindels/tryfixconstrainterror

Attempt to fix ConstraintError on sync
This commit is contained in:
Bruno Windels 2020-01-04 19:29:10 +00:00 committed by GitHub
commit 063efea159
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 17 additions and 19 deletions

View File

@ -13,7 +13,7 @@ export default class Room extends EventEmitter {
this._hsApi = hsApi; this._hsApi = hsApi;
this._summary = new RoomSummary(roomId); this._summary = new RoomSummary(roomId);
this._fragmentIdComparer = new FragmentIdComparer([]); this._fragmentIdComparer = new FragmentIdComparer([]);
this._syncWriter = new SyncWriter({roomId, storage, fragmentIdComparer: this._fragmentIdComparer}); this._syncWriter = new SyncWriter({roomId, fragmentIdComparer: this._fragmentIdComparer});
this._emitCollectionChange = emitCollectionChange; this._emitCollectionChange = emitCollectionChange;
this._sendQueue = new SendQueue({roomId, storage, sendScheduler, pendingEvents}); this._sendQueue = new SendQueue({roomId, storage, sendScheduler, pendingEvents});
this._timeline = null; this._timeline = null;
@ -22,19 +22,20 @@ export default class Room extends EventEmitter {
async persistSync(roomResponse, membership, txn) { async persistSync(roomResponse, membership, txn) {
const summaryChanged = this._summary.applySync(roomResponse, membership, txn); const summaryChanged = this._summary.applySync(roomResponse, membership, txn);
const newTimelineEntries = await this._syncWriter.writeSync(roomResponse, txn); const {entries, newLiveKey} = await this._syncWriter.writeSync(roomResponse, txn);
let removedPendingEvents; let removedPendingEvents;
if (roomResponse.timeline && roomResponse.timeline.events) { if (roomResponse.timeline && roomResponse.timeline.events) {
removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn); removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn);
} }
return {summaryChanged, newTimelineEntries, removedPendingEvents}; return {summaryChanged, newTimelineEntries: entries, newLiveKey, removedPendingEvents};
} }
emitSync({summaryChanged, newTimelineEntries, removedPendingEvents}) { emitSync({summaryChanged, newTimelineEntries, newLiveKey, removedPendingEvents}) {
if (summaryChanged) { if (summaryChanged) {
this.emit("change"); this.emit("change");
this._emitCollectionChange(this); this._emitCollectionChange(this);
} }
this._syncWriter.setKeyOnCompleted(newLiveKey);
if (this._timeline) { if (this._timeline) {
this._timeline.appendLiveEntries(newTimelineEntries); this._timeline.appendLiveEntries(newTimelineEntries);
} }

View File

@ -18,9 +18,8 @@ function deduplicateEvents(events) {
} }
export default class SyncWriter { export default class SyncWriter {
constructor({roomId, storage, fragmentIdComparer}) { constructor({roomId, fragmentIdComparer}) {
this._roomId = roomId; this._roomId = roomId;
this._storage = storage;
this._fragmentIdComparer = fragmentIdComparer; this._fragmentIdComparer = fragmentIdComparer;
this._lastLiveKey = null; this._lastLiveKey = null;
} }
@ -85,23 +84,23 @@ export default class SyncWriter {
async writeSync(roomResponse, txn) { async writeSync(roomResponse, txn) {
const entries = []; const entries = [];
const timeline = roomResponse.timeline; const timeline = roomResponse.timeline;
if (!this._lastLiveKey) { let currentKey = this._lastLiveKey;
if (!currentKey) {
// means we haven't synced this room yet (just joined or did initial sync) // means we haven't synced this room yet (just joined or did initial sync)
// as this is probably a limited sync, prev_batch should be there // as this is probably a limited sync, prev_batch should be there
// (but don't fail if it isn't, we won't be able to back-paginate though) // (but don't fail if it isn't, we won't be able to back-paginate though)
let liveFragment = await this._createLiveFragment(txn, timeline.prev_batch); let liveFragment = await this._createLiveFragment(txn, timeline.prev_batch);
this._lastLiveKey = new EventKey(liveFragment.id, EventKey.defaultLiveKey.eventIndex); currentKey = new EventKey(liveFragment.id, EventKey.defaultLiveKey.eventIndex);
entries.push(FragmentBoundaryEntry.start(liveFragment, this._fragmentIdComparer)); entries.push(FragmentBoundaryEntry.start(liveFragment, this._fragmentIdComparer));
} else if (timeline.limited) { } else if (timeline.limited) {
// replace live fragment for limited sync, *only* if we had a live fragment already // replace live fragment for limited sync, *only* if we had a live fragment already
const oldFragmentId = this._lastLiveKey.fragmentId; const oldFragmentId = currentKey.fragmentId;
this._lastLiveKey = this._lastLiveKey.nextFragmentKey(); currentKey = currentKey.nextFragmentKey();
const {oldFragment, newFragment} = await this._replaceLiveFragment(oldFragmentId, this._lastLiveKey.fragmentId, timeline.prev_batch, txn); const {oldFragment, newFragment} = await this._replaceLiveFragment(oldFragmentId, currentKey.fragmentId, timeline.prev_batch, txn);
entries.push(FragmentBoundaryEntry.end(oldFragment, this._fragmentIdComparer)); entries.push(FragmentBoundaryEntry.end(oldFragment, this._fragmentIdComparer));
entries.push(FragmentBoundaryEntry.start(newFragment, this._fragmentIdComparer)); entries.push(FragmentBoundaryEntry.start(newFragment, this._fragmentIdComparer));
} }
let currentKey = this._lastLiveKey;
if (timeline.events) { if (timeline.events) {
const events = deduplicateEvents(timeline.events); const events = deduplicateEvents(timeline.events);
for(const event of events) { for(const event of events) {
@ -111,12 +110,6 @@ export default class SyncWriter {
entries.push(new EventEntry(entry, this._fragmentIdComparer)); entries.push(new EventEntry(entry, this._fragmentIdComparer));
} }
} }
// right thing to do? if the txn fails, not sure we'll continue anyways ...
// only advance the key once the transaction has succeeded
txn.complete().then(() => {
this._lastLiveKey = currentKey;
})
// persist state // persist state
const state = roomResponse.state; const state = roomResponse.state;
if (state.events) { if (state.events) {
@ -133,7 +126,11 @@ export default class SyncWriter {
} }
} }
return entries; return {entries, newLiveKey: currentKey};
}
setKeyOnCompleted(newLiveKey) {
this._lastLiveKey = newLiveKey;
} }
} }