reusable-streaming (#5709)

This commit is contained in:
masarakki 2017-11-16 00:04:15 +09:00 committed by Eugen Rochko
parent f6bc6399e2
commit c73a1fb537
2 changed files with 69 additions and 49 deletions

View File

@ -1,4 +1,4 @@
import createStream from '../stream'; import { connectStream } from '../stream';
import { import {
updateTimeline, updateTimeline,
deleteFromTimelines, deleteFromTimelines,
@ -12,42 +12,19 @@ import { getLocale } from '../locales';
const { messages } = getLocale(); const { messages } = getLocale();
export function connectTimelineStream (timelineId, path, pollingRefresh = null) { export function connectTimelineStream (timelineId, path, pollingRefresh = null) {
return (dispatch, getState) => {
const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']); return connectStream (path, pollingRefresh, (dispatch, getState) => {
const accessToken = getState().getIn(['meta', 'access_token']);
const locale = getState().getIn(['meta', 'locale']); const locale = getState().getIn(['meta', 'locale']);
let polling = null; return {
onConnect() {
const setupPolling = () => {
polling = setInterval(() => {
pollingRefresh(dispatch);
}, 20000);
};
const clearPolling = () => {
if (polling) {
clearInterval(polling);
polling = null;
}
};
const subscription = createStream(streamingAPIBaseURL, accessToken, path, {
connected () {
if (pollingRefresh) {
clearPolling();
}
dispatch(connectTimeline(timelineId)); dispatch(connectTimeline(timelineId));
}, },
disconnected () { onDisconnect() {
if (pollingRefresh) {
setupPolling();
}
dispatch(disconnectTimeline(timelineId)); dispatch(disconnectTimeline(timelineId));
}, },
received (data) { onReceive (data) {
switch(data.event) { switch(data.event) {
case 'update': case 'update':
dispatch(updateTimeline(timelineId, JSON.parse(data.payload))); dispatch(updateTimeline(timelineId, JSON.parse(data.payload)));
@ -60,26 +37,8 @@ export function connectTimelineStream (timelineId, path, pollingRefresh = null)
break; break;
} }
}, },
};
reconnected () {
if (pollingRefresh) {
clearPolling();
pollingRefresh(dispatch);
}
dispatch(connectTimeline(timelineId));
},
}); });
const disconnect = () => {
if (subscription) {
subscription.close();
}
clearPolling();
};
return disconnect;
};
} }
function refreshHomeTimelineAndNotification (dispatch) { function refreshHomeTimelineAndNotification (dispatch) {

View File

@ -1,5 +1,66 @@
import WebSocketClient from 'websocket.js'; import WebSocketClient from 'websocket.js';
export function connectStream(path, pollingRefresh = null, callbacks = () => ({ onConnect() {}, onDisconnect() {}, onReceive() {} })) {
return (dispatch, getState) => {
const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']);
const accessToken = getState().getIn(['meta', 'access_token']);
const { onConnect, onDisconnect, onReceive } = callbacks(dispatch, getState);
let polling = null;
const setupPolling = () => {
polling = setInterval(() => {
pollingRefresh(dispatch);
}, 20000);
};
const clearPolling = () => {
if (polling) {
clearInterval(polling);
polling = null;
}
};
const subscription = getStream(streamingAPIBaseURL, accessToken, path, {
connected () {
if (pollingRefresh) {
clearPolling();
}
onConnect();
},
disconnected () {
if (pollingRefresh) {
setupPolling();
}
onDisconnect();
},
received (data) {
onReceive(data);
},
reconnected () {
if (pollingRefresh) {
clearPolling();
pollingRefresh(dispatch);
}
onConnect();
},
});
const disconnect = () => {
if (subscription) {
subscription.close();
}
clearPolling();
};
return disconnect;
};
}
export default function getStream(streamingAPIBaseURL, accessToken, stream, { connected, received, disconnected, reconnected }) { export default function getStream(streamingAPIBaseURL, accessToken, stream, { connected, received, disconnected, reconnected }) {
const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?access_token=${accessToken}&stream=${stream}`); const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?access_token=${accessToken}&stream=${stream}`);