add fail fast to the common doorman lambda
This commit is contained in:
parent
4817ed93d2
commit
7a2ebb92be
@ -12,10 +12,11 @@
|
||||
},
|
||||
"dependencies": {
|
||||
"@aws-sdk/client-dynamodb": "^3.609.0",
|
||||
"@twilio-labs/serverless-runtime-types": "^3.0.0",
|
||||
"@twilio-labs/serverless-runtime-types": "^4.0.1",
|
||||
"@twilio/runtime-handler": "1.3.0",
|
||||
"discord.js": "^14.16.3",
|
||||
"prom-client": "^15.1.3",
|
||||
"promise.timeout": "^1.2.0",
|
||||
"twilio": "^3.56",
|
||||
"winston": "^3.17.0",
|
||||
"winston-loki": "^6.1.3"
|
||||
|
||||
@ -4,19 +4,27 @@ import { DoormanLambdaContext } from "./DoormanHandlerContext";
|
||||
import { shouldBlockRequest } from "../utils/blockUserAgent";
|
||||
import { RequestOptions } from "https";
|
||||
import { FastHttpPushgateway } from "../metrics/FastHttpPromGateway";
|
||||
import '@twilio-labs/serverless-runtime-types';
|
||||
import VoiceResponse from 'twilio/lib/twiml/VoiceResponse';
|
||||
|
||||
import { createLogger, format, Logger, transports } from "winston";
|
||||
import LokiTransport from "winston-loki";
|
||||
import pTimeout, { TimeoutError } from "promise.timeout";
|
||||
|
||||
export type BaseEvent = { request: { cookies: {}; headers: {}; }; }
|
||||
|
||||
export type CallbackResult = Parameters<ServerlessCallback>;
|
||||
export type FailFastCallback = () => CallbackResult;
|
||||
|
||||
export type DoormanLambda<T extends DoormanLambdaContext, U extends BaseEvent> = (
|
||||
context: Parameters<ServerlessFunctionSignature<T, U>>[0],
|
||||
event: Parameters<ServerlessFunctionSignature<T, U>>[1],
|
||||
callback: Parameters<ServerlessFunctionSignature<T, U>>[2],
|
||||
metricsRegistry: Registry<PrometheusContentType>,
|
||||
logger: Logger,
|
||||
) => void;
|
||||
// this is optional, but if called this method should provide an alternative callback provider that can fail fast
|
||||
failFastCallback: (fn: FailFastCallback) => void,
|
||||
) => Promise<void>;
|
||||
|
||||
export enum CommonMetrics {
|
||||
RUNTIME = "FunctionRuntime",
|
||||
@ -25,6 +33,9 @@ export enum CommonMetrics {
|
||||
LOKI_ERROR = "LokiError",
|
||||
LOKI_LOG_AFTER_CLOSE = "LokiLogAfterClose",
|
||||
TWILIO_ISOLATION_BUSTED = "TwilioIsolationBusted",
|
||||
INNER_HANDLER_TIMEOUT = "InnerHandlerTimeout",
|
||||
INNER_HANDLER_MISSING_FAIL_FAST = "InnerHandlerMissingFailFast",
|
||||
RETURN_FALLBACK_RESPONSE = "ReturnFallbackResponse",
|
||||
};
|
||||
|
||||
export function getMetricFromRegistry<T>(metricsRegistry: Registry, metric: string): T {
|
||||
@ -42,8 +53,15 @@ export function gracefullyEndLogger(logger: Logger): Promise<void> {
|
||||
});
|
||||
}
|
||||
|
||||
// TODO: make this response a voice message?
|
||||
const REJECT_RESPONSE = new VoiceResponse();
|
||||
REJECT_RESPONSE.reject();
|
||||
|
||||
const FALLBACK_CALLBACK: CallbackResult = [null, REJECT_RESPONSE];
|
||||
|
||||
const MINIMUM_MS_TO_SEND_RESPONSE: number = 250;
|
||||
const FUNCTION_MAXIMUM_DURATION_MS: number = 10_000;
|
||||
const INNER_HANDLER_MAXIMUM_DURATION_MS: number = 8_500;
|
||||
|
||||
/**
|
||||
* A decorator for twilio handlers. It provides a metrics registry and
|
||||
@ -100,6 +118,21 @@ export function withMetrics<T extends DoormanLambdaContext, U extends BaseEvent>
|
||||
help: "Twilio invocation lasted longer than 10s",
|
||||
}));
|
||||
|
||||
metricsRegistry.registerMetric(new Counter({
|
||||
name: CommonMetrics.INNER_HANDLER_MISSING_FAIL_FAST,
|
||||
help: "Inner handler was timed out and we did not have any fallback response",
|
||||
}));
|
||||
|
||||
metricsRegistry.registerMetric(new Counter({
|
||||
name: CommonMetrics.INNER_HANDLER_TIMEOUT,
|
||||
help: "Inner handler was timed out",
|
||||
}));
|
||||
|
||||
metricsRegistry.registerMetric(new Counter({
|
||||
name: CommonMetrics.RETURN_FALLBACK_RESPONSE,
|
||||
help: "Common handler returned a fallback response",
|
||||
}));
|
||||
|
||||
// Create a Winston logger
|
||||
const logger = createLogger({
|
||||
level: 'info',
|
||||
@ -136,6 +169,7 @@ export function withMetrics<T extends DoormanLambdaContext, U extends BaseEvent>
|
||||
// Override the base console log with winston
|
||||
console.log = function (...args) {
|
||||
if (logger.writable) {
|
||||
// @ts-ignore
|
||||
return logger.info.apply(logger, [...args]);
|
||||
}
|
||||
getMetricFromRegistry<Counter>(metricsRegistry, CommonMetrics.LOKI_LOG_AFTER_CLOSE).inc(1);
|
||||
@ -143,6 +177,7 @@ export function withMetrics<T extends DoormanLambdaContext, U extends BaseEvent>
|
||||
};
|
||||
console.error = function (...args) {
|
||||
if (logger.writable) {
|
||||
// @ts-ignore
|
||||
return logger.error.apply(logger, [...args]);
|
||||
}
|
||||
getMetricFromRegistry<Counter>(metricsRegistry, CommonMetrics.LOKI_LOG_AFTER_CLOSE).inc(1);
|
||||
@ -150,6 +185,7 @@ export function withMetrics<T extends DoormanLambdaContext, U extends BaseEvent>
|
||||
};
|
||||
console.warn = function (...args) {
|
||||
if (logger.writable) {
|
||||
// @ts-ignore
|
||||
return logger.warn.apply(logger, [...args]);
|
||||
}
|
||||
getMetricFromRegistry<Counter>(metricsRegistry, CommonMetrics.LOKI_LOG_AFTER_CLOSE).inc(1);
|
||||
@ -157,6 +193,7 @@ export function withMetrics<T extends DoormanLambdaContext, U extends BaseEvent>
|
||||
};
|
||||
console.info = function (...args) {
|
||||
if (logger.writable) {
|
||||
// @ts-ignore
|
||||
return logger.info.apply(logger, [...args]);
|
||||
}
|
||||
getMetricFromRegistry<Counter>(metricsRegistry, CommonMetrics.LOKI_LOG_AFTER_CLOSE).inc(1);
|
||||
@ -175,6 +212,12 @@ export function withMetrics<T extends DoormanLambdaContext, U extends BaseEvent>
|
||||
callbackResult = [err, payload];
|
||||
}
|
||||
|
||||
// intercept the fail fast callback method, if it is provided
|
||||
let failFastCallbackMethod: FailFastCallback | undefined;
|
||||
const failFastCallbackProvider: (fn: FailFastCallback) => void = (fn) => {
|
||||
failFastCallbackMethod = fn;
|
||||
}
|
||||
|
||||
// block requests before we even call the handler
|
||||
if (shouldBlockRequest(event)) {
|
||||
getMetricFromRegistry<Counter>(metricsRegistry, CommonMetrics.BLOCKED_REQUEST).inc(1);
|
||||
@ -182,14 +225,34 @@ export function withMetrics<T extends DoormanLambdaContext, U extends BaseEvent>
|
||||
response.setStatusCode(200);
|
||||
callbackResult = [null, response];
|
||||
} else {
|
||||
await handler(context, event, tempCallback, metricsRegistry, logger);
|
||||
// wait for response for up to x ms, otherwise we call the failFastCallbackMethod
|
||||
try {
|
||||
await pTimeout(
|
||||
() => handler(context, event, tempCallback, metricsRegistry, logger, failFastCallbackProvider),
|
||||
INNER_HANDLER_MAXIMUM_DURATION_MS
|
||||
)();
|
||||
} catch (e) {
|
||||
console.log(e);
|
||||
if (e instanceof TimeoutError) {
|
||||
getMetricFromRegistry<Counter>(metricsRegistry, CommonMetrics.INNER_HANDLER_TIMEOUT).inc(1);
|
||||
|
||||
if (!failFastCallbackMethod) {
|
||||
getMetricFromRegistry<Counter>(metricsRegistry, CommonMetrics.INNER_HANDLER_MISSING_FAIL_FAST).inc(1);
|
||||
reject("Timeout, but no failfast result was given");
|
||||
return;
|
||||
}
|
||||
|
||||
callbackResult = failFastCallbackMethod();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!callbackResult) {
|
||||
reject("No callback was given");
|
||||
return;
|
||||
}
|
||||
|
||||
let statusCode: number | undefined = (callbackResult?.[1] as any)?.statusCode;
|
||||
let statusCode: number | undefined = (callbackResult[1] as any)?.statusCode;
|
||||
|
||||
if (statusCode && statusCode >= 400) {
|
||||
getMetricFromRegistry<Counter>(metricsRegistry, CommonMetrics.HTTP_CLIENT_ERROR).inc({
|
||||
@ -197,12 +260,16 @@ export function withMetrics<T extends DoormanLambdaContext, U extends BaseEvent>
|
||||
}, 1);
|
||||
}
|
||||
|
||||
resolve(callbackResult as Parameters<ServerlessCallback>);
|
||||
resolve(callbackResult);
|
||||
});
|
||||
|
||||
console.time("[CommonHandler] nested handler time");
|
||||
|
||||
const result = await handlerResponsePromise;
|
||||
const result = await handlerResponsePromise.catch((err) => {
|
||||
console.error("[CommonHandler] inner handler promise was rejected with reason: " + err);
|
||||
getMetricFromRegistry<Counter>(metricsRegistry, CommonMetrics.RETURN_FALLBACK_RESPONSE).inc(1);
|
||||
return FALLBACK_CALLBACK;
|
||||
});
|
||||
|
||||
console.timeEnd("[CommonHandler] nested handler time");
|
||||
|
||||
@ -214,7 +281,7 @@ export function withMetrics<T extends DoormanLambdaContext, U extends BaseEvent>
|
||||
getMetricFromRegistry<Counter>(metricsRegistry, CommonMetrics.TWILIO_ISOLATION_BUSTED).inc(1);
|
||||
}
|
||||
|
||||
// abandoning metrics, since there isn't enough time
|
||||
// abandoning metrics/logs, since there isn't enough time
|
||||
if (remainingTime <= MINIMUM_MS_TO_SEND_RESPONSE) {
|
||||
console.error("[CommonHandler] there is no time to send metrics / logs abandoning them");
|
||||
callback(...result);
|
||||
|
||||
@ -11,11 +11,12 @@
|
||||
"deploy": "twilio-run deploy --load-system-env --env .env.example --service-name buzzer --environment=prod --override-existing-project"
|
||||
},
|
||||
"dependencies": {
|
||||
"@twilio-labs/serverless-runtime-types": "^3.0.0",
|
||||
"@twilio-labs/serverless-runtime-types": "^4.0.1",
|
||||
"@twilio/runtime-handler": "1.3.0",
|
||||
"node-fetch": "^2.7.0",
|
||||
"prom-client": "^15.1.3",
|
||||
"prometheus-remote-write": "^0.5.0",
|
||||
"promise.timeout": "^1.2.0",
|
||||
"twilio": "^3.84.1",
|
||||
"winston": "^3.17.0",
|
||||
"winston-loki": "^6.1.3"
|
||||
|
||||
@ -19,7 +19,7 @@ import { getMetricFromRegistry, withMetrics } from '../../../doorman-api/src/com
|
||||
import { Counter, Summary } from 'prom-client';
|
||||
import { BuzzerActivatedMetrics, registerMetrics } from '../metrics/BuzzerActivatedMetrics';
|
||||
|
||||
export const handler: ServerlessFunctionSignature<TwilioContext, BuzzerDialEvent> = withMetrics('buzzer-activated', async function(context, event, callback, metricsRegistry) {
|
||||
export const handler: ServerlessFunctionSignature<TwilioContext, BuzzerDialEvent> = withMetrics('buzzer-activated', async function(context, event, callback, metricsRegistry, logger, failFastCallback) {
|
||||
// metrics
|
||||
registerMetrics(metricsRegistry);
|
||||
|
||||
@ -58,7 +58,7 @@ export const handler: ServerlessFunctionSignature<TwilioContext, BuzzerDialEvent
|
||||
|
||||
await notifyDiscord(context, msgs, config.discordUsers, config.discordUsers.map(() => ""), metricsRegistry);
|
||||
|
||||
let discordLock = false;
|
||||
let responseLock = false;
|
||||
let intervals: Timer[] = [];
|
||||
let timeouts: Timer[] = [];
|
||||
|
||||
@ -79,8 +79,8 @@ export const handler: ServerlessFunctionSignature<TwilioContext, BuzzerDialEvent
|
||||
clearInterval(intervals[0]);
|
||||
const twiml = doorOpenTwiml(config);
|
||||
|
||||
if (!discordLock) {
|
||||
discordLock = true;
|
||||
if (!responseLock) {
|
||||
responseLock = true;
|
||||
console.log(
|
||||
invokeId + " UnlockPromise: I was the fastest, so I will attempt to notify discord users before resolving with unlock"
|
||||
);
|
||||
@ -103,8 +103,8 @@ export const handler: ServerlessFunctionSignature<TwilioContext, BuzzerDialEvent
|
||||
timeouts.push(setTimeout(async () => {
|
||||
const twiml = dialFallbackTwiml(config);
|
||||
|
||||
if (!discordLock) {
|
||||
discordLock = true;
|
||||
if (!responseLock) {
|
||||
responseLock = true;
|
||||
getMetricFromRegistry<Counter>(metricsRegistry, BuzzerActivatedMetrics.DIAL_THROUGH)
|
||||
.inc({ door: config.door }, 1);
|
||||
|
||||
@ -120,14 +120,17 @@ export const handler: ServerlessFunctionSignature<TwilioContext, BuzzerDialEvent
|
||||
resolve(twiml);
|
||||
} else {
|
||||
console.log(
|
||||
invokeId + " GracefulFallbackPromise: dropping out of the race, unlock is already notifying discord users"
|
||||
invokeId + " GracefulFallbackPromise: dropping out of the race, another response is already underway"
|
||||
);
|
||||
}
|
||||
}, 7500));
|
||||
});
|
||||
|
||||
const ungracefulFallbackPromise = new Promise<VoiceResponse>((resolve, reject) => {
|
||||
timeouts.push(setTimeout(async () => {
|
||||
// provide a method to the decorator to use as a fast fallback that shouldn't have any await
|
||||
failFastCallback(() => {
|
||||
// prevent other responses
|
||||
responseLock = true;
|
||||
|
||||
getMetricFromRegistry<Counter>(metricsRegistry, BuzzerActivatedMetrics.RESULT_NOTIFICATION_FATE_UNKNOWN)
|
||||
.inc({ door: config.door }, 1);
|
||||
|
||||
@ -136,13 +139,12 @@ export const handler: ServerlessFunctionSignature<TwilioContext, BuzzerDialEvent
|
||||
|
||||
const twiml = dialFallbackTwiml(config);
|
||||
console.log(
|
||||
invokeId + " UngracefulFallbackPromise: Cutting it too close to timeout! Skipping notifying users and calling fallback"
|
||||
invokeId + " UngracefulFallback: Cutting it too close to timeout! Skipping notifying users and responding fallback"
|
||||
);
|
||||
resolve(twiml);
|
||||
}, 8500));
|
||||
return [null, twiml];
|
||||
});
|
||||
|
||||
const twiml = await Promise.race([unlockPromise, gracefulFallbackPromise, ungracefulFallbackPromise]);
|
||||
const twiml = await Promise.race([unlockPromise, gracefulFallbackPromise]);
|
||||
console.log(invokeId + " Race ended, clearing residual timers");
|
||||
timeouts.forEach(timeout => clearTimeout(timeout));
|
||||
intervals.forEach(interval => clearInterval(interval));
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user