All peers enter the same router up to config.routerScaleSize. Then go to the next one, and keep going until all routers are filled up to config.routerScaleSize. After that simple put peers into routers with least peers.

auto_join_3.3
Håvar Aambø Fosstveit 2020-05-04 23:33:51 +02:00
parent 698a57cb3e
commit 381f9cd733
2 changed files with 103 additions and 16 deletions

View File

@ -60,6 +60,8 @@ module.exports =
// When truthy, the room will be open to all users when the first // When truthy, the room will be open to all users when the first
// authenticated user has already joined the room. // authenticated user has already joined the room.
activateOnHostJoin : true, activateOnHostJoin : true,
// Room size before spreading to new router
routerScaleSize : 20,
// Mediasoup settings // Mediasoup settings
mediasoup : mediasoup :
{ {

View File

@ -5,7 +5,7 @@ const config = require('../config/config');
const logger = new Logger('Room'); const logger = new Logger('Room');
const ROUTER_SCALE_SIZE = 40; const ROUTER_SCALE_SIZE = config.routerScaleSize || 20;
class Room extends EventEmitter class Room extends EventEmitter
{ {
@ -25,23 +25,30 @@ class Room extends EventEmitter
// Router media codecs. // Router media codecs.
const mediaCodecs = config.mediasoup.router.mediaCodecs; const mediaCodecs = config.mediasoup.router.mediaCodecs;
const mediasoupRouters = []; const mediasoupRouters = new Map();
let firstRouter = null;
for (const worker of mediasoupWorkers) for (const worker of mediasoupWorkers)
{ {
const router = await worker.createRouter({ mediaCodecs }); const router = await worker.createRouter({ mediaCodecs });
mediasoupRouters.push(router); if (!firstRouter)
firstRouter = router;
mediasoupRouters.set(router.id, router);
} }
// Create a mediasoup AudioLevelObserver on first router // Create a mediasoup AudioLevelObserver on first router
const audioLevelObserver = await mediasoupRouters[0].createAudioLevelObserver( const audioLevelObserver = await firstRouter.createAudioLevelObserver(
{ {
maxEntries : 1, maxEntries : 1,
threshold : -80, threshold : -80,
interval : 800 interval : 800
}); });
firstRouter = null;
return new Room({ roomId, mediasoupRouters, audioLevelObserver }); return new Room({ roomId, mediasoupRouters, audioLevelObserver });
} }
@ -81,6 +88,11 @@ class Room extends EventEmitter
// Array of mediasoup Router instances. // Array of mediasoup Router instances.
this._mediasoupRouters = mediasoupRouters; this._mediasoupRouters = mediasoupRouters;
// The router we are currently putting peers in
this._routerIterator = this._mediasoupRouters.values();
this._currentRouter = this._routerIterator.next().value;
// mediasoup AudioLevelObserver. // mediasoup AudioLevelObserver.
this._audioLevelObserver = audioLevelObserver; this._audioLevelObserver = audioLevelObserver;
@ -102,8 +114,14 @@ class Room extends EventEmitter
this._closed = true; this._closed = true;
this._chatHistory = null;
this._fileHistory = null;
this._lobby.close(); this._lobby.close();
this._lobby = null;
// Close the peers. // Close the peers.
for (const peer in this._peers) for (const peer in this._peers)
{ {
@ -117,11 +135,19 @@ class Room extends EventEmitter
this._peers = null; this._peers = null;
// Close the mediasoup Routers. // Close the mediasoup Routers.
for (const router of this._mediasoupRouters) for (const router of this._mediasoupRouters.values())
{ {
router.close(); router.close();
} }
this._routerIterator = null;
this._currentRouter = null;
this._mediasoupRouters.clear();
this._audioLevelObserver = null;
// Emit 'close' event. // Emit 'close' event.
this.emit('close'); this.emit('close');
} }
@ -330,7 +356,7 @@ class Room extends EventEmitter
} }
} }
_peerJoining(peer) async _peerJoining(peer)
{ {
peer.socket.join(this._roomId); peer.socket.join(this._roomId);
@ -343,8 +369,8 @@ class Room extends EventEmitter
this._peers[peer.id] = peer; this._peers[peer.id] = peer;
// Assign least loaded router // Assign routerId
peer.routerId = this._getLeastLoadedRouter(); peer.routerId = await this._getRouterId();
this._handlePeer(peer); this._handlePeer(peer);
this._notification(peer.socket, 'roomReady'); this._notification(peer.socket, 'roomReady');
@ -428,7 +454,7 @@ class Room extends EventEmitter
async _handleSocketRequest(peer, request, cb) async _handleSocketRequest(peer, request, cb)
{ {
const router = const router =
this._mediasoupRouters.find((peerRouter) => peerRouter.id === peer.routerId); this._mediasoupRouters.get(peer.routerId);
switch (request.method) switch (request.method)
{ {
@ -632,9 +658,11 @@ class Room extends EventEmitter
const producer = const producer =
await transport.produce({ kind, rtpParameters, appData }); await transport.produce({ kind, rtpParameters, appData });
for (const destinationRouter of this._mediasoupRouters) const pipeRouters = this._getRoutersToPipeTo(peer.routerId);
for (const [ routerId, destinationRouter ] of this._mediasoupRouters)
{ {
if (destinationRouter !== router) if (pipeRouters.includes(routerId))
{ {
await router.pipeToRouter({ await router.pipeToRouter({
producerId : producer.id, producerId : producer.id,
@ -1117,8 +1145,7 @@ class Room extends EventEmitter
producer.id producer.id
); );
const router = this._mediasoupRouters.find((producerRouter) => const router = this._mediasoupRouters.get(producerPeer.routerId);
producerRouter.id === producerPeer.routerId);
// Optimization: // Optimization:
// - Create the server-side Consumer. If video, do it paused. // - Create the server-side Consumer. If video, do it paused.
@ -1324,19 +1351,77 @@ class Room extends EventEmitter
} }
} }
async _pipeProducersToNewRouter()
{
const peersToPipe =
Object.values(this._peers)
.filter((peer) => peer.routerId !== this._currentRouter.id);
for (const peer of peersToPipe)
{
const srcRouter = this._mediasoupRouters.get(peer.routerId);
for (const producerId of peer.producers.keys())
{
await srcRouter.pipeToRouter({
producerId,
router : this._currentRouter
});
}
}
}
async _getRouterId()
{
if (this._currentRouter)
{
const routerLoad =
Object.values(this._peers)
.filter((peer) => peer.routerId === this._currentRouter.id).length;
if (routerLoad >= ROUTER_SCALE_SIZE)
{
this._currentRouter = this._routerIterator.next().value;
if (this._currentRouter)
{
await this._pipeProducersToNewRouter();
return this._currentRouter.id;
}
}
else
{
return this._currentRouter.id;
}
}
return this._getLeastLoadedRouter();
}
// Returns an array of router ids we need to pipe to
_getRoutersToPipeTo(originRouterId)
{
return Object.values(this._peers)
.map((peer) => peer.routerId)
.filter((routerId, index, self) =>
routerId !== originRouterId && self.indexOf(routerId) === index
);
}
_getLeastLoadedRouter() _getLeastLoadedRouter()
{ {
let load = Infinity; let load = Infinity;
let id; let id;
for (const router of this._mediasoupRouters) for (const routerId of this._mediasoupRouters.keys())
{ {
const routerLoad = const routerLoad =
Object.values(this._peers).filter((peer) => peer.routerId === router.id).length; Object.values(this._peers).filter((peer) => peer.routerId === routerId).length;
if (routerLoad < load) if (routerLoad < load)
{ {
id = router.id; id = routerId;
load = routerLoad; load = routerLoad;
} }
} }