channelstream.js

/**
 * Main Channelstream connection class
 */
export class ChannelStreamConnection {

    static get version() {
        return '0.0.2';
    }

    constructor() {
        this.debug = false;
        /** List of channels user should be subscribed to. */
        this.channels = [];
        /** Username of connecting user. */
        this.username = 'Anonymous';
        /** Connection identifier. */
        this.connectionId = null;
        /** Websocket instance. */
        this.websocket = null;
        /** Websocket connection url. */
        this.websocketUrl = '';
        /** URL used in `connect()`. */
        this.connectUrl = '';
        /** URL used in `disconnect()`. */
        this.disconnectUrl = '';
        /** URL used in `subscribe()`. */
        this.subscribeUrl = '';
        /** URL used in `unsubscribe()`. */
        this.unsubscribeUrl = '';
        /** URL used in `updateUserState()`. */
        this.userStateUrl = '';
        /** URL used in `message()`. */
        this.messageUrl = '';
        /** URL used in `editMessage()`. */
        this.messageEditUrl = '';
        /** URL used in `deleteMessage()`. */
        this.messageDeleteUrl = '';
        /** Long-polling connection url. */
        this.longPollUrl = '';
        /** Long-polling connection url. */
        this.shouldReconnect = true;
        /** Should send heartbeats. */
        this.heartbeats = true;
        /** How much should every retry interval increase (in milliseconds) */
        this.increaseBounceIv = 2000;
        this._currentBounceIv = 0;
        /** Should use websockets or long-polling by default */
        this.noWebsocket = false;
        this.connected = false;

        /**
         * Mutators hold functions that you can set locally to change the data
         * that the client is sending to all endpoints
         * you can call it like `elem.mutators('connect', yourFunc())`
         * mutators will be executed in order they were pushed onto arrays
         *
         */
        this.mutators = {
            connect: [],
            message: [],
            messageEdit: [],
            messageDelete: [],
            subscribe: [],
            unsubscribe: [],
            disconnect: [],
            userState: []
        }
    }

    /**
     * Sends AJAX call that creates user and fetches connection information
     * from the server.
     *
     */
    connect() {
        let request = new ChannelStreamRequest();
        request.url = this.connectUrl;
        request.body = {
            username: this.username,
            channels: this.channels
        };
        for (let callable of this.mutators.connect) {
            callable(request);
        }
        request.handleError = this._handleConnectError.bind(this);
        request.handleResponse = this._handleConnect.bind(this);
        request.execute();
    }

    /**
     *
     * Add custom function that will manipulate request before its being executed
     *
     * @param type {string} type of mutator function to register
     * @param func {function} a callable to register
     */
    addMutator(type, func) {
        this.mutators[type].push(func);
    }

    /**
     * Sends AJAX request to update user state.
     * @param stateObj {object}
     */
    updateUserState(stateObj) {
        let request = new ChannelStreamRequest();
        request.url = this.userStateUrl;
        request.body = {
            username: this.username,
            conn_id: this.connectionId,
            update_state: stateObj
        };
        for (let callable of this.mutators.userState) {
            callable(request);
        }
        request.handleError = this._handleSetUserStateError.bind(this);
        request.handleResponse = this._handleSetUserState.bind(this);
        request.execute();
    }

    /**
     * Subscribes user to channels.
     * @param channels {string[]} List of channels sent via POST to `subscribeUrl`.
     */
    subscribe(channels) {
        let request = new ChannelStreamRequest();
        request.url = this.subscribeUrl;
        request.body = {
            channels: channels,
            conn_id: this.connectionId
        };
        for (let callable of this.mutators.subscribe) {
            callable(request);
        }
        request.handleError = this._handleSubscribeError.bind(this);
        request.handleResponse = this._handleSubscribe.bind(this);
        if (request.body.channels && request.body.channels.length) {
            request.execute('POST');
        }
    }

    /**
     * Unsubscribes user from channels.
     * @param channels {string[]} List of channels sent via POST to `unsubscribeUrl`.
     */
    unsubscribe(channels) {
        let request = new ChannelStreamRequest();
        request.url = this.unsubscribeUrl;
        request.body = {
            channels: channels,
            conn_id: this.connectionId
        };
        for (let callable of this.mutators.unsubscribe) {
            callable(request);
        }
        request.handleError = this._handleUnsubscribeError.bind(this);
        request.handleResponse = this._handleUnsubscribe.bind(this);
        request.execute('POST');
    }

