diff --git a/prom.md b/prom.md new file mode 100644 index 0000000..51ba2e8 --- /dev/null +++ b/prom.md @@ -0,0 +1,55 @@ +# Prometheus exporter + +The goal of this version is to offer a few basic metrics for +initial testing. The set of supported metrics can be extended. + +The current implementation is partly +[unconventional](https://prometheus.io/docs/instrumenting/writing_exporters) +in that it creates new metrics each time but does not register a +custom collector. Reasons are that the exporter should +[clear out metrics](https://github.com/prometheus/client_python/issues/182) +for closed connections but that `prom-client` +[does not yet support](https://github.com/siimon/prom-client/issues/241) +custom collectors. + +This version has been ported from an earlier Python version that was not part +of `multiparty-meeting` but connected as an interactive client. + +## Configuration + +See `prometheus` in `server/config/config.example.js` for options and +applicable defaults. + +If `multiparty-meeting` was installed with +[`mm-absible`](https://github.com/misi/mm-ansible) +it may be necessary to open the `iptables` firewall for incoming TCP traffic +on the allocated port (see `/etc/ferm/ferm.conf`). + +## Metrics + +| metric | value | +|--------|-------| +| `edumeet_peers`| | +| `edumeet_rooms`| | +| `mediasoup_consumer_byte_count_bytes`| [`byteCount`](https://mediasoup.org/documentation/v3/mediasoup/rtc-statistics/#Consumer-Statistics) | +| `mediasoup_consumer_score`| [`score`](https://mediasoup.org/documentation/v3/mediasoup/rtc-statistics/#Consumer-Statistics) | +| `mediasoup_producer_byte_count_bytes`| [`byteCount`](https://mediasoup.org/documentation/v3/mediasoup/rtc-statistics/#Producer-Statistics) | +| `mediasoup_producer_score`| [`score`](https://mediasoup.org/documentation/v3/mediasoup/rtc-statistics/#Producer-Statistics) | + +## Architecture + +``` ++-----------+ +---------------------------------------------+ +| workers | | server observer API | +| | sock | +------o------+----o-----+ +| +------+ | int. server | exporter | +| | | | | | +| mediasoup | | express socket.io | net | express | ++-----+-----+ +----+---------+-----+-----+-------+-----+----+ + ^ min-max ^ 443 ^ 443 ^ sock ^ PROM_PORT + | RTP | HTTPS | ws | | HTTP + | | | | | + | +-+---------+-+ +------+------+ +---+--------+ + +---------------+ app | | int. client | | Prometheus | + +-------------+ +-------------+ +------------+ +``` diff --git a/server/config/config.example.js b/server/config/config.example.js index 12bf938..6ff279a 100644 --- a/server/config/config.example.js +++ b/server/config/config.example.js @@ -342,4 +342,13 @@ module.exports = maxIncomingBitrate : 1500000 } } + // Prometheus exporter + /* + prometheus: { + deidentify: false, // deidentify IP addresses + numeric: false, // show numeric IP addresses + port: 8889, // allocated port + quiet: false // include fewer labels + } + */ }; diff --git a/server/lib/promExporter.js b/server/lib/promExporter.js new file mode 100644 index 0000000..cb34f21 --- /dev/null +++ b/server/lib/promExporter.js @@ -0,0 +1,284 @@ +const { Resolver } = require('dns').promises; +const express = require('express'); +const mediasoup = require('mediasoup'); +const prom = require('prom-client'); + +const Logger = require('./Logger'); + +const logger = new Logger('prom'); +const resolver = new Resolver(); +const workers = new Map(); + +const labelNames = [ + 'pid', 'room_id', 'peer_id', 'display_name', 'user_agent', 'transport_id', + 'proto', 'local_addr', 'remote_addr', 'id', 'kind', 'codec', 'type' +]; + +const metadata = { + 'byteCount' : { metricType: prom.Counter, unit: 'bytes' }, + 'score' : { metricType: prom.Gauge } +}; + +module.exports = async function(rooms, peers, config) +{ + const collect = async function(registry) + { + const newMetrics = function(subsystem) + { + const namespace = 'mediasoup'; + const metrics = new Map(); + + for (const key in metadata) + { + if (Object.prototype.hasOwnProperty.call(metadata, key)) + { + const value = metadata[key]; + const name = key.split(/(?=[A-Z])/).join('_') + .toLowerCase(); + const unit = value.unit; + const metricType = value.metricType; + let s = `${namespace}_${subsystem}_${name}`; + + if (unit) + { + s += `_${unit}`; + } + const m = new metricType({ + name : s, help : `${subsystem}.${key}`, labelNames : labelNames, registers : [ registry ] }); + + metrics.set(key, m); + } + } + + return metrics; + }; + + const commonLabels = function(both, fn) + { + for (const roomId of rooms.keys()) + { + for (const [ peerId, peer ] of peers) + { + if (fn(peer)) + { + const displayName = peer._displayName; + const userAgent = peer._socket.client.request.headers['user-agent']; + const kind = both.kind; + const codec = both.rtpParameters.codecs[0].mimeType.split('/')[1]; + + return { roomId, peerId, displayName, userAgent, kind, codec }; + } + } + } + throw new Error('cannot find common labels'); + }; + + const addr = async function(ip, port) + { + if (config.deidentify) + { + const a = ip.split('.'); + + for (let i = 0; i < a.length - 2; i++) + { + a[i] = 'xx'; + } + + return `${a.join('.')}:${port}`; + } + else if (config.numeric) + { + return `${ip}:${port}`; + } + else + { + try + { + const a = await resolver.reverse(ip); + + ip = a[0]; + } + catch (err) + { + logger.error(`reverse DNS query failed: ${ip} ${err.code}`); + } + + return `${ip}:${port}`; + } + }; + + const quiet = function(s) + { + return config.quiet ? '' : s; + }; + + const setValue = function(key, m, labels, v) + { + logger.debug(`setValue key=${key} v=${v}`); + switch (metadata[key].metricType) + { + case prom.Counter: + m.inc(labels, v); + break; + case prom.Gauge: + m.set(labels, v); + break; + default: + throw new Error(`unexpected metric: ${m}`); + } + }; + + logger.debug('collect'); + const mRooms = new prom.Gauge({ name: 'edumeet_rooms', help: '#rooms', registers: [ registry ] }); + + mRooms.set(rooms.size); + const mPeers = new prom.Gauge({ name: 'edumeet_peers', help: '#peers', labelNames: [ 'room_id' ], registers: [ registry ] }); + + for (const [ roomId, room ] of rooms) + { + mPeers.labels(roomId).set(Object.keys(room._peers).length); + } + + const mConsumer = newMetrics('consumer'); + const mProducer = newMetrics('producer'); + + for (const [ pid, worker ] of workers) + { + logger.debug(`visiting worker ${pid}`); + for (const router of worker._routers) + { + logger.debug(`visiting router ${router.id}`); + for (const [ transportId, transport ] of router._transports) + { + logger.debug(`visiting transport ${transportId}`); + const transportJson = await transport.dump(); + + if (transportJson.iceState != 'completed') + { + logger.debug(`skipping transport ${transportId}}: ${transportJson.iceState}`); + continue; + } + const iceSelectedTuple = transportJson.iceSelectedTuple; + const proto = iceSelectedTuple.protocol; + const localAddr = await addr(iceSelectedTuple.localIp, + iceSelectedTuple.localPort); + const remoteAddr = await addr(iceSelectedTuple.remoteIp, + iceSelectedTuple.remotePort); + + for (const [ producerId, producer ] of transport._producers) + { + logger.debug(`visiting producer ${producerId}`); + const { roomId, peerId, displayName, userAgent, kind, codec } = + commonLabels(producer, (peer) => peer._producers.has(producerId)); + const a = await producer.getStats(); + + for (const x of a) + { + const type = x.type; + const labels = { + 'pid' : pid, + 'room_id' : roomId, + 'peer_id' : peerId, + 'display_name' : displayName, + 'user_agent' : userAgent, + 'transport_id' : quiet(transportId), + 'proto' : proto, + 'local_addr' : localAddr, + 'remote_addr' : remoteAddr, + 'id' : quiet(producerId), + 'kind' : kind, + 'codec' : codec, + 'type' : type + }; + + for (const [ key, m ] of mProducer) + { + setValue(key, m, labels, x[key]); + } + } + } + for (const [ consumerId, consumer ] of transport._consumers) + { + logger.debug(`visiting consumer ${consumerId}`); + const { roomId, peerId, displayName, userAgent, kind, codec } = + commonLabels(consumer, (peer) => peer._consumers.has(consumerId)); + const a = await consumer.getStats(); + + for (const x of a) + { + if (x.type == 'inbound-rtp') + { + continue; + } + const type = x.type; + const labels = + { + 'pid' : pid, + 'room_id' : roomId, + 'peer_id' : peerId, + 'display_name' : displayName, + 'user_agent' : userAgent, + 'transport_id' : quiet(transportId), + 'proto' : proto, + 'local_addr' : localAddr, + 'remote_addr' : remoteAddr, + 'id' : quiet(consumerId), + 'kind' : kind, + 'codec' : codec, + 'type' : type + }; + + for (const [ key, m ] of mConsumer) + { + setValue(key, m, labels, x[key]); + } + } + } + } + } + } + }; + + try + { + logger.debug(`config.deidentify=${config.deidentify}`); + logger.debug(`config.numeric=${config.numeric}`); + logger.debug(`config.port=${config.port}`); + logger.debug(`config.quiet=${config.quiet}`); + + mediasoup.observer.on('newworker', (worker) => + { + logger.debug(`observing newworker ${worker.pid} #${workers.size}`); + workers.set(worker.pid, worker); + worker.observer.on('close', () => + { + logger.debug(`observing close worker ${worker.pid} #${workers.size - 1}`); + workers.delete(worker.pid); + }); + }); + + const app = express(); + + app.get('/', async (req, res) => + { + logger.debug(`GET ${req.originalUrl}`); + const registry = new prom.Registry(); + + await collect(registry); + res.set('Content-Type', registry.contentType); + const data = registry.metrics(); + + res.end(data); + }); + const server = app.listen(config.port || 8889, () => + { + const address = server.address(); + + logger.info(`listening ${address.address}:${address.port}`); + }); + } + catch (err) + { + logger.error(err); + } +}; diff --git a/server/package.json b/server/package.json index 50c5262..ef96f36 100644 --- a/server/package.json +++ b/server/package.json @@ -32,6 +32,7 @@ "passport": "^0.4.0", "passport-lti": "0.0.7", "pidusage": "^2.0.17", + "prom-client": ">=12.0.0", "redis": "^2.8.0", "socket.io": "^2.3.0", "spdy": "^4.0.1", diff --git a/server/server.js b/server/server.js index 598fe84..dbf9a8e 100755 --- a/server/server.js +++ b/server/server.js @@ -34,6 +34,7 @@ const expressSession = require('express-session'); const RedisStore = require('connect-redis')(expressSession); const sharedSession = require('express-socket.io-session'); const interactiveServer = require('./lib/interactiveServer'); +const promExporter = require('./lib/promExporter'); /* eslint-disable no-console */ console.log('- process.env.DEBUG:', process.env.DEBUG); @@ -132,6 +133,12 @@ async function run() // Open the interactive server. await interactiveServer(rooms, peers); + // start Prometheus exporter + if (config.prometheus) + { + await promExporter(rooms, peers, config.prometheus); + } + if (typeof(config.auth) === 'undefined') { logger.warn('Auth is not configured properly!');