Creating Kafka Topics Asynchronously with aiokafka in Python: A Practical Guide

Introduction

In this article, I’ll share a pure asynchronous approach to creating Kafka topics using aiokafka. This solution is particularly valuable for applications that already use aiokafka for other operations and want to maintain a consistent asynchronous programming model without introducing additional dependencies. While not groundbreaking, this clean approach might save future-me (and perhaps you) some time searching for solutions.

While working on my Yubarta project, I needed to create Kafka topics programmatically within an asynchronous Python application. Most examples I found followed a hybrid approach: using the synchronous kafka-python library’s KafkaAdminClient for administrative operations, then switching to aiokafka for message processing.

This approach, while functional, breaks the asynchronous pattern and introduces an unnecessary dependency on two separate libraries. What’s particularly frustrating is that the official aiokafka documentation doesn’t clearly demonstrate how to create topics directly using its own asynchronous API, despite having this capability.

After some investigation, I discovered a more elegant solution: creating Kafka topics using purely asynchronous methods within aiokafka itself, maintaining a consistent programming model throughout the application.

Common Approach

If you follow the same approach as the synchronous library, you’d have something like this:

admin_client = AIOKafkaAdminClient(bootstrap_servers="kafka:9092")

topic_list = []
topic_list.append(NewTopic(name="alarms", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list)

However, you’ll encounter an IncompatibleBrokerVersion error:

2025-04-08 07:22:16   File "/usr/local/lib/python3.12/site-packages/aiokafka/admin/client.py", line 177, in _matching_api_version
2025-04-08 07:22:16     raise IncompatibleBrokerVersion(
2025-04-08 07:22:16 aiokafka.errors.IncompatibleBrokerVersion: IncompatibleBrokerVersion: Kafka broker does not support the 'CreateTopicsRequest_v0' Kafka protocol.

Why This Happens and the Solution

This approach doesn’t work with the asynchronous library because the synchronous library (kafka-python) assigns the Kafka API version when the class is instantiated.

With the asynchronous library, we need to assign the Kafka API version manually. I discovered that the AIOKafkaAdminClient class has a start method that assigns the API version. It essentially performs the same function as the synchronous version but in a separate call.

The solution is simple: we need to call start() after instantiating the client class. See line 14 in the following snippet:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import asyncio
from aiokafka.admin import AIOKafkaAdminClient, NewTopic
import logging
from contextlib import asynccontextmanager

logger = logging.getLogger(__name__)

@asynccontextmanager
async def kafka_admin_client():
    client = AIOKafkaAdminClient(
        bootstrap_servers="kafka:9092"
    )
    try:
        await client.start()  # This is the line that makes all the difference
        yield client
    finally:
        await client.close()


async def create_topics():
    async with kafka_admin_client() as client:
        try:
            # Create topics with proper configuration
            topics = [
                NewTopic(
                    name="alarms",
                    num_partitions=3,  # Start with 3 partitions for parallelism
                    replication_factor=1  # Single replica for dev, increase for prod
                )
            ]

            await client.create_topics(topics)
            logger.info(f"Successfully created topic: {settings.KAFKA_ALERT_TOPIC}")
        except Exception as e:
            if "already exists" in str(e):
                logger.info(f"Topic {settings.KAFKA_ALERT_TOPIC} already exists")
            else:
                logger.error(f"Failed to create Kafka topics: {str(e)}")
                raise
        finally:
            await client.close()

if __name__ == "__main__":
    asyncio.run(create_topics()) 

Conclusion

In this article, we’ve explored how to create Kafka topics programmatically using aiokafka’s asynchronous API. The key insight was understanding that we need to explicitly call the start() method on the AIOKafkaAdminClient to properly initialize the Kafka API version before creating topics.

This approach offers several benefits:

  • Maintains a consistent asynchronous programming model throughout your application
  • Eliminates the need for additional dependencies
  • Provides a clean and maintainable solution for managing Kafka topics

The example code demonstrates a practical implementation using Python’s asynccontextmanager to properly handle client lifecycle and error cases. You can adapt this pattern to your specific needs, adjusting the topic configuration parameters like partition count and replication factor based on your requirements.

Remember that while this solution works well for development and testing environments, you should carefully consider your production configuration, particularly the replication factor and partition count, based on your specific use case and performance requirements.