    /**
     * calculates list of channels we should add user to based on difference
     * between channels property and passed channel list
     * @param channels {string[]} List of channels to subscribe
     */
    calculateSubscribe(channels) {
        let toSubscribe = [];
        for (let channel of channels) {
            if (this.channels.indexOf(channel) === -1) {
                toSubscribe.push(channel);
            }
        }
        return toSubscribe;
    }

    /**
     * calculates list of channels we should remove user from based difference
     * between channels property and passed channel list
     * @param channels {string[]} List of channels to un-subscribe
     */
    calculateUnsubscribe(channels) {
        if (!channels) {
            channels = []
        }
        let toUnsubscribe = [];

        for (let channel of channels) {
            if (this.channels.indexOf(channel) !== -1) {
                toUnsubscribe.push(channel);
            }
        }
        return toUnsubscribe;
    }

    /**
     * Marks the connection as expired via /disconnect API.
     *
     */
    disconnect() {
        let request = new ChannelStreamRequest();
        request.url = this.disconnectUrl + '?conn_id=' + this.connectionId;
        request.body = {
            conn_id: this.connectionId
        };
        for (let callable of this.mutators.disconnect) {
            callable(request);
        }
        request.handleResponse = this._handleDisconnect.bind(this);
        request.execute();
        this.closeConnection();
    }

    /**
     * Sends a POST to the web application backend.
     * @param message {object} Message object sent via POST to `messageUrl`.
     */
    message(message) {
        let request = new ChannelStreamRequest();
        request.url = this.messageUrl;
        request.body = message;
        for (let callable of this.mutators.message) {
            callable(request);
        }
        request.handleError = this._handleMessageError.bind(this);
        request.handleResponse = this._handleMessage.bind(this);
        request.execute('POST');
    }

    /**
     * Sends a DELETE request to the web application backend.
     * @param message {object} Message object sent to DELETE to `messageUrl`.
     */
    delete(message) {
        let request = new ChannelStreamRequest();
        request.url = this.messageDeleteUrl;
        for (let callable of this.mutators.messageDelete) {
            callable(request);
        }
        request.body = message;
        request.handleError = this._handleMessageDeleteError.bind(this);
        request.handleResponse = this._handleMessageDelete.bind(this);
        request.execute('DELETE');
    }

    /**
     * Sends a PATCH request to the web application backend.
     * @param message {object} Message object sent via PATCH to `messageUrl`.
     */
    edit(message) {
        let request = new ChannelStreamRequest();
        request.url = this.messageEditUrl;
        request.body = message;
        for (let callable of this.mutators.messageEdit) {
            callable(request);
        }
        request.handleError = this._handleMessageEditError.bind(this);
        request.handleResponse = this._handleMessageEdit.bind(this);
        request.execute('PATCH');
    }

    /**
     * Opens "long lived" (websocket/longpoll) connection to the channelstream server.
     * @param request
     * @param data
     */
    startListening(request, data) {
        this.beforeListeningCallback(request, data);
        if (this.noWebsocket === false) {
            this.noWebsocket = !window.WebSocket;
        }
        if (this.noWebsocket === false) {
            this.openWebsocket();
        } else {
            this.openLongPoll();
        }
    }

    /**
     * Fired before connection start listening for messages
     * @param request
     * @param data
     */
    beforeListeningCallback(request, data) {
        if (!this.debug) {
            return;
        }
        console.log('beforeListeningCallback', request, data);
    }

    /**
     * Opens websocket connection.
     *
     */
    openWebsocket() {
        let url = this.websocketUrl + '?conn_id=' + this.connectionId;
        this.websocket = new WebSocket(url);
        this.websocket.onopen = this._handleListenOpen.bind(this);
        this.websocket.onclose = this._handleWebsocketCloseEvent.bind(this);
        this.websocket.onerror = this._handleListenErrorEvent.bind(this);
        this.websocket.onmessage = this._handleListenWSMessageEvent.bind(this);
    }

