diff --git a/server/lib/Peer.js b/server/lib/Peer.js index cce62aa..6f886a6 100644 --- a/server/lib/Peer.js +++ b/server/lib/Peer.js @@ -30,6 +30,8 @@ class Peer extends EventEmitter this._email = null; + this._routerId = null; + this._rtpCapabilities = null; this._raisedHand = false; @@ -227,6 +229,16 @@ class Peer extends EventEmitter this._email = email; } + get routerId() + { + return this._routerId; + } + + set routerId(routerId) + { + this._routerId = routerId; + } + get rtpCapabilities() { return this._rtpCapabilities; diff --git a/server/lib/Room.js b/server/lib/Room.js index f25f31d..516af2a 100644 --- a/server/lib/Room.js +++ b/server/lib/Room.js @@ -12,32 +12,38 @@ class Room extends EventEmitter * * @async * - * @param {mediasoup.Worker} mediasoupWorker - The mediasoup Worker in which a new + * @param {mediasoup.Worker} mediasoupWorkers - The mediasoup Worker in which a new * mediasoup Router must be created. * @param {String} roomId - Id of the Room instance. */ - static async create({ mediasoupWorker, roomId }) + static async create({ mediasoupWorkers, roomId }) { logger.info('create() [roomId:"%s"]', roomId); // Router media codecs. const mediaCodecs = config.mediasoup.router.mediaCodecs; - // Create a mediasoup Router. - const mediasoupRouter = await mediasoupWorker.createRouter({ mediaCodecs }); + const mediasoupRouters = []; - // Create a mediasoup AudioLevelObserver. - const audioLevelObserver = await mediasoupRouter.createAudioLevelObserver( + for (const worker of mediasoupWorkers) + { + const router = await worker.createRouter({ mediaCodecs }); + + mediasoupRouters.push(router); + } + + // Create a mediasoup AudioLevelObserver on first router + const audioLevelObserver = await mediasoupRouters[0].createAudioLevelObserver( { maxEntries : 1, threshold : -80, interval : 800 }); - return new Room({ roomId, mediasoupRouter, audioLevelObserver }); + return new Room({ roomId, mediasoupRouters, audioLevelObserver }); } - constructor({ roomId, mediasoupRouter, audioLevelObserver }) + constructor({ roomId, mediasoupRouters, audioLevelObserver }) { logger.info('constructor() [roomId:"%s"]', roomId); @@ -70,8 +76,8 @@ class Room extends EventEmitter this._peers = {}; - // mediasoup Router instance. - this._mediasoupRouter = mediasoupRouter; + // Array of mediasoup Router instances. + this._mediasoupRouters = mediasoupRouters; // mediasoup AudioLevelObserver. this._audioLevelObserver = audioLevelObserver; @@ -108,8 +114,11 @@ class Room extends EventEmitter this._peers = null; - // Close the mediasoup Router. - this._mediasoupRouter.close(); + // Close the mediasoup Routers. + for (const router of this._mediasoupRouters) + { + router.close(); + } // Emit 'close' event. this.emit('close'); @@ -332,6 +341,9 @@ class Room extends EventEmitter this._peers[peer.id] = peer; + // Assign least loaded router + peer.routerId = this._getLeastLoadedRouter(); + this._handlePeer(peer); this._notification(peer.socket, 'roomReady'); } @@ -413,11 +425,14 @@ class Room extends EventEmitter async _handleSocketRequest(peer, request, cb) { + const router = + this._mediasoupRouters.find((peerRouter) => peerRouter.id === peer.routerId); + switch (request.method) { case 'getRouterRtpCapabilities': { - cb(null, this._mediasoupRouter.rtpCapabilities); + cb(null, router.rtpCapabilities); break; } @@ -531,7 +546,7 @@ class Room extends EventEmitter webRtcTransportOptions.enableTcp = true; } - const transport = await this._mediasoupRouter.createWebRtcTransport( + const transport = await router.createWebRtcTransport( webRtcTransportOptions ); @@ -615,6 +630,17 @@ class Room extends EventEmitter const producer = await transport.produce({ kind, rtpParameters, appData }); + for (const destinationRouter of this._mediasoupRouters) + { + if (destinationRouter !== router) + { + await router.pipeToRouter({ + producerId : producer.id, + router : destinationRouter + }); + } + } + // Store the Producer into the Peer data Object. peer.addProducer(producer.id, producer); @@ -1089,6 +1115,9 @@ class Room extends EventEmitter producer.id ); + const router = this._mediasoupRouters.find((producerRouter) => + producerRouter.id === producerPeer.routerId); + // Optimization: // - Create the server-side Consumer. If video, do it paused. // - Tell its Peer about it and wait for its response. @@ -1099,7 +1128,7 @@ class Room extends EventEmitter // NOTE: Don't create the Consumer if the remote Peer cannot consume it. if ( !consumerPeer.rtpCapabilities || - !this._mediasoupRouter.canConsume( + !router.canConsume( { producerId : producer.id, rtpCapabilities : consumerPeer.rtpCapabilities @@ -1292,6 +1321,26 @@ class Room extends EventEmitter socket.emit('notification', { method, data }); } } + + _getLeastLoadedRouter() + { + let load = Infinity; + let id; + + for (const router of this._mediasoupRouters) + { + const routerLoad = + Object.values(this._peers).filter((peer) => peer.routerId === router.id).length; + + if (routerLoad < load) + { + id = router.id; + load = routerLoad; + } + } + + return id; + } } module.exports = Room; diff --git a/server/server.js b/server/server.js index 8faa837..e4f868a 100755 --- a/server/server.js +++ b/server/server.js @@ -570,9 +570,9 @@ async function getOrCreateRoom({ roomId }) { logger.info('creating a new Room [roomId:"%s"]', roomId); - const mediasoupWorker = getMediasoupWorker(); + // const mediasoupWorker = getMediasoupWorker(); - room = await Room.create({ mediasoupWorker, roomId }); + room = await Room.create({ mediasoupWorkers, roomId }); rooms.set(roomId, room);