Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 | 1x 1x 1x 1x 1x 1x 3x 1x 1x 1x 1x 7x 1x 9x 2x 1x 1x 1x 1x 1x 1x 1x 1x 2x 2x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x | import { Kafka } from "kafkajs";
import * as _ from "lodash";
interface TopicConfig {
topic: string;
numPartitions: number;
// Add other properties as needed
}
export async function createTopics(brokerString: string, topicsConfig: TopicConfig[]) {
const topics = topicsConfig;
const brokers = brokerString.split(",");
const kafka = new Kafka({
clientId: "admin",
brokers: brokers,
ssl: true,
});
const admin = kafka.admin();
const create = async () => {
await admin.connect();
// Fetch topics from MSK and filter out __ internal management topic
const existingTopicList = _.filter(await admin.listTopics(), (n) => !n.startsWith("_"));
console.log("Existing topics:", JSON.stringify(existingTopicList, null, 2));
// Fetch the metadata for the topics in MSK
const topicsMetadata = _.get(
await admin.fetchTopicMetadata({ topics: existingTopicList }),
"topics",
);
console.log("Topics Metadata:", JSON.stringify(topicsMetadata, null, 2));
// Diff the existing topics array with the topic configuration collection
const topicsToCreate = _.differenceWith(
topics,
existingTopicList,
(topicConfig, topic) => _.get(topicConfig, "topic") === topic,
);
// Find intersection of topics metadata collection with topic configuration collection
// where partition count of topic in Kafka is less than what is specified in the topic configuration collection
// ...can't remove partitions, only add them
const topicsToUpdate = _.intersectionWith(
topics,
topicsMetadata,
(topicConfig, topicMetadata) =>
_.get(topicConfig, "topic") === _.get(topicMetadata, "name") &&
_.get(topicConfig, "numPartitions") > _.get(topicMetadata, "partitions", []).length,
);
// Create a collection to update topic partitioning
const partitionConfig = _.map(topicsToUpdate, (topic) => ({
topic: _.get(topic, "topic"),
count: _.get(topic, "numPartitions"),
}));
console.log("Topics to Create:", JSON.stringify(topicsToCreate, null, 2));
console.log("Topics to Update:", JSON.stringify(topicsToUpdate, null, 2));
console.log("Partitions to Update:", JSON.stringify(partitionConfig, null, 2));
// Create topics that don't exist in MSK
await admin.createTopics({ topics: topicsToCreate });
// If any topics have fewer partitions in MSK than in the configuration, add those partitions
Eif (partitionConfig.length > 0) {
await admin.createPartitions({ topicPartitions: partitionConfig });
}
await admin.disconnect();
};
await create();
}
export async function deleteTopics(brokerString: string, topicList: string[]) {
// Check that each topic in the list is something we can delete
for (const topic of topicList) {
if (!topic.match(/.*--.*--.*--.*/g)) {
throw new Error(
"ERROR: The deleteTopics function only operates against topics that match /.*--.*--.*--.*/g",
);
}
}
const brokers = brokerString.split(",");
const kafka = new Kafka({
clientId: "admin",
brokers: brokers,
ssl: {
rejectUnauthorized: false,
},
requestTimeout: 295000, // 5s short of the lambda function's timeout
});
const admin = kafka.admin();
await admin.connect();
const currentTopics = await admin.listTopics();
const topicsToDelete = _.filter(currentTopics, (currentTopic) =>
topicList.some((pattern) => !!currentTopic.match(pattern)),
);
console.log(`Deleting topics: ${topicsToDelete}`);
await admin.deleteTopics({
topics: topicsToDelete,
timeout: 295000,
});
await admin.disconnect();
}
|