diff --git a/server/config/config.example.js b/server/config/config.example.js index b346a18..64fe101 100644 --- a/server/config/config.example.js +++ b/server/config/config.example.js @@ -118,8 +118,10 @@ module.exports = // Can have multiple listening interfaces // { ip: '::/0', announcedIp: null } ], - maxIncomingBitrate : 1500000, - initialAvailableOutgoingBitrate : 1000000 + initialAvailableOutgoingBitrate : 1000000, + minimumAvailableOutgoingBitrate : 600000, + // Additional options that are not part of WebRtcTransportOptions. + maxIncomingBitrate : 1500000 } } }; diff --git a/server/lib/Room.js b/server/lib/Room.js index 329cb18..f25f31d 100644 --- a/server/lib/Room.js +++ b/server/lib/Room.js @@ -68,7 +68,7 @@ class Room extends EventEmitter this._lastN = []; - this._peers = new Map(); + this._peers = {}; // mediasoup Router instance. this._mediasoupRouter = mediasoupRouter; @@ -96,13 +96,17 @@ class Room extends EventEmitter this._lobby.close(); - this._peers.forEach((peer) => + // Close the peers. + for (const peer in this._peers) { - if (!peer.closed) - peer.close(); - }); + if (Object.prototype.hasOwnProperty.call(this._peers, peer)) + { + if (!peer.closed) + peer.close(); + } + } - this._peers.clear(); + this._peers = null; // Close the mediasoup Router. this._mediasoupRouter.close(); @@ -116,7 +120,7 @@ class Room extends EventEmitter logger.info('handlePeer() [peer:"%s"]', peer.id); // This will allow reconnects to join despite lock - if (this._peers.has(peer.id)) + if (this._peers[peer.id]) { logger.warn( 'handleConnection() | there is already a peer with same peerId [peer:"%s"]', @@ -168,10 +172,10 @@ class Room extends EventEmitter this._peerJoining(promotedPeer); - this._peers.forEach((peer) => + for (const peer of this._getJoinedPeers()) { this._notification(peer.socket, 'lobby:promotedPeer', { peerId: id }); - }); + } }); this._lobby.on('peerAuthenticated', (peer) => @@ -183,20 +187,20 @@ class Room extends EventEmitter { const { id, displayName } = changedPeer; - this._peers.forEach((peer) => + for (const peer of this._getJoinedPeers()) { this._notification(peer.socket, 'lobby:changeDisplayName', { peerId: id, displayName }); - }); + } }); this._lobby.on('changePicture', (changedPeer) => { const { id, picture } = changedPeer; - this._peers.forEach((peer) => + for (const peer of this._getJoinedPeers()) { this._notification(peer.socket, 'lobby:changePicture', { peerId: id, picture }); - }); + } }); this._lobby.on('peerClosed', (closedPeer) => @@ -205,10 +209,10 @@ class Room extends EventEmitter const { id } = closedPeer; - this._peers.forEach((peer) => + for (const peer of this._getJoinedPeers()) { this._notification(peer.socket, 'lobby:peerClosed', { peerId: id }); - }); + } }); // If nobody left in lobby we should check if room is empty too and initiating @@ -230,22 +234,29 @@ class Room extends EventEmitter const { producer, volume } = volumes[0]; // Notify all Peers. - this._peers.forEach((peer) => + for (const peer of this._getJoinedPeers()) { - this._notification(peer.socket, 'activeSpeaker', { - peerId : producer.appData.peerId, - volume : volume - }); - }); + this._notification( + peer.socket, + 'activeSpeaker', + { + peerId : producer.appData.peerId, + volume : volume + }); + } }); this._audioLevelObserver.on('silence', () => { // Notify all Peers. - this._peers.forEach((peer) => + for (const peer of this._getJoinedPeers()) { - this._notification(peer.socket, 'activeSpeaker', { peerId: null }); - }); + this._notification( + peer.socket, + 'activeSpeaker', + { peerId: null } + ); + } }); } @@ -254,7 +265,7 @@ class Room extends EventEmitter logger.info( 'logStatus() [room id:"%s", peers:"%s"]', this._roomId, - this._peers.size + Object.keys(this._peers).length ); } @@ -262,7 +273,7 @@ class Room extends EventEmitter { return { roomId : this._roomId, - peers : this._peers.size + peers : Object.keys(this._peers).length }; } @@ -295,17 +306,17 @@ class Room extends EventEmitter // checks both room and lobby checkEmpty() { - return this._peers.size === 0; + return Object.keys(this._peers).length === 0; } _parkPeer(parkPeer) { this._lobby.parkPeer(parkPeer); - this._peers.forEach((peer) => + for (const peer of this._getJoinedPeers()) { this._notification(peer.socket, 'parkedPeer', { peerId: parkPeer.id }); - }); + } } _peerJoining(peer) @@ -319,7 +330,7 @@ class Room extends EventEmitter this._lastN.push(peer.id); } - this._peers.set(peer.id, peer); + this._peers[peer.id] = peer; this._handlePeer(peer); this._notification(peer.socket, 'roomReady'); @@ -362,7 +373,7 @@ class Room extends EventEmitter this._lastN.splice(index, 1); } - this._peers.delete(peer.id); + delete this._peers[peer.id]; // If this is the last Peer in the room and // lobby is empty, close the room after a while. @@ -452,41 +463,47 @@ class Room extends EventEmitter // Tell the new Peer about already joined Peers. // And also create Consumers for existing Producers. - const peerInfos = []; + const joinedPeers = + [ + ...this._getJoinedPeers() + ]; - this._peers.forEach((joinedPeer) => - { - if (joinedPeer.joined) - { - peerInfos.push(joinedPeer.peerInfo); - - joinedPeer.producers.forEach((producer) => - { - this._createConsumer( - { - consumerPeer : peer, - producerPeer : joinedPeer, - producer - }); - }); - } - }); + const peerInfos = joinedPeers + .filter((joinedPeer) => joinedPeer.id !== peer.id) + .map((joinedPeer) => (joinedPeer.peerInfo)); cb(null, { peers: peerInfos }); // Mark the new Peer as joined. peer.joined = true; - this._notification( - peer.socket, - 'newPeer', + for (const joinedPeer of joinedPeers) + { + // Create Consumers for existing Producers. + for (const producer of joinedPeer.producers.values()) { - id : peer.id, - displayName : displayName, - picture : picture - }, - true - ); + this._createConsumer( + { + consumerPeer : peer, + producerPeer : joinedPeer, + producer + }); + } + } + + // Notify the new Peer to all other Peers. + for (const otherPeer of this._getJoinedPeers({ excludePeer: peer })) + { + this._notification( + otherPeer.socket, + 'newPeer', + { + id : peer.id, + displayName : displayName, + picture : picture + } + ); + } logger.debug( 'peer joined [peer: "%s", displayName: "%s", picture: "%s"]', @@ -501,20 +518,28 @@ class Room extends EventEmitter // initiate mediasoup Transports and be ready when he later joins. const { forceTcp, producing, consuming } = request.data; - const { - maxIncomingBitrate, - initialAvailableOutgoingBitrate - } = config.mediasoup.webRtcTransport; + + const webRtcTransportOptions = + { + ...config.mediasoup.webRtcTransport, + appData : { producing, consuming } + }; + + if (forceTcp) + { + webRtcTransportOptions.enableUdp = false; + webRtcTransportOptions.enableTcp = true; + } const transport = await this._mediasoupRouter.createWebRtcTransport( - { - listenIps : config.mediasoup.webRtcTransport.listenIps, - enableUdp : !forceTcp, - enableTcp : true, - preferUdp : true, - initialAvailableOutgoingBitrate, - appData : { producing, consuming } - }); + webRtcTransportOptions + ); + + transport.on('dtlsstatechange', (dtlsState) => + { + if (dtlsState === 'failed' || dtlsState === 'closed') + logger.warn('WebRtcTransport "dtlsstatechange" event [dtlsState:%s]', dtlsState); + }); // Store the WebRtcTransport into the Peer data Object. peer.addTransport(transport.id, transport); @@ -528,6 +553,8 @@ class Room extends EventEmitter dtlsParameters : transport.dtlsParameters }); + const { maxIncomingBitrate } = config.mediasoup.webRtcTransport; + // If set, apply max incoming bitrate limit. if (maxIncomingBitrate) { @@ -606,18 +633,16 @@ class Room extends EventEmitter cb(null, { id: producer.id }); - this._peers.forEach((otherPeer) => + // Optimization: Create a server-side Consumer for each Peer. + for (const otherPeer of this._getJoinedPeers({ excludePeer: peer })) { - if (otherPeer.joined && otherPeer !== peer) - { - this._createConsumer( - { - consumerPeer : otherPeer, - producerPeer : peer, - producer - }); - } - }); + this._createConsumer( + { + consumerPeer : otherPeer, + producerPeer : peer, + producer + }); + } // Add into the audioLevelObserver. if (kind === 'audio') @@ -746,6 +771,25 @@ class Room extends EventEmitter break; } + case 'setConsumerPriority': + { + // Ensure the Peer is joined. + if (!peer.joined) + throw new Error('Peer not yet joined'); + + const { consumerId, priority } = request.data; + const consumer = peer.getConsumer(consumerId); + + if (!consumer) + throw new Error(`consumer with id "${consumerId}" not found`); + + await consumer.setPriority(priority); + + cb(); + + break; + } + case 'requestConsumerKeyFrame': { // Ensure the Peer is joined. @@ -1149,7 +1193,7 @@ class Room extends EventEmitter 'newConsumer', { peerId : producerPeer.id, - kind : producer.kind, + kind : consumer.kind, producerId : producer.id, id : consumer.id, rtpParameters : consumer.rtpParameters, @@ -1161,8 +1205,7 @@ class Room extends EventEmitter // Now that we got the positive response from the remote Peer and, if // video, resume the Consumer to ask for an efficient key frame. - if (producer.kind === 'video') - await consumer.resume(); + await consumer.resume(); this._notification( consumerPeer.socket, @@ -1179,6 +1222,15 @@ class Room extends EventEmitter } } + /** + * Helper to get the list of joined peers. + */ + _getJoinedPeers({ excludePeer = undefined } = {}) + { + return Object.values(this._peers) + .filter((peer) => peer.joined && peer !== excludePeer); + } + _timeoutCallback(callback) { let called = false;