    /**
     * Opens long-poll connection.
     *
     */
    openLongPoll() {
        let request = new ChannelStreamRequest();
        request.url = this.longPollUrl + '?conn_id=' + this.connectionId;
        request.handleError = this._handleListenErrorEvent.bind(this);
        request.handleRequest = function () {
            this.connected = true;
            this.listenOpenedCallback(request);
        }.bind(this);
        request.handleResponse = function (request, data) {
            this._handleListenMessageEvent(data);
        }.bind(this);
        request.execute();
        this._ajaxListen = request;
    }

    /**
     * Retries `connect()` call while incrementing interval between tries up to 1 minute.
     *
     */
    retryConnection() {
        if (!this.shouldReconnect) {
            return;
        }
        if (this._currentBounceIv < 60000) {
            this._currentBounceIv = this._currentBounceIv + this.increaseBounceIv;
        } else {
            this._currentBounceIv = 60000;
        }
        setTimeout(this.connect.bind(this), this._currentBounceIv);
    }

    /**
     * Closes currently listening connection.
     *
     */
    closeConnection() {
        if (this.websocket && this.websocket.readyState === WebSocket.OPEN) {
            this.websocket.onclose = null;
            this.websocket.onerror = null;
            this.websocket.close();
        }
        if (this._ajaxListen) {
            let request = this._ajaxListen.request;
            request.abort();
        }
        this.connected = false;
        this.connectionClosedCallback();
    }

    /**
     * Fired when listening connection is closed
     */
    connectionClosedCallback() {
        if (!this.debug) {
            return;
        }
        console.log('connectionClosedCallback');
    }

    /**
     * Fired when channels property get mutated
     */
    channelsChangedCallback(data) {
        if (!this.debug) {
            return;
        }
        console.log('channelsChangedCallback', data);
    }

    /**
     *
     * @param request
     * @param data
     * @private
     */
    _handleListenOpen(request, data) {
        this.connected = true;
        this.listenOpenedCallback(request, data);
        this.createHeartBeats();
    }

    /**
     * Fired when client starts listening for messages
     * @param request
     * @param data
     */
    listenOpenedCallback(request, data) {
        if (!this.debug) {
            return;
        }
        console.log('listenOpenedCallback', request, data);
    }

    /**
     * Starts sending heartbeats to maintain connection and notify server
     */
    createHeartBeats() {
        if (typeof this._heartbeat === 'undefined' && this.websocket !== null && this.heartbeats) {
            this._heartbeat = setInterval(this._sendHeartBeat.bind(this), 10000);
        }
    }

    _sendHeartBeat() {
        if (this.websocket.readyState === WebSocket.OPEN && this.heartbeats) {
            this.websocket.send(JSON.stringify({type: 'heartbeat'}));
        }
    }

    /**
     *
     * @param request
     * @param data
     * @private
     */
    _handleListenError(request, data) {
        this.connected = false;
        this.retryConnection(request, data);
    }

    /**
     *
     * @param request
     * @param data
     * @private
     */
    _handleConnectError(request, data) {
        this.connected = false;
        this.retryConnection(request, data);
        this.connectErrorCallback(request, data);
    }

    /**
     * Fired when client fails connect() call
     * @param request
     * @param data
     */
    connectErrorCallback(request, data) {
        if (!this.debug) {
            return;
        }
        console.log('connectErrorCallback', request, data);
    }

    /**
     * Handles long-polling payloads
     * @param data
     * @private
     */
    _handleListenMessageEvent(data) {
        setTimeout(this.openLongPoll.bind(this), 0);
        this.listenMessageCallback(data);
    }

    /**
     * Handles ws payloads
     * @param data
     * @private
     */
    _handleListenWSMessageEvent(data) {
        let parsedData = JSON.parse(data.data);
        this.listenMessageCallback(parsedData);
    }

    /**
     * Fired when messages are received
     * @param data
     */
    listenMessageCallback(data) {
        if (!this.debug) {
            return;
        }
        console.log('listenMessageCallback', data)
    }

    /**
     *
     * @param request
     * @param data
     * @private
     */
    _handleWebsocketCloseEvent(request, data) {
        this.connected = false;
        this.listenCloseCallback(request, data);
        this.retryConnection();
    }

