All files / lib/lambda createTriggers.ts

90.62% Statements 29/32
70% Branches 7/10
50% Functions 1/2
93.54% Lines 29/31

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83                  1x 2x 2x     2x 2x 2x     2x 2x 2x 2x 2x     2x                                               2x 2x     2x 1x 1x 1x       1x 1x 1x 1x 1x 1x 1x           1x     1x 1x   2x      
import { Handler } from "aws-lambda";
import {
  CreateEventSourceMappingCommand,
  CreateEventSourceMappingCommandInput,
  GetEventSourceMappingCommand,
  LambdaClient,
} from "@aws-sdk/client-lambda";
import { randomUUID } from "crypto";
 
export const handler: Handler = async (event, _, callback) => {
  console.log("request:", JSON.stringify(event, undefined, 2));
  const response = {
    statusCode: 200,
  };
  let errorResponse = null;
  try {
    const lambdaClient = new LambdaClient({
      region: process.env.region,
    });
    const uuidsToCheck = [];
    for (const trigger of event.triggers) {
      for (const topic of [...new Set(trigger.topics)]) {
        const consumerGroupId = `${event.consumerGroupPrefix}${randomUUID()}`;
        console.log(
          `Creating a mapping to trigger ${trigger.function} off ${topic} with consumer group ID ${consumerGroupId}`,
        );
        const createEventSourceMappingParams = {
          BatchSize: trigger.batchSize || 1000,
          Enabled: trigger.enabled ?? true,
          FunctionName: trigger.function, // assuming this ARN is provided in the event
          SelfManagedEventSource: {
            Endpoints: {
              KAFKA_BOOTSTRAP_SERVERS: event.brokerString.split(","),
            },
          },
          SelfManagedKafkaEventSourceConfig: {
            ConsumerGroupId: consumerGroupId,
          },
          SourceAccessConfigurations: [
            { Type: "VPC_SUBNET", URI: `${event.subnets[0]}` },
            { Type: "VPC_SUBNET", URI: `${event.subnets[1]}` },
            { Type: "VPC_SUBNET", URI: `${event.subnets[2]}` },
            {
              Type: "VPC_SECURITY_GROUP",
              URI: `security_group:${event.securityGroup}`,
            },
          ],
          StartingPosition: event.startingPosition || "TRIM_HORIZON",
          Topics: [topic],
        };
        console.log(JSON.stringify(createEventSourceMappingParams, null, 2));
        const command = new CreateEventSourceMappingCommand(
          createEventSourceMappingParams as CreateEventSourceMappingCommandInput,
        );
        const result = await lambdaClient.send(command);
        console.log(result);
        Eif (createEventSourceMappingParams.Enabled) {
          uuidsToCheck.push(result.UUID);
        }
      }
    }
    for (const uuid of uuidsToCheck) {
      let isEnabled = false;
      while (!isEnabled) {
        const listCommand = new GetEventSourceMappingCommand({ UUID: uuid });
        const mappingResult = await lambdaClient.send(listCommand);
        if (mappingResult.State === "Enabled") {
          isEnabled = true;
        } else E{
          console.log(`Waiting for mapping ${uuid} to be enabled...`);
          await new Promise((resolve) => setTimeout(resolve, 10000)); // Wait for 10 seconds before checking again
        }
      }
      console.log(`Mapping ${uuid} is now enabled.`);
    }
  } catch (error: any) {
    response.statusCode = 500;
    errorResponse = error;
  } finally {
    callback(errorResponse, response);
  }
};