master
Håvar Aambø Fosstveit 2020-03-19 21:21:01 +01:00
parent 545beb70ec
commit 7a884edcdd
2 changed files with 140 additions and 86 deletions

View File

@ -118,8 +118,10 @@ module.exports =
// Can have multiple listening interfaces // Can have multiple listening interfaces
// { ip: '::/0', announcedIp: null } // { ip: '::/0', announcedIp: null }
], ],
maxIncomingBitrate : 1500000, initialAvailableOutgoingBitrate : 1000000,
initialAvailableOutgoingBitrate : 1000000 minimumAvailableOutgoingBitrate : 600000,
// Additional options that are not part of WebRtcTransportOptions.
maxIncomingBitrate : 1500000
} }
} }
}; };

View File

@ -68,7 +68,7 @@ class Room extends EventEmitter
this._lastN = []; this._lastN = [];
this._peers = new Map(); this._peers = {};
// mediasoup Router instance. // mediasoup Router instance.
this._mediasoupRouter = mediasoupRouter; this._mediasoupRouter = mediasoupRouter;
@ -96,13 +96,17 @@ class Room extends EventEmitter
this._lobby.close(); this._lobby.close();
this._peers.forEach((peer) => // Close the peers.
for (const peer in this._peers)
{ {
if (!peer.closed) if (Object.prototype.hasOwnProperty.call(this._peers, peer))
peer.close(); {
}); if (!peer.closed)
peer.close();
}
}
this._peers.clear(); this._peers = null;
// Close the mediasoup Router. // Close the mediasoup Router.
this._mediasoupRouter.close(); this._mediasoupRouter.close();
@ -116,7 +120,7 @@ class Room extends EventEmitter
logger.info('handlePeer() [peer:"%s"]', peer.id); logger.info('handlePeer() [peer:"%s"]', peer.id);
// This will allow reconnects to join despite lock // This will allow reconnects to join despite lock
if (this._peers.has(peer.id)) if (this._peers[peer.id])
{ {
logger.warn( logger.warn(
'handleConnection() | there is already a peer with same peerId [peer:"%s"]', 'handleConnection() | there is already a peer with same peerId [peer:"%s"]',
@ -168,10 +172,10 @@ class Room extends EventEmitter
this._peerJoining(promotedPeer); this._peerJoining(promotedPeer);
this._peers.forEach((peer) => for (const peer of this._getJoinedPeers())
{ {
this._notification(peer.socket, 'lobby:promotedPeer', { peerId: id }); this._notification(peer.socket, 'lobby:promotedPeer', { peerId: id });
}); }
}); });
this._lobby.on('peerAuthenticated', (peer) => this._lobby.on('peerAuthenticated', (peer) =>
@ -183,20 +187,20 @@ class Room extends EventEmitter
{ {
const { id, displayName } = changedPeer; const { id, displayName } = changedPeer;
this._peers.forEach((peer) => for (const peer of this._getJoinedPeers())
{ {
this._notification(peer.socket, 'lobby:changeDisplayName', { peerId: id, displayName }); this._notification(peer.socket, 'lobby:changeDisplayName', { peerId: id, displayName });
}); }
}); });
this._lobby.on('changePicture', (changedPeer) => this._lobby.on('changePicture', (changedPeer) =>
{ {
const { id, picture } = changedPeer; const { id, picture } = changedPeer;
this._peers.forEach((peer) => for (const peer of this._getJoinedPeers())
{ {
this._notification(peer.socket, 'lobby:changePicture', { peerId: id, picture }); this._notification(peer.socket, 'lobby:changePicture', { peerId: id, picture });
}); }
}); });
this._lobby.on('peerClosed', (closedPeer) => this._lobby.on('peerClosed', (closedPeer) =>
@ -205,10 +209,10 @@ class Room extends EventEmitter
const { id } = closedPeer; const { id } = closedPeer;
this._peers.forEach((peer) => for (const peer of this._getJoinedPeers())
{ {
this._notification(peer.socket, 'lobby:peerClosed', { peerId: id }); this._notification(peer.socket, 'lobby:peerClosed', { peerId: id });
}); }
}); });
// If nobody left in lobby we should check if room is empty too and initiating // If nobody left in lobby we should check if room is empty too and initiating
@ -230,22 +234,29 @@ class Room extends EventEmitter
const { producer, volume } = volumes[0]; const { producer, volume } = volumes[0];
// Notify all Peers. // Notify all Peers.
this._peers.forEach((peer) => for (const peer of this._getJoinedPeers())
{ {
this._notification(peer.socket, 'activeSpeaker', { this._notification(
peerId : producer.appData.peerId, peer.socket,
volume : volume 'activeSpeaker',
}); {
}); peerId : producer.appData.peerId,
volume : volume
});
}
}); });
this._audioLevelObserver.on('silence', () => this._audioLevelObserver.on('silence', () =>
{ {
// Notify all Peers. // Notify all Peers.
this._peers.forEach((peer) => for (const peer of this._getJoinedPeers())
{ {
this._notification(peer.socket, 'activeSpeaker', { peerId: null }); this._notification(
}); peer.socket,
'activeSpeaker',
{ peerId: null }
);
}
}); });
} }
@ -254,7 +265,7 @@ class Room extends EventEmitter
logger.info( logger.info(
'logStatus() [room id:"%s", peers:"%s"]', 'logStatus() [room id:"%s", peers:"%s"]',
this._roomId, this._roomId,
this._peers.size Object.keys(this._peers).length
); );
} }
@ -262,7 +273,7 @@ class Room extends EventEmitter
{ {
return { return {
roomId : this._roomId, roomId : this._roomId,
peers : this._peers.size peers : Object.keys(this._peers).length
}; };
} }
@ -295,17 +306,17 @@ class Room extends EventEmitter
// checks both room and lobby // checks both room and lobby
checkEmpty() checkEmpty()
{ {
return this._peers.size === 0; return Object.keys(this._peers).length === 0;
} }
_parkPeer(parkPeer) _parkPeer(parkPeer)
{ {
this._lobby.parkPeer(parkPeer); this._lobby.parkPeer(parkPeer);
this._peers.forEach((peer) => for (const peer of this._getJoinedPeers())
{ {
this._notification(peer.socket, 'parkedPeer', { peerId: parkPeer.id }); this._notification(peer.socket, 'parkedPeer', { peerId: parkPeer.id });
}); }
} }
_peerJoining(peer) _peerJoining(peer)
@ -319,7 +330,7 @@ class Room extends EventEmitter
this._lastN.push(peer.id); this._lastN.push(peer.id);
} }
this._peers.set(peer.id, peer); this._peers[peer.id] = peer;
this._handlePeer(peer); this._handlePeer(peer);
this._notification(peer.socket, 'roomReady'); this._notification(peer.socket, 'roomReady');
@ -362,7 +373,7 @@ class Room extends EventEmitter
this._lastN.splice(index, 1); this._lastN.splice(index, 1);
} }
this._peers.delete(peer.id); delete this._peers[peer.id];
// If this is the last Peer in the room and // If this is the last Peer in the room and
// lobby is empty, close the room after a while. // lobby is empty, close the room after a while.
@ -452,41 +463,47 @@ class Room extends EventEmitter
// Tell the new Peer about already joined Peers. // Tell the new Peer about already joined Peers.
// And also create Consumers for existing Producers. // And also create Consumers for existing Producers.
const peerInfos = []; const joinedPeers =
[
...this._getJoinedPeers()
];
this._peers.forEach((joinedPeer) => const peerInfos = joinedPeers
{ .filter((joinedPeer) => joinedPeer.id !== peer.id)
if (joinedPeer.joined) .map((joinedPeer) => (joinedPeer.peerInfo));
{
peerInfos.push(joinedPeer.peerInfo);
joinedPeer.producers.forEach((producer) =>
{
this._createConsumer(
{
consumerPeer : peer,
producerPeer : joinedPeer,
producer
});
});
}
});
cb(null, { peers: peerInfos }); cb(null, { peers: peerInfos });
// Mark the new Peer as joined. // Mark the new Peer as joined.
peer.joined = true; peer.joined = true;
this._notification( for (const joinedPeer of joinedPeers)
peer.socket, {
'newPeer', // Create Consumers for existing Producers.
for (const producer of joinedPeer.producers.values())
{ {
id : peer.id, this._createConsumer(
displayName : displayName, {
picture : picture consumerPeer : peer,
}, producerPeer : joinedPeer,
true producer
); });
}
}
// Notify the new Peer to all other Peers.
for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
{
this._notification(
otherPeer.socket,
'newPeer',
{
id : peer.id,
displayName : displayName,
picture : picture
}
);
}
logger.debug( logger.debug(
'peer joined [peer: "%s", displayName: "%s", picture: "%s"]', 'peer joined [peer: "%s", displayName: "%s", picture: "%s"]',
@ -501,20 +518,28 @@ class Room extends EventEmitter
// initiate mediasoup Transports and be ready when he later joins. // initiate mediasoup Transports and be ready when he later joins.
const { forceTcp, producing, consuming } = request.data; const { forceTcp, producing, consuming } = request.data;
const {
maxIncomingBitrate, const webRtcTransportOptions =
initialAvailableOutgoingBitrate {
} = config.mediasoup.webRtcTransport; ...config.mediasoup.webRtcTransport,
appData : { producing, consuming }
};
if (forceTcp)
{
webRtcTransportOptions.enableUdp = false;
webRtcTransportOptions.enableTcp = true;
}
const transport = await this._mediasoupRouter.createWebRtcTransport( const transport = await this._mediasoupRouter.createWebRtcTransport(
{ webRtcTransportOptions
listenIps : config.mediasoup.webRtcTransport.listenIps, );
enableUdp : !forceTcp,
enableTcp : true, transport.on('dtlsstatechange', (dtlsState) =>
preferUdp : true, {
initialAvailableOutgoingBitrate, if (dtlsState === 'failed' || dtlsState === 'closed')
appData : { producing, consuming } logger.warn('WebRtcTransport "dtlsstatechange" event [dtlsState:%s]', dtlsState);
}); });
// Store the WebRtcTransport into the Peer data Object. // Store the WebRtcTransport into the Peer data Object.
peer.addTransport(transport.id, transport); peer.addTransport(transport.id, transport);
@ -528,6 +553,8 @@ class Room extends EventEmitter
dtlsParameters : transport.dtlsParameters dtlsParameters : transport.dtlsParameters
}); });
const { maxIncomingBitrate } = config.mediasoup.webRtcTransport;
// If set, apply max incoming bitrate limit. // If set, apply max incoming bitrate limit.
if (maxIncomingBitrate) if (maxIncomingBitrate)
{ {
@ -606,18 +633,16 @@ class Room extends EventEmitter
cb(null, { id: producer.id }); cb(null, { id: producer.id });
this._peers.forEach((otherPeer) => // Optimization: Create a server-side Consumer for each Peer.
for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
{ {
if (otherPeer.joined && otherPeer !== peer) this._createConsumer(
{ {
this._createConsumer( consumerPeer : otherPeer,
{ producerPeer : peer,
consumerPeer : otherPeer, producer
producerPeer : peer, });
producer }
});
}
});
// Add into the audioLevelObserver. // Add into the audioLevelObserver.
if (kind === 'audio') if (kind === 'audio')
@ -746,6 +771,25 @@ class Room extends EventEmitter
break; break;
} }
case 'setConsumerPriority':
{
// Ensure the Peer is joined.
if (!peer.joined)
throw new Error('Peer not yet joined');
const { consumerId, priority } = request.data;
const consumer = peer.getConsumer(consumerId);
if (!consumer)
throw new Error(`consumer with id "${consumerId}" not found`);
await consumer.setPriority(priority);
cb();
break;
}
case 'requestConsumerKeyFrame': case 'requestConsumerKeyFrame':
{ {
// Ensure the Peer is joined. // Ensure the Peer is joined.
@ -1149,7 +1193,7 @@ class Room extends EventEmitter
'newConsumer', 'newConsumer',
{ {
peerId : producerPeer.id, peerId : producerPeer.id,
kind : producer.kind, kind : consumer.kind,
producerId : producer.id, producerId : producer.id,
id : consumer.id, id : consumer.id,
rtpParameters : consumer.rtpParameters, rtpParameters : consumer.rtpParameters,
@ -1161,8 +1205,7 @@ class Room extends EventEmitter
// Now that we got the positive response from the remote Peer and, if // Now that we got the positive response from the remote Peer and, if
// video, resume the Consumer to ask for an efficient key frame. // video, resume the Consumer to ask for an efficient key frame.
if (producer.kind === 'video') await consumer.resume();
await consumer.resume();
this._notification( this._notification(
consumerPeer.socket, consumerPeer.socket,
@ -1179,6 +1222,15 @@ class Room extends EventEmitter
} }
} }
/**
* Helper to get the list of joined peers.
*/
_getJoinedPeers({ excludePeer = undefined } = {})
{
return Object.values(this._peers)
.filter((peer) => peer.joined && peer !== excludePeer);
}
_timeoutCallback(callback) _timeoutCallback(callback)
{ {
let called = false; let called = false;