From 3f264e476c296a5d8585909ca6f07db189f84a8f Mon Sep 17 00:00:00 2001 From: Nolan Lawson Date: Thu, 15 Feb 2018 09:02:46 -0800 Subject: [PATCH] add home/notification streaming --- routes/_actions/streaming.js | 82 ++++++++++++++++++++---------- routes/_database/timelines.js | 3 ++ routes/_store/instanceObservers.js | 32 +++++++++++- routes/_store/timelineObservers.js | 19 +++++-- 4 files changed, 104 insertions(+), 32 deletions(-) diff --git a/routes/_actions/streaming.js b/routes/_actions/streaming.js index a39ab1f..ad4474b 100644 --- a/routes/_actions/streaming.js +++ b/routes/_actions/streaming.js @@ -6,24 +6,35 @@ import { scheduleIdleTask } from '../_utils/scheduleIdleTask' import throttle from 'lodash/throttle' import { mark, stop } from '../_utils/marks' -async function removeDuplicates (instanceName, timelineName, updates) { - // remove duplicates, including duplicates due to reblogs - let timelineItemIds = store.getForTimeline(instanceName, timelineName, 'timelineItemIds') || [] - let reblogIds = (await Promise.all(timelineItemIds.map(async timelineItemId => { +async function getReblogIds(instanceName, statusIds) { + let reblogIds = await Promise.all(statusIds.map(async timelineItemId => { let status = await database.getStatus(instanceName, timelineItemId) return status.reblog && status.reblog.id - }))).filter(identity) - let existingItemIds = new Set([].concat(timelineItemIds).concat(reblogIds)) + })) + return reblogIds.filter(identity) +} + +async function getExistingItemIdsSet(instanceName, timelineName) { + let timelineItemIds = store.getForTimeline(instanceName, timelineName, 'timelineItemIds') || [] + if (timelineName === 'notifications') { + return new Set(timelineItemIds) + } + let reblogIds = await getReblogIds(instanceName, timelineItemIds) + return new Set([].concat(timelineItemIds).concat(reblogIds)) +} + +async function removeDuplicates (instanceName, timelineName, updates) { + // remove duplicates, including duplicates due to reblogs + let existingItemIds = await getExistingItemIdsSet(instanceName, timelineName) return updates.filter(update => !existingItemIds.has(update.id)) } -async function processFreshChanges (instanceName, timelineName) { - mark('processFreshChanges') - let freshChanges = store.getForTimeline(instanceName, timelineName, 'freshChanges') - if (freshChanges.updates && freshChanges.updates.length) { - let updates = freshChanges.updates.slice() - freshChanges.updates = [] - store.setForTimeline(instanceName, timelineName, {freshChanges: freshChanges}) +async function processFreshUpdates (instanceName, timelineName) { + mark('processFreshUpdates') + let freshUpdates = store.getForTimeline(instanceName, timelineName, 'freshUpdates') + if (freshUpdates && freshUpdates.length) { + let updates = freshUpdates.slice() + store.setForTimeline(instanceName, timelineName, {freshUpdates: []}) updates = await removeDuplicates(instanceName, timelineName, updates) @@ -35,37 +46,54 @@ async function processFreshChanges (instanceName, timelineName) { console.log('adding ', itemIdsToAdd.length, 'items to itemIdsToAdd') store.setForTimeline(instanceName, timelineName, {itemIdsToAdd: itemIdsToAdd}) } - stop('processFreshChanges') + stop('processFreshUpdates') } } -const lazilyProcessFreshChanges = throttle((instanceName, timelineName) => { +const lazilyProcessFreshUpdates = throttle((instanceName, timelineName) => { scheduleIdleTask(() => { - processFreshChanges(instanceName, timelineName) + /* no await */ processFreshUpdates(instanceName, timelineName) }) }, 5000) -function handleStreamMessage (instanceName, timelineName, message) { - mark('handleStreamMessage') +function processUpdate(instanceName, timelineName, update) { + let freshUpdates = store.getForTimeline(instanceName, timelineName, 'freshUpdates') || [] + freshUpdates.push(update) + store.setForTimeline(instanceName, timelineName, {freshUpdates: freshUpdates}) + lazilyProcessFreshUpdates(instanceName, timelineName) +} + +function processDelete(instanceName, deletion) { + // TODO +} + +function processMessage (instanceName, timelineName, message) { + mark('processMessage') let { event, payload } = message - let key = event === 'update' ? 'updates' : 'deletes' - let freshChanges = store.getForTimeline(instanceName, timelineName, 'freshChanges') || {} - freshChanges[key] = freshChanges[key] || [] - freshChanges[key].push(JSON.parse(payload)) - store.setForTimeline(instanceName, timelineName, {freshChanges: freshChanges}) - lazilyProcessFreshChanges(instanceName, timelineName) - stop('handleStreamMessage') + let parsedPayload = JSON.parse(payload) + switch (event) { + case 'delete': + processDelete(instanceName, parsedPayload) + break + case 'update': + processUpdate(instanceName, timelineName, parsedPayload) + break + case 'notification': + processUpdate(instanceName, 'notifications', parsedPayload) + break + } + stop('processMessage') } export function createStream (streamingApi, instanceName, accessToken, timelineName) { return new TimelineStream(streamingApi, accessToken, timelineName, { onMessage (msg) { - if (msg.event !== 'update' && msg.event !== 'delete') { + if (msg.event !== 'update' && msg.event !== 'delete' && msg.event !== 'notification') { console.error("don't know how to handle event", msg) return } scheduleIdleTask(() => { - handleStreamMessage(instanceName, timelineName, msg) + processMessage(instanceName, timelineName, msg) }) }, onOpen () { diff --git a/routes/_database/timelines.js b/routes/_database/timelines.js index 3ac4147..22d777b 100644 --- a/routes/_database/timelines.js +++ b/routes/_database/timelines.js @@ -178,6 +178,9 @@ async function insertTimelineNotifications (instanceName, timeline, notification for (let notification of notifications) { setInCache(notificationsCache, instanceName, notification.id, notification) setInCache(accountsCache, instanceName, notification.account.id, notification.account) + if (notification.status) { + setInCache(statusesCache, instanceName, notification.status.id, notification.status) + } } const db = await getDatabase(instanceName) let storeNames = [NOTIFICATION_TIMELINES_STORE, NOTIFICATIONS_STORE, ACCOUNTS_STORE, STATUSES_STORE] diff --git a/routes/_store/instanceObservers.js b/routes/_store/instanceObservers.js index 715b411..6573ebd 100644 --- a/routes/_store/instanceObservers.js +++ b/routes/_store/instanceObservers.js @@ -1,13 +1,43 @@ import { updateInstanceInfo, updateVerifyCredentialsForInstance } from '../_actions/instances' import { updateLists } from '../_actions/lists' +import { createStream } from '../_actions/streaming' export function instanceObservers (store) { - store.observe('currentInstance', (currentInstance) => { + + // stream to watch for home timeline updates and notifications + let currentInstanceStream + + store.observe('currentInstance', async (currentInstance) => { + if (!process.browser) { + return + } + if (currentInstanceStream) { + currentInstanceStream.close() + currentInstanceStream = null + if (process.env.NODE_ENV !== 'production') { + window.currentInstanceStream = null + } + } if (!currentInstance) { return } updateVerifyCredentialsForInstance(currentInstance) updateInstanceInfo(currentInstance) updateLists() + + await updateInstanceInfo(currentInstance) + let instanceInfo = store.get('currentInstanceInfo') + if (!(instanceInfo && store.get('currentInstance') === currentInstance)) { + return + } + + let accessToken = store.get('accessToken') + currentInstanceStream = createStream(instanceInfo.urls.streaming_api, + currentInstance, accessToken, 'home') + + if (process.env.NODE_ENV !== 'production') { + window.currentInstanceStream = currentInstanceStream + } + }) } diff --git a/routes/_store/timelineObservers.js b/routes/_store/timelineObservers.js index db6207f..f7a4502 100644 --- a/routes/_store/timelineObservers.js +++ b/routes/_store/timelineObservers.js @@ -2,6 +2,9 @@ import { updateInstanceInfo } from '../_actions/instances' import { createStream } from '../_actions/streaming' export function timelineObservers (store) { + + // stream to watch for local/federated/etc. updates. home and notification + // updates are handled in timelineObservers.js let currentTimelineStream store.observe('currentTimeline', async (currentTimeline) => { @@ -11,18 +14,21 @@ export function timelineObservers (store) { if (currentTimelineStream) { currentTimelineStream.close() currentTimelineStream = null + if (process.env.NODE_ENV !== 'production') { + window.currentTimelineStream = null + } } if (!currentTimeline) { return } - if (!(['home', 'local', 'federated'].includes(currentTimeline) || - currentTimeline.startsWith('list/') || - currentTimeline.startsWith('tag/'))) { + if (currentTimeline !== 'local' && + currentTimeline !== 'federated' && + !currentTimeline.startsWith('list/') && + !currentTimeline.startsWith('tag/')) { return } let currentInstance = store.get('currentInstance') - let accessToken = store.get('accessToken') await updateInstanceInfo(currentInstance) let instanceInfo = store.get('currentInstanceInfo') if (!(instanceInfo && @@ -31,7 +37,12 @@ export function timelineObservers (store) { return } + let accessToken = store.get('accessToken') currentTimelineStream = createStream(instanceInfo.urls.streaming_api, currentInstance, accessToken, currentTimeline) + + if (process.env.NODE_ENV !== 'production') { + window.currentTimelineStream = currentTimelineStream + } }) }