All files / lib/lambda checkConsumerLag.ts

100% Statements 48/48
94.44% Branches 17/18
100% Functions 6/6
100% Lines 44/44

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104        1x 6x           6x 6x 6x 6x     6x 6x 6x 6x         6x       1x       5x 6x   5x 1x       4x 1x         3x 3x 1x       2x           2x         2x 2x     2x 1x   1x 1x 1x 1x 1x 1x         1x 1x 1x       1x         1x 1x 1x 1x   5x 5x   6x      
import { Handler } from "aws-lambda";
import { Kafka } from "kafkajs";
import { LambdaClient, ListEventSourceMappingsCommand } from "@aws-sdk/client-lambda";
 
export const handler: Handler = async (event, _, callback) => {
  const response = {
    statusCode: 200,
    stable: false,
    current: false,
    ready: false,
  };
  let errorResponse = null;
  try {
    const triggerInfo: any[] = [];
    const lambdaClient = new LambdaClient({
      region: process.env.region,
    });
    for (const trigger of event.triggers) {
      for (const topic of [...new Set(trigger.topics)]) {
        console.log(`Getting consumer groups for function: ${trigger.function} and topic ${topic}`);
        const lambdaResponse = await lambdaClient.send(
          new ListEventSourceMappingsCommand({
            FunctionName: trigger.function,
          }),
        );
        if (
          !lambdaResponse.EventSourceMappings ||
          lambdaResponse.EventSourceMappings.length === 0
        ) {
          throw new Error(
            `ERROR: No event source mapping found for function ${trigger.function} and topic ${topic}`,
          );
        }
        const mappingForCurrentTopic = lambdaResponse.EventSourceMappings.filter(
          (mapping) => mapping.Topics && mapping.Topics.includes(topic as string),
        );
        if (!mappingForCurrentTopic || mappingForCurrentTopic.length === 0) {
          throw new Error(
            `ERROR: No event source mapping found for function ${trigger.function} and topic ${topic}`,
          );
        }
        if (mappingForCurrentTopic.length > 1) {
          throw new Error(
            `ERROR: Multiple event source mappings found for function ${trigger.function} and topic ${topic}`,
          );
        }
        const groupId =
          mappingForCurrentTopic[0]?.SelfManagedKafkaEventSourceConfig?.ConsumerGroupId;
        if (!groupId) {
          throw new Error(
            `ERROR: No ConsumerGroupId found for function ${trigger.function} and topic ${topic}`,
          );
        }
        triggerInfo.push({
          groupId,
          topics: [topic],
        });
      }
    }
    const kafka = new Kafka({
      clientId: "consumerGroupResetter",
      brokers: event.brokerString?.split(",") || [],
      ssl: true,
    });
    const admin = kafka.admin();
    await admin.connect();
 
    // Get status for each consumer group
    const info = await admin.describeGroups(triggerInfo.map((a) => a.groupId));
    const statuses = info.groups.map((a) => a.state.toString());
    // Get topic and group offset for each consumer group
    const offsets: { [key: string]: any } = {};
    for (const trigger of triggerInfo) {
      for (const topic of trigger.topics) {
        const groupId: string = trigger.groupId;
        const topicOffsets = await admin.fetchTopicOffsets(topic);
        const groupOffsets = await admin.fetchOffsets({
          groupId,
          topics: [topic],
        });
        // Assuming there's a single partition for simplicity.
        const latestOffset = topicOffsets[0].offset;
        const currentOffset = groupOffsets[0].partitions[0].offset;
        offsets[groupId] = {
          latestOffset,
          currentOffset,
        };
        console.log(
          `Topic: ${topic}, Group: ${groupId}, Latest Offset: ${latestOffset}, Current Offset: ${currentOffset}`,
        );
      }
    }
    await admin.disconnect();
    response.stable = statuses.every((status) => status === "Stable");
    response.current = Object.values(offsets).every((o) => o.latestOffset === o.currentOffset);
    response.ready = response.stable && response.current;
  } catch (error: any) {
    response.statusCode = 500;
    errorResponse = error;
  } finally {
    callback(errorResponse, response);
  }
};