Cleanup of join logic, and making sure that lobbyPeers are sent to all peers if last peer with PROMOTE_PEER leaves and allowWhenRoleMissing contains PROMOTE_PEER, fixes #303

auto_join_3.3
Håvar Aambø Fosstveit 2020-05-08 22:11:48 +02:00
parent c73ee5c26b
commit a9e9a1c1fa
1 changed files with 157 additions and 98 deletions

View File

@ -1,4 +1,5 @@
const EventEmitter = require('events').EventEmitter;
const AwaitQueue = require('awaitqueue');
const axios = require('axios');
const Logger = require('./Logger');
const Lobby = require('./Lobby');
@ -49,6 +50,8 @@ const roomPermissions =
...config.permissionsFromRoles
};
const roomAllowWhenRoleMissing = config.allowWhenRoleMissing || [];
const ROUTER_SCALE_SIZE = config.routerScaleSize || 40;
class Room extends EventEmitter
@ -115,6 +118,9 @@ class Room extends EventEmitter
// Closed flag.
this._closed = false;
// Joining queue
this._queue = new AwaitQueue();
// Locked flag.
this._locked = false;
@ -166,6 +172,10 @@ class Room extends EventEmitter
this._closed = true;
this._queue.close();
this._queue = null;
if (this._selfDestructTimeout)
clearTimeout(this._selfDestructTimeout);
@ -288,7 +298,7 @@ class Room extends EventEmitter
this._peerJoining(promotedPeer);
for (const peer of this._getPeersWithPermission(PROMOTE_PEER))
for (const peer of this._getAllowedPeers(PROMOTE_PEER))
{
this._notification(peer.socket, 'lobby:promotedPeer', { peerId: id });
}
@ -319,7 +329,7 @@ class Room extends EventEmitter
{
const { id, displayName } = changedPeer;
for (const peer of this._getPeersWithPermission(PROMOTE_PEER))
for (const peer of this._getAllowedPeers(PROMOTE_PEER))
{
this._notification(peer.socket, 'lobby:changeDisplayName', { peerId: id, displayName });
}
@ -329,7 +339,7 @@ class Room extends EventEmitter
{
const { id, picture } = changedPeer;
for (const peer of this._getPeersWithPermission(PROMOTE_PEER))
for (const peer of this._getAllowedPeers(PROMOTE_PEER))
{
this._notification(peer.socket, 'lobby:changePicture', { peerId: id, picture });
}
@ -341,7 +351,7 @@ class Room extends EventEmitter
const { id } = closedPeer;
for (const peer of this._getPeersWithPermission(PROMOTE_PEER))
for (const peer of this._getAllowedPeers(PROMOTE_PEER))
{
this._notification(peer.socket, 'lobby:peerClosed', { peerId: id });
}
@ -447,114 +457,91 @@ class Room extends EventEmitter
{
this._lobby.parkPeer(parkPeer);
for (const peer of this._getPeersWithPermission(PROMOTE_PEER))
for (const peer of this._getAllowedPeers(PROMOTE_PEER))
{
this._notification(peer.socket, 'parkedPeer', { peerId: parkPeer.id });
}
}
async _peerJoining(peer, returning = false)
_peerJoining(peer, returning = false)
{
peer.socket.join(this._roomId);
// If we don't have this peer, add to end
!this._lastN.includes(peer.id) && this._lastN.push(peer.id);
this._peers[peer.id] = peer;
// Assign routerId
peer.routerId = await this._getRouterId();
this._handlePeer(peer);
if (returning)
this._queue.push(async () =>
{
this._notification(peer.socket, 'roomBack');
}
else
{
const token = jwt.sign({ id: peer.id }, this._uuid, { noTimestamp: true });
peer.socket.join(this._roomId);
peer.socket.handshake.session.token = token;
// If we don't have this peer, add to end
!this._lastN.includes(peer.id) && this._lastN.push(peer.id);
peer.socket.handshake.session.save();
this._peers[peer.id] = peer;
let turnServers;
if ('turnAPIURI' in config)
// Assign routerId
peer.routerId = await this._getRouterId();
this._handlePeer(peer);
if (returning)
{
try
{
const { data } = await axios.get(
config.turnAPIURI,
{
params : {
...config.turnAPIparams,
'api_key' : config.turnAPIKey,
'ip' : peer.socket.request.connection.remoteAddress
}
});
turnServers = [ {
urls : data.uris,
username : data.username,
credential : data.password
} ];
}
catch (error)
{
if ('backupTurnServers' in config)
turnServers = config.backupTurnServers;
logger.error('_peerJoining() | error on REST turn [error:"%o"]', error);
}
this._notification(peer.socket, 'roomBack');
}
else if ('backupTurnServers' in config)
else
{
turnServers = config.backupTurnServers;
const token = jwt.sign({ id: peer.id }, this._uuid, { noTimestamp: true });
peer.socket.handshake.session.token = token;
peer.socket.handshake.session.save();
let turnServers;
if ('turnAPIURI' in config)
{
try
{
const { data } = await axios.get(
config.turnAPIURI,
{
params : {
...config.turnAPIparams,
'api_key' : config.turnAPIKey,
'ip' : peer.socket.request.connection.remoteAddress
}
});
turnServers = [ {
urls : data.uris,
username : data.username,
credential : data.password
} ];
}
catch (error)
{
if ('backupTurnServers' in config)
turnServers = config.backupTurnServers;
logger.error('_peerJoining() | error on REST turn [error:"%o"]', error);
}
}
else if ('backupTurnServers' in config)
{
turnServers = config.backupTurnServers;
}
this._notification(peer.socket, 'roomReady', { turnServers });
}
this._notification(peer.socket, 'roomReady', { turnServers });
}
})
.catch((error) =>
{
logger.error('_peerJoining() [error:"%o"]', error);
});
}
_handlePeer(peer)
{
logger.debug('_handlePeer() [peer:"%s"]', peer.id);
peer.socket.on('request', (request, cb) =>
{
logger.debug(
'Peer "request" event [method:"%s", peerId:"%s"]',
request.method, peer.id);
this._handleSocketRequest(peer, request, cb)
.catch((error) =>
{
logger.error('"request" failed [error:"%o"]', error);
cb(error);
});
});
peer.on('close', () =>
{
if (this._closed)
return;
// If the Peer was joined, notify all Peers.
if (peer.joined)
this._notification(peer.socket, 'peerClosed', { peerId: peer.id }, true);
// Remove from lastN
this._lastN = this._lastN.filter((id) => id !== peer.id);
delete this._peers[peer.id];
// If this is the last Peer in the room and
// lobby is empty, close the room after a while.
if (this.checkEmpty() && this._lobby.checkEmpty())
this.selfDestructCountdown();
this._handlePeerClose(peer);
});
peer.on('displayNameChanged', ({ oldDisplayName }) =>
@ -620,6 +607,69 @@ class Room extends EventEmitter
role : oldRole
}, true, true);
});
peer.socket.on('request', (request, cb) =>
{
logger.debug(
'Peer "request" event [method:"%s", peerId:"%s"]',
request.method, peer.id);
this._handleSocketRequest(peer, request, cb)
.catch((error) =>
{
logger.error('"request" failed [error:"%o"]', error);
cb(error);
});
});
// Peer left before we were done joining
if (peer.closed)
this._handlePeerClose(peer);
}
_handlePeerClose(peer)
{
logger.debug('_handlePeerClose() [peer:"%s"]', peer.id);
if (this._closed)
return;
// If the Peer was joined, notify all Peers.
if (peer.joined)
this._notification(peer.socket, 'peerClosed', { peerId: peer.id }, true);
// Remove from lastN
this._lastN = this._lastN.filter((id) => id !== peer.id);
// Need this to know if this peer was the last with PROMOTE_PEER
const hasPromotePeer = peer.roles.some((role) =>
roomPermissions[PROMOTE_PEER].includes(role)
);
delete this._peers[peer.id];
// No peers left with PROMOTE_PEER, might need to give
// lobbyPeers to peers that are left.
if (
hasPromotePeer &&
!this._lobby.checkEmpty() &&
roomAllowWhenRoleMissing.includes(PROMOTE_PEER) &&
this._getPeersWithPermission(PROMOTE_PEER).length === 0
)
{
const lobbyPeers = this._lobby.peerList();
for (const allowedPeer of this._getAllowedPeers(PROMOTE_PEER))
{
this._notification(allowedPeer.socket, 'parkedPeers', { lobbyPeers });
}
}
// If this is the last Peer in the room and
// lobby is empty, close the room after a while.
if (this.checkEmpty() && this._lobby.checkEmpty())
this.selfDestructCountdown();
}
async _handleSocketRequest(peer, request, cb)
@ -656,13 +706,9 @@ class Room extends EventEmitter
// Tell the new Peer about already joined Peers.
// And also create Consumers for existing Producers.
const joinedPeers =
[
...this._getJoinedPeers()
];
const joinedPeers = this._getJoinedPeers(peer);
const peerInfos = joinedPeers
.filter((joinedPeer) => joinedPeer.id !== peer.id)
.map((joinedPeer) => (joinedPeer.peerInfo));
let lobbyPeers = [];
@ -678,7 +724,7 @@ class Room extends EventEmitter
authenticated : peer.authenticated,
roomPermissions : roomPermissions,
userRoles : userRoles,
allowWhenRoleMissing : config.allowWhenRoleMissing,
allowWhenRoleMissing : roomAllowWhenRoleMissing,
chatHistory : this._chatHistory,
fileHistory : this._fileHistory,
lastNHistory : this._lastN,
@ -1622,8 +1668,7 @@ class Room extends EventEmitter
// Allow if config is set, and no one is present
if (
'allowWhenRoleMissing' in config &&
config.allowWhenRoleMissing.includes(permission) &&
roomAllowWhenRoleMissing.includes(permission) &&
this._getPeersWithPermission(permission).length === 0
)
return true;
@ -1645,6 +1690,20 @@ class Room extends EventEmitter
.filter((peer) => peer.joined && peer !== excludePeer);
}
_getAllowedPeers(permission = null, excludePeer = undefined, joined = true)
{
const peers = this._getPeersWithPermission(permission, excludePeer, joined);
if (peers.length > 0)
return peers;
// Allow if config is set, and no one is present
if (roomAllowWhenRoleMissing.includes(permission))
return Object.values(this._peers);
return peers;
}
_getPeersWithPermission(permission = null, excludePeer = undefined, joined = true)
{
return Object.values(this._peers)