add home/notification streaming
This commit is contained in:
		
							parent
							
								
									1ef3238974
								
							
						
					
					
						commit
						3f264e476c
					
				
					 4 changed files with 104 additions and 32 deletions
				
			
		| 
						 | 
				
			
			@ -6,24 +6,35 @@ import { scheduleIdleTask } from '../_utils/scheduleIdleTask'
 | 
			
		|||
import throttle from 'lodash/throttle'
 | 
			
		||||
import { mark, stop } from '../_utils/marks'
 | 
			
		||||
 | 
			
		||||
async function removeDuplicates (instanceName, timelineName, updates) {
 | 
			
		||||
  // remove duplicates, including duplicates due to reblogs
 | 
			
		||||
  let timelineItemIds = store.getForTimeline(instanceName, timelineName, 'timelineItemIds') || []
 | 
			
		||||
  let reblogIds = (await Promise.all(timelineItemIds.map(async timelineItemId => {
 | 
			
		||||
async function getReblogIds(instanceName, statusIds) {
 | 
			
		||||
  let reblogIds = await Promise.all(statusIds.map(async timelineItemId => {
 | 
			
		||||
    let status = await database.getStatus(instanceName, timelineItemId)
 | 
			
		||||
    return status.reblog && status.reblog.id
 | 
			
		||||
  }))).filter(identity)
 | 
			
		||||
  let existingItemIds = new Set([].concat(timelineItemIds).concat(reblogIds))
 | 
			
		||||
  }))
 | 
			
		||||
  return reblogIds.filter(identity)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async function getExistingItemIdsSet(instanceName, timelineName) {
 | 
			
		||||
  let timelineItemIds = store.getForTimeline(instanceName, timelineName, 'timelineItemIds') || []
 | 
			
		||||
  if (timelineName === 'notifications') {
 | 
			
		||||
    return new Set(timelineItemIds)
 | 
			
		||||
  }
 | 
			
		||||
  let reblogIds = await getReblogIds(instanceName, timelineItemIds)
 | 
			
		||||
  return new Set([].concat(timelineItemIds).concat(reblogIds))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async function removeDuplicates (instanceName, timelineName, updates) {
 | 
			
		||||
  // remove duplicates, including duplicates due to reblogs
 | 
			
		||||
  let existingItemIds = await getExistingItemIdsSet(instanceName, timelineName)
 | 
			
		||||
  return updates.filter(update => !existingItemIds.has(update.id))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async function processFreshChanges (instanceName, timelineName) {
 | 
			
		||||
  mark('processFreshChanges')
 | 
			
		||||
  let freshChanges = store.getForTimeline(instanceName, timelineName, 'freshChanges')
 | 
			
		||||
  if (freshChanges.updates && freshChanges.updates.length) {
 | 
			
		||||
    let updates = freshChanges.updates.slice()
 | 
			
		||||
    freshChanges.updates = []
 | 
			
		||||
    store.setForTimeline(instanceName, timelineName, {freshChanges: freshChanges})
 | 
			
		||||
async function processFreshUpdates (instanceName, timelineName) {
 | 
			
		||||
  mark('processFreshUpdates')
 | 
			
		||||
  let freshUpdates = store.getForTimeline(instanceName, timelineName, 'freshUpdates')
 | 
			
		||||
  if (freshUpdates && freshUpdates.length) {
 | 
			
		||||
    let updates = freshUpdates.slice()
 | 
			
		||||
    store.setForTimeline(instanceName, timelineName, {freshUpdates: []})
 | 
			
		||||
 | 
			
		||||
    updates = await removeDuplicates(instanceName, timelineName, updates)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -35,37 +46,54 @@ async function processFreshChanges (instanceName, timelineName) {
 | 
			
		|||
      console.log('adding ', itemIdsToAdd.length, 'items to itemIdsToAdd')
 | 
			
		||||
      store.setForTimeline(instanceName, timelineName, {itemIdsToAdd: itemIdsToAdd})
 | 
			
		||||
    }
 | 
			
		||||
    stop('processFreshChanges')
 | 
			
		||||
    stop('processFreshUpdates')
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const lazilyProcessFreshChanges = throttle((instanceName, timelineName) => {
 | 
			
		||||
const lazilyProcessFreshUpdates = throttle((instanceName, timelineName) => {
 | 
			
		||||
  scheduleIdleTask(() => {
 | 
			
		||||
    processFreshChanges(instanceName, timelineName)
 | 
			
		||||
    /* no await */ processFreshUpdates(instanceName, timelineName)
 | 
			
		||||
  })
 | 
			
		||||
}, 5000)
 | 
			
		||||
 | 
			
		||||
function handleStreamMessage (instanceName, timelineName, message) {
 | 
			
		||||
  mark('handleStreamMessage')
 | 
			
		||||
function processUpdate(instanceName, timelineName, update) {
 | 
			
		||||
  let freshUpdates = store.getForTimeline(instanceName, timelineName, 'freshUpdates') || []
 | 
			
		||||
  freshUpdates.push(update)
 | 
			
		||||
  store.setForTimeline(instanceName, timelineName, {freshUpdates: freshUpdates})
 | 
			
		||||
  lazilyProcessFreshUpdates(instanceName, timelineName)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
function processDelete(instanceName, deletion) {
 | 
			
		||||
  // TODO
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
function processMessage (instanceName, timelineName, message) {
 | 
			
		||||
  mark('processMessage')
 | 
			
		||||
  let { event, payload } = message
 | 
			
		||||
  let key = event === 'update' ? 'updates' : 'deletes'
 | 
			
		||||
  let freshChanges = store.getForTimeline(instanceName, timelineName, 'freshChanges') || {}
 | 
			
		||||
  freshChanges[key] = freshChanges[key] || []
 | 
			
		||||
  freshChanges[key].push(JSON.parse(payload))
 | 
			
		||||
  store.setForTimeline(instanceName, timelineName, {freshChanges: freshChanges})
 | 
			
		||||
  lazilyProcessFreshChanges(instanceName, timelineName)
 | 
			
		||||
  stop('handleStreamMessage')
 | 
			
		||||
  let parsedPayload = JSON.parse(payload)
 | 
			
		||||
  switch (event) {
 | 
			
		||||
    case 'delete':
 | 
			
		||||
      processDelete(instanceName, parsedPayload)
 | 
			
		||||
      break
 | 
			
		||||
    case 'update':
 | 
			
		||||
      processUpdate(instanceName, timelineName, parsedPayload)
 | 
			
		||||
      break
 | 
			
		||||
    case 'notification':
 | 
			
		||||
      processUpdate(instanceName, 'notifications', parsedPayload)
 | 
			
		||||
      break
 | 
			
		||||
  }
 | 
			
		||||
  stop('processMessage')
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export function createStream (streamingApi, instanceName, accessToken, timelineName) {
 | 
			
		||||
  return new TimelineStream(streamingApi, accessToken, timelineName, {
 | 
			
		||||
    onMessage (msg) {
 | 
			
		||||
      if (msg.event !== 'update' && msg.event !== 'delete') {
 | 
			
		||||
      if (msg.event !== 'update' && msg.event !== 'delete' && msg.event !== 'notification') {
 | 
			
		||||
        console.error("don't know how to handle event", msg)
 | 
			
		||||
        return
 | 
			
		||||
      }
 | 
			
		||||
      scheduleIdleTask(() => {
 | 
			
		||||
        handleStreamMessage(instanceName, timelineName, msg)
 | 
			
		||||
        processMessage(instanceName, timelineName, msg)
 | 
			
		||||
      })
 | 
			
		||||
    },
 | 
			
		||||
    onOpen () {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -178,6 +178,9 @@ async function insertTimelineNotifications (instanceName, timeline, notification
 | 
			
		|||
  for (let notification of notifications) {
 | 
			
		||||
    setInCache(notificationsCache, instanceName, notification.id, notification)
 | 
			
		||||
    setInCache(accountsCache, instanceName, notification.account.id, notification.account)
 | 
			
		||||
    if (notification.status) {
 | 
			
		||||
      setInCache(statusesCache, instanceName, notification.status.id, notification.status)
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  const db = await getDatabase(instanceName)
 | 
			
		||||
  let storeNames = [NOTIFICATION_TIMELINES_STORE, NOTIFICATIONS_STORE, ACCOUNTS_STORE, STATUSES_STORE]
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,13 +1,43 @@
 | 
			
		|||
import { updateInstanceInfo, updateVerifyCredentialsForInstance } from '../_actions/instances'
 | 
			
		||||
import { updateLists } from '../_actions/lists'
 | 
			
		||||
import { createStream } from '../_actions/streaming'
 | 
			
		||||
 | 
			
		||||
export function instanceObservers (store) {
 | 
			
		||||
  store.observe('currentInstance', (currentInstance) => {
 | 
			
		||||
 | 
			
		||||
  // stream to watch for home timeline updates and notifications
 | 
			
		||||
  let currentInstanceStream
 | 
			
		||||
 | 
			
		||||
  store.observe('currentInstance', async (currentInstance) => {
 | 
			
		||||
    if (!process.browser) {
 | 
			
		||||
      return
 | 
			
		||||
    }
 | 
			
		||||
    if (currentInstanceStream) {
 | 
			
		||||
      currentInstanceStream.close()
 | 
			
		||||
      currentInstanceStream = null
 | 
			
		||||
      if (process.env.NODE_ENV !== 'production') {
 | 
			
		||||
        window.currentInstanceStream = null
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
    if (!currentInstance) {
 | 
			
		||||
      return
 | 
			
		||||
    }
 | 
			
		||||
    updateVerifyCredentialsForInstance(currentInstance)
 | 
			
		||||
    updateInstanceInfo(currentInstance)
 | 
			
		||||
    updateLists()
 | 
			
		||||
 | 
			
		||||
    await updateInstanceInfo(currentInstance)
 | 
			
		||||
    let instanceInfo = store.get('currentInstanceInfo')
 | 
			
		||||
    if (!(instanceInfo && store.get('currentInstance') === currentInstance)) {
 | 
			
		||||
      return
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    let accessToken = store.get('accessToken')
 | 
			
		||||
    currentInstanceStream = createStream(instanceInfo.urls.streaming_api,
 | 
			
		||||
      currentInstance, accessToken, 'home')
 | 
			
		||||
 | 
			
		||||
    if (process.env.NODE_ENV !== 'production') {
 | 
			
		||||
      window.currentInstanceStream = currentInstanceStream
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
  })
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2,6 +2,9 @@ import { updateInstanceInfo } from '../_actions/instances'
 | 
			
		|||
import { createStream } from '../_actions/streaming'
 | 
			
		||||
 | 
			
		||||
export function timelineObservers (store) {
 | 
			
		||||
 | 
			
		||||
  // stream to watch for local/federated/etc. updates. home and notification
 | 
			
		||||
  // updates are handled in timelineObservers.js
 | 
			
		||||
  let currentTimelineStream
 | 
			
		||||
 | 
			
		||||
  store.observe('currentTimeline', async (currentTimeline) => {
 | 
			
		||||
| 
						 | 
				
			
			@ -11,18 +14,21 @@ export function timelineObservers (store) {
 | 
			
		|||
    if (currentTimelineStream) {
 | 
			
		||||
      currentTimelineStream.close()
 | 
			
		||||
      currentTimelineStream = null
 | 
			
		||||
      if (process.env.NODE_ENV !== 'production') {
 | 
			
		||||
        window.currentTimelineStream = null
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
    if (!currentTimeline) {
 | 
			
		||||
      return
 | 
			
		||||
    }
 | 
			
		||||
    if (!(['home', 'local', 'federated'].includes(currentTimeline) ||
 | 
			
		||||
        currentTimeline.startsWith('list/') ||
 | 
			
		||||
        currentTimeline.startsWith('tag/'))) {
 | 
			
		||||
    if (currentTimeline !== 'local' &&
 | 
			
		||||
        currentTimeline !== 'federated' &&
 | 
			
		||||
        !currentTimeline.startsWith('list/') &&
 | 
			
		||||
        !currentTimeline.startsWith('tag/')) {
 | 
			
		||||
      return
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    let currentInstance = store.get('currentInstance')
 | 
			
		||||
    let accessToken = store.get('accessToken')
 | 
			
		||||
    await updateInstanceInfo(currentInstance)
 | 
			
		||||
    let instanceInfo = store.get('currentInstanceInfo')
 | 
			
		||||
    if (!(instanceInfo &&
 | 
			
		||||
| 
						 | 
				
			
			@ -31,7 +37,12 @@ export function timelineObservers (store) {
 | 
			
		|||
      return
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    let accessToken = store.get('accessToken')
 | 
			
		||||
    currentTimelineStream = createStream(instanceInfo.urls.streaming_api,
 | 
			
		||||
      currentInstance, accessToken, currentTimeline)
 | 
			
		||||
 | 
			
		||||
    if (process.env.NODE_ENV !== 'production') {
 | 
			
		||||
      window.currentTimelineStream = currentTimelineStream
 | 
			
		||||
    }
 | 
			
		||||
  })
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		
		Reference in a new issue