    /**
     * Fired on websocket connection close event
     * @param request
     * @param data
     */
    listenCloseCallback(request, data) {
        if (!this.debug) {
            return;
        }
        console.log('listenCloseCallback', request, data);
    }

    /**
     *
     * @param request
     * @param data
     * @private
     */
    _handleListenErrorEvent(request, data) {
        this.connected = false;
        this.listenErrorCallback(request, data);
    }

    /**
     * Fired on long-pool/websocket connection error event
     * @param request
     * @param data
     */
    listenErrorCallback(request, data) {
        if (!this.debug) {
            return;
        }
        console.log('listenErrorCallback', request, data);
    }

    /**
     *
     * @param request
     * @param data
     * @private
     */
    _handleConnect(request, data) {
        this.currentBounceIv = 0;
        this.connectionId = data.conn_id;
        this.channels = data.channels;
        this.channelsChangedCallback(this.channels);
        this.connectCallback(request, data);
        this.startListening(request, data);
    }

    /**
     * Fired on successful connect() call
     * @param request
     * @param data
     */
    connectCallback(request, data) {
        if (!this.debug) {
            return;
        }
        console.log('connectCallback', request, data);
    }

    /**
     *
     * @param request
     * @param data
     * @private
     */
    _handleDisconnect(request, data) {
        this.connected = false;
        this.disconnectCallback(request, data);
    }

    /**
     * Fired after successful disconnect() call
     * @param request
     * @param data
     */
    disconnectCallback(request, data) {
        if (!this.debug) {
            return;
        }
        console.log('disconnectCallback', request, data);
    }

    /**
     *
     * @param request
     * @param data
     * @private
     */
    _handleMessage(request, data) {
        this.messageCallback(request, data);
    }

    /**
     * Fired on successful message() call
     * @param request
     * @param data
     */
    messageCallback(request, data) {
        if (!this.debug) {
            return;
        }
        console.log('messageCallback', request, data);
    }

    /**
     *
     * @param request
     * @param data
     * @private
     */
    _handleMessageError(request, data) {
        this.messageErrorCallback(request, data);
    }

    /**
     * Fired on message() call error
     * @param request
     * @param data
     */
    messageErrorCallback(request, data) {
        if (!this.debug) {
            return;
        }
        console.log('messageErrorCallback', request, data)
    }

    /**
     *
     * @param request
     * @param data
     * @private
     */
    _handleMessageEdit(request, data) {
        this.messageEditCallback(request, data);
    }

    /**
     * Fired on successful edit() call
     * @param request
     * @param data
     */
    messageEditCallback(request, data) {
        if (!this.debug) {
            return;
        }
        console.log('messageCallback', request, data);
    }

    /**
     *
     * @param request
     * @param data
     * @private
     */
    _handleMessageEditError(request, data) {
        this.messageEditErrorCallback(request, data);
    }

    /**
     * Fired on edit() call error
     * @param request
     * @param data
     */
    messageEditErrorCallback(request, data) {
        if (!this.debug) {
            return;
        }
        console.log('messageEditErrorCallback', request, data)
    }

    /**
     *
     * @param request
     * @param data
     * @private
     */
    _handleMessageDelete(request, data) {
        this.messageDeleteCallback(request, data);
    }

    /**
     * Fired on successful delete() call
     * @param request
     * @param data
     */
    messageDeleteCallback(request, data) {
        if (!this.debug) {
            return;
        }
        console.log('messageCallback', request, data);
    }

    /**
     *
     * @param request
     * @param data
     * @private
     */
    _handleMessageDeleteError(request, data) {
        this.messageDeleteErrorCallback(request, data);
    }

    /**
     * Fired on delete() call error
     * @param request
     * @param data
     */
    messageDeleteErrorCallback(request, data) {
        if (!this.debug) {
            return;
        }
        console.log('messageDeleteErrorCallback', request, data)
    }

    /**
     *
     * @param request
     * @param data
     * @private
     */
    _handleSubscribe(request, data) {
        this.channels = data.channels;
        this.channelsChangedCallback(this.channels);
        this.subscribeCallback(request, data);
    }

