Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 | 1x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 2x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 2x | import {
CreateEventSourceMappingCommand,
CreateEventSourceMappingCommandInput,
GetEventSourceMappingCommand,
LambdaClient,
} from "@aws-sdk/client-lambda";
import { Handler } from "aws-lambda";
import { randomUUID } from "crypto";
export const handler: Handler = async (event, _, callback) => {
console.log("request:", JSON.stringify(event, undefined, 2));
const response = {
statusCode: 200,
};
let errorResponse = null;
try {
const lambdaClient = new LambdaClient({
region: process.env.region,
});
const uuidsToCheck = [];
for (const trigger of event.triggers) {
for (const topic of [...new Set(trigger.topics)]) {
const consumerGroupId = `${event.consumerGroupPrefix}${randomUUID()}`;
console.log(
`Creating a mapping to trigger ${trigger.function} off ${topic} with consumer group ID ${consumerGroupId}`,
);
const createEventSourceMappingParams = {
BatchSize: trigger.batchSize || 1000,
Enabled: trigger.enabled ?? true,
FunctionName: trigger.function, // assuming this ARN is provided in the event
SelfManagedEventSource: {
Endpoints: {
KAFKA_BOOTSTRAP_SERVERS: event.brokerString.split(","),
},
},
SelfManagedKafkaEventSourceConfig: {
ConsumerGroupId: consumerGroupId,
},
SourceAccessConfigurations: [
{ Type: "VPC_SUBNET", URI: `${event.subnets[0]}` },
{ Type: "VPC_SUBNET", URI: `${event.subnets[1]}` },
{ Type: "VPC_SUBNET", URI: `${event.subnets[2]}` },
{
Type: "VPC_SECURITY_GROUP",
URI: `security_group:${event.securityGroup}`,
},
],
StartingPosition: event.startingPosition || "TRIM_HORIZON",
Topics: [topic],
};
console.log(JSON.stringify(createEventSourceMappingParams, null, 2));
const command = new CreateEventSourceMappingCommand(
createEventSourceMappingParams as CreateEventSourceMappingCommandInput,
);
const result = await lambdaClient.send(command);
console.log(result);
Eif (createEventSourceMappingParams.Enabled) {
uuidsToCheck.push(result.UUID);
}
}
}
for (const uuid of uuidsToCheck) {
let isEnabled = false;
while (!isEnabled) {
const listCommand = new GetEventSourceMappingCommand({ UUID: uuid });
const mappingResult = await lambdaClient.send(listCommand);
if (mappingResult.State === "Enabled") {
isEnabled = true;
} else E{
console.log(`Waiting for mapping ${uuid} to be enabled...`);
await new Promise((resolve) => setTimeout(resolve, 10000)); // Wait for 10 seconds before checking again
}
}
console.log(`Mapping ${uuid} is now enabled.`);
}
} catch (error: any) {
response.statusCode = 500;
errorResponse = error;
} finally {
callback(errorResponse, response);
}
};
|