Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | 1x 6x 6x 6x 6x 6x 6x 6x 6x 6x 6x 1x 5x 6x 5x 1x 4x 1x 3x 3x 1x 2x 2x 2x 2x 2x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 5x 5x 6x | import { Handler } from "aws-lambda";
import { Kafka } from "kafkajs";
import { LambdaClient, ListEventSourceMappingsCommand } from "@aws-sdk/client-lambda";
export const handler: Handler = async (event, _, callback) => {
const response = {
statusCode: 200,
stable: false,
current: false,
ready: false,
};
let errorResponse = null;
try {
const triggerInfo: any[] = [];
const lambdaClient = new LambdaClient({
region: process.env.region,
});
for (const trigger of event.triggers) {
for (const topic of [...new Set(trigger.topics)]) {
console.log(`Getting consumer groups for function: ${trigger.function} and topic ${topic}`);
const lambdaResponse = await lambdaClient.send(
new ListEventSourceMappingsCommand({
FunctionName: trigger.function,
}),
);
if (
!lambdaResponse.EventSourceMappings ||
lambdaResponse.EventSourceMappings.length === 0
) {
throw new Error(
`ERROR: No event source mapping found for function ${trigger.function} and topic ${topic}`,
);
}
const mappingForCurrentTopic = lambdaResponse.EventSourceMappings.filter(
(mapping) => mapping.Topics && mapping.Topics.includes(topic as string),
);
if (!mappingForCurrentTopic || mappingForCurrentTopic.length === 0) {
throw new Error(
`ERROR: No event source mapping found for function ${trigger.function} and topic ${topic}`,
);
}
if (mappingForCurrentTopic.length > 1) {
throw new Error(
`ERROR: Multiple event source mappings found for function ${trigger.function} and topic ${topic}`,
);
}
const groupId =
mappingForCurrentTopic[0]?.SelfManagedKafkaEventSourceConfig?.ConsumerGroupId;
if (!groupId) {
throw new Error(
`ERROR: No ConsumerGroupId found for function ${trigger.function} and topic ${topic}`,
);
}
triggerInfo.push({
groupId,
topics: [topic],
});
}
}
const kafka = new Kafka({
clientId: "consumerGroupResetter",
brokers: event.brokerString?.split(",") || [],
ssl: true,
});
const admin = kafka.admin();
await admin.connect();
// Get status for each consumer group
const info = await admin.describeGroups(triggerInfo.map((a) => a.groupId));
const statuses = info.groups.map((a) => a.state.toString());
// Get topic and group offset for each consumer group
const offsets: { [key: string]: any } = {};
for (const trigger of triggerInfo) {
for (const topic of trigger.topics) {
const groupId: string = trigger.groupId;
const topicOffsets = await admin.fetchTopicOffsets(topic);
const groupOffsets = await admin.fetchOffsets({
groupId,
topics: [topic],
});
// Assuming there's a single partition for simplicity.
const latestOffset = topicOffsets[0].offset;
const currentOffset = groupOffsets[0].partitions[0].offset;
offsets[groupId] = {
latestOffset,
currentOffset,
};
console.log(
`Topic: ${topic}, Group: ${groupId}, Latest Offset: ${latestOffset}, Current Offset: ${currentOffset}`,
);
}
}
await admin.disconnect();
response.stable = statuses.every((status) => status === "Stable");
response.current = Object.values(offsets).every((o) => o.latestOffset === o.currentOffset);
response.ready = response.stable && response.current;
} catch (error: any) {
response.statusCode = 500;
errorResponse = error;
} finally {
callback(errorResponse, response);
}
};
|