Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 | 1x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 2x | import { Handler } from "aws-lambda"; import { CreateEventSourceMappingCommand, CreateEventSourceMappingCommandInput, GetEventSourceMappingCommand, LambdaClient, } from "@aws-sdk/client-lambda"; import { randomUUID } from "crypto"; export const handler: Handler = async (event, _, callback) => { console.log("request:", JSON.stringify(event, undefined, 2)); const response = { statusCode: 200, }; let errorResponse = null; try { const lambdaClient = new LambdaClient({ region: process.env.region, }); const uuidsToCheck = []; for (const trigger of event.triggers) { for (const topic of [...new Set(trigger.topics)]) { const consumerGroupId = `${event.consumerGroupPrefix}${randomUUID()}`; console.log( `Creating a mapping to trigger ${trigger.function} off ${topic} with consumer group ID ${consumerGroupId}`, ); const createEventSourceMappingParams = { BatchSize: trigger.batchSize || 1000, Enabled: trigger.enabled ?? true, FunctionName: trigger.function, // assuming this ARN is provided in the event SelfManagedEventSource: { Endpoints: { KAFKA_BOOTSTRAP_SERVERS: event.brokerString.split(","), }, }, SelfManagedKafkaEventSourceConfig: { ConsumerGroupId: consumerGroupId, }, SourceAccessConfigurations: [ { Type: "VPC_SUBNET", URI: `${event.subnets[0]}` }, { Type: "VPC_SUBNET", URI: `${event.subnets[1]}` }, { Type: "VPC_SUBNET", URI: `${event.subnets[2]}` }, { Type: "VPC_SECURITY_GROUP", URI: `security_group:${event.securityGroup}`, }, ], StartingPosition: event.startingPosition || "TRIM_HORIZON", Topics: [topic], }; console.log(JSON.stringify(createEventSourceMappingParams, null, 2)); const command = new CreateEventSourceMappingCommand( createEventSourceMappingParams as CreateEventSourceMappingCommandInput, ); const result = await lambdaClient.send(command); console.log(result); Eif (createEventSourceMappingParams.Enabled) { uuidsToCheck.push(result.UUID); } } } for (const uuid of uuidsToCheck) { let isEnabled = false; while (!isEnabled) { const listCommand = new GetEventSourceMappingCommand({ UUID: uuid }); const mappingResult = await lambdaClient.send(listCommand); if (mappingResult.State === "Enabled") { isEnabled = true; } else E{ console.log(`Waiting for mapping ${uuid} to be enabled...`); await new Promise((resolve) => setTimeout(resolve, 10000)); // Wait for 10 seconds before checking again } } console.log(`Mapping ${uuid} is now enabled.`); } } catch (error: any) { response.statusCode = 500; errorResponse = error; } finally { callback(errorResponse, response); } }; |