Refactored a bit of code and added a Peer class to the server.

master
Håvar Aambø Fosstveit 2019-10-25 13:34:28 +02:00
parent 76c6b5121e
commit 39c3f7c36d
7 changed files with 571 additions and 336 deletions

View File

@ -110,3 +110,8 @@ This started as a fork of the [work](https://github.com/versatica/mediasoup-demo
## License ## License
MIT MIT
Contributions to this work were made on behalf of the GÉANT project, a project that has received funding from the European Unions Horizon 2020 research and innovation programme under Grant Agreement No. 731122 (GN4-2). On behalf of GÉANT project, GÉANT Association is the sole owner of the copyright in all material which was developed by a member of the GÉANT project.
GÉANT Vereniging (Association) is registered with the Chamber of Commerce in Amsterdam with registration number 40535155 and operates in the UK as a branch of GÉANT Vereniging. Registered office: Hoekenrode 3, 1102BR Amsterdam, The Netherlands. UK branch address: City House, 126-130 Hills Road, Cambridge CB2 1PQ, UK.

View File

@ -77,11 +77,11 @@ export default class RoomClient
} }
constructor( constructor(
{ roomId, peerId, accessCode, device, useSimulcast, produce, consume, forceTcp }) { roomId, peerId, accessCode, device, useSimulcast, produce, forceTcp })
{ {
logger.debug( logger.debug(
'constructor() [roomId: "%s", peerId: "%s", device: "%s", useSimulcast: "%s", produce: "%s", consume: "%s", forceTcp: "%s"]', 'constructor() [roomId: "%s", peerId: "%s", device: "%s", useSimulcast: "%s", produce: "%s", forceTcp: "%s"]',
roomId, peerId, device.flag, useSimulcast, produce, consume, forceTcp); roomId, peerId, device.flag, useSimulcast, produce, forceTcp);
this._signalingUrl = getSignalingUrl(peerId, roomId); this._signalingUrl = getSignalingUrl(peerId, roomId);
@ -94,9 +94,6 @@ export default class RoomClient
// Whether we should produce. // Whether we should produce.
this._produce = produce; this._produce = produce;
// Whether we should consume.
this._consume = consume;
// Wheter we force TCP // Wheter we force TCP
this._forceTcp = forceTcp; this._forceTcp = forceTcp;
@ -308,7 +305,7 @@ export default class RoomClient
login() login()
{ {
const url = `/auth/login?roomId=${this._roomId}&peerId=${this._peerId}`; const url = `/auth/login?id=${this._peerId}`;
this._loginWindow = window.open(url, 'loginWindow'); this._loginWindow = window.open(url, 'loginWindow');
} }
@ -1705,44 +1702,41 @@ export default class RoomClient
}); });
} }
if (this._consume) const transportInfo = await this.sendRequest(
{ 'createWebRtcTransport',
const transportInfo = await this.sendRequest( {
'createWebRtcTransport', forceTcp : this._forceTcp,
{ producing : false,
forceTcp : this._forceTcp, consuming : true
producing : false, });
consuming : true
});
const { const {
id,
iceParameters,
iceCandidates,
dtlsParameters
} = transportInfo;
this._recvTransport = this._mediasoupDevice.createRecvTransport(
{
id, id,
iceParameters, iceParameters,
iceCandidates, iceCandidates,
dtlsParameters dtlsParameters
} = transportInfo; });
this._recvTransport = this._mediasoupDevice.createRecvTransport( this._recvTransport.on(
{ 'connect', ({ dtlsParameters }, callback, errback) => // eslint-disable-line no-shadow
id, {
iceParameters, this.sendRequest(
iceCandidates, 'connectWebRtcTransport',
dtlsParameters {
}); transportId : this._recvTransport.id,
dtlsParameters
this._recvTransport.on( })
'connect', ({ dtlsParameters }, callback, errback) => // eslint-disable-line no-shadow .then(callback)
{ .catch(errback);
this.sendRequest( });
'connectWebRtcTransport',
{
transportId : this._recvTransport.id,
dtlsParameters
})
.then(callback)
.catch(errback);
});
}
// Set our media capabilities. // Set our media capabilities.
store.dispatch(stateActions.setMediaCapabilities( store.dispatch(stateActions.setMediaCapabilities(
@ -1760,11 +1754,11 @@ export default class RoomClient
displayName : displayName, displayName : displayName,
picture : picture, picture : picture,
device : this._device, device : this._device,
rtpCapabilities : this._consume rtpCapabilities : this._mediasoupDevice.rtpCapabilities
? this._mediasoupDevice.rtpCapabilities
: undefined
}); });
logger.debug('_joinRoom() joined, got peers [peers:"%o"]', peers);
for (const peer of peers) for (const peer of peers)
{ {
store.dispatch( store.dispatch(

View File

@ -19,7 +19,7 @@ import * as serviceWorker from './serviceWorker';
import './index.css'; import './index.css';
if (process.env.REACT_APP_DEBUG === '*') if (process.env.REACT_APP_DEBUG === '*' || process.env.NODE_ENV !== 'production')
{ {
debug.enable('* -engine* -socket* -RIE* *WARN* *ERROR*'); debug.enable('* -engine* -socket* -RIE* *WARN* *ERROR*');
} }
@ -64,7 +64,6 @@ function run()
const accessCode = parameters.get('code'); const accessCode = parameters.get('code');
const produce = parameters.get('produce') !== 'false'; const produce = parameters.get('produce') !== 'false';
const consume = parameters.get('consume') !== 'false';
const useSimulcast = parameters.get('simulcast') === 'true'; const useSimulcast = parameters.get('simulcast') === 'true';
const forceTcp = parameters.get('forceTcp') === 'true'; const forceTcp = parameters.get('forceTcp') === 'true';
@ -85,7 +84,7 @@ function run()
); );
roomClient = new RoomClient( roomClient = new RoomClient(
{ roomId, peerId, accessCode, device, useSimulcast, produce, consume, forceTcp }); { roomId, peerId, accessCode, device, useSimulcast, produce, forceTcp });
global.CLIENT = roomClient; global.CLIENT = roomClient;

View File

@ -14,7 +14,7 @@ class Lobby extends EventEmitter
// Closed flag. // Closed flag.
this._closed = false; this._closed = false;
this._peers = {}; this._peers = new Map();
} }
close() close()
@ -23,110 +23,126 @@ class Lobby extends EventEmitter
this._closed = true; this._closed = true;
Object.values(this._peers).forEach((peer) => this._peers.forEach((peer) =>
{ {
if (peer.socket) if (!peer.closed)
peer.socket.disconnect(); peer.close();
}); });
this._peers = {}; this._peers.clear();
} }
checkEmpty() checkEmpty()
{ {
logger.info('checkEmpty()'); logger.info('checkEmpty()');
if (Object.keys(this._peers).length == 0)
return true return this._peers.size === 0;
else return false;
} }
peerList() peerList()
{ {
logger.info('peerList()'); logger.info('peerList()');
return Object.values(this._peers).map((peer) => return Array.from(this._peers.values()).map((peer) =>
({ ({
peerId : peer.peerId, peerId : peer.id,
displayName : peer.displayName displayName : peer.displayName
})); }));
} }
hasPeer(peerId) hasPeer(peerId)
{ {
return Boolean(this._peers[peerId]); return this._peers.has(peerId);
} }
promoteAllPeers() promoteAllPeers()
{ {
logger.info('promoteAllPeers()'); logger.info('promoteAllPeers()');
Object.values(this._peers).forEach((peer) => this._peers.forEach((peer) =>
{ {
if (peer.socket) if (peer.socket)
this.promotePeer(peer.peerId); this.promotePeer(peer.id);
}); });
} }
promotePeer(peerId) promotePeer(peerId)
{ {
logger.info('promotePeer() [peerId: %s]', peerId); logger.info('promotePeer() [peer:"%s"]', peerId);
const peer = this._peers[peerId]; const peer = this._peers.get(peerId);
if (peer) if (peer)
{ {
this.emit('promotePeer', peer); this.emit('promotePeer', peer);
delete this._peers[peerId]; this._peers.delete(peerId);
} }
} }
parkPeer({ peerId, consume, socket }) parkPeer(peer)
{ {
logger.info('parkPeer()'); logger.info('parkPeer() [peer:"%s"]', peer.id);
if ( this._closed ) return; if (this._closed)
return;
const peer = { peerId, socket, consume }; peer.socket.emit('notification', { method: 'enteredLobby', data: {} });
socket.emit('notification', { method: 'enteredLobby', data: {} }); this._peers.set(peer.id, peer);
this._peers[peerId] = peer; peer.on('authenticationChange', () =>
{
logger.info('parkPeer() | authenticationChange [peer:"%s"]', peer.id);
socket.on('request', (request, cb) => peer.authenticated && this.emit('peerAuthenticated', peer);
});
peer.socket.on('request', (request, cb) =>
{ {
logger.debug( logger.debug(
'Peer "request" event [method:%s, peerId:%s]', 'Peer "request" event [method:"%s", peer:"%s"]',
request.method, peer.peerId); request.method, peer.id);
if (this._closed) return; if (this._closed)
return;
this._handleSocketRequest(peer, request, cb) this._handleSocketRequest(peer, request, cb)
.catch((error) => .catch((error) =>
{ {
logger.error('request failed:%o', error); logger.error('request failed [error:"%o"]', error);
cb(error); cb(error);
}); });
}); });
socket.on('disconnect', () => peer.socket.on('disconnect', () =>
{ {
logger.debug('Peer "close" event [peerId:%s]', peer.peerId); logger.debug('Peer "close" event [peer:"%s"]', peer.id);
if (this._closed) return; if (this._closed)
return;
this.emit('peerClosed', peer); this.emit('peerClosed', peer);
delete this._peers[peer.peerId]; this._peers.delete(peer.id);
if ( this.checkEmpty() ) this.emit('lobbyEmpty'); if (this.checkEmpty())
this.emit('lobbyEmpty');
}); });
} }
async _handleSocketRequest(peer, request, cb) async _handleSocketRequest(peer, request, cb)
{ {
logger.debug('_handleSocketRequest [peer:%o], [request:%o]', peer, request); logger.debug(
if (this._closed) return; '_handleSocketRequest [peer:"%s"], [request:"%s"]',
peer.id,
request.method
);
if (this._closed)
return;
switch (request.method) switch (request.method)
{ {
case 'changeDisplayName': case 'changeDisplayName':

268
server/lib/Peer.js 100644
View File

@ -0,0 +1,268 @@
const EventEmitter = require('events').EventEmitter;
const Logger = require('./Logger');
const logger = new Logger('Peer');
class Peer extends EventEmitter
{
constructor({ id, socket })
{
logger.info('constructor() [id:"%s", socket:"%s"]', id, socket.id);
super();
this._id = id;
this._socket = socket;
this._closed = false;
this._joined = false;
this._authenticated = false;
this._displayName = false;
this._picture = null;
this._device = null;
this._rtpCapabilities = null;
this._raisedHand = false;
this._transports = new Map();
this._producers = new Map();
this._consumers = new Map();
this._handlePeer();
}
close()
{
logger.info('close()');
this._closed = true;
// Iterate and close all mediasoup Transport associated to this Peer, so all
// its Producers and Consumers will also be closed.
this.transports.forEach((transport) =>
{
transport.close();
});
if (this._socket)
this._socket.disconnect(true);
this.emit('close');
}
_handlePeer()
{
this.authenticated =
this.socket.handshake.session.passport &&
this.socket.handshake.session.passport.user;
this.socket.use((packet, next) =>
{
this.authenticated =
this.socket.handshake.session.passport &&
this.socket.handshake.session.passport.user;
return next();
});
this.socket.on('disconnect', () =>
{
if (this.closed)
return;
logger.debug('"disconnect" event [id:%s]', this.id);
this.close();
});
}
get id()
{
return this._id;
}
set id(id)
{
this._id = id;
}
get socket()
{
return this._socket;
}
set socket(socket)
{
this._socket = socket;
}
get closed()
{
return this._closed;
}
get joined()
{
return this._joined;
}
set joined(joined)
{
this._joined = joined;
}
get authenticated()
{
return this._authenticated;
}
set authenticated(authenticated)
{
if (authenticated !== this._authenticated)
{
logger.info('authenticated() | authenticationChange [peer:"%s", authenticated:"%s"]', this.id, authenticated);
this._authenticated = authenticated;
this.emit('authenticationChange');
}
}
get displayName()
{
return this._displayName;
}
set displayName(displayName)
{
this._displayName = displayName;
}
get picture()
{
return this._picture;
}
set picture(picture)
{
this._picture = picture;
}
get device()
{
return this._device;
}
set device(device)
{
this._device = device;
}
get rtpCapabilities()
{
return this._rtpCapabilities;
}
set rtpCapabilities(rtpCapabilities)
{
this._rtpCapabilities = rtpCapabilities;
}
get raisedHand()
{
return this._raisedHand;
}
set raisedHand(raisedHand)
{
this._raisedHand = raisedHand;
}
get transports()
{
return this._transports;
}
get producers()
{
return this._producers;
}
get consumers()
{
return this._consumers;
}
addTransport(id, transport)
{
this.transports.set(id, transport);
}
getTransport(id)
{
return this.transports.get(id);
}
getConsumerTransport()
{
return Array.from(this.transports.values())
.find((t) => t.appData.consuming);
}
removeTransport(id)
{
this.transports.delete(id);
}
addProducer(id, producer)
{
this.producers.set(id, producer);
}
getProducer(id)
{
return this.producers.get(id);
}
removeProducer(id)
{
this.producers.delete(id);
}
addConsumer(id, consumer)
{
this.consumers.set(id, consumer);
}
getConsumer(id)
{
return this.consumers.get(id);
}
removeConsumer(id)
{
this.consumers.delete(id);
}
get peerInfo()
{
const peerInfo =
{
id : this.id,
displayName : this.displayName,
picture : this.picture,
device : this.device
};
return peerInfo;
}
}
module.exports = Peer;

View File

@ -18,7 +18,7 @@ class Room extends EventEmitter
*/ */
static async create({ mediasoupWorker, roomId }) static async create({ mediasoupWorker, roomId })
{ {
logger.info('create() [roomId:%s, forceH264:%s]', roomId); logger.info('create() [roomId:"%s"]', roomId);
// Router media codecs. // Router media codecs.
const mediaCodecs = config.mediasoup.router.mediaCodecs; const mediaCodecs = config.mediasoup.router.mediaCodecs;
@ -62,39 +62,118 @@ class Room extends EventEmitter
this._lobby = new Lobby(); this._lobby = new Lobby();
this._chatHistory = [];
this._fileHistory = [];
this._lastN = [];
this._peers = new Map();
// mediasoup Router instance.
this._mediasoupRouter = mediasoupRouter;
// mediasoup AudioLevelObserver.
this._audioLevelObserver = audioLevelObserver;
// Current active speaker.
this._currentActiveSpeaker = null;
this._handleLobby();
this._handleAudioLevelObserver();
}
close()
{
logger.debug('close()');
this._closed = true;
this._lobby.close();
this._peers.forEach((peer) =>
{
if (!peer.closed)
peer.close();
});
this._peers.clear();
// Close the mediasoup Router.
this._mediasoupRouter.close();
// Emit 'close' event.
this.emit('close');
}
handlePeer(peer)
{
logger.info('handlePeer() [peer:"%s"]', peer.id);
// This will allow reconnects to join despite lock
if (this._peers.has(peer.id))
{
logger.warn(
'handleConnection() | there is already a peer with same peerId [peer:"%s"]',
peer.id);
peer.close();
return;
}
else if (
this._locked ||
(config.requireSignInToAccess && !peer.authenticated)
)
{
this._parkPeer(peer);
return;
}
this._peerJoining(peer);
}
_handleLobby()
{
this._lobby.on('promotePeer', (promotedPeer) => this._lobby.on('promotePeer', (promotedPeer) =>
{ {
logger.info('promotePeer() [promotedPeer:"%o"]', promotedPeer); logger.info('promotePeer() [promotedPeer:"%s"]', promotedPeer.id);
const { peerId } = promotedPeer; const { id } = promotedPeer;
this._peerJoining({ ...promotedPeer }); this._peerJoining(promotedPeer);
Object.values(this._peers).forEach((peer) => this._peers.forEach((peer) =>
{ {
this._notification(peer.socket, 'promotedPeer', { peerId }); this._notification(peer.socket, 'promotedPeer', { peerId: id });
}); });
}); });
this._lobby.on('lobbyPeerDisplayNameChanged', (changedPeer) => this._lobby.on('lobbyPeerDisplayNameChanged', (changedPeer) =>
{ {
const { peerId, displayName } = changedPeer; const { id, displayName } = changedPeer;
Object.values(this._peers).forEach((peer) => this._peers.forEach((peer) =>
{ {
this._notification(peer.socket, 'lobbyPeerDisplayNameChanged', { peerId, displayName }); this._notification(peer.socket, 'lobbyPeerDisplayNameChanged', { peerId: id, displayName });
}); });
}); });
this._lobby.on('peerAuthenticated', (peer) =>
{
!this._locked && this._lobby.promotePeer(peer.id);
});
this._lobby.on('peerClosed', (closedPeer) => this._lobby.on('peerClosed', (closedPeer) =>
{ {
logger.info('peerClosed() [closedPeer:"%o"]', closedPeer); logger.info('peerClosed() [closedPeer:"%s"]', closedPeer.id);
const { peerId } = closedPeer; const { id } = closedPeer;
Object.values(this._peers).forEach((peer) => this._peers.forEach((peer) =>
{ {
this._notification(peer.socket, 'lobbyPeerClosed', { peerId }); this._notification(peer.socket, 'lobbyPeerClosed', { peerId: id });
}); });
}); });
@ -107,34 +186,17 @@ class Room extends EventEmitter
this.selfDestructCountdown(); this.selfDestructCountdown();
} }
}); });
}
this._chatHistory = []; _handleAudioLevelObserver()
{
this._fileHistory = [];
this._lastN = [];
this._peers = {};
// mediasoup Router instance.
// @type {mediasoup.Router}
this._mediasoupRouter = mediasoupRouter;
// mediasoup AudioLevelObserver.
// @type {mediasoup.AudioLevelObserver}
this._audioLevelObserver = audioLevelObserver;
// Set audioLevelObserver events. // Set audioLevelObserver events.
this._audioLevelObserver.on('volumes', (volumes) => this._audioLevelObserver.on('volumes', (volumes) =>
{ {
const { producer, volume } = volumes[0]; const { producer, volume } = volumes[0];
// logger.debug(
// 'audioLevelObserver "volumes" event [producerId:%s, volume:%s]',
// producer.id, volume);
// Notify all Peers. // Notify all Peers.
Object.values(this._peers).forEach((peer) => this._peers.forEach((peer) =>
{ {
this._notification(peer.socket, 'activeSpeaker', { this._notification(peer.socket, 'activeSpeaker', {
peerId : producer.appData.peerId, peerId : producer.appData.peerId,
@ -145,18 +207,21 @@ class Room extends EventEmitter
this._audioLevelObserver.on('silence', () => this._audioLevelObserver.on('silence', () =>
{ {
// logger.debug('audioLevelObserver "silence" event');
// Notify all Peers. // Notify all Peers.
Object.values(this._peers).forEach((peer) => this._peers.forEach((peer) =>
{ {
this._notification(peer.socket, 'activeSpeaker', { peerId: null }); this._notification(peer.socket, 'activeSpeaker', { peerId: null });
}); });
}); });
}
// Current active speaker. logStatus()
// @type {mediasoup.Peer} {
this._currentActiveSpeaker = null; logger.info(
'logStatus() [room id:"%s", peers:"%s"]',
this._roomId,
this._peers.size
);
} }
get id() get id()
@ -173,10 +238,10 @@ class Room extends EventEmitter
if (this._closed) if (this._closed)
return; return;
if (this.checkEmpty() && this._lobby.checkEmpty()) if (this.checkEmpty())
{ {
logger.info( logger.info(
'Room deserted for some time, closing the room [roomId:%s]', 'Room deserted for some time, closing the room [roomId:"%s"]',
this._roomId); this._roomId);
this.close(); this.close();
} }
@ -185,170 +250,68 @@ class Room extends EventEmitter
}, 10000); }, 10000);
} }
close()
{
logger.debug('close()');
this._closed = true;
this._lobby.close();
Object.values(this._peers).forEach((peer) =>
{
if (peer.socket)
peer.socket.disconnect();
});
this._peers = {};
// Close the mediasoup Router.
this._mediasoupRouter.close();
// Emit 'close' event.
this.emit('close');
}
logStatus()
{
logger.info(
'logStatus() [room id:"%s", peers:%o]',
this._roomId,
this._peers
);
}
// checks both room and lobby // checks both room and lobby
checkEmpty() checkEmpty()
{ {
if ((Object.keys(this._peers).length == 0) && (this._lobby.checkEmpty())) if ((this._peers.size == 0) && (this._lobby.checkEmpty()))
return true; return true;
else else
return false; return false;
} }
handleConnection({ peerId, consume, socket }) _parkPeer(parkPeer)
{ {
logger.info('handleConnection() [peerId:"%s"]', peerId); this._lobby.parkPeer(parkPeer);
// This will allow reconnects to join despite lock this._peers.forEach((peer) =>
if (this._peers[peerId])
{ {
logger.warn( this._notification(peer.socket, 'parkedPeer', { peerId: parkPeer.id });
'handleConnection() | there is already a peer with same peerId, ' +
'closing the previous one [peerId:"%s"]',
peerId);
const peer = this._peers[peerId];
peer.socket.disconnect();
delete this._peers[peerId];
}
else if (this._locked) // Don't allow connections to a locked room
this._parkPeer({ peerId, consume, socket })
else if (config.requireSignInToAccess) // Only allow signed in users directly into room
{
const { passport } = socket.handshake.session;
if (passport && passport.user)
this._peerJoining({ peerId, consume, socket });
else
this._parkPeer({ peerId, consume, socket })
}
else
this._peerJoining({ peerId, consume, socket });
}
_parkPeer({ peerId, consume, socket })
{
this._lobby.parkPeer({ peerId, consume, socket });
Object.values(this._peers).forEach((peer) =>
{
this._notification(peer.socket, 'parkedPeer', { peerId });
}); });
} }
_peerJoining({ peerId, consume, socket }) _peerJoining(peer)
{ {
socket.join(this._roomId); peer.socket.join(this._roomId);
const peer = { id: peerId, socket: socket }; const index = this._lastN.indexOf(peer.id);
const index = this._lastN.indexOf(peerId);
if (index === -1) // We don't have this peer, add to end if (index === -1) // We don't have this peer, add to end
{ {
this._lastN.push(peerId); this._lastN.push(peer.id);
} }
this._peers[peerId] = peer; this._peers.set(peer.id, peer);
this._handlePeer({ peer, consume }); this._handlePeer(peer);
this._notification(socket, 'roomReady'); this._notification(peer.socket, 'roomReady');
} }
isLocked() _handlePeer(peer)
{
return this._locked;
}
peerAuthenticated(peerId)
{
logger.debug('peerAuthenticated() | [peerId:"%s"]', peerId);
if (!this._locked)
{
if (!this._peers[peerId])
{
this._lobby.promotePeer(peerId);
}
}
}
_handlePeer({ peer, consume })
{ {
logger.debug('_handlePeer() [peer:"%s"]', peer.id); logger.debug('_handlePeer() [peer:"%s"]', peer.id);
peer.data = {};
// Not joined after a custom protoo 'join' request is later received.
peer.data.consume = consume;
peer.data.joined = false;
peer.data.displayName = undefined;
peer.data.device = undefined;
peer.data.rtpCapabilities = undefined;
peer.data.raiseHandState = false;
// Have mediasoup related maps ready even before the Peer joins since we
// allow creating Transports before joining.
peer.data.transports = new Map();
peer.data.producers = new Map();
peer.data.consumers = new Map();
peer.socket.on('request', (request, cb) => peer.socket.on('request', (request, cb) =>
{ {
logger.debug( logger.debug(
'Peer "request" event [method:%s, peerId:%s]', 'Peer "request" event [method:"%s", peerId:"%s"]',
request.method, peer.id); request.method, peer.id);
this._handleSocketRequest(peer, request, cb) this._handleSocketRequest(peer, request, cb)
.catch((error) => .catch((error) =>
{ {
logger.error('request failed:%o', error); logger.error('"request" failed [error:"%o"]', error);
cb(error); cb(error);
}); });
}); });
peer.socket.on('disconnect', () => peer.on('close', () =>
{ {
if (this._closed) if (this._closed)
return; return;
logger.debug('Peer "disconnect" event [peerId:%s]', peer.id);
// If the Peer was joined, notify all Peers. // If the Peer was joined, notify all Peers.
if (peer.data.joined) if (peer.joined)
{ {
this._notification(peer.socket, 'peerClosed', { peerId: peer.id }, true); this._notification(peer.socket, 'peerClosed', { peerId: peer.id }, true);
} }
@ -360,14 +323,7 @@ class Room extends EventEmitter
this._lastN.splice(index, 1); this._lastN.splice(index, 1);
} }
// Iterate and close all mediasoup Transport associated to this Peer, so all this._peers.delete(peer.id);
// its Producers and Consumers will also be closed.
for (const transport of peer.data.transports.values())
{
transport.close();
}
delete this._peers[peer.id];
// If this is the last Peer in the room and // If this is the last Peer in the room and
// lobby is empty, close the room after a while. // lobby is empty, close the room after a while.
@ -382,7 +338,6 @@ class Room extends EventEmitter
{ {
switch (request.method) switch (request.method)
{ {
case 'getRouterRtpCapabilities': case 'getRouterRtpCapabilities':
{ {
cb(null, this._mediasoupRouter.rtpCapabilities); cb(null, this._mediasoupRouter.rtpCapabilities);
@ -393,7 +348,7 @@ class Room extends EventEmitter
case 'join': case 'join':
{ {
// Ensure the Peer is not already joined. // Ensure the Peer is not already joined.
if (peer.data.joined) if (peer.joined)
throw new Error('Peer already joined'); throw new Error('Peer already joined');
const { const {
@ -403,30 +358,24 @@ class Room extends EventEmitter
rtpCapabilities rtpCapabilities
} = request.data; } = request.data;
// Store client data into the protoo Peer data object. // Store client data into the Peer data object.
peer.data.displayName = displayName; peer.displayName = displayName;
peer.data.picture = picture; peer.picture = picture;
peer.data.device = device; peer.device = device;
peer.data.rtpCapabilities = rtpCapabilities; peer.rtpCapabilities = rtpCapabilities;
// Tell the new Peer about already joined Peers. // Tell the new Peer about already joined Peers.
// And also create Consumers for existing Producers. // And also create Consumers for existing Producers.
const peerInfos = []; const peerInfos = [];
Object.values(this._peers).forEach((joinedPeer) => this._peers.forEach((joinedPeer) =>
{ {
if (joinedPeer.data.joined) if (joinedPeer.joined)
{ {
peerInfos.push( peerInfos.push(joinedPeer.peerInfo);
{
id : joinedPeer.id,
displayName : joinedPeer.data.displayName,
picture : joinedPeer.data.picture,
device : joinedPeer.data.device
});
for (const producer of joinedPeer.data.producers.values()) joinedPeer.producers.forEach((producer) =>
{ {
this._createConsumer( this._createConsumer(
{ {
@ -434,14 +383,14 @@ class Room extends EventEmitter
producerPeer : joinedPeer, producerPeer : joinedPeer,
producer producer
}); });
} });
} }
}); });
cb(null, { peers: peerInfos }); cb(null, { peers: peerInfos });
// Mark the new Peer as joined. // Mark the new Peer as joined.
peer.data.joined = true; peer.joined = true;
this._notification( this._notification(
peer.socket, peer.socket,
@ -456,8 +405,8 @@ class Room extends EventEmitter
); );
logger.debug( logger.debug(
'peer joined [peeerId: %s, displayName: %s, picture: %s, device: %o]', 'peer joined [peer: "%s", displayName: "%s", picture: "%s"]',
peer.id, displayName, picture, device); peer.id, displayName, picture);
break; break;
} }
@ -483,8 +432,8 @@ class Room extends EventEmitter
appData : { producing, consuming } appData : { producing, consuming }
}); });
// Store the WebRtcTransport into the protoo Peer data Object. // Store the WebRtcTransport into the Peer data Object.
peer.data.transports.set(transport.id, transport); peer.addTransport(transport.id, transport);
cb( cb(
null, null,
@ -508,7 +457,7 @@ class Room extends EventEmitter
case 'connectWebRtcTransport': case 'connectWebRtcTransport':
{ {
const { transportId, dtlsParameters } = request.data; const { transportId, dtlsParameters } = request.data;
const transport = peer.data.transports.get(transportId); const transport = peer.getTransport(transportId);
if (!transport) if (!transport)
throw new Error(`transport with id "${transportId}" not found`); throw new Error(`transport with id "${transportId}" not found`);
@ -523,7 +472,7 @@ class Room extends EventEmitter
case 'restartIce': case 'restartIce':
{ {
const { transportId } = request.data; const { transportId } = request.data;
const transport = peer.data.transports.get(transportId); const transport = peer.getTransport(transportId);
if (!transport) if (!transport)
throw new Error(`transport with id "${transportId}" not found`); throw new Error(`transport with id "${transportId}" not found`);
@ -538,12 +487,12 @@ class Room extends EventEmitter
case 'produce': case 'produce':
{ {
// Ensure the Peer is joined. // Ensure the Peer is joined.
if (!peer.data.joined) if (!peer.joined)
throw new Error('Peer not yet joined'); throw new Error('Peer not yet joined');
const { transportId, kind, rtpParameters } = request.data; const { transportId, kind, rtpParameters } = request.data;
let { appData } = request.data; let { appData } = request.data;
const transport = peer.data.transports.get(transportId); const transport = peer.getTransport(transportId);
if (!transport) if (!transport)
throw new Error(`transport with id "${transportId}" not found`); throw new Error(`transport with id "${transportId}" not found`);
@ -555,31 +504,27 @@ class Room extends EventEmitter
const producer = const producer =
await transport.produce({ kind, rtpParameters, appData }); await transport.produce({ kind, rtpParameters, appData });
// Store the Producer into the protoo Peer data Object. // Store the Producer into the Peer data Object.
peer.data.producers.set(producer.id, producer); peer.addProducer(producer.id, producer);
// Set Producer events. // Set Producer events.
producer.on('score', (score) => producer.on('score', (score) =>
{ {
// logger.debug(
// 'producer "score" event [producerId:%s, score:%o]',
// producer.id, score);
this._notification(peer.socket, 'producerScore', { producerId: producer.id, score }); this._notification(peer.socket, 'producerScore', { producerId: producer.id, score });
}); });
producer.on('videoorientationchange', (videoOrientation) => producer.on('videoorientationchange', (videoOrientation) =>
{ {
logger.debug( logger.debug(
'producer "videoorientationchange" event [producerId:%s, videoOrientation:%o]', 'producer "videoorientationchange" event [producerId:"%s", videoOrientation:"%o"]',
producer.id, videoOrientation); producer.id, videoOrientation);
}); });
cb(null, { id: producer.id }); cb(null, { id: producer.id });
Object.values(this._peers).forEach((otherPeer) => this._peers.forEach((otherPeer) =>
{ {
if (otherPeer.data.joined && otherPeer !== peer) if (otherPeer.joined && otherPeer !== peer)
{ {
this._createConsumer( this._createConsumer(
{ {
@ -603,11 +548,11 @@ class Room extends EventEmitter
case 'closeProducer': case 'closeProducer':
{ {
// Ensure the Peer is joined. // Ensure the Peer is joined.
if (!peer.data.joined) if (!peer.joined)
throw new Error('Peer not yet joined'); throw new Error('Peer not yet joined');
const { producerId } = request.data; const { producerId } = request.data;
const producer = peer.data.producers.get(producerId); const producer = peer.getProducer(producerId);
if (!producer) if (!producer)
throw new Error(`producer with id "${producerId}" not found`); throw new Error(`producer with id "${producerId}" not found`);
@ -615,7 +560,7 @@ class Room extends EventEmitter
producer.close(); producer.close();
// Remove from its map. // Remove from its map.
peer.data.producers.delete(producer.id); peer.removeProducer(producer.id);
cb(); cb();
@ -625,11 +570,11 @@ class Room extends EventEmitter
case 'pauseProducer': case 'pauseProducer':
{ {
// Ensure the Peer is joined. // Ensure the Peer is joined.
if (!peer.data.joined) if (!peer.joined)
throw new Error('Peer not yet joined'); throw new Error('Peer not yet joined');
const { producerId } = request.data; const { producerId } = request.data;
const producer = peer.data.producers.get(producerId); const producer = peer.getProducer(producerId);
if (!producer) if (!producer)
throw new Error(`producer with id "${producerId}" not found`); throw new Error(`producer with id "${producerId}" not found`);
@ -644,11 +589,11 @@ class Room extends EventEmitter
case 'resumeProducer': case 'resumeProducer':
{ {
// Ensure the Peer is joined. // Ensure the Peer is joined.
if (!peer.data.joined) if (!peer.joined)
throw new Error('Peer not yet joined'); throw new Error('Peer not yet joined');
const { producerId } = request.data; const { producerId } = request.data;
const producer = peer.data.producers.get(producerId); const producer = peer.getProducer(producerId);
if (!producer) if (!producer)
throw new Error(`producer with id "${producerId}" not found`); throw new Error(`producer with id "${producerId}" not found`);
@ -663,11 +608,11 @@ class Room extends EventEmitter
case 'pauseConsumer': case 'pauseConsumer':
{ {
// Ensure the Peer is joined. // Ensure the Peer is joined.
if (!peer.data.joined) if (!peer.joined)
throw new Error('Peer not yet joined'); throw new Error('Peer not yet joined');
const { consumerId } = request.data; const { consumerId } = request.data;
const consumer = peer.data.consumers.get(consumerId); const consumer = peer.getConsumer(consumerId);
if (!consumer) if (!consumer)
throw new Error(`consumer with id "${consumerId}" not found`); throw new Error(`consumer with id "${consumerId}" not found`);
@ -682,11 +627,11 @@ class Room extends EventEmitter
case 'resumeConsumer': case 'resumeConsumer':
{ {
// Ensure the Peer is joined. // Ensure the Peer is joined.
if (!peer.data.joined) if (!peer.joined)
throw new Error('Peer not yet joined'); throw new Error('Peer not yet joined');
const { consumerId } = request.data; const { consumerId } = request.data;
const consumer = peer.data.consumers.get(consumerId); const consumer = peer.getConsumer(consumerId);
if (!consumer) if (!consumer)
throw new Error(`consumer with id "${consumerId}" not found`); throw new Error(`consumer with id "${consumerId}" not found`);
@ -701,11 +646,11 @@ class Room extends EventEmitter
case 'setConsumerPreferedLayers': case 'setConsumerPreferedLayers':
{ {
// Ensure the Peer is joined. // Ensure the Peer is joined.
if (!peer.data.joined) if (!peer.joined)
throw new Error('Peer not yet joined'); throw new Error('Peer not yet joined');
const { consumerId, spatialLayer, temporalLayer } = request.data; const { consumerId, spatialLayer, temporalLayer } = request.data;
const consumer = peer.data.consumers.get(consumerId); const consumer = peer.getConsumer(consumerId);
if (!consumer) if (!consumer)
throw new Error(`consumer with id "${consumerId}" not found`); throw new Error(`consumer with id "${consumerId}" not found`);
@ -720,11 +665,11 @@ class Room extends EventEmitter
case 'requestConsumerKeyFrame': case 'requestConsumerKeyFrame':
{ {
// Ensure the Peer is joined. // Ensure the Peer is joined.
if (!peer.data.joined) if (!peer.joined)
throw new Error('Peer not yet joined'); throw new Error('Peer not yet joined');
const { consumerId } = request.data; const { consumerId } = request.data;
const consumer = peer.data.consumers.get(consumerId); const consumer = peer.getConsumer(consumerId);
if (!consumer) if (!consumer)
throw new Error(`consumer with id "${consumerId}" not found`); throw new Error(`consumer with id "${consumerId}" not found`);
@ -739,7 +684,7 @@ class Room extends EventEmitter
case 'getTransportStats': case 'getTransportStats':
{ {
const { transportId } = request.data; const { transportId } = request.data;
const transport = peer.data.transports.get(transportId); const transport = peer.getTransport(transportId);
if (!transport) if (!transport)
throw new Error(`transport with id "${transportId}" not found`); throw new Error(`transport with id "${transportId}" not found`);
@ -754,7 +699,7 @@ class Room extends EventEmitter
case 'getProducerStats': case 'getProducerStats':
{ {
const { producerId } = request.data; const { producerId } = request.data;
const producer = peer.data.producers.get(producerId); const producer = peer.getProducer(producerId);
if (!producer) if (!producer)
throw new Error(`producer with id "${producerId}" not found`); throw new Error(`producer with id "${producerId}" not found`);
@ -769,7 +714,7 @@ class Room extends EventEmitter
case 'getConsumerStats': case 'getConsumerStats':
{ {
const { consumerId } = request.data; const { consumerId } = request.data;
const consumer = peer.data.consumers.get(consumerId); const consumer = peer.getConsumer(consumerId);
if (!consumer) if (!consumer)
throw new Error(`consumer with id "${consumerId}" not found`); throw new Error(`consumer with id "${consumerId}" not found`);
@ -784,13 +729,13 @@ class Room extends EventEmitter
case 'changeDisplayName': case 'changeDisplayName':
{ {
// Ensure the Peer is joined. // Ensure the Peer is joined.
if (!peer.data.joined) if (!peer.joined)
throw new Error('Peer not yet joined'); throw new Error('Peer not yet joined');
const { displayName } = request.data; const { displayName } = request.data;
const oldDisplayName = peer.data.displayName; const oldDisplayName = peer.displayName;
peer.data.displayName = displayName; peer.displayName = displayName;
// Spread to others // Spread to others
this._notification(peer.socket, 'changeDisplayName', { this._notification(peer.socket, 'changeDisplayName', {
@ -808,7 +753,7 @@ class Room extends EventEmitter
case 'changeProfilePicture': case 'changeProfilePicture':
{ {
// Ensure the Peer is joined. // Ensure the Peer is joined.
if (!peer.data.joined) if (!peer.joined)
throw new Error('Peer not yet joined'); throw new Error('Peer not yet joined');
const { picture } = request.data; const { picture } = request.data;
@ -973,14 +918,14 @@ class Room extends EventEmitter
case 'raiseHand': case 'raiseHand':
{ {
const { raiseHandState } = request.data; const { raisedHand } = request.data;
peer.data.raiseHandState = raiseHandState; peer.raisedHand = raisedHand;
// Spread to others // Spread to others
this._notification(peer.socket, 'raiseHand', { this._notification(peer.socket, 'raiseHand', {
peerId : peer.id, peerId : peer.id,
raiseHandState : raiseHandState raisedHand : raisedHand
}, true); }, true);
// Return no error // Return no error
@ -1005,6 +950,13 @@ class Room extends EventEmitter
*/ */
async _createConsumer({ consumerPeer, producerPeer, producer }) async _createConsumer({ consumerPeer, producerPeer, producer })
{ {
logger.debug(
'_createConsumer() [consumerPeer:"%s", producerPeer:"%s", producer:"%s"]',
consumerPeer.id,
producerPeer.id,
producer.id
);
// Optimization: // Optimization:
// - Create the server-side Consumer. If video, do it paused. // - Create the server-side Consumer. If video, do it paused.
// - Tell its Peer about it and wait for its response. // - Tell its Peer about it and wait for its response.
@ -1014,11 +966,11 @@ class Room extends EventEmitter
// NOTE: Don't create the Consumer if the remote Peer cannot consume it. // NOTE: Don't create the Consumer if the remote Peer cannot consume it.
if ( if (
!consumerPeer.data.rtpCapabilities || !consumerPeer.rtpCapabilities ||
!this._mediasoupRouter.canConsume( !this._mediasoupRouter.canConsume(
{ {
producerId : producer.id, producerId : producer.id,
rtpCapabilities : consumerPeer.data.rtpCapabilities rtpCapabilities : consumerPeer.rtpCapabilities
}) })
) )
{ {
@ -1026,8 +978,7 @@ class Room extends EventEmitter
} }
// Must take the Transport the remote Peer is using for consuming. // Must take the Transport the remote Peer is using for consuming.
const transport = Array.from(consumerPeer.data.transports.values()) const transport = consumerPeer.getConsumerTransport();
.find((t) => t.appData.consuming);
// This should not happen. // This should not happen.
if (!transport) if (!transport)
@ -1045,31 +996,31 @@ class Room extends EventEmitter
consumer = await transport.consume( consumer = await transport.consume(
{ {
producerId : producer.id, producerId : producer.id,
rtpCapabilities : consumerPeer.data.rtpCapabilities, rtpCapabilities : consumerPeer.rtpCapabilities,
paused : producer.kind === 'video' paused : producer.kind === 'video'
}); });
} }
catch (error) catch (error)
{ {
logger.warn('_createConsumer() | transport.consume():%o', error); logger.warn('_createConsumer() | [error:"%o"]', error);
return; return;
} }
// Store the Consumer into the protoo consumerPeer data Object. // Store the Consumer into the consumerPeer data Object.
consumerPeer.data.consumers.set(consumer.id, consumer); consumerPeer.addConsumer(consumer.id, consumer);
// Set Consumer events. // Set Consumer events.
consumer.on('transportclose', () => consumer.on('transportclose', () =>
{ {
// Remove from its map. // Remove from its map.
consumerPeer.data.consumers.delete(consumer.id); consumerPeer.removeConsumer(consumer.id);
}); });
consumer.on('producerclose', () => consumer.on('producerclose', () =>
{ {
// Remove from its map. // Remove from its map.
consumerPeer.data.consumers.delete(consumer.id); consumerPeer.removeConsumer(consumer.id);
this._notification(consumerPeer.socket, 'consumerClosed', { consumerId: consumer.id }); this._notification(consumerPeer.socket, 'consumerClosed', { consumerId: consumer.id });
}); });
@ -1086,10 +1037,6 @@ class Room extends EventEmitter
consumer.on('score', (score) => consumer.on('score', (score) =>
{ {
// logger.debug(
// 'consumer "score" event [consumerId:%s, score:%o]',
// consumer.id, score);
this._notification(consumerPeer.socket, 'consumerScore', { consumerId: consumer.id, score }); this._notification(consumerPeer.socket, 'consumerScore', { consumerId: consumer.id, score });
}); });
@ -1106,7 +1053,7 @@ class Room extends EventEmitter
); );
}); });
// Send a protoo request to the remote Peer with Consumer parameters. // Send a request to the remote Peer with Consumer parameters.
try try
{ {
await this._request( await this._request(
@ -1140,7 +1087,7 @@ class Room extends EventEmitter
} }
catch (error) catch (error)
{ {
logger.warn('_createConsumer() | failed:%o', error); logger.warn('_createConsumer() | [error:"%o"]', error);
} }
} }

View File

@ -14,6 +14,7 @@ const mediasoup = require('mediasoup');
const AwaitQueue = require('awaitqueue'); const AwaitQueue = require('awaitqueue');
const Logger = require('./lib/Logger'); const Logger = require('./lib/Logger');
const Room = require('./lib/Room'); const Room = require('./lib/Room');
const Peer = require('./lib/Peer');
const base64 = require('base-64'); const base64 = require('base-64');
const helmet = require('helmet'); const helmet = require('helmet');
const httpHelper = require('./httpHelper'); const httpHelper = require('./httpHelper');
@ -44,6 +45,9 @@ let nextMediasoupWorkerIdx = 0;
// Map of Room instances indexed by roomId. // Map of Room instances indexed by roomId.
const rooms = new Map(); const rooms = new Map();
// Map of Peer instances indexed by peerId.
const peers = new Map();
// TLS server configuration. // TLS server configuration.
const tls = const tls =
{ {
@ -237,8 +241,7 @@ async function setupAuth(oidcIssuer)
{ {
passport.authenticate('oidc', { passport.authenticate('oidc', {
state : base64.encode(JSON.stringify({ state : base64.encode(JSON.stringify({
roomId : req.query.roomId, id : req.query.id
peerId : req.query.peerId
})) }))
})(req, res, next); })(req, res, next);
}); });
@ -278,9 +281,9 @@ async function setupAuth(oidcIssuer)
photo = '/static/media/buddy.403cb9f6.svg'; photo = '/static/media/buddy.403cb9f6.svg';
} }
const room = rooms.get(state.roomId); const peer = peers.get(state.id);
room && room.peerAuthenticated(state.peerId); peer && (peer.authenticated = true);
res.send(httpHelper({ res.send(httpHelper({
success : true, success : true,
@ -350,8 +353,6 @@ async function runWebSocketServer()
{ {
const { roomId, peerId } = socket.handshake.query; const { roomId, peerId } = socket.handshake.query;
logger.info('socket.io "connection" | [session:"%o"]', socket.handshake.session);
if (!roomId || !peerId) if (!roomId || !peerId)
{ {
logger.warn('connection request without roomId and/or peerId'); logger.warn('connection request without roomId and/or peerId');
@ -367,12 +368,17 @@ async function runWebSocketServer()
queue.push(async () => queue.push(async () =>
{ {
const room = await getOrCreateRoom({ roomId }); const room = await getOrCreateRoom({ roomId });
const peer = new Peer({ id: peerId, socket });
room.handleConnection({ peerId, socket }); peers.set(peerId, peer);
peer.on('close', () => peers.delete(peerId));
room.handlePeer(peer);
}) })
.catch((error) => .catch((error) =>
{ {
logger.error('room creation or room joining failed:%o', error); logger.error('room creation or room joining failed [error:"%o"]', error);
socket.disconnect(true); socket.disconnect(true);
@ -435,7 +441,7 @@ async function getOrCreateRoom({ roomId })
// If the Room does not exist create a new one. // If the Room does not exist create a new one.
if (!room) if (!room)
{ {
logger.info('creating a new Room [roomId:%s]', roomId); logger.info('creating a new Room [roomId:"%s"]', roomId);
const mediasoupWorker = getMediasoupWorker(); const mediasoupWorker = getMediasoupWorker();