From 39c3f7c36d1610563678a3815207d85bc3be22d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5var=20Aamb=C3=B8=20Fosstveit?= Date: Fri, 25 Oct 2019 13:34:28 +0200 Subject: [PATCH] Refactored a bit of code and added a Peer class to the server. --- README.md | 5 + app/src/RoomClient.js | 80 ++++---- app/src/index.js | 5 +- server/lib/Lobby.js | 84 ++++---- server/lib/Peer.js | 268 +++++++++++++++++++++++++ server/lib/Room.js | 441 +++++++++++++++++++----------------------- server/server.js | 24 ++- 7 files changed, 571 insertions(+), 336 deletions(-) create mode 100644 server/lib/Peer.js diff --git a/README.md b/README.md index 82137d3..c699a76 100644 --- a/README.md +++ b/README.md @@ -110,3 +110,8 @@ This started as a fork of the [work](https://github.com/versatica/mediasoup-demo ## License MIT + + +Contributions to this work were made on behalf of the GÉANT project, a project that has received funding from the European Union’s Horizon 2020 research and innovation programme under Grant Agreement No. 731122 (GN4-2). On behalf of GÉANT project, GÉANT Association is the sole owner of the copyright in all material which was developed by a member of the GÉANT project. + +GÉANT Vereniging (Association) is registered with the Chamber of Commerce in Amsterdam with registration number 40535155 and operates in the UK as a branch of GÉANT Vereniging. Registered office: Hoekenrode 3, 1102BR Amsterdam, The Netherlands. UK branch address: City House, 126-130 Hills Road, Cambridge CB2 1PQ, UK. \ No newline at end of file diff --git a/app/src/RoomClient.js b/app/src/RoomClient.js index dac26a0..2b6638e 100644 --- a/app/src/RoomClient.js +++ b/app/src/RoomClient.js @@ -77,11 +77,11 @@ export default class RoomClient } constructor( - { roomId, peerId, accessCode, device, useSimulcast, produce, consume, forceTcp }) + { roomId, peerId, accessCode, device, useSimulcast, produce, forceTcp }) { logger.debug( - 'constructor() [roomId: "%s", peerId: "%s", device: "%s", useSimulcast: "%s", produce: "%s", consume: "%s", forceTcp: "%s"]', - roomId, peerId, device.flag, useSimulcast, produce, consume, forceTcp); + 'constructor() [roomId: "%s", peerId: "%s", device: "%s", useSimulcast: "%s", produce: "%s", forceTcp: "%s"]', + roomId, peerId, device.flag, useSimulcast, produce, forceTcp); this._signalingUrl = getSignalingUrl(peerId, roomId); @@ -94,9 +94,6 @@ export default class RoomClient // Whether we should produce. this._produce = produce; - // Whether we should consume. - this._consume = consume; - // Wheter we force TCP this._forceTcp = forceTcp; @@ -308,7 +305,7 @@ export default class RoomClient login() { - const url = `/auth/login?roomId=${this._roomId}&peerId=${this._peerId}`; + const url = `/auth/login?id=${this._peerId}`; this._loginWindow = window.open(url, 'loginWindow'); } @@ -1705,44 +1702,41 @@ export default class RoomClient }); } - if (this._consume) - { - const transportInfo = await this.sendRequest( - 'createWebRtcTransport', - { - forceTcp : this._forceTcp, - producing : false, - consuming : true - }); + const transportInfo = await this.sendRequest( + 'createWebRtcTransport', + { + forceTcp : this._forceTcp, + producing : false, + consuming : true + }); - const { + const { + id, + iceParameters, + iceCandidates, + dtlsParameters + } = transportInfo; + + this._recvTransport = this._mediasoupDevice.createRecvTransport( + { id, iceParameters, iceCandidates, dtlsParameters - } = transportInfo; + }); - this._recvTransport = this._mediasoupDevice.createRecvTransport( - { - id, - iceParameters, - iceCandidates, - dtlsParameters - }); - - this._recvTransport.on( - 'connect', ({ dtlsParameters }, callback, errback) => // eslint-disable-line no-shadow - { - this.sendRequest( - 'connectWebRtcTransport', - { - transportId : this._recvTransport.id, - dtlsParameters - }) - .then(callback) - .catch(errback); - }); - } + this._recvTransport.on( + 'connect', ({ dtlsParameters }, callback, errback) => // eslint-disable-line no-shadow + { + this.sendRequest( + 'connectWebRtcTransport', + { + transportId : this._recvTransport.id, + dtlsParameters + }) + .then(callback) + .catch(errback); + }); // Set our media capabilities. store.dispatch(stateActions.setMediaCapabilities( @@ -1760,11 +1754,11 @@ export default class RoomClient displayName : displayName, picture : picture, device : this._device, - rtpCapabilities : this._consume - ? this._mediasoupDevice.rtpCapabilities - : undefined + rtpCapabilities : this._mediasoupDevice.rtpCapabilities }); - + + logger.debug('_joinRoom() joined, got peers [peers:"%o"]', peers); + for (const peer of peers) { store.dispatch( diff --git a/app/src/index.js b/app/src/index.js index 3561578..a412bb6 100644 --- a/app/src/index.js +++ b/app/src/index.js @@ -19,7 +19,7 @@ import * as serviceWorker from './serviceWorker'; import './index.css'; -if (process.env.REACT_APP_DEBUG === '*') +if (process.env.REACT_APP_DEBUG === '*' || process.env.NODE_ENV !== 'production') { debug.enable('* -engine* -socket* -RIE* *WARN* *ERROR*'); } @@ -64,7 +64,6 @@ function run() const accessCode = parameters.get('code'); const produce = parameters.get('produce') !== 'false'; - const consume = parameters.get('consume') !== 'false'; const useSimulcast = parameters.get('simulcast') === 'true'; const forceTcp = parameters.get('forceTcp') === 'true'; @@ -85,7 +84,7 @@ function run() ); roomClient = new RoomClient( - { roomId, peerId, accessCode, device, useSimulcast, produce, consume, forceTcp }); + { roomId, peerId, accessCode, device, useSimulcast, produce, forceTcp }); global.CLIENT = roomClient; diff --git a/server/lib/Lobby.js b/server/lib/Lobby.js index 1d7a7d3..c2c22f0 100644 --- a/server/lib/Lobby.js +++ b/server/lib/Lobby.js @@ -14,7 +14,7 @@ class Lobby extends EventEmitter // Closed flag. this._closed = false; - this._peers = {}; + this._peers = new Map(); } close() @@ -23,110 +23,126 @@ class Lobby extends EventEmitter this._closed = true; - Object.values(this._peers).forEach((peer) => + this._peers.forEach((peer) => { - if (peer.socket) - peer.socket.disconnect(); + if (!peer.closed) + peer.close(); }); - this._peers = {}; + this._peers.clear(); } checkEmpty() { logger.info('checkEmpty()'); - if (Object.keys(this._peers).length == 0) - return true - else return false; + + return this._peers.size === 0; } peerList() { logger.info('peerList()'); - return Object.values(this._peers).map((peer) => + return Array.from(this._peers.values()).map((peer) => ({ - peerId : peer.peerId, + peerId : peer.id, displayName : peer.displayName })); } hasPeer(peerId) { - return Boolean(this._peers[peerId]); + return this._peers.has(peerId); } promoteAllPeers() { logger.info('promoteAllPeers()'); - Object.values(this._peers).forEach((peer) => + this._peers.forEach((peer) => { if (peer.socket) - this.promotePeer(peer.peerId); + this.promotePeer(peer.id); }); } promotePeer(peerId) { - logger.info('promotePeer() [peerId: %s]', peerId); + logger.info('promotePeer() [peer:"%s"]', peerId); - const peer = this._peers[peerId]; + const peer = this._peers.get(peerId); if (peer) { this.emit('promotePeer', peer); - delete this._peers[peerId]; + this._peers.delete(peerId); } } - parkPeer({ peerId, consume, socket }) + parkPeer(peer) { - logger.info('parkPeer()'); + logger.info('parkPeer() [peer:"%s"]', peer.id); - if ( this._closed ) return; + if (this._closed) + return; - const peer = { peerId, socket, consume }; + peer.socket.emit('notification', { method: 'enteredLobby', data: {} }); - socket.emit('notification', { method: 'enteredLobby', data: {} }); + this._peers.set(peer.id, peer); - this._peers[peerId] = peer; + peer.on('authenticationChange', () => + { + logger.info('parkPeer() | authenticationChange [peer:"%s"]', peer.id); - socket.on('request', (request, cb) => + peer.authenticated && this.emit('peerAuthenticated', peer); + }); + + peer.socket.on('request', (request, cb) => { logger.debug( - 'Peer "request" event [method:%s, peerId:%s]', - request.method, peer.peerId); + 'Peer "request" event [method:"%s", peer:"%s"]', + request.method, peer.id); - if (this._closed) return; + if (this._closed) + return; + this._handleSocketRequest(peer, request, cb) .catch((error) => { - logger.error('request failed:%o', error); + logger.error('request failed [error:"%o"]', error); cb(error); }); }); - socket.on('disconnect', () => + peer.socket.on('disconnect', () => { - logger.debug('Peer "close" event [peerId:%s]', peer.peerId); + logger.debug('Peer "close" event [peer:"%s"]', peer.id); - if (this._closed) return; + if (this._closed) + return; this.emit('peerClosed', peer); - delete this._peers[peer.peerId]; + this._peers.delete(peer.id); - if ( this.checkEmpty() ) this.emit('lobbyEmpty'); + if (this.checkEmpty()) + this.emit('lobbyEmpty'); }); } async _handleSocketRequest(peer, request, cb) { - logger.debug('_handleSocketRequest [peer:%o], [request:%o]', peer, request); - if (this._closed) return; + logger.debug( + '_handleSocketRequest [peer:"%s"], [request:"%s"]', + peer.id, + request.method + ); + + if (this._closed) + return; + switch (request.method) { case 'changeDisplayName': diff --git a/server/lib/Peer.js b/server/lib/Peer.js new file mode 100644 index 0000000..f71aafd --- /dev/null +++ b/server/lib/Peer.js @@ -0,0 +1,268 @@ +const EventEmitter = require('events').EventEmitter; +const Logger = require('./Logger'); + +const logger = new Logger('Peer'); + +class Peer extends EventEmitter +{ + constructor({ id, socket }) + { + logger.info('constructor() [id:"%s", socket:"%s"]', id, socket.id); + super(); + + this._id = id; + + this._socket = socket; + + this._closed = false; + + this._joined = false; + + this._authenticated = false; + + this._displayName = false; + + this._picture = null; + + this._device = null; + + this._rtpCapabilities = null; + + this._raisedHand = false; + + this._transports = new Map(); + + this._producers = new Map(); + + this._consumers = new Map(); + + this._handlePeer(); + } + + close() + { + logger.info('close()'); + + this._closed = true; + + // Iterate and close all mediasoup Transport associated to this Peer, so all + // its Producers and Consumers will also be closed. + this.transports.forEach((transport) => + { + transport.close(); + }); + + if (this._socket) + this._socket.disconnect(true); + + this.emit('close'); + } + + _handlePeer() + { + this.authenticated = + this.socket.handshake.session.passport && + this.socket.handshake.session.passport.user; + + this.socket.use((packet, next) => + { + this.authenticated = + this.socket.handshake.session.passport && + this.socket.handshake.session.passport.user; + + return next(); + }); + + this.socket.on('disconnect', () => + { + if (this.closed) + return; + + logger.debug('"disconnect" event [id:%s]', this.id); + + this.close(); + }); + } + + get id() + { + return this._id; + } + + set id(id) + { + this._id = id; + } + + get socket() + { + return this._socket; + } + + set socket(socket) + { + this._socket = socket; + } + + get closed() + { + return this._closed; + } + + get joined() + { + return this._joined; + } + + set joined(joined) + { + this._joined = joined; + } + + get authenticated() + { + return this._authenticated; + } + + set authenticated(authenticated) + { + if (authenticated !== this._authenticated) + { + logger.info('authenticated() | authenticationChange [peer:"%s", authenticated:"%s"]', this.id, authenticated); + + this._authenticated = authenticated; + this.emit('authenticationChange'); + } + } + + get displayName() + { + return this._displayName; + } + + set displayName(displayName) + { + this._displayName = displayName; + } + + get picture() + { + return this._picture; + } + + set picture(picture) + { + this._picture = picture; + } + + get device() + { + return this._device; + } + + set device(device) + { + this._device = device; + } + + get rtpCapabilities() + { + return this._rtpCapabilities; + } + + set rtpCapabilities(rtpCapabilities) + { + this._rtpCapabilities = rtpCapabilities; + } + + get raisedHand() + { + return this._raisedHand; + } + + set raisedHand(raisedHand) + { + this._raisedHand = raisedHand; + } + + get transports() + { + return this._transports; + } + + get producers() + { + return this._producers; + } + + get consumers() + { + return this._consumers; + } + + addTransport(id, transport) + { + this.transports.set(id, transport); + } + + getTransport(id) + { + return this.transports.get(id); + } + + getConsumerTransport() + { + return Array.from(this.transports.values()) + .find((t) => t.appData.consuming); + } + + removeTransport(id) + { + this.transports.delete(id); + } + + addProducer(id, producer) + { + this.producers.set(id, producer); + } + + getProducer(id) + { + return this.producers.get(id); + } + + removeProducer(id) + { + this.producers.delete(id); + } + + addConsumer(id, consumer) + { + this.consumers.set(id, consumer); + } + + getConsumer(id) + { + return this.consumers.get(id); + } + + removeConsumer(id) + { + this.consumers.delete(id); + } + + get peerInfo() + { + const peerInfo = + { + id : this.id, + displayName : this.displayName, + picture : this.picture, + device : this.device + }; + + return peerInfo; + } +} + +module.exports = Peer; diff --git a/server/lib/Room.js b/server/lib/Room.js index 6f7a379..b8c8bf8 100644 --- a/server/lib/Room.js +++ b/server/lib/Room.js @@ -18,7 +18,7 @@ class Room extends EventEmitter */ static async create({ mediasoupWorker, roomId }) { - logger.info('create() [roomId:%s, forceH264:%s]', roomId); + logger.info('create() [roomId:"%s"]', roomId); // Router media codecs. const mediaCodecs = config.mediasoup.router.mediaCodecs; @@ -62,39 +62,118 @@ class Room extends EventEmitter this._lobby = new Lobby(); + this._chatHistory = []; + + this._fileHistory = []; + + this._lastN = []; + + this._peers = new Map(); + + // mediasoup Router instance. + this._mediasoupRouter = mediasoupRouter; + + // mediasoup AudioLevelObserver. + this._audioLevelObserver = audioLevelObserver; + + // Current active speaker. + this._currentActiveSpeaker = null; + + this._handleLobby(); + this._handleAudioLevelObserver(); + } + + close() + { + logger.debug('close()'); + + this._closed = true; + + this._lobby.close(); + + this._peers.forEach((peer) => + { + if (!peer.closed) + peer.close(); + }); + + this._peers.clear(); + + // Close the mediasoup Router. + this._mediasoupRouter.close(); + + // Emit 'close' event. + this.emit('close'); + } + + handlePeer(peer) + { + logger.info('handlePeer() [peer:"%s"]', peer.id); + + // This will allow reconnects to join despite lock + if (this._peers.has(peer.id)) + { + logger.warn( + 'handleConnection() | there is already a peer with same peerId [peer:"%s"]', + peer.id); + + peer.close(); + + return; + } + else if ( + this._locked || + (config.requireSignInToAccess && !peer.authenticated) + ) + { + this._parkPeer(peer); + + return; + } + + this._peerJoining(peer); + } + + _handleLobby() + { this._lobby.on('promotePeer', (promotedPeer) => { - logger.info('promotePeer() [promotedPeer:"%o"]', promotedPeer); + logger.info('promotePeer() [promotedPeer:"%s"]', promotedPeer.id); - const { peerId } = promotedPeer; + const { id } = promotedPeer; - this._peerJoining({ ...promotedPeer }); + this._peerJoining(promotedPeer); - Object.values(this._peers).forEach((peer) => + this._peers.forEach((peer) => { - this._notification(peer.socket, 'promotedPeer', { peerId }); + this._notification(peer.socket, 'promotedPeer', { peerId: id }); }); }); this._lobby.on('lobbyPeerDisplayNameChanged', (changedPeer) => { - const { peerId, displayName } = changedPeer; + const { id, displayName } = changedPeer; - Object.values(this._peers).forEach((peer) => + this._peers.forEach((peer) => { - this._notification(peer.socket, 'lobbyPeerDisplayNameChanged', { peerId, displayName }); + this._notification(peer.socket, 'lobbyPeerDisplayNameChanged', { peerId: id, displayName }); }); }); + this._lobby.on('peerAuthenticated', (peer) => + { + !this._locked && this._lobby.promotePeer(peer.id); + }); + this._lobby.on('peerClosed', (closedPeer) => { - logger.info('peerClosed() [closedPeer:"%o"]', closedPeer); + logger.info('peerClosed() [closedPeer:"%s"]', closedPeer.id); - const { peerId } = closedPeer; + const { id } = closedPeer; - Object.values(this._peers).forEach((peer) => + this._peers.forEach((peer) => { - this._notification(peer.socket, 'lobbyPeerClosed', { peerId }); + this._notification(peer.socket, 'lobbyPeerClosed', { peerId: id }); }); }); @@ -107,34 +186,17 @@ class Room extends EventEmitter this.selfDestructCountdown(); } }); + } - this._chatHistory = []; - - this._fileHistory = []; - - this._lastN = []; - - this._peers = {}; - - // mediasoup Router instance. - // @type {mediasoup.Router} - this._mediasoupRouter = mediasoupRouter; - - // mediasoup AudioLevelObserver. - // @type {mediasoup.AudioLevelObserver} - this._audioLevelObserver = audioLevelObserver; - + _handleAudioLevelObserver() + { // Set audioLevelObserver events. this._audioLevelObserver.on('volumes', (volumes) => { const { producer, volume } = volumes[0]; - // logger.debug( - // 'audioLevelObserver "volumes" event [producerId:%s, volume:%s]', - // producer.id, volume); - // Notify all Peers. - Object.values(this._peers).forEach((peer) => + this._peers.forEach((peer) => { this._notification(peer.socket, 'activeSpeaker', { peerId : producer.appData.peerId, @@ -145,18 +207,21 @@ class Room extends EventEmitter this._audioLevelObserver.on('silence', () => { - // logger.debug('audioLevelObserver "silence" event'); - // Notify all Peers. - Object.values(this._peers).forEach((peer) => + this._peers.forEach((peer) => { this._notification(peer.socket, 'activeSpeaker', { peerId: null }); }); }); + } - // Current active speaker. - // @type {mediasoup.Peer} - this._currentActiveSpeaker = null; + logStatus() + { + logger.info( + 'logStatus() [room id:"%s", peers:"%s"]', + this._roomId, + this._peers.size + ); } get id() @@ -173,10 +238,10 @@ class Room extends EventEmitter if (this._closed) return; - if (this.checkEmpty() && this._lobby.checkEmpty()) + if (this.checkEmpty()) { logger.info( - 'Room deserted for some time, closing the room [roomId:%s]', + 'Room deserted for some time, closing the room [roomId:"%s"]', this._roomId); this.close(); } @@ -185,170 +250,68 @@ class Room extends EventEmitter }, 10000); } - close() - { - logger.debug('close()'); - - this._closed = true; - - this._lobby.close(); - - Object.values(this._peers).forEach((peer) => - { - if (peer.socket) - peer.socket.disconnect(); - }); - - this._peers = {}; - - // Close the mediasoup Router. - this._mediasoupRouter.close(); - - // Emit 'close' event. - this.emit('close'); - } - - logStatus() - { - logger.info( - 'logStatus() [room id:"%s", peers:%o]', - this._roomId, - this._peers - ); - } - // checks both room and lobby checkEmpty() { - if ((Object.keys(this._peers).length == 0) && (this._lobby.checkEmpty())) + if ((this._peers.size == 0) && (this._lobby.checkEmpty())) return true; else return false; } - handleConnection({ peerId, consume, socket }) + _parkPeer(parkPeer) { - logger.info('handleConnection() [peerId:"%s"]', peerId); + this._lobby.parkPeer(parkPeer); - // This will allow reconnects to join despite lock - if (this._peers[peerId]) + this._peers.forEach((peer) => { - logger.warn( - 'handleConnection() | there is already a peer with same peerId, ' + - 'closing the previous one [peerId:"%s"]', - peerId); - - const peer = this._peers[peerId]; - - peer.socket.disconnect(); - delete this._peers[peerId]; - } - else if (this._locked) // Don't allow connections to a locked room - this._parkPeer({ peerId, consume, socket }) - else if (config.requireSignInToAccess) // Only allow signed in users directly into room - { - const { passport } = socket.handshake.session; - - if (passport && passport.user) - this._peerJoining({ peerId, consume, socket }); - else - this._parkPeer({ peerId, consume, socket }) - } - else - this._peerJoining({ peerId, consume, socket }); - } - - _parkPeer({ peerId, consume, socket }) - { - this._lobby.parkPeer({ peerId, consume, socket }); - - Object.values(this._peers).forEach((peer) => - { - this._notification(peer.socket, 'parkedPeer', { peerId }); + this._notification(peer.socket, 'parkedPeer', { peerId: parkPeer.id }); }); } - _peerJoining({ peerId, consume, socket }) + _peerJoining(peer) { - socket.join(this._roomId); + peer.socket.join(this._roomId); - const peer = { id: peerId, socket: socket }; - - const index = this._lastN.indexOf(peerId); + const index = this._lastN.indexOf(peer.id); if (index === -1) // We don't have this peer, add to end { - this._lastN.push(peerId); + this._lastN.push(peer.id); } - this._peers[peerId] = peer; + this._peers.set(peer.id, peer); - this._handlePeer({ peer, consume }); - this._notification(socket, 'roomReady'); + this._handlePeer(peer); + this._notification(peer.socket, 'roomReady'); } - isLocked() - { - return this._locked; - } - - peerAuthenticated(peerId) - { - logger.debug('peerAuthenticated() | [peerId:"%s"]', peerId); - - if (!this._locked) - { - if (!this._peers[peerId]) - { - this._lobby.promotePeer(peerId); - } - } - } - - _handlePeer({ peer, consume }) + _handlePeer(peer) { logger.debug('_handlePeer() [peer:"%s"]', peer.id); - peer.data = {}; - - // Not joined after a custom protoo 'join' request is later received. - peer.data.consume = consume; - peer.data.joined = false; - peer.data.displayName = undefined; - peer.data.device = undefined; - peer.data.rtpCapabilities = undefined; - peer.data.raiseHandState = false; - - // Have mediasoup related maps ready even before the Peer joins since we - // allow creating Transports before joining. - peer.data.transports = new Map(); - peer.data.producers = new Map(); - peer.data.consumers = new Map(); - peer.socket.on('request', (request, cb) => { logger.debug( - 'Peer "request" event [method:%s, peerId:%s]', + 'Peer "request" event [method:"%s", peerId:"%s"]', request.method, peer.id); this._handleSocketRequest(peer, request, cb) .catch((error) => { - logger.error('request failed:%o', error); + logger.error('"request" failed [error:"%o"]', error); cb(error); }); }); - peer.socket.on('disconnect', () => + peer.on('close', () => { if (this._closed) return; - logger.debug('Peer "disconnect" event [peerId:%s]', peer.id); - // If the Peer was joined, notify all Peers. - if (peer.data.joined) + if (peer.joined) { this._notification(peer.socket, 'peerClosed', { peerId: peer.id }, true); } @@ -360,14 +323,7 @@ class Room extends EventEmitter this._lastN.splice(index, 1); } - // Iterate and close all mediasoup Transport associated to this Peer, so all - // its Producers and Consumers will also be closed. - for (const transport of peer.data.transports.values()) - { - transport.close(); - } - - delete this._peers[peer.id]; + this._peers.delete(peer.id); // If this is the last Peer in the room and // lobby is empty, close the room after a while. @@ -382,7 +338,6 @@ class Room extends EventEmitter { switch (request.method) { - case 'getRouterRtpCapabilities': { cb(null, this._mediasoupRouter.rtpCapabilities); @@ -393,7 +348,7 @@ class Room extends EventEmitter case 'join': { // Ensure the Peer is not already joined. - if (peer.data.joined) + if (peer.joined) throw new Error('Peer already joined'); const { @@ -403,30 +358,24 @@ class Room extends EventEmitter rtpCapabilities } = request.data; - // Store client data into the protoo Peer data object. - peer.data.displayName = displayName; - peer.data.picture = picture; - peer.data.device = device; - peer.data.rtpCapabilities = rtpCapabilities; + // Store client data into the Peer data object. + peer.displayName = displayName; + peer.picture = picture; + peer.device = device; + peer.rtpCapabilities = rtpCapabilities; // Tell the new Peer about already joined Peers. // And also create Consumers for existing Producers. const peerInfos = []; - Object.values(this._peers).forEach((joinedPeer) => + this._peers.forEach((joinedPeer) => { - if (joinedPeer.data.joined) + if (joinedPeer.joined) { - peerInfos.push( - { - id : joinedPeer.id, - displayName : joinedPeer.data.displayName, - picture : joinedPeer.data.picture, - device : joinedPeer.data.device - }); + peerInfos.push(joinedPeer.peerInfo); - for (const producer of joinedPeer.data.producers.values()) + joinedPeer.producers.forEach((producer) => { this._createConsumer( { @@ -434,14 +383,14 @@ class Room extends EventEmitter producerPeer : joinedPeer, producer }); - } + }); } }); cb(null, { peers: peerInfos }); // Mark the new Peer as joined. - peer.data.joined = true; + peer.joined = true; this._notification( peer.socket, @@ -456,8 +405,8 @@ class Room extends EventEmitter ); logger.debug( - 'peer joined [peeerId: %s, displayName: %s, picture: %s, device: %o]', - peer.id, displayName, picture, device); + 'peer joined [peer: "%s", displayName: "%s", picture: "%s"]', + peer.id, displayName, picture); break; } @@ -483,8 +432,8 @@ class Room extends EventEmitter appData : { producing, consuming } }); - // Store the WebRtcTransport into the protoo Peer data Object. - peer.data.transports.set(transport.id, transport); + // Store the WebRtcTransport into the Peer data Object. + peer.addTransport(transport.id, transport); cb( null, @@ -508,7 +457,7 @@ class Room extends EventEmitter case 'connectWebRtcTransport': { const { transportId, dtlsParameters } = request.data; - const transport = peer.data.transports.get(transportId); + const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); @@ -523,7 +472,7 @@ class Room extends EventEmitter case 'restartIce': { const { transportId } = request.data; - const transport = peer.data.transports.get(transportId); + const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); @@ -538,12 +487,12 @@ class Room extends EventEmitter case 'produce': { // Ensure the Peer is joined. - if (!peer.data.joined) + if (!peer.joined) throw new Error('Peer not yet joined'); const { transportId, kind, rtpParameters } = request.data; let { appData } = request.data; - const transport = peer.data.transports.get(transportId); + const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); @@ -555,31 +504,27 @@ class Room extends EventEmitter const producer = await transport.produce({ kind, rtpParameters, appData }); - // Store the Producer into the protoo Peer data Object. - peer.data.producers.set(producer.id, producer); + // Store the Producer into the Peer data Object. + peer.addProducer(producer.id, producer); // Set Producer events. producer.on('score', (score) => { - // logger.debug( - // 'producer "score" event [producerId:%s, score:%o]', - // producer.id, score); - this._notification(peer.socket, 'producerScore', { producerId: producer.id, score }); }); producer.on('videoorientationchange', (videoOrientation) => { logger.debug( - 'producer "videoorientationchange" event [producerId:%s, videoOrientation:%o]', + 'producer "videoorientationchange" event [producerId:"%s", videoOrientation:"%o"]', producer.id, videoOrientation); }); cb(null, { id: producer.id }); - Object.values(this._peers).forEach((otherPeer) => + this._peers.forEach((otherPeer) => { - if (otherPeer.data.joined && otherPeer !== peer) + if (otherPeer.joined && otherPeer !== peer) { this._createConsumer( { @@ -603,11 +548,11 @@ class Room extends EventEmitter case 'closeProducer': { // Ensure the Peer is joined. - if (!peer.data.joined) + if (!peer.joined) throw new Error('Peer not yet joined'); const { producerId } = request.data; - const producer = peer.data.producers.get(producerId); + const producer = peer.getProducer(producerId); if (!producer) throw new Error(`producer with id "${producerId}" not found`); @@ -615,7 +560,7 @@ class Room extends EventEmitter producer.close(); // Remove from its map. - peer.data.producers.delete(producer.id); + peer.removeProducer(producer.id); cb(); @@ -625,11 +570,11 @@ class Room extends EventEmitter case 'pauseProducer': { // Ensure the Peer is joined. - if (!peer.data.joined) + if (!peer.joined) throw new Error('Peer not yet joined'); const { producerId } = request.data; - const producer = peer.data.producers.get(producerId); + const producer = peer.getProducer(producerId); if (!producer) throw new Error(`producer with id "${producerId}" not found`); @@ -644,11 +589,11 @@ class Room extends EventEmitter case 'resumeProducer': { // Ensure the Peer is joined. - if (!peer.data.joined) + if (!peer.joined) throw new Error('Peer not yet joined'); const { producerId } = request.data; - const producer = peer.data.producers.get(producerId); + const producer = peer.getProducer(producerId); if (!producer) throw new Error(`producer with id "${producerId}" not found`); @@ -663,11 +608,11 @@ class Room extends EventEmitter case 'pauseConsumer': { // Ensure the Peer is joined. - if (!peer.data.joined) + if (!peer.joined) throw new Error('Peer not yet joined'); const { consumerId } = request.data; - const consumer = peer.data.consumers.get(consumerId); + const consumer = peer.getConsumer(consumerId); if (!consumer) throw new Error(`consumer with id "${consumerId}" not found`); @@ -682,11 +627,11 @@ class Room extends EventEmitter case 'resumeConsumer': { // Ensure the Peer is joined. - if (!peer.data.joined) + if (!peer.joined) throw new Error('Peer not yet joined'); const { consumerId } = request.data; - const consumer = peer.data.consumers.get(consumerId); + const consumer = peer.getConsumer(consumerId); if (!consumer) throw new Error(`consumer with id "${consumerId}" not found`); @@ -701,11 +646,11 @@ class Room extends EventEmitter case 'setConsumerPreferedLayers': { // Ensure the Peer is joined. - if (!peer.data.joined) + if (!peer.joined) throw new Error('Peer not yet joined'); const { consumerId, spatialLayer, temporalLayer } = request.data; - const consumer = peer.data.consumers.get(consumerId); + const consumer = peer.getConsumer(consumerId); if (!consumer) throw new Error(`consumer with id "${consumerId}" not found`); @@ -720,11 +665,11 @@ class Room extends EventEmitter case 'requestConsumerKeyFrame': { // Ensure the Peer is joined. - if (!peer.data.joined) + if (!peer.joined) throw new Error('Peer not yet joined'); const { consumerId } = request.data; - const consumer = peer.data.consumers.get(consumerId); + const consumer = peer.getConsumer(consumerId); if (!consumer) throw new Error(`consumer with id "${consumerId}" not found`); @@ -739,7 +684,7 @@ class Room extends EventEmitter case 'getTransportStats': { const { transportId } = request.data; - const transport = peer.data.transports.get(transportId); + const transport = peer.getTransport(transportId); if (!transport) throw new Error(`transport with id "${transportId}" not found`); @@ -754,7 +699,7 @@ class Room extends EventEmitter case 'getProducerStats': { const { producerId } = request.data; - const producer = peer.data.producers.get(producerId); + const producer = peer.getProducer(producerId); if (!producer) throw new Error(`producer with id "${producerId}" not found`); @@ -769,7 +714,7 @@ class Room extends EventEmitter case 'getConsumerStats': { const { consumerId } = request.data; - const consumer = peer.data.consumers.get(consumerId); + const consumer = peer.getConsumer(consumerId); if (!consumer) throw new Error(`consumer with id "${consumerId}" not found`); @@ -784,13 +729,13 @@ class Room extends EventEmitter case 'changeDisplayName': { // Ensure the Peer is joined. - if (!peer.data.joined) + if (!peer.joined) throw new Error('Peer not yet joined'); const { displayName } = request.data; - const oldDisplayName = peer.data.displayName; + const oldDisplayName = peer.displayName; - peer.data.displayName = displayName; + peer.displayName = displayName; // Spread to others this._notification(peer.socket, 'changeDisplayName', { @@ -808,7 +753,7 @@ class Room extends EventEmitter case 'changeProfilePicture': { // Ensure the Peer is joined. - if (!peer.data.joined) + if (!peer.joined) throw new Error('Peer not yet joined'); const { picture } = request.data; @@ -973,14 +918,14 @@ class Room extends EventEmitter case 'raiseHand': { - const { raiseHandState } = request.data; + const { raisedHand } = request.data; - peer.data.raiseHandState = raiseHandState; + peer.raisedHand = raisedHand; // Spread to others this._notification(peer.socket, 'raiseHand', { - peerId : peer.id, - raiseHandState : raiseHandState + peerId : peer.id, + raisedHand : raisedHand }, true); // Return no error @@ -1005,6 +950,13 @@ class Room extends EventEmitter */ async _createConsumer({ consumerPeer, producerPeer, producer }) { + logger.debug( + '_createConsumer() [consumerPeer:"%s", producerPeer:"%s", producer:"%s"]', + consumerPeer.id, + producerPeer.id, + producer.id + ); + // Optimization: // - Create the server-side Consumer. If video, do it paused. // - Tell its Peer about it and wait for its response. @@ -1014,11 +966,11 @@ class Room extends EventEmitter // NOTE: Don't create the Consumer if the remote Peer cannot consume it. if ( - !consumerPeer.data.rtpCapabilities || + !consumerPeer.rtpCapabilities || !this._mediasoupRouter.canConsume( { producerId : producer.id, - rtpCapabilities : consumerPeer.data.rtpCapabilities + rtpCapabilities : consumerPeer.rtpCapabilities }) ) { @@ -1026,8 +978,7 @@ class Room extends EventEmitter } // Must take the Transport the remote Peer is using for consuming. - const transport = Array.from(consumerPeer.data.transports.values()) - .find((t) => t.appData.consuming); + const transport = consumerPeer.getConsumerTransport(); // This should not happen. if (!transport) @@ -1045,31 +996,31 @@ class Room extends EventEmitter consumer = await transport.consume( { producerId : producer.id, - rtpCapabilities : consumerPeer.data.rtpCapabilities, + rtpCapabilities : consumerPeer.rtpCapabilities, paused : producer.kind === 'video' }); } catch (error) { - logger.warn('_createConsumer() | transport.consume():%o', error); + logger.warn('_createConsumer() | [error:"%o"]', error); return; } - // Store the Consumer into the protoo consumerPeer data Object. - consumerPeer.data.consumers.set(consumer.id, consumer); + // Store the Consumer into the consumerPeer data Object. + consumerPeer.addConsumer(consumer.id, consumer); // Set Consumer events. consumer.on('transportclose', () => { // Remove from its map. - consumerPeer.data.consumers.delete(consumer.id); + consumerPeer.removeConsumer(consumer.id); }); consumer.on('producerclose', () => { // Remove from its map. - consumerPeer.data.consumers.delete(consumer.id); + consumerPeer.removeConsumer(consumer.id); this._notification(consumerPeer.socket, 'consumerClosed', { consumerId: consumer.id }); }); @@ -1086,10 +1037,6 @@ class Room extends EventEmitter consumer.on('score', (score) => { - // logger.debug( - // 'consumer "score" event [consumerId:%s, score:%o]', - // consumer.id, score); - this._notification(consumerPeer.socket, 'consumerScore', { consumerId: consumer.id, score }); }); @@ -1106,7 +1053,7 @@ class Room extends EventEmitter ); }); - // Send a protoo request to the remote Peer with Consumer parameters. + // Send a request to the remote Peer with Consumer parameters. try { await this._request( @@ -1140,7 +1087,7 @@ class Room extends EventEmitter } catch (error) { - logger.warn('_createConsumer() | failed:%o', error); + logger.warn('_createConsumer() | [error:"%o"]', error); } } diff --git a/server/server.js b/server/server.js index 839029f..3149793 100755 --- a/server/server.js +++ b/server/server.js @@ -14,6 +14,7 @@ const mediasoup = require('mediasoup'); const AwaitQueue = require('awaitqueue'); const Logger = require('./lib/Logger'); const Room = require('./lib/Room'); +const Peer = require('./lib/Peer'); const base64 = require('base-64'); const helmet = require('helmet'); const httpHelper = require('./httpHelper'); @@ -44,6 +45,9 @@ let nextMediasoupWorkerIdx = 0; // Map of Room instances indexed by roomId. const rooms = new Map(); +// Map of Peer instances indexed by peerId. +const peers = new Map(); + // TLS server configuration. const tls = { @@ -237,8 +241,7 @@ async function setupAuth(oidcIssuer) { passport.authenticate('oidc', { state : base64.encode(JSON.stringify({ - roomId : req.query.roomId, - peerId : req.query.peerId + id : req.query.id })) })(req, res, next); }); @@ -278,9 +281,9 @@ async function setupAuth(oidcIssuer) photo = '/static/media/buddy.403cb9f6.svg'; } - const room = rooms.get(state.roomId); + const peer = peers.get(state.id); - room && room.peerAuthenticated(state.peerId); + peer && (peer.authenticated = true); res.send(httpHelper({ success : true, @@ -350,8 +353,6 @@ async function runWebSocketServer() { const { roomId, peerId } = socket.handshake.query; - logger.info('socket.io "connection" | [session:"%o"]', socket.handshake.session); - if (!roomId || !peerId) { logger.warn('connection request without roomId and/or peerId'); @@ -367,12 +368,17 @@ async function runWebSocketServer() queue.push(async () => { const room = await getOrCreateRoom({ roomId }); + const peer = new Peer({ id: peerId, socket }); - room.handleConnection({ peerId, socket }); + peers.set(peerId, peer); + + peer.on('close', () => peers.delete(peerId)); + + room.handlePeer(peer); }) .catch((error) => { - logger.error('room creation or room joining failed:%o', error); + logger.error('room creation or room joining failed [error:"%o"]', error); socket.disconnect(true); @@ -435,7 +441,7 @@ async function getOrCreateRoom({ roomId }) // If the Room does not exist create a new one. if (!room) { - logger.info('creating a new Room [roomId:%s]', roomId); + logger.info('creating a new Room [roomId:"%s"]', roomId); const mediasoupWorker = getMediasoupWorker();