diff --git a/streaming/index.js b/streaming/index.js index 8f66362178a..2267c469c00 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -292,6 +292,9 @@ const CHANNEL_NAMES = [ const startServer = async () => { const pgPool = new pg.Pool(pgConfigFromEnv(process.env)); + + const metrics = setupMetrics(CHANNEL_NAMES, pgPool); + const server = http.createServer(); const wss = new WebSocketServer({ noServer: true }); @@ -388,16 +391,6 @@ const startServer = async () => { const redisClient = await createRedisClient(redisConfig); const { redisPrefix } = redisConfig; - const metrics = setupMetrics(CHANNEL_NAMES, pgPool); - // TODO: migrate all metrics to metrics.X.method() instead of just X.method() - const { - connectedClients, - connectedChannels, - redisSubscriptions, - redisMessagesReceived, - messagesSent, - } = metrics; - // When checking metrics in the browser, the favicon is requested this // prevents the request from falling through to the API Router, which would // error for this endpoint: @@ -408,15 +401,7 @@ const startServer = async () => { res.end('OK'); }); - app.get('/metrics', async (req, res) => { - try { - res.set('Content-Type', metrics.register.contentType); - res.end(await metrics.register.metrics()); - } catch (ex) { - req.log.error(ex); - res.status(500).end(); - } - }); + app.get('/metrics', metrics.requestHandler); /** * @param {string[]} channels @@ -443,7 +428,7 @@ const startServer = async () => { * @param {string} message */ const onRedisMessage = (channel, message) => { - redisMessagesReceived.inc(); + metrics.redisMessagesReceived.inc(); const callbacks = subs[channel]; @@ -481,7 +466,7 @@ const startServer = async () => { if (err) { logger.error(`Error subscribing to ${channel}`); } else if (typeof count === 'number') { - redisSubscriptions.set(count); + metrics.redisSubscriptions.set(count); } }); } @@ -508,7 +493,7 @@ const startServer = async () => { if (err) { logger.error(`Error unsubscribing to ${channel}`); } else if (typeof count === 'number') { - redisSubscriptions.set(count); + metrics.redisSubscriptions.set(count); } }); delete subs[channel]; @@ -688,13 +673,13 @@ const startServer = async () => { unsubscribe(`${redisPrefix}${accessTokenChannelId}`, listener); unsubscribe(`${redisPrefix}${systemChannelId}`, listener); - connectedChannels.labels({ type: 'eventsource', channel: 'system' }).dec(2); + metrics.connectedChannels.labels({ type: 'eventsource', channel: 'system' }).dec(2); }); subscribe(`${redisPrefix}${accessTokenChannelId}`, listener); subscribe(`${redisPrefix}${systemChannelId}`, listener); - connectedChannels.labels({ type: 'eventsource', channel: 'system' }).inc(2); + metrics.connectedChannels.labels({ type: 'eventsource', channel: 'system' }).inc(2); }; /** @@ -790,7 +775,7 @@ const startServer = async () => { // TODO: Replace "string"-based delete payloads with object payloads: const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload; - messagesSent.labels({ type: destinationType }).inc(1); + metrics.messagesSent.labels({ type: destinationType }).inc(1); log.debug({ event, payload }, `Transmitting ${event} to ${req.accountId}`); @@ -1027,11 +1012,11 @@ const startServer = async () => { const streamToHttp = (req, res) => { const channelName = channelNameFromPath(req); - connectedClients.labels({ type: 'eventsource' }).inc(); + metrics.connectedClients.labels({ type: 'eventsource' }).inc(); // In theory we'll always have a channel name, but channelNameFromPath can return undefined: if (typeof channelName === 'string') { - connectedChannels.labels({ type: 'eventsource', channel: channelName }).inc(); + metrics.connectedChannels.labels({ type: 'eventsource', channel: channelName }).inc(); } res.setHeader('Content-Type', 'text/event-stream'); @@ -1047,10 +1032,10 @@ const startServer = async () => { // We decrement these counters here instead of in streamHttpEnd as in that // method we don't have knowledge of the channel names - connectedClients.labels({ type: 'eventsource' }).dec(); + metrics.connectedClients.labels({ type: 'eventsource' }).dec(); // In theory we'll always have a channel name, but channelNameFromPath can return undefined: if (typeof channelName === 'string') { - connectedChannels.labels({ type: 'eventsource', channel: channelName }).dec(); + metrics.connectedChannels.labels({ type: 'eventsource', channel: channelName }).dec(); } clearInterval(heartbeat); @@ -1324,7 +1309,7 @@ const startServer = async () => { const stopHeartbeat = subscriptionHeartbeat(channelIds); const listener = streamFrom(channelIds, request, logger, onSend, undefined, 'websocket', options.needsFiltering); - connectedChannels.labels({ type: 'websocket', channel: channelName }).inc(); + metrics.connectedChannels.labels({ type: 'websocket', channel: channelName }).inc(); subscriptions[channelIds.join(';')] = { channelName, @@ -1363,7 +1348,7 @@ const startServer = async () => { unsubscribe(`${redisPrefix}${channelId}`, subscription.listener); }); - connectedChannels.labels({ type: 'websocket', channel: subscription.channelName }).dec(); + metrics.connectedChannels.labels({ type: 'websocket', channel: subscription.channelName }).dec(); subscription.stopHeartbeat(); delete subscriptions[channelIds.join(';')]; @@ -1421,7 +1406,7 @@ const startServer = async () => { }, }; - connectedChannels.labels({ type: 'websocket', channel: 'system' }).inc(2); + metrics.connectedChannels.labels({ type: 'websocket', channel: 'system' }).inc(2); }; /** @@ -1433,7 +1418,7 @@ const startServer = async () => { // Note: url.parse could throw, which would terminate the connection, so we // increment the connected clients metric straight away when we establish // the connection, without waiting: - connectedClients.labels({ type: 'websocket' }).inc(); + metrics.connectedClients.labels({ type: 'websocket' }).inc(); // Setup connection keep-alive state: ws.isAlive = true; @@ -1459,7 +1444,7 @@ const startServer = async () => { }); // Decrement the metrics for connected clients: - connectedClients.labels({ type: 'websocket' }).dec(); + metrics.connectedClients.labels({ type: 'websocket' }).dec(); // We need to unassign the session object as to ensure it correctly gets // garbage collected, without doing this we could accidentally hold on to diff --git a/streaming/metrics.js b/streaming/metrics.js index a029d778fcf..bb6bce3f3c1 100644 --- a/streaming/metrics.js +++ b/streaming/metrics.js @@ -4,12 +4,12 @@ import metrics from 'prom-client'; /** * @typedef StreamingMetrics - * @property {metrics.Registry} register * @property {metrics.Gauge<"type">} connectedClients * @property {metrics.Gauge<"type" | "channel">} connectedChannels * @property {metrics.Gauge} redisSubscriptions * @property {metrics.Counter} redisMessagesReceived * @property {metrics.Counter<"type">} messagesSent + * @property {import('express').RequestHandler<{}>} requestHandler */ /** @@ -92,8 +92,21 @@ export function setupMetrics(channels, pgPool) { messagesSent.inc({ type: 'websocket' }, 0); messagesSent.inc({ type: 'eventsource' }, 0); + /** + * @type {import('express').RequestHandler<{}>} + */ + const requestHandler = (req, res) => { + metrics.register.metrics().then((output) => { + res.set('Content-Type', metrics.register.contentType); + res.end(output); + }).catch((err) => { + req.log.error(err, "Error collecting metrics"); + res.status(500).end(); + }); + }; + return { - register: metrics.register, + requestHandler, connectedClients, connectedChannels, redisSubscriptions,