'use strict'; const EventEmitter = require('events').EventEmitter; const Logger = require('./Logger'); const config = require('../config/config'); const MAX_BITRATE = config.mediasoup.maxBitrate || 1000000; const MIN_BITRATE = Math.min(50000, MAX_BITRATE); const BITRATE_FACTOR = 0.75; const logger = new Logger('Room'); class Room extends EventEmitter { constructor(roomId, mediaServer, io) { logger.info('constructor() [roomId:"%s"]', roomId); super(); this.setMaxListeners(Infinity); // Room ID. this._roomId = roomId; // Closed flag. this._closed = false; // Locked flag. this._locked = false; this._chatHistory = []; this._fileHistory = []; this._lastN = []; this._io = io; this._signalingPeers = new Map(); try { // mediasoup Room instance. this._mediaRoom = mediaServer.Room(config.mediasoup.mediaCodecs); } catch (error) { this.close(); throw error; } // Current max bitrate for all the participants. this._maxBitrate = MAX_BITRATE; // Current active speaker. // @type {mediasoup.Peer} this._currentActiveSpeaker = null; this._handleMediaRoom(); } get id() { return this._roomId; } close() { logger.debug('close()'); this._closed = true; // Close the signalingPeers if (this._signalingPeers) for (let peer of this._signalingPeers) { if (peer.socket) peer.socket.disconnect(); }; this._signalingPeers.clear(); // Close the mediasoup Room. if (this._mediaRoom) this._mediaRoom.close(); // Emit 'close' event. this.emit('close'); } logStatus() { if (!this._mediaRoom) return; logger.info( 'logStatus() [room id:"%s", peers:%s, mediasoup peers:%s]', this._roomId, this._signalingPeers.length, this._mediaRoom.peers.length); } handleConnection(peerName, socket) { logger.info('handleConnection() [peerName:"%s"]', peerName); // This will allow reconnects to join despite lock if (this._signalingPeers.has(peerName)) { logger.warn( 'handleConnection() | there is already a peer with same peerName, ' + 'closing the previous one [peerName:"%s"]', peerName); const signalingPeer = this._signalingPeers.get(peerName); signalingPeer.socket.disconnect(); this._signalingPeers.delete(peerName); } else if (this._locked) // Don't allow connections to a locked room { socket.emit('room-locked'); socket.disconnect(true); return; } socket.join(this._roomId); const signalingPeer = { peerName : peerName, socket : socket }; const index = this._lastN.indexOf(peerName); if (index === -1) // We don't have this peer, add to end { this._lastN.push(peerName); } this._signalingPeers.set(peerName, signalingPeer); this._handleSignalingPeer(signalingPeer); socket.emit('room-ready'); } isLocked() { return this._locked; } authCallback(data) { logger.debug('authCallback()'); const { peerName, name, picture } = data; const signalingPeer = this._signalingPeers.get(peerName); if (signalingPeer) { signalingPeer.socket.emit('auth', { name : name, picture : picture }); } } _handleMediaRoom() { logger.debug('_handleMediaRoom()'); const activeSpeakerDetector = this._mediaRoom.createActiveSpeakerDetector(); activeSpeakerDetector.on('activespeakerchange', (activePeer) => { if (activePeer) { logger.info('new active speaker [peerName:"%s"]', activePeer.name); this._currentActiveSpeaker = activePeer; const index = this._lastN.indexOf(activePeer.name); if (index > -1) // We have this speaker in the list, move to front { this._lastN.splice(index, 1); this._lastN = [activePeer.name].concat(this._lastN); } const activeVideoProducer = activePeer.producers .find((producer) => producer.kind === 'video'); for (const peer of this._mediaRoom.peers) { for (const consumer of peer.consumers) { if (consumer.kind !== 'video') continue; if (consumer.source === activeVideoProducer) { consumer.setPreferredProfile('high'); } else { consumer.setPreferredProfile('low'); } } } } else { logger.info('no active speaker'); this._currentActiveSpeaker = null; for (const peer of this._mediaRoom.peers) { for (const consumer of peer.consumers) { if (consumer.kind !== 'video') continue; consumer.setPreferredProfile('low'); } } } this._io.to(this._roomId).emit('active-speaker', { peerName : activePeer ? activePeer.name : null }); }); } _handleSignalingPeer(signalingPeer) { logger.debug('_handleSignalingPeer() [peer:"%s"]', signalingPeer.id); signalingPeer.socket.on('mediasoup-request', (request, cb) => { const mediasoupRequest = request; this._handleMediasoupClientRequest( signalingPeer, mediasoupRequest, cb); }); signalingPeer.socket.on('mediasoup-notification', (request, cb) => { // Return no error cb(null); const mediasoupNotification = request; this._handleMediasoupClientNotification( signalingPeer, mediasoupNotification); }); signalingPeer.socket.on('change-display-name', (request, cb) => { // Return no error cb(null); const { displayName } = request; const mediaPeer = this._mediaRoom.getPeerByName(signalingPeer.peerName); const oldDisplayName = mediaPeer.appData.displayName; mediaPeer.appData.displayName = displayName; signalingPeer.socket.broadcast.to(this._roomId).emit( 'display-name-changed', { peerName : signalingPeer.peerName, displayName : displayName, oldDisplayName : oldDisplayName } ); }); signalingPeer.socket.on('change-profile-picture', (request, cb) => { // Return no error cb(null); signalingPeer.socket.broadcast.to(this._roomId).emit( 'profile-picture-changed', { peerName : signalingPeer.peerName, picture : request.picture } ); }); signalingPeer.socket.on('chat-message', (request, cb) => { // Return no error cb(null); const { chatMessage } = request; this._chatHistory.push(chatMessage); // Spread to others signalingPeer.socket.broadcast.to(this._roomId).emit( 'chat-message-receive', { peerName : signalingPeer.peerName, chatMessage : chatMessage } ); }); signalingPeer.socket.on('server-history', (request, cb) => { // Return to sender cb( null, { chatHistory : this._chatHistory, fileHistory : this._fileHistory, lastN : this._lastN } ); }); signalingPeer.socket.on('lock-room', (request, cb) => { // Return no error cb(null); this._locked = true; // Spread to others signalingPeer.socket.broadcast.to(this._roomId).emit( 'lock-room', { peerName : signalingPeer.peerName } ); }); signalingPeer.socket.on('unlock-room', (request, cb) => { // Return no error cb(null); this._locked = false; // Spread to others signalingPeer.socket.broadcast.to(this._roomId).emit( 'unlock-room', { peerName : signalingPeer.peerName } ); }); signalingPeer.socket.on('send-file', (request, cb) => { // Return no error cb(null); const fileData = request.file; this._fileHistory.push(fileData); // Spread to others signalingPeer.socket.broadcast.to(this._roomId).emit( 'file-receive', { peerName : signalingPeer.peerName, file : fileData } ); }); signalingPeer.socket.on('raisehand-message', (request, cb) => { // Return no error cb(null); const { raiseHandState } = request; const { mediaPeer } = signalingPeer; mediaPeer.appData.raiseHandState = raiseHandState; // Spread to others signalingPeer.socket.broadcast.to(this._roomId).emit( 'raisehand-message', { peerName : signalingPeer.peerName, raiseHandState : raiseHandState }, ); }); signalingPeer.socket.on('request-consumer-keyframe', (request, cb) => { cb(null); const { consumerId } = request; const mediaPeer = this._mediaRoom.getPeerByName(signalingPeer.peerName); const consumer = mediaPeer.consumers .find((_consumer) => _consumer.id === consumerId); if (!consumer) { logger.warn('consumer with id "%s" not found', consumerId); return; } consumer.requestKeyFrame(); }); signalingPeer.socket.on('disconnect', () => { logger.debug('Peer "close" event [peer:"%s"]', signalingPeer.peerName); const mediaPeer = this._mediaRoom.getPeerByName(signalingPeer.peerName); if (mediaPeer && !mediaPeer.closed) mediaPeer.close(); const index = this._lastN.indexOf(signalingPeer.peerName); if (index > -1) // We have this peer in the list, remove { this._lastN.splice(index, 1); } // If this is the latest peer in the room, close the room. // However wait a bit (for reconnections). setTimeout(() => { if (this._mediaRoom && this._mediaRoom.closed) return; if (this._mediaRoom.peers.length === 0) { logger.info( 'last peer in the room left, closing the room [roomId:"%s"]', this._roomId); this.close(); } }, 5000); }); } _handleMediaPeer(signalingPeer, mediaPeer) { mediaPeer.on('notify', (notification) => { signalingPeer.socket.emit('mediasoup-notification', notification); }); mediaPeer.on('newtransport', (transport) => { logger.info( 'mediaPeer "newtransport" event [id:%s, direction:%s]', transport.id, transport.direction); // Update peers max sending bitrate. if (transport.direction === 'send') { this._updateMaxBitrate(); transport.on('close', () => { this._updateMaxBitrate(); }); } this._handleMediaTransport(transport); }); mediaPeer.on('newproducer', (producer) => { logger.info('mediaPeer "newproducer" event [id:%s]', producer.id); this._handleMediaProducer(producer); }); mediaPeer.on('newconsumer', (consumer) => { logger.info('mediaPeer "newconsumer" event [id:%s]', consumer.id); this._handleMediaConsumer(consumer); }); // Also handle already existing Consumers. for (const consumer of mediaPeer.consumers) { logger.info('mediaPeer existing "consumer" [id:%s]', consumer.id); this._handleMediaConsumer(consumer); } // Notify about the existing active speaker. if (this._currentActiveSpeaker) { signalingPeer.socket.emit( 'active-speaker', { peerName : this._currentActiveSpeaker.name }); } } _handleMediaTransport(transport) { transport.on('close', (originator) => { logger.info( 'Transport "close" event [originator:%s]', originator); }); } _handleMediaProducer(producer) { producer.on('close', (originator) => { logger.info( 'Producer "close" event [originator:%s]', originator); }); producer.on('pause', (originator) => { logger.info( 'Producer "pause" event [originator:%s]', originator); }); producer.on('resume', (originator) => { logger.info( 'Producer "resume" event [originator:%s]', originator); }); } _handleMediaConsumer(consumer) { consumer.on('close', (originator) => { logger.info( 'Consumer "close" event [originator:%s]', originator); }); consumer.on('pause', (originator) => { logger.info( 'Consumer "pause" event [originator:%s]', originator); }); consumer.on('resume', (originator) => { logger.info( 'Consumer "resume" event [originator:%s]', originator); }); consumer.on('effectiveprofilechange', (profile) => { logger.info( 'Consumer "effectiveprofilechange" event [profile:%s]', profile); }); // If video, initially make it 'low' profile unless this is for the current // active speaker. if (consumer.kind === 'video' && consumer.peer !== this._currentActiveSpeaker) consumer.setPreferredProfile('low'); } _handleMediasoupClientRequest(signalingPeer, request, cb) { logger.debug( 'mediasoup-client request [method:%s, peer:"%s"]', request.method, signalingPeer.peerName); switch (request.method) { case 'queryRoom': { this._mediaRoom.receiveRequest(request) .then((response) => cb(null, response)) .catch((error) => cb(error.toString())); break; } case 'join': { // TODO: Handle appData. Yes? const { peerName } = request; if (peerName !== signalingPeer.peerName) { cb('that is not your corresponding mediasoup Peer name'); break; } else if (signalingPeer.mediaPeer) { cb('already have a mediasoup Peer'); break; } this._mediaRoom.receiveRequest(request) .then((response) => { cb(null, response); // Get the newly created mediasoup Peer. const mediaPeer = this._mediaRoom.getPeerByName(peerName); signalingPeer.mediaPeer = mediaPeer; this._handleMediaPeer(signalingPeer, mediaPeer); }) .catch((error) => { cb(error.toString()); }); break; } default: { const { mediaPeer } = signalingPeer; if (!mediaPeer) { logger.error( 'cannot handle mediasoup request, no mediasoup Peer [method:"%s"]', request.method); cb('no mediasoup Peer'); } mediaPeer.receiveRequest(request) .then((response) => cb(null, response)) .catch((error) => cb(error.toString())); } } } _handleMediasoupClientNotification(signalingPeer, notification) { logger.debug( 'mediasoup-client notification [method:%s, peer:"%s"]', notification.method, signalingPeer.peerName); // NOTE: mediasoup-client just sends notifications with target 'peer', // so first of all, get the mediasoup Peer. const { mediaPeer } = signalingPeer; if (!mediaPeer) { logger.error( 'cannot handle mediasoup notification, no mediasoup Peer [method:"%s"]', notification.method); return; } mediaPeer.receiveNotification(notification); } _updateMaxBitrate() { if (this._mediaRoom.closed) return; const numPeers = this._mediaRoom.peers.length; const previousMaxBitrate = this._maxBitrate; let newMaxBitrate; if (numPeers <= 2) { newMaxBitrate = MAX_BITRATE; } else { newMaxBitrate = Math.round(MAX_BITRATE / ((numPeers - 1) * BITRATE_FACTOR)); if (newMaxBitrate < MIN_BITRATE) newMaxBitrate = MIN_BITRATE; } this._maxBitrate = newMaxBitrate; for (const peer of this._mediaRoom.peers) { for (const transport of peer.transports) { if (transport.direction === 'send') { transport.setMaxBitrate(newMaxBitrate) .catch((error) => { logger.error('transport.setMaxBitrate() failed: %s', String(error)); }); } } } logger.info( '_updateMaxBitrate() [num peers:%s, before:%skbps, now:%skbps]', numPeers, Math.round(previousMaxBitrate / 1000), Math.round(newMaxBitrate / 1000)); } } module.exports = Room;