All files / lib/lambda attachmentArchiveBackfillStatus.ts

94.44% Statements 34/36
82.75% Branches 24/29
100% Functions 5/5
94.44% Lines 34/36

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118      1x 1x 1x               2x 2x       2x       2x 2x       2x                 3x 3x 3x   3x 3x                 3x 5x 1x     4x 2x     2x 2x 1x     2x     3x     3x 1x                   3x     1x 2x             2x     2x     2x         2x             2x              
import { ListExecutionsCommand, SFNClient } from "@aws-sdk/client-sfn";
import { GetQueueAttributesCommand, SQSClient } from "@aws-sdk/client-sqs";
 
const queueClient = new SQSClient({ region: process.env.region || process.env.AWS_REGION });
const stepFunctionsClient = new SFNClient({ region: process.env.region || process.env.AWS_REGION });
const STALE_ARCHIVE_EXECUTION_MAX_AGE_MS = 30 * 60 * 1000;
 
type AttachmentArchiveBackfillStatusEvent = {
  currentExecutionArn?: string;
  historicalBackfillStateMachineArn?: string;
};
 
function getArchiveRebuildQueueUrl() {
  const queueUrl = process.env.ATTACHMENT_ARCHIVE_REBUILD_QUEUE_URL;
  Iif (!queueUrl) {
    throw new Error("ATTACHMENT_ARCHIVE_REBUILD_QUEUE_URL must be defined");
  }
 
  return queueUrl;
}
 
function getArchiveStateMachineArn() {
  const stateMachineArn = process.env.ATTACHMENT_ARCHIVE_STATE_MACHINE_ARN;
  Iif (!stateMachineArn) {
    throw new Error("ATTACHMENT_ARCHIVE_STATE_MACHINE_ARN must be defined");
  }
 
  return stateMachineArn;
}
 
async function countRunningExecutions(
  stateMachineArn: string,
  currentExecutionArn?: string,
  maxAgeMs?: number,
) {
  let nextToken: string | undefined;
  let runningCount = 0;
  const staleExecutions: string[] = [];
  const now = Date.now();
 
  do {
    const response = await stepFunctionsClient.send(
      new ListExecutionsCommand({
        stateMachineArn,
        maxResults: 100,
        nextToken,
        statusFilter: "RUNNING",
      }),
    );
 
    runningCount += (response.executions || []).filter((execution) => {
      if (execution.executionArn === currentExecutionArn) {
        return false;
      }
 
      if (!maxAgeMs || !execution.startDate) {
        return true;
      }
 
      const isStale = now - execution.startDate.getTime() > maxAgeMs;
      if (isStale && execution.executionArn) {
        staleExecutions.push(execution.executionArn);
      }
 
      return !isStale;
    }).length;
 
    nextToken = response.nextToken;
  } while (nextToken);
 
  if (staleExecutions.length > 0) {
    console.warn(
      JSON.stringify({
        event: "attachment_archive_backfill_status_ignored_stale_executions",
        stateMachineArn,
        staleExecutionArns: staleExecutions,
        staleThresholdMinutes: STALE_ARCHIVE_EXECUTION_MAX_AGE_MS / 60000,
      }),
    );
  }
 
  return runningCount;
}
 
export const handler = async (event: AttachmentArchiveBackfillStatusEvent = {}) => {
  const queueAttributesResponse = await queueClient.send(
    new GetQueueAttributesCommand({
      AttributeNames: ["ApproximateNumberOfMessages", "ApproximateNumberOfMessagesNotVisible"],
      QueueUrl: getArchiveRebuildQueueUrl(),
    }),
  );
 
  const queueVisible = Number(
    queueAttributesResponse.Attributes?.ApproximateNumberOfMessages || "0",
  );
  const queueInflight = Number(
    queueAttributesResponse.Attributes?.ApproximateNumberOfMessagesNotVisible || "0",
  );
  const runningExecutions = await countRunningExecutions(
    getArchiveStateMachineArn(),
    undefined,
    STALE_ARCHIVE_EXECUTION_MAX_AGE_MS,
  );
  const otherHistoricalBackfillExecutions = event.historicalBackfillStateMachineArn
    ? await countRunningExecutions(
        event.historicalBackfillStateMachineArn,
        event.currentExecutionArn,
      )
    : 0;
 
  return {
    otherHistoricalBackfillExecutions,
    queueInflight,
    queueVisible,
    runningExecutions,
  };
};