From a9e9a1c1fad7c7f9575b7f9873b5351e47fd3454 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5var=20Aamb=C3=B8=20Fosstveit?= Date: Fri, 8 May 2020 22:11:48 +0200 Subject: [PATCH] Cleanup of join logic, and making sure that lobbyPeers are sent to all peers if last peer with PROMOTE_PEER leaves and allowWhenRoleMissing contains PROMOTE_PEER, fixes #303 --- server/lib/Room.js | 255 ++++++++++++++++++++++++++++----------------- 1 file changed, 157 insertions(+), 98 deletions(-) diff --git a/server/lib/Room.js b/server/lib/Room.js index 3ab281b..74accc4 100644 --- a/server/lib/Room.js +++ b/server/lib/Room.js @@ -1,4 +1,5 @@ const EventEmitter = require('events').EventEmitter; +const AwaitQueue = require('awaitqueue'); const axios = require('axios'); const Logger = require('./Logger'); const Lobby = require('./Lobby'); @@ -49,6 +50,8 @@ const roomPermissions = ...config.permissionsFromRoles }; +const roomAllowWhenRoleMissing = config.allowWhenRoleMissing || []; + const ROUTER_SCALE_SIZE = config.routerScaleSize || 40; class Room extends EventEmitter @@ -115,6 +118,9 @@ class Room extends EventEmitter // Closed flag. this._closed = false; + // Joining queue + this._queue = new AwaitQueue(); + // Locked flag. this._locked = false; @@ -166,6 +172,10 @@ class Room extends EventEmitter this._closed = true; + this._queue.close(); + + this._queue = null; + if (this._selfDestructTimeout) clearTimeout(this._selfDestructTimeout); @@ -288,7 +298,7 @@ class Room extends EventEmitter this._peerJoining(promotedPeer); - for (const peer of this._getPeersWithPermission(PROMOTE_PEER)) + for (const peer of this._getAllowedPeers(PROMOTE_PEER)) { this._notification(peer.socket, 'lobby:promotedPeer', { peerId: id }); } @@ -319,7 +329,7 @@ class Room extends EventEmitter { const { id, displayName } = changedPeer; - for (const peer of this._getPeersWithPermission(PROMOTE_PEER)) + for (const peer of this._getAllowedPeers(PROMOTE_PEER)) { this._notification(peer.socket, 'lobby:changeDisplayName', { peerId: id, displayName }); } @@ -329,7 +339,7 @@ class Room extends EventEmitter { const { id, picture } = changedPeer; - for (const peer of this._getPeersWithPermission(PROMOTE_PEER)) + for (const peer of this._getAllowedPeers(PROMOTE_PEER)) { this._notification(peer.socket, 'lobby:changePicture', { peerId: id, picture }); } @@ -341,7 +351,7 @@ class Room extends EventEmitter const { id } = closedPeer; - for (const peer of this._getPeersWithPermission(PROMOTE_PEER)) + for (const peer of this._getAllowedPeers(PROMOTE_PEER)) { this._notification(peer.socket, 'lobby:peerClosed', { peerId: id }); } @@ -447,114 +457,91 @@ class Room extends EventEmitter { this._lobby.parkPeer(parkPeer); - for (const peer of this._getPeersWithPermission(PROMOTE_PEER)) + for (const peer of this._getAllowedPeers(PROMOTE_PEER)) { this._notification(peer.socket, 'parkedPeer', { peerId: parkPeer.id }); } } - async _peerJoining(peer, returning = false) + _peerJoining(peer, returning = false) { - peer.socket.join(this._roomId); - - // If we don't have this peer, add to end - !this._lastN.includes(peer.id) && this._lastN.push(peer.id); - - this._peers[peer.id] = peer; - - // Assign routerId - peer.routerId = await this._getRouterId(); - - this._handlePeer(peer); - - if (returning) + this._queue.push(async () => { - this._notification(peer.socket, 'roomBack'); - } - else - { - const token = jwt.sign({ id: peer.id }, this._uuid, { noTimestamp: true }); + peer.socket.join(this._roomId); - peer.socket.handshake.session.token = token; + // If we don't have this peer, add to end + !this._lastN.includes(peer.id) && this._lastN.push(peer.id); - peer.socket.handshake.session.save(); + this._peers[peer.id] = peer; - let turnServers; - - if ('turnAPIURI' in config) + // Assign routerId + peer.routerId = await this._getRouterId(); + + this._handlePeer(peer); + + if (returning) { - try - { - const { data } = await axios.get( - config.turnAPIURI, - { - params : { - ...config.turnAPIparams, - 'api_key' : config.turnAPIKey, - 'ip' : peer.socket.request.connection.remoteAddress - } - }); - - turnServers = [ { - urls : data.uris, - username : data.username, - credential : data.password - } ]; - } - catch (error) - { - if ('backupTurnServers' in config) - turnServers = config.backupTurnServers; - - logger.error('_peerJoining() | error on REST turn [error:"%o"]', error); - } + this._notification(peer.socket, 'roomBack'); } - else if ('backupTurnServers' in config) + else { - turnServers = config.backupTurnServers; + const token = jwt.sign({ id: peer.id }, this._uuid, { noTimestamp: true }); + + peer.socket.handshake.session.token = token; + + peer.socket.handshake.session.save(); + + let turnServers; + + if ('turnAPIURI' in config) + { + try + { + const { data } = await axios.get( + config.turnAPIURI, + { + params : { + ...config.turnAPIparams, + 'api_key' : config.turnAPIKey, + 'ip' : peer.socket.request.connection.remoteAddress + } + }); + + turnServers = [ { + urls : data.uris, + username : data.username, + credential : data.password + } ]; + } + catch (error) + { + if ('backupTurnServers' in config) + turnServers = config.backupTurnServers; + + logger.error('_peerJoining() | error on REST turn [error:"%o"]', error); + } + } + else if ('backupTurnServers' in config) + { + turnServers = config.backupTurnServers; + } + + this._notification(peer.socket, 'roomReady', { turnServers }); } - - this._notification(peer.socket, 'roomReady', { turnServers }); - } + }) + .catch((error) => + { + logger.error('_peerJoining() [error:"%o"]', error); + }); } _handlePeer(peer) { logger.debug('_handlePeer() [peer:"%s"]', peer.id); - peer.socket.on('request', (request, cb) => - { - logger.debug( - 'Peer "request" event [method:"%s", peerId:"%s"]', - request.method, peer.id); - - this._handleSocketRequest(peer, request, cb) - .catch((error) => - { - logger.error('"request" failed [error:"%o"]', error); - - cb(error); - }); - }); - peer.on('close', () => { - if (this._closed) - return; - - // If the Peer was joined, notify all Peers. - if (peer.joined) - this._notification(peer.socket, 'peerClosed', { peerId: peer.id }, true); - - // Remove from lastN - this._lastN = this._lastN.filter((id) => id !== peer.id); - - delete this._peers[peer.id]; - - // If this is the last Peer in the room and - // lobby is empty, close the room after a while. - if (this.checkEmpty() && this._lobby.checkEmpty()) - this.selfDestructCountdown(); + this._handlePeerClose(peer); }); peer.on('displayNameChanged', ({ oldDisplayName }) => @@ -620,6 +607,69 @@ class Room extends EventEmitter role : oldRole }, true, true); }); + + peer.socket.on('request', (request, cb) => + { + logger.debug( + 'Peer "request" event [method:"%s", peerId:"%s"]', + request.method, peer.id); + + this._handleSocketRequest(peer, request, cb) + .catch((error) => + { + logger.error('"request" failed [error:"%o"]', error); + + cb(error); + }); + }); + + // Peer left before we were done joining + if (peer.closed) + this._handlePeerClose(peer); + } + + _handlePeerClose(peer) + { + logger.debug('_handlePeerClose() [peer:"%s"]', peer.id); + + if (this._closed) + return; + + // If the Peer was joined, notify all Peers. + if (peer.joined) + this._notification(peer.socket, 'peerClosed', { peerId: peer.id }, true); + + // Remove from lastN + this._lastN = this._lastN.filter((id) => id !== peer.id); + + // Need this to know if this peer was the last with PROMOTE_PEER + const hasPromotePeer = peer.roles.some((role) => + roomPermissions[PROMOTE_PEER].includes(role) + ); + + delete this._peers[peer.id]; + + // No peers left with PROMOTE_PEER, might need to give + // lobbyPeers to peers that are left. + if ( + hasPromotePeer && + !this._lobby.checkEmpty() && + roomAllowWhenRoleMissing.includes(PROMOTE_PEER) && + this._getPeersWithPermission(PROMOTE_PEER).length === 0 + ) + { + const lobbyPeers = this._lobby.peerList(); + + for (const allowedPeer of this._getAllowedPeers(PROMOTE_PEER)) + { + this._notification(allowedPeer.socket, 'parkedPeers', { lobbyPeers }); + } + } + + // If this is the last Peer in the room and + // lobby is empty, close the room after a while. + if (this.checkEmpty() && this._lobby.checkEmpty()) + this.selfDestructCountdown(); } async _handleSocketRequest(peer, request, cb) @@ -656,13 +706,9 @@ class Room extends EventEmitter // Tell the new Peer about already joined Peers. // And also create Consumers for existing Producers. - const joinedPeers = - [ - ...this._getJoinedPeers() - ]; + const joinedPeers = this._getJoinedPeers(peer); const peerInfos = joinedPeers - .filter((joinedPeer) => joinedPeer.id !== peer.id) .map((joinedPeer) => (joinedPeer.peerInfo)); let lobbyPeers = []; @@ -678,7 +724,7 @@ class Room extends EventEmitter authenticated : peer.authenticated, roomPermissions : roomPermissions, userRoles : userRoles, - allowWhenRoleMissing : config.allowWhenRoleMissing, + allowWhenRoleMissing : roomAllowWhenRoleMissing, chatHistory : this._chatHistory, fileHistory : this._fileHistory, lastNHistory : this._lastN, @@ -1622,8 +1668,7 @@ class Room extends EventEmitter // Allow if config is set, and no one is present if ( - 'allowWhenRoleMissing' in config && - config.allowWhenRoleMissing.includes(permission) && + roomAllowWhenRoleMissing.includes(permission) && this._getPeersWithPermission(permission).length === 0 ) return true; @@ -1645,6 +1690,20 @@ class Room extends EventEmitter .filter((peer) => peer.joined && peer !== excludePeer); } + _getAllowedPeers(permission = null, excludePeer = undefined, joined = true) + { + const peers = this._getPeersWithPermission(permission, excludePeer, joined); + + if (peers.length > 0) + return peers; + + // Allow if config is set, and no one is present + if (roomAllowWhenRoleMissing.includes(permission)) + return Object.values(this._peers); + + return peers; + } + _getPeersWithPermission(permission = null, excludePeer = undefined, joined = true) { return Object.values(this._peers)