    /**
     * Fired on successful subscribe() call
     * @param request
     * @param data
     */
    subscribeCallback(request, data) {
        if (!this.debug) {
            return;
        }
        console.log('subscribeCallback', request, data)
    }

    /**
     *
     * @param request
     * @param data
     * @private
     */
    _handleSubscribeError(request, data) {
        this.subscribeErrorCallback(request, data);
    }

    /**
     * Fired on subscribe() call error
     * @param request
     * @param data
     */
    subscribeErrorCallback(request, data) {
        if (!this.debug) {
            return;
        }
        console.log('subscribeErrorCallback', request, data);
    }

    /**
     *
     * @param request
     * @param data
     * @private
     */
    _handleUnsubscribe(request, data) {
        this.channels = data.channels;
        this.channelsChangedCallback(this.channels);
        this.unsubscribeCallback(request, data);
    }

    /**
     * Fired on successful unsubscribe() call
     * @param request
     * @param data
     */
    unsubscribeCallback(request, data) {
        if (!this.debug) {
            return;
        }
        console.log('unsubscribeCallback', request, data);
    }

    /**
     *
     * @param request
     * @param data
     * @private
     */
    _handleUnsubscribeError(request, data) {
        this.unsubscribeErrorCallback(request, data);
    }

    /**
     * Fired on unsubscribe() call error
     * @param request
     * @param data
     */
    unsubscribeErrorCallback(request, data) {
        if (!this.debug) {
            return;
        }
        console.log('unsubscribeErrorCallback', request, data)
    }

    /**
     *
     * @param request
     * @param data
     * @private
     */
    _handleSetUserState(request, data) {
        this.setUserStateCallback(request, data);
    }

    /**
     * Fired on successful updateUserState() call
     * @param request
     * @param data
     */
    setUserStateCallback(request, data) {
        if (!this.debug) {
            return;
        }
        console.log('setUserStateCallback', request, data)
    }

    /**
     *
     * @param request
     * @param data
     * @private
     */
    _handleSetUserStateError(request, data) {
        this.setUserStateErrorCallback(request, data);
    }

    /**
     * Fired on updateUserState() error
     * @param request
     * @param data
     */
    setUserStateErrorCallback(request, data) {
        if (!this.debug) {
            return;
        }
        console.log('setUserStateErrorCallback', request, data);
    }
};

/**
 * Base class for making ajax requests
 */
class ChannelStreamRequest {

    constructor() {
        this.headers = [];
        this.body = null;
        this.url = '';
        this.request = null;
    }

    /**
     * Placeholder for error handling function
     * @param request
     * @param respText
     */
    handleError(request, respText) {
        console.error('request', request);
        console.error('respText', respText);
    };

    /**
     * Placeholder for sucessful response handler
     * @param request
     * @param respText
     */
    handleResponse(request, respText) {
        console.info('request', request);
        console.info('respText', respText);
    };

    /**
     * Placeholder for in-progress requests
     * @param request
     */
    handleRequest(request) {
    };

    handleStateChange() {
        let result = this.request.responseText;
        try {
            result = JSON.parse(result);
        } catch (exc) {

        }
        if (this.request.readyState === XMLHttpRequest.DONE) {
            if (this.request.status && this.request.status <= 400) {
                this.handleResponse(this.request, result);
            } else {
                this.handleError(this.request, result);
            }
        } else {
            this.handleRequest(this.request);
        }
    };

    /**
     * Execute AJAX request using specific verb, can send JSON payloads
     * @param verb {string} HTTP verb
     */
    execute(verb) {
        this.request = new XMLHttpRequest();
        this.request.onreadystatechange = this.handleStateChange.bind(this);
        if (this.headers) {
            for (let i = 0; i < this.headers.length; i++) {
                this.request.setRequestHeader(
                    this.headers[i].name, this.headers[i].value);
            }
        }
        if (this.body) {
            this.request.open(verb || 'POST', this.url);
            this.request.setRequestHeader('Content-Type', 'application/json');
            this.request.send(JSON.stringify(this.body));
        } else {
            this.request.open(verb || 'GET', this.url);
            this.request.send();
        }
    };
}