Merge branch 'feat-server-scaling' into develop

auto_join_3.3
Håvar Aambø Fosstveit 2020-05-04 23:44:23 +02:00
commit 5af33068b1
4 changed files with 175 additions and 19 deletions

View File

@ -246,9 +246,17 @@ module.exports =
}, },
// When truthy, the room will be open to all users when as long as there // When truthy, the room will be open to all users when as long as there
// are allready users in the room // are allready users in the room
activateOnHostJoin : true, activateOnHostJoin : true,
// If this is set to true, only signed-in users will be able
// to join a room directly. Non-signed-in users (guests) will
// always be put in the lobby regardless of room lock status.
// If false, there is no difference between guests and signed-in
// users when joining.
requireSignInToAccess : true,
// Room size before spreading to new router
routerScaleSize : 20,
// Mediasoup settings // Mediasoup settings
mediasoup : mediasoup :
{ {
numWorkers : Object.keys(os.cpus()).length, numWorkers : Object.keys(os.cpus()).length,
// mediasoup Worker settings. // mediasoup Worker settings.

View File

@ -39,6 +39,8 @@ class Peer extends EventEmitter
this._email = null; this._email = null;
this._routerId = null;
this._rtpCapabilities = null; this._rtpCapabilities = null;
this._raisedHand = false; this._raisedHand = false;
@ -238,6 +240,16 @@ class Peer extends EventEmitter
this._email = email; this._email = email;
} }
get routerId()
{
return this._routerId;
}
set routerId(routerId)
{
this._routerId = routerId;
}
get rtpCapabilities() get rtpCapabilities()
{ {
return this._rtpCapabilities; return this._rtpCapabilities;

View File

@ -31,6 +31,8 @@ const permissionsFromRoles =
...config.permissionsFromRoles ...config.permissionsFromRoles
}; };
const ROUTER_SCALE_SIZE = config.routerScaleSize || 20;
class Room extends EventEmitter class Room extends EventEmitter
{ {
/** /**
@ -38,32 +40,45 @@ class Room extends EventEmitter
* *
* @async * @async
* *
* @param {mediasoup.Worker} mediasoupWorker - The mediasoup Worker in which a new * @param {mediasoup.Worker} mediasoupWorkers - The mediasoup Worker in which a new
* mediasoup Router must be created. * mediasoup Router must be created.
* @param {String} roomId - Id of the Room instance. * @param {String} roomId - Id of the Room instance.
*/ */
static async create({ mediasoupWorker, roomId }) static async create({ mediasoupWorkers, roomId })
{ {
logger.info('create() [roomId:"%s"]', roomId); logger.info('create() [roomId:"%s"]', roomId);
// Router media codecs. // Router media codecs.
const mediaCodecs = config.mediasoup.router.mediaCodecs; const mediaCodecs = config.mediasoup.router.mediaCodecs;
// Create a mediasoup Router. const mediasoupRouters = new Map();
const mediasoupRouter = await mediasoupWorker.createRouter({ mediaCodecs });
// Create a mediasoup AudioLevelObserver. let firstRouter = null;
const audioLevelObserver = await mediasoupRouter.createAudioLevelObserver(
for (const worker of mediasoupWorkers)
{
const router = await worker.createRouter({ mediaCodecs });
if (!firstRouter)
firstRouter = router;
mediasoupRouters.set(router.id, router);
}
// Create a mediasoup AudioLevelObserver on first router
const audioLevelObserver = await firstRouter.createAudioLevelObserver(
{ {
maxEntries : 1, maxEntries : 1,
threshold : -80, threshold : -80,
interval : 800 interval : 800
}); });
return new Room({ roomId, mediasoupRouter, audioLevelObserver }); firstRouter = null;
return new Room({ roomId, mediasoupRouters, audioLevelObserver });
} }
constructor({ roomId, mediasoupRouter, audioLevelObserver }) constructor({ roomId, mediasoupRouters, audioLevelObserver })
{ {
logger.info('constructor() [roomId:"%s"]', roomId); logger.info('constructor() [roomId:"%s"]', roomId);
@ -98,8 +113,13 @@ class Room extends EventEmitter
this._peers = {}; this._peers = {};
// mediasoup Router instance. // Array of mediasoup Router instances.
this._mediasoupRouter = mediasoupRouter; this._mediasoupRouters = mediasoupRouters;
// The router we are currently putting peers in
this._routerIterator = this._mediasoupRouters.values();
this._currentRouter = this._routerIterator.next().value;
// mediasoup AudioLevelObserver. // mediasoup AudioLevelObserver.
this._audioLevelObserver = audioLevelObserver; this._audioLevelObserver = audioLevelObserver;
@ -122,8 +142,14 @@ class Room extends EventEmitter
this._closed = true; this._closed = true;
this._chatHistory = null;
this._fileHistory = null;
this._lobby.close(); this._lobby.close();
this._lobby = null;
// Close the peers. // Close the peers.
for (const peer in this._peers) for (const peer in this._peers)
{ {
@ -133,8 +159,19 @@ class Room extends EventEmitter
this._peers = null; this._peers = null;
// Close the mediasoup Router. // Close the mediasoup Routers.
this._mediasoupRouter.close(); for (const router of this._mediasoupRouters.values())
{
router.close();
}
this._routerIterator = null;
this._currentRouter = null;
this._mediasoupRouters.clear();
this._audioLevelObserver = null;
// Emit 'close' event. // Emit 'close' event.
this.emit('close'); this.emit('close');
@ -401,6 +438,9 @@ class Room extends EventEmitter
this._peers[peer.id] = peer; this._peers[peer.id] = peer;
// Assign routerId
peer.routerId = await this._getRouterId();
this._handlePeer(peer); this._handlePeer(peer);
if (returning) if (returning)
@ -560,11 +600,14 @@ class Room extends EventEmitter
async _handleSocketRequest(peer, request, cb) async _handleSocketRequest(peer, request, cb)
{ {
const router =
this._mediasoupRouters.get(peer.routerId);
switch (request.method) switch (request.method)
{ {
case 'getRouterRtpCapabilities': case 'getRouterRtpCapabilities':
{ {
cb(null, this._mediasoupRouter.rtpCapabilities); cb(null, router.rtpCapabilities);
break; break;
} }
@ -673,7 +716,7 @@ class Room extends EventEmitter
webRtcTransportOptions.enableTcp = true; webRtcTransportOptions.enableTcp = true;
} }
const transport = await this._mediasoupRouter.createWebRtcTransport( const transport = await router.createWebRtcTransport(
webRtcTransportOptions webRtcTransportOptions
); );
@ -772,6 +815,19 @@ class Room extends EventEmitter
const producer = const producer =
await transport.produce({ kind, rtpParameters, appData }); await transport.produce({ kind, rtpParameters, appData });
const pipeRouters = this._getRoutersToPipeTo(peer.routerId);
for (const [ routerId, destinationRouter ] of this._mediasoupRouters)
{
if (pipeRouters.includes(routerId))
{
await router.pipeToRouter({
producerId : producer.id,
router : destinationRouter
});
}
}
// Store the Producer into the Peer data Object. // Store the Producer into the Peer data Object.
peer.addProducer(producer.id, producer); peer.addProducer(producer.id, producer);
@ -1379,6 +1435,8 @@ class Room extends EventEmitter
producer.id producer.id
); );
const router = this._mediasoupRouters.get(producerPeer.routerId);
// Optimization: // Optimization:
// - Create the server-side Consumer. If video, do it paused. // - Create the server-side Consumer. If video, do it paused.
// - Tell its Peer about it and wait for its response. // - Tell its Peer about it and wait for its response.
@ -1389,7 +1447,7 @@ class Room extends EventEmitter
// NOTE: Don't create the Consumer if the remote Peer cannot consume it. // NOTE: Don't create the Consumer if the remote Peer cannot consume it.
if ( if (
!consumerPeer.rtpCapabilities || !consumerPeer.rtpCapabilities ||
!this._mediasoupRouter.canConsume( !router.canConsume(
{ {
producerId : producer.id, producerId : producer.id,
rtpCapabilities : consumerPeer.rtpCapabilities rtpCapabilities : consumerPeer.rtpCapabilities
@ -1601,6 +1659,84 @@ class Room extends EventEmitter
socket.emit('notification', { method, data }); socket.emit('notification', { method, data });
} }
} }
async _pipeProducersToNewRouter()
{
const peersToPipe =
Object.values(this._peers)
.filter((peer) => peer.routerId !== this._currentRouter.id);
for (const peer of peersToPipe)
{
const srcRouter = this._mediasoupRouters.get(peer.routerId);
for (const producerId of peer.producers.keys())
{
await srcRouter.pipeToRouter({
producerId,
router : this._currentRouter
});
}
}
}
async _getRouterId()
{
if (this._currentRouter)
{
const routerLoad =
Object.values(this._peers)
.filter((peer) => peer.routerId === this._currentRouter.id).length;
if (routerLoad >= ROUTER_SCALE_SIZE)
{
this._currentRouter = this._routerIterator.next().value;
if (this._currentRouter)
{
await this._pipeProducersToNewRouter();
return this._currentRouter.id;
}
}
else
{
return this._currentRouter.id;
}
}
return this._getLeastLoadedRouter();
}
// Returns an array of router ids we need to pipe to
_getRoutersToPipeTo(originRouterId)
{
return Object.values(this._peers)
.map((peer) => peer.routerId)
.filter((routerId, index, self) =>
routerId !== originRouterId && self.indexOf(routerId) === index
);
}
_getLeastLoadedRouter()
{
let load = Infinity;
let id;
for (const routerId of this._mediasoupRouters.keys())
{
const routerLoad =
Object.values(this._peers).filter((peer) => peer.routerId === routerId).length;
if (routerLoad < load)
{
id = routerId;
load = routerLoad;
}
}
return id;
}
} }
module.exports = Room; module.exports = Room;

View File

@ -644,9 +644,9 @@ async function getOrCreateRoom({ roomId })
{ {
logger.info('creating a new Room [roomId:"%s"]', roomId); logger.info('creating a new Room [roomId:"%s"]', roomId);
const mediasoupWorker = getMediasoupWorker(); // const mediasoupWorker = getMediasoupWorker();
room = await Room.create({ mediasoupWorker, roomId }); room = await Room.create({ mediasoupWorkers, roomId });
rooms.set(roomId, room); rooms.set(roomId, room);