Streaming: Refactor to use metrics.$name instead of destructuring (#31566)

This commit is contained in:
Emelia Smith 2024-08-26 10:08:21 +02:00 committed by GitHub
parent f9f4006a1b
commit c245a2044e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 34 additions and 36 deletions

View File

@ -292,6 +292,9 @@ const CHANNEL_NAMES = [
const startServer = async () => { const startServer = async () => {
const pgPool = new pg.Pool(pgConfigFromEnv(process.env)); const pgPool = new pg.Pool(pgConfigFromEnv(process.env));
const metrics = setupMetrics(CHANNEL_NAMES, pgPool);
const server = http.createServer(); const server = http.createServer();
const wss = new WebSocketServer({ noServer: true }); const wss = new WebSocketServer({ noServer: true });
@ -388,16 +391,6 @@ const startServer = async () => {
const redisClient = await createRedisClient(redisConfig); const redisClient = await createRedisClient(redisConfig);
const { redisPrefix } = redisConfig; const { redisPrefix } = redisConfig;
const metrics = setupMetrics(CHANNEL_NAMES, pgPool);
// TODO: migrate all metrics to metrics.X.method() instead of just X.method()
const {
connectedClients,
connectedChannels,
redisSubscriptions,
redisMessagesReceived,
messagesSent,
} = metrics;
// When checking metrics in the browser, the favicon is requested this // When checking metrics in the browser, the favicon is requested this
// prevents the request from falling through to the API Router, which would // prevents the request from falling through to the API Router, which would
// error for this endpoint: // error for this endpoint:
@ -408,15 +401,7 @@ const startServer = async () => {
res.end('OK'); res.end('OK');
}); });
app.get('/metrics', async (req, res) => { app.get('/metrics', metrics.requestHandler);
try {
res.set('Content-Type', metrics.register.contentType);
res.end(await metrics.register.metrics());
} catch (ex) {
req.log.error(ex);
res.status(500).end();
}
});
/** /**
* @param {string[]} channels * @param {string[]} channels
@ -443,7 +428,7 @@ const startServer = async () => {
* @param {string} message * @param {string} message
*/ */
const onRedisMessage = (channel, message) => { const onRedisMessage = (channel, message) => {
redisMessagesReceived.inc(); metrics.redisMessagesReceived.inc();
const callbacks = subs[channel]; const callbacks = subs[channel];
@ -481,7 +466,7 @@ const startServer = async () => {
if (err) { if (err) {
logger.error(`Error subscribing to ${channel}`); logger.error(`Error subscribing to ${channel}`);
} else if (typeof count === 'number') { } else if (typeof count === 'number') {
redisSubscriptions.set(count); metrics.redisSubscriptions.set(count);
} }
}); });
} }
@ -508,7 +493,7 @@ const startServer = async () => {
if (err) { if (err) {
logger.error(`Error unsubscribing to ${channel}`); logger.error(`Error unsubscribing to ${channel}`);
} else if (typeof count === 'number') { } else if (typeof count === 'number') {
redisSubscriptions.set(count); metrics.redisSubscriptions.set(count);
} }
}); });
delete subs[channel]; delete subs[channel];
@ -688,13 +673,13 @@ const startServer = async () => {
unsubscribe(`${redisPrefix}${accessTokenChannelId}`, listener); unsubscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
unsubscribe(`${redisPrefix}${systemChannelId}`, listener); unsubscribe(`${redisPrefix}${systemChannelId}`, listener);
connectedChannels.labels({ type: 'eventsource', channel: 'system' }).dec(2); metrics.connectedChannels.labels({ type: 'eventsource', channel: 'system' }).dec(2);
}); });
subscribe(`${redisPrefix}${accessTokenChannelId}`, listener); subscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
subscribe(`${redisPrefix}${systemChannelId}`, listener); subscribe(`${redisPrefix}${systemChannelId}`, listener);
connectedChannels.labels({ type: 'eventsource', channel: 'system' }).inc(2); metrics.connectedChannels.labels({ type: 'eventsource', channel: 'system' }).inc(2);
}; };
/** /**
@ -790,7 +775,7 @@ const startServer = async () => {
// TODO: Replace "string"-based delete payloads with object payloads: // TODO: Replace "string"-based delete payloads with object payloads:
const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload; const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
messagesSent.labels({ type: destinationType }).inc(1); metrics.messagesSent.labels({ type: destinationType }).inc(1);
log.debug({ event, payload }, `Transmitting ${event} to ${req.accountId}`); log.debug({ event, payload }, `Transmitting ${event} to ${req.accountId}`);
@ -1027,11 +1012,11 @@ const startServer = async () => {
const streamToHttp = (req, res) => { const streamToHttp = (req, res) => {
const channelName = channelNameFromPath(req); const channelName = channelNameFromPath(req);
connectedClients.labels({ type: 'eventsource' }).inc(); metrics.connectedClients.labels({ type: 'eventsource' }).inc();
// In theory we'll always have a channel name, but channelNameFromPath can return undefined: // In theory we'll always have a channel name, but channelNameFromPath can return undefined:
if (typeof channelName === 'string') { if (typeof channelName === 'string') {
connectedChannels.labels({ type: 'eventsource', channel: channelName }).inc(); metrics.connectedChannels.labels({ type: 'eventsource', channel: channelName }).inc();
} }
res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Content-Type', 'text/event-stream');
@ -1047,10 +1032,10 @@ const startServer = async () => {
// We decrement these counters here instead of in streamHttpEnd as in that // We decrement these counters here instead of in streamHttpEnd as in that
// method we don't have knowledge of the channel names // method we don't have knowledge of the channel names
connectedClients.labels({ type: 'eventsource' }).dec(); metrics.connectedClients.labels({ type: 'eventsource' }).dec();
// In theory we'll always have a channel name, but channelNameFromPath can return undefined: // In theory we'll always have a channel name, but channelNameFromPath can return undefined:
if (typeof channelName === 'string') { if (typeof channelName === 'string') {
connectedChannels.labels({ type: 'eventsource', channel: channelName }).dec(); metrics.connectedChannels.labels({ type: 'eventsource', channel: channelName }).dec();
} }
clearInterval(heartbeat); clearInterval(heartbeat);
@ -1324,7 +1309,7 @@ const startServer = async () => {
const stopHeartbeat = subscriptionHeartbeat(channelIds); const stopHeartbeat = subscriptionHeartbeat(channelIds);
const listener = streamFrom(channelIds, request, logger, onSend, undefined, 'websocket', options.needsFiltering); const listener = streamFrom(channelIds, request, logger, onSend, undefined, 'websocket', options.needsFiltering);
connectedChannels.labels({ type: 'websocket', channel: channelName }).inc(); metrics.connectedChannels.labels({ type: 'websocket', channel: channelName }).inc();
subscriptions[channelIds.join(';')] = { subscriptions[channelIds.join(';')] = {
channelName, channelName,
@ -1363,7 +1348,7 @@ const startServer = async () => {
unsubscribe(`${redisPrefix}${channelId}`, subscription.listener); unsubscribe(`${redisPrefix}${channelId}`, subscription.listener);
}); });
connectedChannels.labels({ type: 'websocket', channel: subscription.channelName }).dec(); metrics.connectedChannels.labels({ type: 'websocket', channel: subscription.channelName }).dec();
subscription.stopHeartbeat(); subscription.stopHeartbeat();
delete subscriptions[channelIds.join(';')]; delete subscriptions[channelIds.join(';')];
@ -1421,7 +1406,7 @@ const startServer = async () => {
}, },
}; };
connectedChannels.labels({ type: 'websocket', channel: 'system' }).inc(2); metrics.connectedChannels.labels({ type: 'websocket', channel: 'system' }).inc(2);
}; };
/** /**
@ -1433,7 +1418,7 @@ const startServer = async () => {
// Note: url.parse could throw, which would terminate the connection, so we // Note: url.parse could throw, which would terminate the connection, so we
// increment the connected clients metric straight away when we establish // increment the connected clients metric straight away when we establish
// the connection, without waiting: // the connection, without waiting:
connectedClients.labels({ type: 'websocket' }).inc(); metrics.connectedClients.labels({ type: 'websocket' }).inc();
// Setup connection keep-alive state: // Setup connection keep-alive state:
ws.isAlive = true; ws.isAlive = true;
@ -1459,7 +1444,7 @@ const startServer = async () => {
}); });
// Decrement the metrics for connected clients: // Decrement the metrics for connected clients:
connectedClients.labels({ type: 'websocket' }).dec(); metrics.connectedClients.labels({ type: 'websocket' }).dec();
// We need to unassign the session object as to ensure it correctly gets // We need to unassign the session object as to ensure it correctly gets
// garbage collected, without doing this we could accidentally hold on to // garbage collected, without doing this we could accidentally hold on to

View File

@ -4,12 +4,12 @@ import metrics from 'prom-client';
/** /**
* @typedef StreamingMetrics * @typedef StreamingMetrics
* @property {metrics.Registry} register
* @property {metrics.Gauge<"type">} connectedClients * @property {metrics.Gauge<"type">} connectedClients
* @property {metrics.Gauge<"type" | "channel">} connectedChannels * @property {metrics.Gauge<"type" | "channel">} connectedChannels
* @property {metrics.Gauge} redisSubscriptions * @property {metrics.Gauge} redisSubscriptions
* @property {metrics.Counter} redisMessagesReceived * @property {metrics.Counter} redisMessagesReceived
* @property {metrics.Counter<"type">} messagesSent * @property {metrics.Counter<"type">} messagesSent
* @property {import('express').RequestHandler<{}>} requestHandler
*/ */
/** /**
@ -92,8 +92,21 @@ export function setupMetrics(channels, pgPool) {
messagesSent.inc({ type: 'websocket' }, 0); messagesSent.inc({ type: 'websocket' }, 0);
messagesSent.inc({ type: 'eventsource' }, 0); messagesSent.inc({ type: 'eventsource' }, 0);
/**
* @type {import('express').RequestHandler<{}>}
*/
const requestHandler = (req, res) => {
metrics.register.metrics().then((output) => {
res.set('Content-Type', metrics.register.contentType);
res.end(output);
}).catch((err) => {
req.log.error(err, "Error collecting metrics");
res.status(500).end();
});
};
return { return {
register: metrics.register, requestHandler,
connectedClients, connectedClients,
connectedChannels, connectedChannels,
redisSubscriptions, redisSubscriptions,