diff --git a/server/config/config.example.js b/server/config/config.example.js index 740a9ae..3541327 100644 --- a/server/config/config.example.js +++ b/server/config/config.example.js @@ -60,6 +60,8 @@ module.exports = // When truthy, the room will be open to all users when the first // authenticated user has already joined the room. activateOnHostJoin : true, + // Room size before spreading to new router + routerScaleSize : 20, // Mediasoup settings mediasoup : { diff --git a/server/lib/Room.js b/server/lib/Room.js index 23f6d53..2dc2368 100644 --- a/server/lib/Room.js +++ b/server/lib/Room.js @@ -5,7 +5,7 @@ const config = require('../config/config'); const logger = new Logger('Room'); -const ROUTER_SCALE_SIZE = 40; +const ROUTER_SCALE_SIZE = config.routerScaleSize || 20; class Room extends EventEmitter { @@ -25,23 +25,30 @@ class Room extends EventEmitter // Router media codecs. const mediaCodecs = config.mediasoup.router.mediaCodecs; - const mediasoupRouters = []; + const mediasoupRouters = new Map(); + + let firstRouter = null; for (const worker of mediasoupWorkers) { const router = await worker.createRouter({ mediaCodecs }); - mediasoupRouters.push(router); + if (!firstRouter) + firstRouter = router; + + mediasoupRouters.set(router.id, router); } // Create a mediasoup AudioLevelObserver on first router - const audioLevelObserver = await mediasoupRouters[0].createAudioLevelObserver( + const audioLevelObserver = await firstRouter.createAudioLevelObserver( { maxEntries : 1, threshold : -80, interval : 800 }); + firstRouter = null; + return new Room({ roomId, mediasoupRouters, audioLevelObserver }); } @@ -81,6 +88,11 @@ class Room extends EventEmitter // Array of mediasoup Router instances. this._mediasoupRouters = mediasoupRouters; + // The router we are currently putting peers in + this._routerIterator = this._mediasoupRouters.values(); + + this._currentRouter = this._routerIterator.next().value; + // mediasoup AudioLevelObserver. this._audioLevelObserver = audioLevelObserver; @@ -102,8 +114,14 @@ class Room extends EventEmitter this._closed = true; + this._chatHistory = null; + + this._fileHistory = null; + this._lobby.close(); + this._lobby = null; + // Close the peers. for (const peer in this._peers) { @@ -117,11 +135,19 @@ class Room extends EventEmitter this._peers = null; // Close the mediasoup Routers. - for (const router of this._mediasoupRouters) + for (const router of this._mediasoupRouters.values()) { router.close(); } + this._routerIterator = null; + + this._currentRouter = null; + + this._mediasoupRouters.clear(); + + this._audioLevelObserver = null; + // Emit 'close' event. this.emit('close'); } @@ -330,7 +356,7 @@ class Room extends EventEmitter } } - _peerJoining(peer) + async _peerJoining(peer) { peer.socket.join(this._roomId); @@ -343,8 +369,8 @@ class Room extends EventEmitter this._peers[peer.id] = peer; - // Assign least loaded router - peer.routerId = this._getLeastLoadedRouter(); + // Assign routerId + peer.routerId = await this._getRouterId(); this._handlePeer(peer); this._notification(peer.socket, 'roomReady'); @@ -428,7 +454,7 @@ class Room extends EventEmitter async _handleSocketRequest(peer, request, cb) { const router = - this._mediasoupRouters.find((peerRouter) => peerRouter.id === peer.routerId); + this._mediasoupRouters.get(peer.routerId); switch (request.method) { @@ -632,9 +658,11 @@ class Room extends EventEmitter const producer = await transport.produce({ kind, rtpParameters, appData }); - for (const destinationRouter of this._mediasoupRouters) + const pipeRouters = this._getRoutersToPipeTo(peer.routerId); + + for (const [ routerId, destinationRouter ] of this._mediasoupRouters) { - if (destinationRouter !== router) + if (pipeRouters.includes(routerId)) { await router.pipeToRouter({ producerId : producer.id, @@ -1117,8 +1145,7 @@ class Room extends EventEmitter producer.id ); - const router = this._mediasoupRouters.find((producerRouter) => - producerRouter.id === producerPeer.routerId); + const router = this._mediasoupRouters.get(producerPeer.routerId); // Optimization: // - Create the server-side Consumer. If video, do it paused. @@ -1324,19 +1351,77 @@ class Room extends EventEmitter } } + async _pipeProducersToNewRouter() + { + const peersToPipe = + Object.values(this._peers) + .filter((peer) => peer.routerId !== this._currentRouter.id); + + for (const peer of peersToPipe) + { + const srcRouter = this._mediasoupRouters.get(peer.routerId); + + for (const producerId of peer.producers.keys()) + { + await srcRouter.pipeToRouter({ + producerId, + router : this._currentRouter + }); + } + } + } + + async _getRouterId() + { + if (this._currentRouter) + { + const routerLoad = + Object.values(this._peers) + .filter((peer) => peer.routerId === this._currentRouter.id).length; + + if (routerLoad >= ROUTER_SCALE_SIZE) + { + this._currentRouter = this._routerIterator.next().value; + + if (this._currentRouter) + { + await this._pipeProducersToNewRouter(); + + return this._currentRouter.id; + } + } + else + { + return this._currentRouter.id; + } + } + + return this._getLeastLoadedRouter(); + } + + // Returns an array of router ids we need to pipe to + _getRoutersToPipeTo(originRouterId) + { + return Object.values(this._peers) + .map((peer) => peer.routerId) + .filter((routerId, index, self) => + routerId !== originRouterId && self.indexOf(routerId) === index + ); + } + _getLeastLoadedRouter() { let load = Infinity; let id; - for (const router of this._mediasoupRouters) + for (const routerId of this._mediasoupRouters.keys()) { const routerLoad = - Object.values(this._peers).filter((peer) => peer.routerId === router.id).length; + Object.values(this._peers).filter((peer) => peer.routerId === routerId).length; if (routerLoad < load) { - id = router.id; + id = routerId; load = routerLoad; } }