Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 | 1x 1x 1x 1x 1x 1x 3x 1x 1x 1x 1x 7x 1x 9x 2x 1x 1x 1x 1x 1x 1x 1x 1x 2x 2x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x | import { Kafka } from "kafkajs"; import * as _ from "lodash"; interface TopicConfig { topic: string; numPartitions: number; // Add other properties as needed } export async function createTopics(brokerString: string, topicsConfig: TopicConfig[]) { const topics = topicsConfig; const brokers = brokerString.split(","); const kafka = new Kafka({ clientId: "admin", brokers: brokers, ssl: true, }); const admin = kafka.admin(); const create = async () => { await admin.connect(); // Fetch topics from MSK and filter out __ internal management topic const existingTopicList = _.filter(await admin.listTopics(), (n) => !n.startsWith("_")); console.log("Existing topics:", JSON.stringify(existingTopicList, null, 2)); // Fetch the metadata for the topics in MSK const topicsMetadata = _.get( await admin.fetchTopicMetadata({ topics: existingTopicList }), "topics", ); console.log("Topics Metadata:", JSON.stringify(topicsMetadata, null, 2)); // Diff the existing topics array with the topic configuration collection const topicsToCreate = _.differenceWith( topics, existingTopicList, (topicConfig, topic) => _.get(topicConfig, "topic") === topic, ); // Find intersection of topics metadata collection with topic configuration collection // where partition count of topic in Kafka is less than what is specified in the topic configuration collection // ...can't remove partitions, only add them const topicsToUpdate = _.intersectionWith( topics, topicsMetadata, (topicConfig, topicMetadata) => _.get(topicConfig, "topic") === _.get(topicMetadata, "name") && _.get(topicConfig, "numPartitions") > _.get(topicMetadata, "partitions", []).length, ); // Create a collection to update topic partitioning const partitionConfig = _.map(topicsToUpdate, (topic) => ({ topic: _.get(topic, "topic"), count: _.get(topic, "numPartitions"), })); console.log("Topics to Create:", JSON.stringify(topicsToCreate, null, 2)); console.log("Topics to Update:", JSON.stringify(topicsToUpdate, null, 2)); console.log("Partitions to Update:", JSON.stringify(partitionConfig, null, 2)); // Create topics that don't exist in MSK await admin.createTopics({ topics: topicsToCreate }); // If any topics have fewer partitions in MSK than in the configuration, add those partitions Eif (partitionConfig.length > 0) { await admin.createPartitions({ topicPartitions: partitionConfig }); } await admin.disconnect(); }; await create(); } export async function deleteTopics(brokerString: string, topicList: string[]) { // Check that each topic in the list is something we can delete for (const topic of topicList) { if (!topic.match(/.*--.*--.*--.*/g)) { throw new Error( "ERROR: The deleteTopics function only operates against topics that match /.*--.*--.*--.*/g", ); } } const brokers = brokerString.split(","); const kafka = new Kafka({ clientId: "admin", brokers: brokers, ssl: { rejectUnauthorized: false, }, requestTimeout: 295000, // 5s short of the lambda function's timeout }); const admin = kafka.admin(); await admin.connect(); const currentTopics = await admin.listTopics(); const topicsToDelete = _.filter(currentTopics, (currentTopic) => topicList.some((pattern) => !!currentTopic.match(pattern)), ); console.log(`Deleting topics: ${topicsToDelete}`); await admin.deleteTopics({ topics: topicsToDelete, timeout: 295000, }); await admin.disconnect(); } |