v1 of streaming
This commit is contained in:
		
							parent
							
								
									321a90251a
								
							
						
					
					
						commit
						64350ad912
					
				
					 12 changed files with 213 additions and 29 deletions
				
			
		|  | @ -5,6 +5,7 @@ import { toast } from '../_utils/toast' | |||
| import { database } from '../_database/database' | ||||
| import { goto } from 'sapper/runtime.js' | ||||
| import { cacheFirstUpdateAfter } from '../_utils/sync' | ||||
| import { getInstanceInfo } from '../_api/instance' | ||||
| 
 | ||||
| export function changeTheme (instanceName, newTheme) { | ||||
|   let instanceThemes = store.get('instanceThemes') | ||||
|  | @ -65,3 +66,17 @@ export async function updateVerifyCredentialsForInstance (instanceName) { | |||
|     verifyCredentials => setStoreVerifyCredentials(instanceName, verifyCredentials) | ||||
|   ) | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| export async function updateInstanceInfo(instanceName) { | ||||
|   await cacheFirstUpdateAfter( | ||||
|     () => getInstanceInfo(instanceName), | ||||
|     () => database.getInstanceInfo(instanceName), | ||||
|     info => database.setInstanceInfo(instanceName, info), | ||||
|     info => { | ||||
|       let instanceInfos = store.get('instanceInfos') | ||||
|       instanceInfos[instanceName] = info | ||||
|       store.set({instanceInfos: instanceInfos}) | ||||
|     } | ||||
|   ) | ||||
| } | ||||
							
								
								
									
										74
									
								
								routes/_actions/streaming.js
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										74
									
								
								routes/_actions/streaming.js
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,74 @@ | |||
| import { TimelineStream } from '../_api/TimelineStream' | ||||
| import identity from 'lodash/identity' | ||||
| import { database } from '../_database/database' | ||||
| import { store } from '../_store/store' | ||||
| import { scheduleIdleTask } from '../_utils/scheduleIdleTask' | ||||
| 
 | ||||
| async function removeDuplicates (instanceName, timelineName, updates) { | ||||
|   // remove duplicates, including duplicates due to reblogs
 | ||||
|   let timelineItemIds = store.getForTimeline(instanceName, timelineName, 'timelineItemIds') | ||||
|   let reblogIds = (await Promise.all(timelineItemIds.map(async timelineItemId => { | ||||
|     let status = await database.getStatus(instanceName, timelineItemId) | ||||
|     return status.reblog && status.reblog.id | ||||
|   }))).filter(identity) | ||||
|   let existingItemIds = new Set([].concat(timelineItemIds).concat(reblogIds)) | ||||
|   return updates.filter(update => !existingItemIds.has(update.id)) | ||||
| } | ||||
| 
 | ||||
| async function handleFreshChanges (instanceName, timelineName) { | ||||
|   console.log('handleFreshChanges') | ||||
|   let freshChanges = store.getForTimeline(instanceName, timelineName, 'freshChanges') | ||||
|   console.log('freshChanges', freshChanges) | ||||
|   if (freshChanges.updates && freshChanges.updates.length) { | ||||
|     let updates = freshChanges.updates.slice() | ||||
|     freshChanges.updates = [] | ||||
|     store.setForTimeline(instanceName, timelineName, {freshChanges: freshChanges}) | ||||
| 
 | ||||
|     console.log('before removing duplicates, updates are ', updates.length) | ||||
|     updates = await removeDuplicates(instanceName, timelineName, updates) | ||||
|     console.log('after removing duplicates, updates are ', updates.length) | ||||
| 
 | ||||
|     await database.insertTimelineItems(instanceName, timelineName, updates) | ||||
| 
 | ||||
|     let itemIdsToAdd = store.getForTimeline(instanceName, timelineName, 'itemIdsToAdd') || [] | ||||
|     itemIdsToAdd = itemIdsToAdd.concat(updates.map(_ => _.id)) | ||||
|     store.setForTimeline(instanceName, timelineName, {itemIdsToAdd: itemIdsToAdd}) | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| function handleStreamMessage (instanceName, timelineName, message) { | ||||
|   console.log('handleStreamMessage') | ||||
|   let { event, payload } = message | ||||
|   let key = event === 'update' ? 'updates' : 'deletes' | ||||
|   let freshChanges = store.getForTimeline(instanceName, timelineName, 'freshChanges') || {} | ||||
|   freshChanges[key] = freshChanges[key] || [] | ||||
|   freshChanges[key].push(JSON.parse(payload)) | ||||
|   store.setForTimeline(instanceName, timelineName, {freshChanges: freshChanges}) | ||||
|   scheduleIdleTask(() => { | ||||
|     handleFreshChanges(instanceName, timelineName) | ||||
|   }) | ||||
| } | ||||
| 
 | ||||
| export function createStream (streamingApi, instanceName, accessToken, timelineName) { | ||||
|   new TimelineStream(streamingApi, accessToken, timelineName, { | ||||
|     onMessage(msg) { | ||||
|       console.log('message', msg) | ||||
|       if (msg.event !== 'update' && msg.event !== 'delete') { | ||||
|         console.error("don't know how to handle event", msg) | ||||
|         return | ||||
|       } | ||||
|       scheduleIdleTask(() => { | ||||
|         handleStreamMessage(instanceName, timelineName, msg) | ||||
|       }) | ||||
|     }, | ||||
|     onOpen() { | ||||
|       console.log('opened stream for timeline', timelineName) | ||||
|     }, | ||||
|     onClose() { | ||||
|       console.log('closed stream for timeline', timelineName) | ||||
|     }, | ||||
|     onReconnect() { | ||||
|       console.log('reconnected stream for timeline', timelineName) | ||||
|     } | ||||
|   }) | ||||
| } | ||||
|  | @ -30,10 +30,14 @@ async function addTimelineItems (instanceName, timelineName, newItems) { | |||
|   console.log('addTimelineItems, length:', newItems.length) | ||||
|   mark('addTimelineItems') | ||||
|   let newIds = newItems.map(item => item.id) | ||||
|   addTimelineItemIds(instanceName, timelineName, newIds) | ||||
|   stop('addTimelineItems') | ||||
| } | ||||
| 
 | ||||
| export async function addTimelineItemIds (instanceName, timelineName, newIds) { | ||||
|   let oldIds = store.getForTimeline(instanceName, timelineName, 'timelineItemIds') || [] | ||||
|   let merged = mergeArrays(oldIds, newIds) | ||||
|   store.setForTimeline(instanceName, timelineName, { timelineItemIds: merged }) | ||||
|   stop('addTimelineItems') | ||||
| } | ||||
| 
 | ||||
| async function fetchTimelineItemsAndPossiblyFallBack () { | ||||
|  |  | |||
|  | @ -1,6 +1,6 @@ | |||
| import { paramsString } from '../_utils/ajax' | ||||
| import noop from 'lodash/noop' | ||||
| import WebSocketClient from '@gamestdio/websocket' | ||||
| import { importWebSocketClient } from '../_utils/asyncModules' | ||||
| 
 | ||||
| function getStreamName (timeline) { | ||||
|   switch (timeline) { | ||||
|  | @ -16,6 +16,9 @@ function getStreamName (timeline) { | |||
|   if (timeline.startsWith('tag/')) { | ||||
|     return 'hashtag' | ||||
|   } | ||||
|   if (timeline.startsWith('list/')) { | ||||
|     return 'list' | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| function getUrl (streamingApi, accessToken, timeline) { | ||||
|  | @ -28,6 +31,8 @@ function getUrl (streamingApi, accessToken, timeline) { | |||
| 
 | ||||
|   if (timeline.startsWith('tag/')) { | ||||
|     params.tag = timeline.split('/').slice(-1)[0] | ||||
|   } else if (timeline.startsWith('list/')) { | ||||
|     params.list = timeline.split('/').slice(-1)[0] | ||||
|   } | ||||
| 
 | ||||
|   if (accessToken) { | ||||
|  | @ -37,22 +42,29 @@ function getUrl (streamingApi, accessToken, timeline) { | |||
|   return url + '?' + paramsString(params) | ||||
| } | ||||
| 
 | ||||
| export class StatusStream { | ||||
| export class TimelineStream { | ||||
|   constructor (streamingApi, accessToken, timeline, opts) { | ||||
|     let url = getUrl(streamingApi, accessToken, timeline) | ||||
|     importWebSocketClient().then(WebSocketClient => { | ||||
|       if (this.__closed) { | ||||
|         return | ||||
|       } | ||||
|       const ws = new WebSocketClient(url, null, { backoff: 'exponential' }) | ||||
|       const onMessage = opts.onMessage || noop | ||||
| 
 | ||||
|     const ws = new WebSocketClient(url, null, { backoff: 'exponential' }) | ||||
|     const onMessage = opts.onMessage || noop | ||||
|       ws.onopen = opts.onOpen || noop | ||||
|       ws.onmessage = e => onMessage(JSON.parse(e.data)) | ||||
|       ws.onclose = opts.onClose || noop | ||||
|       ws.onreconnect = opts.onReconnect || noop | ||||
| 
 | ||||
|     ws.onopen = opts.onOpen || noop | ||||
|     ws.onmessage = e => onMessage(JSON.parse(e.data)) | ||||
|     ws.onclose = opts.onClose || noop | ||||
|     ws.onreconnect = opts.onReconnect || noop | ||||
| 
 | ||||
|     this._ws = ws | ||||
|       this._ws = ws | ||||
|     }) | ||||
|   } | ||||
| 
 | ||||
|   close () { | ||||
|     this._ws.close() | ||||
|     this.__closed = true | ||||
|     if (this._ws) { | ||||
|       this._ws.close() | ||||
|     } | ||||
|   } | ||||
| } | ||||
|  | @ -61,21 +61,27 @@ | |||
|   import { initializeTimeline, fetchTimelineItemsOnScrollToBottom, setupTimeline } from '../../_actions/timeline' | ||||
|   import LoadingPage from '../LoadingPage.html' | ||||
|   import { focusWithCapture, blurWithCapture } from '../../_utils/events' | ||||
|   import { addTimelineItemIds } from '../../_actions/timeline' | ||||
| 
 | ||||
|   export default { | ||||
|     oncreate() { | ||||
|       console.log('timeline oncreate()') | ||||
|       this.onPushState = this.onPushState.bind(this) | ||||
|       this.store.setForCurrentTimeline({ignoreBlurEvents: false}) | ||||
|       window.addEventListener('pushState', this.onPushState) | ||||
|       this.setupFocus() | ||||
|       setupTimeline() | ||||
|       if (this.store.get('initialized')) { | ||||
|         this.restoreFocus() | ||||
|       } | ||||
|       let instanceName = this.store.get('currentInstance') | ||||
|       let timelineName = this.get('timeline') | ||||
|       this.observe('itemIdsToAdd', itemIdsToAdd => { | ||||
|         console.log('itemIdsToAdd', itemIdsToAdd) | ||||
|         addTimelineItemIds(instanceName, timelineName, itemIdsToAdd) | ||||
|         this.store.setForTimeline(instanceName, timelineName, { itemIdsToAdd: [] }) | ||||
|       }) | ||||
|     }, | ||||
|     ondestroy() { | ||||
|       console.log('ondestroy') | ||||
|       window.removeEventListener('pushState', this.onPushState) | ||||
|       this.teardownFocus() | ||||
|     }, | ||||
|     data: () => ({ | ||||
|       StatusVirtualListItem, | ||||
|  | @ -125,6 +131,12 @@ | |||
|           && $firstTimelineItemId | ||||
|           && timelineValue !== $firstTimelineItemId | ||||
|           && timelineValue | ||||
|       }, | ||||
|       itemIdsToAdd: (timeline, $currentInstance, $timelines) => { | ||||
|         return ($timelines && | ||||
|           $timelines[$currentInstance] && | ||||
|           $timelines[$currentInstance][timeline] && | ||||
|           $timelines[$currentInstance][timeline].itemIdsToAdd) || [] | ||||
|       } | ||||
|     }, | ||||
|     store: () => store, | ||||
|  | @ -145,9 +157,6 @@ | |||
|         console.log('timeline initialize()') | ||||
|         initializeTimeline() | ||||
|       }, | ||||
|       onPushState() { | ||||
|         this.store.setForCurrentTimeline({ ignoreBlurEvents: true }) | ||||
|       }, | ||||
|       onScrollToBottom() { | ||||
|         if (!this.store.get('initialized') || | ||||
|             this.store.get('runningUpdate') || | ||||
|  | @ -156,6 +165,17 @@ | |||
|         } | ||||
|         fetchTimelineItemsOnScrollToBottom() | ||||
|       }, | ||||
|       setupFocus() { | ||||
|         this.onPushState = this.onPushState.bind(this) | ||||
|         this.store.setForCurrentTimeline({ignoreBlurEvents: false}) | ||||
|         window.addEventListener('pushState', this.onPushState) | ||||
|       }, | ||||
|       teardownFocus() { | ||||
|         window.removeEventListener('pushState', this.onPushState) | ||||
|       }, | ||||
|       onPushState() { | ||||
|         this.store.setForCurrentTimeline({ ignoreBlurEvents: true }) | ||||
|       }, | ||||
|       saveFocus(e) { | ||||
|         let instanceName = this.store.get('currentInstance') | ||||
|         let timelineName = this.get('timeline') | ||||
|  |  | |||
|  | @ -47,6 +47,12 @@ export function instanceComputations (store) { | |||
|     (currentInstance, verifyCredentials) => verifyCredentials && verifyCredentials[currentInstance] | ||||
|   ) | ||||
| 
 | ||||
|   store.compute( | ||||
|     'currentInstanceInfo', | ||||
|     ['currentInstance', 'instanceInfos'], | ||||
|     (currentInstance, instanceInfos) => instanceInfos && instanceInfos[currentInstance] | ||||
|   ) | ||||
| 
 | ||||
|   store.compute( | ||||
|     'pinnedPage', | ||||
|     ['pinnedPages', 'currentInstance'], | ||||
|  |  | |||
							
								
								
									
										13
									
								
								routes/_store/instanceObservers.js
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										13
									
								
								routes/_store/instanceObservers.js
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,13 @@ | |||
| import { updateInstanceInfo, updateVerifyCredentialsForInstance } from '../_actions/instances' | ||||
| import { updateLists } from '../_actions/lists' | ||||
| 
 | ||||
| export function instanceObservers (store) { | ||||
|   store.observe('currentInstance', (currentInstance) => { | ||||
|     if (!currentInstance) { | ||||
|       return | ||||
|     } | ||||
|     updateVerifyCredentialsForInstance(currentInstance) | ||||
|     updateInstanceInfo(currentInstance) | ||||
|     updateLists() | ||||
|   }) | ||||
| } | ||||
|  | @ -1,11 +1,7 @@ | |||
| import { updateVerifyCredentialsForInstance } from '../_actions/instances' | ||||
| import { updateLists } from '../_actions/lists' | ||||
| import { instanceObservers } from './instanceObservers' | ||||
| import { timelineObservers } from './timelineObservers' | ||||
| 
 | ||||
| export function observers (store) { | ||||
|   store.observe('currentInstance', (currentInstance) => { | ||||
|     if (currentInstance) { | ||||
|       updateVerifyCredentialsForInstance(currentInstance) | ||||
|       updateLists() | ||||
|     } | ||||
|   }) | ||||
|   instanceObservers(store) | ||||
|   timelineObservers(store) | ||||
| } | ||||
|  |  | |||
|  | @ -35,7 +35,8 @@ const store = new PinaforeStore({ | |||
|   markMediaAsSensitive: false, | ||||
|   pinnedPages: {}, | ||||
|   instanceLists: {}, | ||||
|   pinnedStatuses: {} | ||||
|   pinnedStatuses: {}, | ||||
|   instanceInfos: {} | ||||
| }) | ||||
| 
 | ||||
| mixins(PinaforeStore) | ||||
|  |  | |||
|  | @ -14,6 +14,7 @@ export function timelineComputations (store) { | |||
|   computeForTimeline(store, 'initialized') | ||||
|   computeForTimeline(store, 'lastFocusedElementSelector') | ||||
|   computeForTimeline(store, 'ignoreBlurEvents') | ||||
|   computeForTimeline(store, 'itemIdsToAdd') | ||||
| 
 | ||||
|   store.compute('firstTimelineItemId', ['timelineItemIds'], (timelineItemIds) => timelineItemIds && timelineItemIds.length && timelineItemIds[0]) | ||||
|   store.compute('lastTimelineItemId', ['timelineItemIds'], (timelineItemIds) => timelineItemIds && timelineItemIds.length && timelineItemIds[timelineItemIds.length - 1]) | ||||
|  |  | |||
							
								
								
									
										38
									
								
								routes/_store/timelineObservers.js
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										38
									
								
								routes/_store/timelineObservers.js
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,38 @@ | |||
| import { updateInstanceInfo } from '../_actions/instances' | ||||
| import { createStream } from '../_actions/streaming' | ||||
| 
 | ||||
| export function timelineObservers (store) { | ||||
| 
 | ||||
|   let currentTimelineStream | ||||
| 
 | ||||
|   store.observe('currentTimeline', async (currentTimeline) => { | ||||
|     if (!process.browser) { | ||||
|       return | ||||
|     } | ||||
|     if (currentTimelineStream) { | ||||
|       currentTimelineStream.close() | ||||
|       currentTimelineStream = null | ||||
|     } | ||||
|     if (!currentTimeline) { | ||||
|       return | ||||
|     } | ||||
|     if (!(['home', 'local', 'federated'].includes(currentTimeline) || | ||||
|         currentTimeline.startsWith('list/') || | ||||
|         currentTimeline.startsWith('tag/'))) { | ||||
|       return | ||||
|     } | ||||
| 
 | ||||
|     let currentInstance = store.get('currentInstance') | ||||
|     let accessToken = store.get('accessToken') | ||||
|     await updateInstanceInfo(currentInstance) | ||||
|     let instanceInfo = store.get('currentInstanceInfo') | ||||
|     if (!(instanceInfo && | ||||
|         store.get('currentInstance') === currentInstance && | ||||
|         store.get('currentTimeline') === currentTimeline)) { | ||||
|       return | ||||
|     } | ||||
| 
 | ||||
|     currentTimelineStream = createStream(instanceInfo.urls.streaming_api, | ||||
|       currentInstance, accessToken, currentTimeline) | ||||
|   }) | ||||
| } | ||||
|  | @ -23,4 +23,8 @@ export const importRequestIdleCallback = () => import( | |||
| 
 | ||||
| export const importIndexedDBGetAllShim = () => import( | ||||
|   /* webpackChunkName: 'indexeddb-getall-shim' */ 'indexeddb-getall-shim' | ||||
|   ) | ||||
|   ) | ||||
| 
 | ||||
| export const importWebSocketClient = () => import( | ||||
|   /* webpackChunkName: '@gamestdio/websocket' */ '@gamestdio/websocket' | ||||
|   ).then(mod => mod.default) | ||||
		Loading…
	
	Add table
		
		Reference in a new issue