diff --git a/server/lib/promExporter.js b/server/lib/promExporter.js index e54f241..cb34f21 100644 --- a/server/lib/promExporter.js +++ b/server/lib/promExporter.js @@ -4,225 +4,281 @@ const mediasoup = require('mediasoup'); const prom = require('prom-client'); const Logger = require('./Logger'); -const Peer = require('./Peer'); -const Room = require('./Room'); const logger = new Logger('prom'); const resolver = new Resolver(); - const workers = new Map(); -const label_names = [ +const labelNames = [ 'pid', 'room_id', 'peer_id', 'display_name', 'user_agent', 'transport_id', 'proto', 'local_addr', 'remote_addr', 'id', 'kind', 'codec', 'type' ]; const metadata = { - 'byteCount': { metric_type: prom.Counter, unit: 'bytes' }, - 'score': { metric_type: prom.Gauge } -} + 'byteCount' : { metricType: prom.Counter, unit: 'bytes' }, + 'score' : { metricType: prom.Gauge } +}; -common_labels = function(both, fn) { - for (let [room_id, room] of rooms) { - for (let [peer_id, peer] of peers) { - if (fn(peer)) { - let display_name = peer._displayName; +module.exports = async function(rooms, peers, config) +{ + const collect = async function(registry) + { + const newMetrics = function(subsystem) + { + const namespace = 'mediasoup'; + const metrics = new Map(); - let user_agent = peer._socket.client.request.headers['user-agent']; - let kind = both.kind; - let codec = both.rtpParameters.codecs[0].mimeType.split('/')[1]; - return { room_id, peer_id, display_name, user_agent, kind, codec }; - } - } - } - throw new Error('cannot find common labels'); -} + for (const key in metadata) + { + if (Object.prototype.hasOwnProperty.call(metadata, key)) + { + const value = metadata[key]; + const name = key.split(/(?=[A-Z])/).join('_') + .toLowerCase(); + const unit = value.unit; + const metricType = value.metricType; + let s = `${namespace}_${subsystem}_${name}`; -set_value = function(key, m, labels, v) { - logger.debug(`set_value key=${key} v=${v}`); - switch (metadata[key].metric_type) { - case prom.Counter: - m.inc(labels, v); - break; - case prom.Gauge: - m.set(labels, v); - break; - default: - throw new Error(`unexpected metric: ${m}`); - } -} + if (unit) + { + s += `_${unit}`; + } + const m = new metricType({ + name : s, help : `${subsystem}.${key}`, labelNames : labelNames, registers : [ registry ] }); -collect = async function(registry, rooms, peers) { - - metrics = function(subsystem) { - let namespace = 'mediasoup'; - let metrics = new Map(); - for (let key in metadata) { - value = metadata[key]; - let name = key.split(/(?=[A-Z])/).join('_').toLowerCase(); - let unit = value.unit; - let metric_type = value.metric_type; - let s = `${namespace}_${subsystem}_${name}`; - if (unit) { - s += `_${unit}`; - } - m = new metric_type({name: s, help: `${subsystem}.${key}`, - labelNames: label_names, registers: [registry]}); - metrics.set(key, m); - } - return metrics; - } - - logger.debug('collect'); - const m_rooms = new prom.Gauge({name: 'edumeet_rooms', help: '#rooms', - registers: [registry]}); - m_rooms.set(rooms.size); - const m_peers = new prom.Gauge({name: 'edumeet_peers', help: '#peers', - labelNames: ['room_id'], registers: [registry]}); - for (let [room_id, room] of rooms) { - m_peers.labels(room_id).set(Object.keys(room._peers).length); - } - - const m_consumer = metrics('consumer'); - const m_producer = metrics('producer'); - for (let [pid, worker] of workers) { - logger.debug(`visiting worker ${pid}`); - for (let router of worker._routers) { - logger.debug(`visiting router ${router.id}`); - for (let [transport_id, transport] of router._transports) { - logger.debug(`visiting transport ${transport_id}`); - let transport_j = await transport.dump(); - if (transport_j.iceState != 'completed') { - logger.debug(`skipping transport ${transport_id}}: ${transport_j.iceState}`); - continue; + metrics.set(key, m); } - let ice_selected_tuple = transport_j.iceSelectedTuple; - let proto = ice_selected_tuple.protocol - let local_addr = await addr(ice_selected_tuple.localIp, - ice_selected_tuple.localPort); - let remote_addr = await addr(ice_selected_tuple.remoteIp, - ice_selected_tuple.remotePort); - for (let [producer_id, producer] of transport._producers) { - logger.debug(`visiting producer ${producer_id}`); - let { room_id, peer_id, display_name, user_agent, kind, codec } = - common_labels(producer, peer => peer._producers.has(producer_id)); - let a = await producer.getStats(); - for (let x of a) { - let type = x.type; - let labels = { - 'pid': pid, - 'room_id': room_id, - 'peer_id': peer_id, - 'display_name': display_name, - 'user_agent': user_agent, - 'transport_id': quiet(transport_id), - 'proto': proto, - 'local_addr': local_addr, - 'remote_addr': remote_addr, - 'id': quiet(producer_id), - 'kind': kind, - 'codec': codec, - 'type': type - } - for (let [key, m] of m_producer) { - set_value(key, m, labels, x[key]); - } + } + + return metrics; + }; + + const commonLabels = function(both, fn) + { + for (const roomId of rooms.keys()) + { + for (const [ peerId, peer ] of peers) + { + if (fn(peer)) + { + const displayName = peer._displayName; + const userAgent = peer._socket.client.request.headers['user-agent']; + const kind = both.kind; + const codec = both.rtpParameters.codecs[0].mimeType.split('/')[1]; + + return { roomId, peerId, displayName, userAgent, kind, codec }; } } - for (let [consumer_id, consumer] of transport._consumers) { - logger.debug(`visiting consumer ${consumer_id}`); - let { room_id, peer_id, display_name, user_agent, kind, codec } = - common_labels(consumer, peer => peer._consumers.has(consumer_id)); - let a = await consumer.getStats(); - for (let x of a) { - if (x.type == 'inbound-rtp') { - continue; + } + throw new Error('cannot find common labels'); + }; + + const addr = async function(ip, port) + { + if (config.deidentify) + { + const a = ip.split('.'); + + for (let i = 0; i < a.length - 2; i++) + { + a[i] = 'xx'; + } + + return `${a.join('.')}:${port}`; + } + else if (config.numeric) + { + return `${ip}:${port}`; + } + else + { + try + { + const a = await resolver.reverse(ip); + + ip = a[0]; + } + catch (err) + { + logger.error(`reverse DNS query failed: ${ip} ${err.code}`); + } + + return `${ip}:${port}`; + } + }; + + const quiet = function(s) + { + return config.quiet ? '' : s; + }; + + const setValue = function(key, m, labels, v) + { + logger.debug(`setValue key=${key} v=${v}`); + switch (metadata[key].metricType) + { + case prom.Counter: + m.inc(labels, v); + break; + case prom.Gauge: + m.set(labels, v); + break; + default: + throw new Error(`unexpected metric: ${m}`); + } + }; + + logger.debug('collect'); + const mRooms = new prom.Gauge({ name: 'edumeet_rooms', help: '#rooms', registers: [ registry ] }); + + mRooms.set(rooms.size); + const mPeers = new prom.Gauge({ name: 'edumeet_peers', help: '#peers', labelNames: [ 'room_id' ], registers: [ registry ] }); + + for (const [ roomId, room ] of rooms) + { + mPeers.labels(roomId).set(Object.keys(room._peers).length); + } + + const mConsumer = newMetrics('consumer'); + const mProducer = newMetrics('producer'); + + for (const [ pid, worker ] of workers) + { + logger.debug(`visiting worker ${pid}`); + for (const router of worker._routers) + { + logger.debug(`visiting router ${router.id}`); + for (const [ transportId, transport ] of router._transports) + { + logger.debug(`visiting transport ${transportId}`); + const transportJson = await transport.dump(); + + if (transportJson.iceState != 'completed') + { + logger.debug(`skipping transport ${transportId}}: ${transportJson.iceState}`); + continue; + } + const iceSelectedTuple = transportJson.iceSelectedTuple; + const proto = iceSelectedTuple.protocol; + const localAddr = await addr(iceSelectedTuple.localIp, + iceSelectedTuple.localPort); + const remoteAddr = await addr(iceSelectedTuple.remoteIp, + iceSelectedTuple.remotePort); + + for (const [ producerId, producer ] of transport._producers) + { + logger.debug(`visiting producer ${producerId}`); + const { roomId, peerId, displayName, userAgent, kind, codec } = + commonLabels(producer, (peer) => peer._producers.has(producerId)); + const a = await producer.getStats(); + + for (const x of a) + { + const type = x.type; + const labels = { + 'pid' : pid, + 'room_id' : roomId, + 'peer_id' : peerId, + 'display_name' : displayName, + 'user_agent' : userAgent, + 'transport_id' : quiet(transportId), + 'proto' : proto, + 'local_addr' : localAddr, + 'remote_addr' : remoteAddr, + 'id' : quiet(producerId), + 'kind' : kind, + 'codec' : codec, + 'type' : type + }; + + for (const [ key, m ] of mProducer) + { + setValue(key, m, labels, x[key]); + } } - let type = x.type; - let labels = { - 'pid': pid, - 'room_id': room_id, - 'peer_id': peer_id, - 'display_name': display_name, - 'user_agent': user_agent, - 'transport_id': quiet(transport_id), - 'proto': proto, - 'local_addr': local_addr, - 'remote_addr': remote_addr, - 'id': quiet(consumer_id), - 'kind': kind, - 'codec': codec, - 'type': type - } - for (let [key, m] of m_consumer) { - set_value(key, m, labels, x[key]); + } + for (const [ consumerId, consumer ] of transport._consumers) + { + logger.debug(`visiting consumer ${consumerId}`); + const { roomId, peerId, displayName, userAgent, kind, codec } = + commonLabels(consumer, (peer) => peer._consumers.has(consumerId)); + const a = await consumer.getStats(); + + for (const x of a) + { + if (x.type == 'inbound-rtp') + { + continue; + } + const type = x.type; + const labels = + { + 'pid' : pid, + 'room_id' : roomId, + 'peer_id' : peerId, + 'display_name' : displayName, + 'user_agent' : userAgent, + 'transport_id' : quiet(transportId), + 'proto' : proto, + 'local_addr' : localAddr, + 'remote_addr' : remoteAddr, + 'id' : quiet(consumerId), + 'kind' : kind, + 'codec' : codec, + 'type' : type + }; + + for (const [ key, m ] of mConsumer) + { + setValue(key, m, labels, x[key]); + } } } } } } - } -} + }; -module.exports = async function(rooms, peers, config) { - - addr = async function(ip, port) { - if (config.deidentify) { - let a = ip.split('.') - for (let i = 0; i < a.length - 2; i++) { - a[i] = 'xx'; - } - return `${a.join('.')}:${port}`; - } - else if (config.numeric) { - return `${ip}:${port}`; - } - else { - try { - let a = await resolver.reverse(ip); - ip = a[0]; - } - catch (err) { - logger.error(`reverse DNS query failed: ${ip} ${err.code}`); - } - return `${ip}:${port}`; - } - } - - quiet = function(s) { - return config.quiet ? '' : s; - } - - try { + try + { logger.debug(`config.deidentify=${config.deidentify}`); logger.debug(`config.numeric=${config.numeric}`); logger.debug(`config.port=${config.port}`); logger.debug(`config.quiet=${config.quiet}`); - mediasoup.observer.on('newworker', worker => { + mediasoup.observer.on('newworker', (worker) => + { logger.debug(`observing newworker ${worker.pid} #${workers.size}`); workers.set(worker.pid, worker); - worker.observer.on('close', () => { + worker.observer.on('close', () => + { logger.debug(`observing close worker ${worker.pid} #${workers.size - 1}`); workers.delete(worker.pid); }); }); - let app = express(); - app.get('/', async (req, res) => { + const app = express(); + + app.get('/', async (req, res) => + { logger.debug(`GET ${req.originalUrl}`); - let registry = new prom.Registry(); - await collect(registry, rooms, peers); + const registry = new prom.Registry(); + + await collect(registry); res.set('Content-Type', registry.contentType); - let data = registry.metrics(); + const data = registry.metrics(); + res.end(data); }); - let server = app.listen(config.port || 8889, () => { - address = server.address(); + const server = app.listen(config.port || 8889, () => + { + const address = server.address(); + logger.info(`listening ${address.address}:${address.port}`); }); } - catch (err) { + catch (err) + { logger.error(err); } -} +}; diff --git a/server/server.js b/server/server.js index 82aa9ab..dbf9a8e 100755 --- a/server/server.js +++ b/server/server.js @@ -134,9 +134,10 @@ async function run() await interactiveServer(rooms, peers); // start Prometheus exporter - if (config.prometheus) { + if (config.prometheus) + { await promExporter(rooms, peers, config.prometheus); - } + } if (typeof(config.auth) === 'undefined') {