multiparty-meeting/server/lib/Room.js

600 lines
12 KiB
JavaScript

'use strict';
const EventEmitter = require('events').EventEmitter;
const protooServer = require('protoo-server');
const Logger = require('./Logger');
const config = require('../config');
const MAX_BITRATE = config.mediasoup.maxBitrate || 1000000;
const MIN_BITRATE = Math.min(50000, MAX_BITRATE);
const BITRATE_FACTOR = 0.75;
const logger = new Logger('Room');
class Room extends EventEmitter
{
constructor(roomId, mediaServer)
{
logger.info('constructor() [roomId:"%s"]', roomId);
super();
this.setMaxListeners(Infinity);
// Room ID.
this._roomId = roomId;
// Closed flag.
this._closed = false;
this._chatHistory = [];
try
{
// Protoo Room instance.
this._protooRoom = new protooServer.Room();
// mediasoup Room instance.
this._mediaRoom = mediaServer.Room(config.mediasoup.mediaCodecs);
}
catch (error)
{
this.close();
throw error;
}
// Current max bitrate for all the participants.
this._maxBitrate = MAX_BITRATE;
// Current active speaker.
// @type {mediasoup.Peer}
this._currentActiveSpeaker = null;
this._handleMediaRoom();
}
get id()
{
return this._roomId;
}
close()
{
logger.debug('close()');
this._closed = true;
// Close the protoo Room.
if (this._protooRoom)
this._protooRoom.close();
// Close the mediasoup Room.
if (this._mediaRoom)
this._mediaRoom.close();
// Emit 'close' event.
this.emit('close');
}
logStatus()
{
if (!this._mediaRoom)
return;
logger.info(
'logStatus() [room id:"%s", protoo peers:%s, mediasoup peers:%s]',
this._roomId,
this._protooRoom.peers.length,
this._mediaRoom.peers.length);
}
handleConnection(peerName, transport)
{
logger.info('handleConnection() [peerName:"%s"]', peerName);
if (this._protooRoom.hasPeer(peerName))
{
logger.warn(
'handleConnection() | there is already a peer with same peerName, ' +
'closing the previous one [peerName:"%s"]',
peerName);
const protooPeer = this._protooRoom.getPeer(peerName);
protooPeer.close();
}
const protooPeer = this._protooRoom.createPeer(peerName, transport);
this._handleProtooPeer(protooPeer);
}
_handleMediaRoom()
{
logger.debug('_handleMediaRoom()');
const activeSpeakerDetector = this._mediaRoom.createActiveSpeakerDetector();
activeSpeakerDetector.on('activespeakerchange', (activePeer) =>
{
if (activePeer)
{
logger.info('new active speaker [peerName:"%s"]', activePeer.name);
this._currentActiveSpeaker = activePeer;
const activeVideoProducer = activePeer.producers
.find((producer) => producer.kind === 'video');
for (const peer of this._mediaRoom.peers)
{
for (const consumer of peer.consumers)
{
if (consumer.kind !== 'video')
continue;
if (consumer.source === activeVideoProducer)
{
consumer.setPreferredProfile('high');
}
else
{
consumer.setPreferredProfile('low');
}
}
}
}
else
{
logger.info('no active speaker');
this._currentActiveSpeaker = null;
for (const peer of this._mediaRoom.peers)
{
for (const consumer of peer.consumers)
{
if (consumer.kind !== 'video')
continue;
consumer.setPreferredProfile('low');
}
}
}
// Spread to others via protoo.
this._protooRoom.spread(
'active-speaker',
{
peerName : activePeer ? activePeer.name : null
});
});
}
_handleProtooPeer(protooPeer)
{
logger.debug('_handleProtooPeer() [peer:"%s"]', protooPeer.id);
protooPeer.on('request', (request, accept, reject) =>
{
logger.debug(
'protoo "request" event [method:%s, peer:"%s"]',
request.method, protooPeer.id);
switch (request.method)
{
case 'mediasoup-request':
{
const mediasoupRequest = request.data;
this._handleMediasoupClientRequest(
protooPeer, mediasoupRequest, accept, reject);
break;
}
case 'mediasoup-notification':
{
accept();
const mediasoupNotification = request.data;
this._handleMediasoupClientNotification(
protooPeer, mediasoupNotification);
break;
}
case 'change-display-name':
{
accept();
const { displayName } = request.data;
const { mediaPeer } = protooPeer.data;
const oldDisplayName = mediaPeer.appData.displayName;
mediaPeer.appData.displayName = displayName;
// Spread to others via protoo.
this._protooRoom.spread(
'display-name-changed',
{
peerName : protooPeer.id,
displayName : displayName,
oldDisplayName : oldDisplayName
},
[ protooPeer ]);
break;
}
case 'change-profile-picture':
{
accept();
this._protooRoom.spread('profile-picture-changed', {
peerName : protooPeer.id,
picture : request.data.picture
}, [ protooPeer ]);
break;
}
case 'chat-message':
{
accept();
const { chatMessage } = request.data;
this._chatHistory.push(chatMessage);
// Spread to others via protoo.
this._protooRoom.spread(
'chat-message-receive',
{
peerName : protooPeer.id,
chatMessage : chatMessage
},
[ protooPeer ]);
break;
}
case 'chat-history':
{
accept();
protooPeer.send(
'chat-history-receive',
{ chatHistory: this._chatHistory }
);
break;
}
case 'raisehand-message':
{
accept();
const { raiseHandState } = request.data;
const { mediaPeer } = protooPeer.data;
mediaPeer.appData.raiseHandState = request.data.raiseHandState;
// Spread to others via protoo.
this._protooRoom.spread(
'raisehand-message',
{
peerName : protooPeer.id,
raiseHandState : raiseHandState
},
[ protooPeer ]);
break;
}
default:
{
logger.error('unknown request.method "%s"', request.method);
reject(400, `unknown request.method "${request.method}"`);
}
}
});
protooPeer.on('close', () =>
{
logger.debug('protoo Peer "close" event [peer:"%s"]', protooPeer.id);
const { mediaPeer } = protooPeer.data;
if (mediaPeer && !mediaPeer.closed)
mediaPeer.close();
// If this is the latest peer in the room, close the room.
// However wait a bit (for reconnections).
setTimeout(() =>
{
if (this._mediaRoom && this._mediaRoom.closed)
return;
if (this._mediaRoom.peers.length === 0)
{
logger.info(
'last peer in the room left, closing the room [roomId:"%s"]',
this._roomId);
this.close();
}
}, 5000);
});
}
_handleMediaPeer(protooPeer, mediaPeer)
{
mediaPeer.on('notify', (notification) =>
{
protooPeer.send('mediasoup-notification', notification)
.catch(() => {});
});
mediaPeer.on('newtransport', (transport) =>
{
logger.info(
'mediaPeer "newtransport" event [id:%s, direction:%s]',
transport.id, transport.direction);
// Update peers max sending bitrate.
if (transport.direction === 'send')
{
this._updateMaxBitrate();
transport.on('close', () =>
{
this._updateMaxBitrate();
});
}
this._handleMediaTransport(transport);
});
mediaPeer.on('newproducer', (producer) =>
{
logger.info('mediaPeer "newproducer" event [id:%s]', producer.id);
this._handleMediaProducer(producer);
});
mediaPeer.on('newconsumer', (consumer) =>
{
logger.info('mediaPeer "newconsumer" event [id:%s]', consumer.id);
this._handleMediaConsumer(consumer);
});
// Also handle already existing Consumers.
for (const consumer of mediaPeer.consumers)
{
logger.info('mediaPeer existing "consumer" [id:%s]', consumer.id);
this._handleMediaConsumer(consumer);
}
// Notify about the existing active speaker.
if (this._currentActiveSpeaker)
{
protooPeer.send(
'active-speaker',
{
peerName : this._currentActiveSpeaker.name
})
.catch(() => {});
}
}
_handleMediaTransport(transport)
{
transport.on('close', (originator) =>
{
logger.info(
'Transport "close" event [originator:%s]', originator);
});
}
_handleMediaProducer(producer)
{
producer.on('close', (originator) =>
{
logger.info(
'Producer "close" event [originator:%s]', originator);
});
producer.on('pause', (originator) =>
{
logger.info(
'Producer "pause" event [originator:%s]', originator);
});
producer.on('resume', (originator) =>
{
logger.info(
'Producer "resume" event [originator:%s]', originator);
});
}
_handleMediaConsumer(consumer)
{
consumer.on('close', (originator) =>
{
logger.info(
'Consumer "close" event [originator:%s]', originator);
});
consumer.on('pause', (originator) =>
{
logger.info(
'Consumer "pause" event [originator:%s]', originator);
});
consumer.on('resume', (originator) =>
{
logger.info(
'Consumer "resume" event [originator:%s]', originator);
});
consumer.on('effectiveprofilechange', (profile) =>
{
logger.info(
'Consumer "effectiveprofilechange" event [profile:%s]', profile);
});
// If video, initially make it 'low' profile unless this is for the current
// active speaker.
if (consumer.kind === 'video' && consumer.peer !== this._currentActiveSpeaker)
consumer.setPreferredProfile('low');
}
_handleMediasoupClientRequest(protooPeer, request, accept, reject)
{
logger.debug(
'mediasoup-client request [method:%s, peer:"%s"]',
request.method, protooPeer.id);
switch (request.method)
{
case 'queryRoom':
{
this._mediaRoom.receiveRequest(request)
.then((response) => accept(response))
.catch((error) => reject(500, error.toString()));
break;
}
case 'join':
{
// TODO: Handle appData. Yes?
const { peerName } = request;
if (peerName !== protooPeer.id)
{
reject(403, 'that is not your corresponding mediasoup Peer name');
break;
}
else if (protooPeer.data.mediaPeer)
{
reject(500, 'already have a mediasoup Peer');
break;
}
this._mediaRoom.receiveRequest(request)
.then((response) =>
{
accept(response);
// Get the newly created mediasoup Peer.
const mediaPeer = this._mediaRoom.getPeerByName(peerName);
protooPeer.data.mediaPeer = mediaPeer;
this._handleMediaPeer(protooPeer, mediaPeer);
})
.catch((error) =>
{
reject(500, error.toString());
});
break;
}
default:
{
const { mediaPeer } = protooPeer.data;
if (!mediaPeer)
{
logger.error(
'cannot handle mediasoup request, no mediasoup Peer [method:"%s"]',
request.method);
reject(400, 'no mediasoup Peer');
}
mediaPeer.receiveRequest(request)
.then((response) => accept(response))
.catch((error) => reject(500, error.toString()));
}
}
}
_handleMediasoupClientNotification(protooPeer, notification)
{
logger.debug(
'mediasoup-client notification [method:%s, peer:"%s"]',
notification.method, protooPeer.id);
// NOTE: mediasoup-client just sends notifications with target 'peer',
// so first of all, get the mediasoup Peer.
const { mediaPeer } = protooPeer.data;
if (!mediaPeer)
{
logger.error(
'cannot handle mediasoup notification, no mediasoup Peer [method:"%s"]',
notification.method);
return;
}
mediaPeer.receiveNotification(notification);
}
_updateMaxBitrate()
{
if (this._mediaRoom.closed)
return;
const numPeers = this._mediaRoom.peers.length;
const previousMaxBitrate = this._maxBitrate;
let newMaxBitrate;
if (numPeers <= 2)
{
newMaxBitrate = MAX_BITRATE;
}
else
{
newMaxBitrate = Math.round(MAX_BITRATE / ((numPeers - 1) * BITRATE_FACTOR));
if (newMaxBitrate < MIN_BITRATE)
newMaxBitrate = MIN_BITRATE;
}
this._maxBitrate = newMaxBitrate;
for (const peer of this._mediaRoom.peers)
{
for (const transport of peer.transports)
{
if (transport.direction === 'send')
{
transport.setMaxBitrate(newMaxBitrate)
.catch((error) =>
{
logger.error('transport.setMaxBitrate() failed: %s', String(error));
});
}
}
}
logger.info(
'_updateMaxBitrate() [num peers:%s, before:%skbps, now:%skbps]',
numPeers,
Math.round(previousMaxBitrate / 1000),
Math.round(newMaxBitrate / 1000));
}
}
module.exports = Room;