diff --git a/prom.md b/prom.md new file mode 100644 index 0000000..47ae230 --- /dev/null +++ b/prom.md @@ -0,0 +1,59 @@ +# Prometheus exporter + +The goal of this version is to offer a few basic metrics for +initial testing. The set of supported metrics can be extended. + +The current implementation is +[unconventional](https://prometheus.io/docs/instrumenting/writing_exporters) +in that it creates new metrics each time but does not register a +custom collector. Reasons are that the exporter should +[clear out metrics](https://github.com/prometheus/client_python/issues/182) +for closed connections but that `prom-client` +[does not yet support](https://github.com/siimon/prom-client/issues/241) +custom collectors. + +This version has been ported from an earlier Python version that was not part +of `multiparty-meeting` but connected as an interactive client. + +## Configuration + +| `.env` | description | +|--------|-------| +| `DEBUG` | e.g. `*WARN*,*ERROR*,*:prom` for debugging | +| `PROM_DEIDENTIFY` | if set, deidentify IP addresses | +| `PROM_NUMERIC` | if set, show numerical IP addresses | +| `PROM_PORT` | if set, enable exporter on this port | +| `PROM_QUIET` | if set, include fewer labels | + +## License + +MIT License (without copyright notice) + +## Metrics + +| metric | value | +|--------|-------| +| `edumeet_peers`| | +| `edumeet_rooms`| | +| `mediasoup_consumer_byte_count_bytes`| [`byteCount`](https://mediasoup.org/documentation/v3/mediasoup/rtc-statistics/#Consumer-Statistics) | +| `mediasoup_consumer_score`| [`score`](https://mediasoup.org/documentation/v3/mediasoup/rtc-statistics/#Consumer-Statistics) | +| `mediasoup_producer_byte_count_bytes`| [`byteCount`](https://mediasoup.org/documentation/v3/mediasoup/rtc-statistics/#Producer-Statistics) | +| `mediasoup_producer_score`| [`score`](https://mediasoup.org/documentation/v3/mediasoup/rtc-statistics/#Producer-Statistics) | + +## Architecture + +``` ++-----------+ +---------------------------------------------+ +| workers | | server observer API | +| | sock | +------o------+----o-----+ +| +------+ | int. server | exporter | +| | | | | | +| mediasoup | | express socket.io | net | express | ++-----+-----+ +----+---------+-----+-----+-------+-----+----+ + ^ min-max ^ 443 ^ 443 ^ sock ^ PROM_PORT + | RTP | HTTPS | ws | | HTTP + | | | | | + | +-+---------+-+ +------+------+ +---+--------+ + +---------------+ app | | int. client | | Prometheus | + +-------------+ +-------------+ +------------+ +``` diff --git a/server/lib/promExporter.js b/server/lib/promExporter.js new file mode 100644 index 0000000..ccb2562 --- /dev/null +++ b/server/lib/promExporter.js @@ -0,0 +1,238 @@ +const { Resolver } = require('dns').promises; +const express = require('express'); +const mediasoup = require('mediasoup'); +const prom = require('prom-client'); + +const Logger = require('./Logger'); +const Peer = require('./Peer'); +const Room = require('./Room'); + +const logger = new Logger('prom'); +const resolver = new Resolver(); + +const workers = new Map(); + +const label_names = [ + 'pid', 'room_id', 'peer_id', 'display_name', 'user_agent', 'transport_id', + 'proto', 'local_addr', 'remote_addr', 'id', 'kind', 'codec', 'type' +]; + +const metadata = { + 'byteCount': { metric_type: prom.Counter, unit: 'bytes' }, + 'score': { metric_type: prom.Gauge } +} + +common_labels = function(both, fn) { + for (let [room_id, room] of rooms) { + for (let [peer_id, peer] of peers) { + if (fn(peer)) { + let display_name = peer._displayName; + + let user_agent = peer._socket.client.request.headers['user-agent']; + let kind = both.kind; + let codec = both.rtpParameters.codecs[0].mimeType.split('/')[1]; + return { room_id, peer_id, display_name, user_agent, kind, codec }; + } + } + } + throw new Error('cannot find generic labels'); +} + +set_value = function(key, m, labels, v) { + logger.debug(`set_value key=${key} v=${v}`); + switch (metadata[key].metric_type) { + case prom.Counter: + m.inc(labels, v); + break; + case prom.Gauge: + m.set(labels, v); + break; + default: + throw new Error(`unexpected metric: ${metric}`); + } +} + + +addr = async function(ip, port) { + if ('PROM_DEIDENTIFY' in process.env) { + let a = ip.split('.') + for (let i = 0; i < a.length - 2; i++) { + a[i] = 'xx'; + } + return a.join('.'); + } + else if ('PROM_NUMERIC' in process.env) { + return ip; + } + else { + try { + let a = await resolver.reverse(ip); + ip = a[0]; + } + catch (err) { + logger.error(`reverse DNS query failed: ${ip} ${err.code}`); + } + return `${ip}:${port}`; + } +} + +quiet = function(s) { + return 'PROM_QUIET' in process.env ? '' : s; +} + +collect = async function(registry, rooms, peers) { + + metrics = function(subsystem) { + let namespace = 'mediasoup'; + let metrics = new Map(); + for (let key in metadata) { + value = metadata[key]; + let name = key.split(/(?=[A-Z])/).join('_').toLowerCase(); + let unit = value.unit; + let metric_type = value.metric_type; + let s = `${namespace}_${subsystem}_${name}`; + if (unit) { + s += `_${unit}`; + } + m = new metric_type({name: s, help: `${subsystem}.${key}`, + labelNames: label_names, registers: [registry]}); + metrics.set(key, m); + } + return metrics; + } + + logger.debug('collect'); + const m_rooms = new prom.Gauge({name: 'edumeet_rooms', help: '#rooms', + registers: [registry]}); + m_rooms.set(rooms.size); + const m_peers = new prom.Gauge({name: 'edumeet_peers', help: '#peers', + labelNames: ['room_id'], registers: [registry]}); + for (let [room_id, room] of rooms) { + m_peers.labels(room_id).set(Object.keys(room._peers).length); + } + + const m_consumer = metrics('consumer'); + const m_producer = metrics('producer'); + for (let [pid, worker] of workers) { + logger.debug(`visiting worker ${pid}`); + for (let router of worker._routers) { + logger.debug(`visiting router ${router.id}`); + for (let [transport_id, transport] of router._transports) { + logger.debug(`visiting transport ${transport_id}`); + let transport_j = await transport.dump(); + if (transport_j.iceState != 'completed') { + logger.debug(`skipping transport ${transport_id}}: ${transport_j.iceState}`); + continue; + } + let ice_selected_tuple = transport_j.iceSelectedTuple; + let proto = ice_selected_tuple.protocol + let local_addr = await addr(ice_selected_tuple.localIp, + ice_selected_tuple.localPort); + let remote_addr = await addr(ice_selected_tuple.remoteIp, + ice_selected_tuple.remotePort); + for (let [producer_id, producer] of transport._producers) { + logger.debug(`visiting producer ${producer_id}`); + let { room_id, peer_id, display_name, user_agent, kind, codec } = + common_labels(producer, peer => peer._producers.has(producer_id)); + let a = await producer.getStats(); + for (let x of a) { + let type = x.type; + let labels = { + 'pid': pid, + 'room_id': room_id, + 'peer_id': peer_id, + 'display_name': display_name, + 'user_agent': user_agent, + 'transport_id': quiet(transport_id), + 'proto': proto, + 'local_addr': local_addr, + 'remote_addr': remote_addr, + 'id': quiet(producer_id), + 'kind': kind, + 'codec': codec, + 'type': type + } + for (let [key, m] of m_producer) { + set_value(key, m, labels, x[key]); + } + } + } + for (let [consumer_id, consumer] of transport._consumers) { + logger.debug(`visiting consumer ${consumer_id}`); + let { room_id, peer_id, display_name, user_agent, kind, codec } = + common_labels(consumer, peer => peer._consumers.has(consumer_id)); + let a = await consumer.getStats(); + for (let x of a) { + if (x.type == 'inbound-rtp') { + continue; + } + let type = x.type; + let labels = { + 'pid': pid, + 'room_id': room_id, + 'peer_id': peer_id, + 'display_name': display_name, + 'user_agent': user_agent, + 'transport_id': quiet(transport_id), + 'proto': proto, + 'local_addr': local_addr, + 'remote_addr': remote_addr, + 'id': quiet(consumer_id), + 'kind': kind, + 'codec': codec, + 'type': type + } + for (let [key, m] of m_consumer) { + set_value(key, m, labels, x[key]); + } + } + } + } + } + } +} + +module.exports = async function(rooms, peers) { + try { + logger.debug(`PROM_DEIDENTIFY=${process.env.PROM_DEIDENTIFY}`); + logger.debug(`PROM_NUMERIC=${process.env.PROM_NUMERIC}`); + logger.debug(`PROM_PORT=${process.env.PROM_PORT}`); + logger.debug(`PROM_QUIET=${process.env.PROM_QUIET}`); + let s_port = process.env.PROM_PORT; + if (!s_port) { + logger.info('exporter disabled'); + } + else { + let n_port = Number(s_port); + if (Number.isNaN(n_port)) { + throw new TypeError(`PROM_PORT has illegal value: ${s_port}`); + } + + mediasoup.observer.on('newworker', worker => { + logger.debug(`observing newworker ${worker.pid} #${workers.size}`); + workers.set(worker.pid, worker); + worker.observer.on('close', () => { + logger.debug(`observing close worker ${worker.pid} #${workers.size - 1}`); + workers.delete(worker.pid); + }); + }); + + let app = express(); + app.get('/', async (req, res) => { + logger.debug(`GET ${req.originalUrl}`); + let registry = new prom.Registry(); + await collect(registry, rooms, peers); + res.set('Content-Type', registry.contentType); + let data = registry.metrics(); + res.end(data); + }); + let server = app.listen(n_port, () => { + address = server.address(); + logger.info(`listening ${address.address}:${address.port}`); + }); + } + } + catch (err) { + logger.error(err); + } +} diff --git a/server/package.json b/server/package.json index d8b314d..d156a62 100644 --- a/server/package.json +++ b/server/package.json @@ -31,6 +31,7 @@ "passport": "^0.4.0", "passport-lti": "0.0.7", "pidusage": "^2.0.17", + "prom-client": ">=12.0.0", "redis": "^2.8.0", "socket.io": "^2.3.0", "spdy": "^4.0.1", diff --git a/server/server.js b/server/server.js index 70cb5bc..9253bec 100755 --- a/server/server.js +++ b/server/server.js @@ -34,6 +34,7 @@ const expressSession = require('express-session'); const RedisStore = require('connect-redis')(expressSession); const sharedSession = require('express-socket.io-session'); const interactiveServer = require('./lib/interactiveServer'); +const promExporter = require('./lib/promExporter'); /* eslint-disable no-console */ console.log('- process.env.DEBUG:', process.env.DEBUG); @@ -132,6 +133,9 @@ async function run() // Open the interactive server. await interactiveServer(rooms, peers); + // start Prometheus exporter + await promExporter(rooms, peers); + if (typeof(config.auth) === 'undefined') { logger.warn('Auth is not configured properly!');