Kafka messaging system is commonly used for communication between services.
Often times, we need to get messages from Kafka into some materialized form, i.e. SQL database, for analytical purposes. In this article, we discuss a way to achieve this in a very simple and quick way, without the need to deploy new frameworks.
When your use-case:
- handles low volume of data, lets say at most hundred messages per second
- analytical SQL database is not required to have the data in near real-time manner, minute of a delay is not an issue
You may consider our approach as valid solution, avoiding coding and deploying:
- Kafka Consumer/Kafka Connect
- Beam/Flink/Kafka Streams/Spark Structured Streaming
saving a lot of effort maintaining these deployments.
Curious how? Continue reading.
Introduction
When it comes to transferring data from Kafka to relational database system, such as MariaDB/MySQL or PostgreSQL, we have couple of options. But before we commit to a solution, lets make feasibility assessment together.
Lets imagine we are tasked to transfer data from a Kafka topic, with JSON messages, for example about purchases on our web page. Such a message could look like:
{
"order_id": "123456789",
"customer_id": "987654321",
"product_id": "p001",
"total": "39.98",
"order_date": "2023-12-08T12:30:00Z"
}
Capacity Planning
Lets make capacity planning for a couple of scenarios.
Messages per second | Bytes per second |
---|---|
10 | 1KB |
100 | 10KB |
1 000 | 100KB |
10 000 | 1MB |
100 000 | 10MB |
1 000 000 | 100MB |
We are getting to a limit of a single machine, lets stop here. If our use-case fits into one machine, we might actually avoid deploying cluster handling solutions.
Also, if we do not need real-time, but for example 1 minute lateness of the data is fine, we may actually run a piece of code on a single machine, to fill in last minute of data into database. And run it every minute.
We will design the ETL job, as per diagram:
Kafka Offsets
Let us examine how the ETL job could work. We would need just three basic steps.
- Figure out the offset to start reading from Kafka, this is usually part of consumer group protocol
- Obtain some events from Kafka since the stored offset
- Publish the batch into SQL database
And in the next run, just pick up, where left. The basic code of the ETL job is:
# Get Kafka consumer
consumer = Consumer(group_id='orders_to_database')
consumer.subscribe(['orders'])
# Connect to the database
connection = mysql.connector.connect()
cursor = connection.cursor()
# Poll for messages in batches (max 10 messages per poll)
events = consumer.poll(max_records=10, timeout=1000) # Step 1
# Publish to database
# Use a prepared statement for INSERT
insert_query = """INSERT INTO orders (order_id, customer_id, product_id, total, order_date )
VALUES (%s, %s, %s, %s, %s);"""
# Execute the batch of INSERT statements
cursor.executemany(insert_query, events) # Step 2
# Commit the changes
connection.commit() # Step 3
# Disconnect nicely
connection.close() # Step 4
consumer.close()
Lets describe how this works.
- In first step, we get a batch of at most 10 events from Kafka (limit chosen arbitrarily, not important at the moment).
- In second step, we prepare and send insert statements to the database
- In the third step, we commit database transaction.
When we start the job for the first time, the starting offsets are from 0 (beginning). But the algorithm has one problem.
Problems with auto-commit
The solution relies on auto commit in Kafka Consumer.
It can’t be seen from the example, but the default value for argument enable_auto_commit
is True
, so in fact, we do:
consumer = Consumer(group_id='orders_to_database', enable_auto_commit=True)
and depending on the interval auto_commit_interval_ms
which defaults to 5000, offsets commits are triggered in the background.
This is a rather risky choice when it comes to reliability. What happens if the script crashes, lets say in step 2 and offsets are already committed to Kafka?
Lets model the example:
- In step 1, we take 10 events, offsets 0 to 9.
- Step 2, we create INSERT statements, somewhere in the background, auto commit sent commit to Kafka, that consumer group
orders_to_database
new offset position is 10. - Step 3 crashes.
The sequence of what happened is depicted in figure:
When we restart the ETL job:
- In step 1, we take 10 events from Kafka, but using
orders_to_database
consumer group, therefore we get events since offset 10, events with offsets 10 to 19. - Step 2, we create INSERT statements
- Step 3, we commit transaction on SQL database
In database we now have only events 10 to 19. Events 0 to 9 are lost. Our algorithm is causing data loss.
Problems with manual offset commit
Fixing and altering the algorithm with manual offset commit, we can put the offset commit just after we commit to the SQL database:
# Get Kafka consumer
consumer = Consumer(group_id='orders_to_database', enable_auto_commit=False) # We change to **not use auto commit**
consumer.subscribe(['orders'])
# Connect to the database
connection = mysql.connector.connect()
cursor = connection.cursor()
# Poll for messages in batches (max 10 messages per poll)
events = consumer.poll(max_records=10, timeout=1000) # Step 1
# Publish to database
# Use a prepared statement for INSERT
insert_query = """INSERT INTO orders (order_id, customer_id, product_id, total, order_date )
VALUES (%s, %s, %s, %s, %s);"""
# Execute the batch of INSERT statements
cursor.executemany(insert_query, events) # Step 2
# Commit the changes
consumer.commit() # Step 3a - commit Kafka
connection.commit() # Step 3b - commit Database
# Disconnect nicely
connection.close() # Step 4
consumer.close()
Repeating our simulated crash, lets put a crash just after Step 3a, commiting to Kafka:
- In step 1, we take 10 events, offsets 0 to 9.
- Step 2, we create INSERT statements, somewhere in the background, auto commit sent commit to Kafka, that consumer group
orders_to_database
new offset position is 10. - Step 3a commits to Kafka
- Crash, commit to Database never happens
Situation depicted in a figure:
Not exactly an improvement. And it does not even have to be ETL crashing, it is sufficient that Database does a rollback on our transaction, see:
Even if we put whole action into try-catch-finally block, where we commit to both Kafka and Database:
# Get Kafka consumer
consumer = Consumer(group_id='orders_to_database', enable_auto_commit=False) # We change to **not use auto commit**
consumer.subscribe(['orders'])
# Connect to the database
connection = mysql.connector.connect()
cursor = connection.cursor()
try:
# Poll for messages in batches (max 10 messages per poll)
events = consumer.poll(max_records=10, timeout=1000) # Step 1
# Publish to database
# Use a prepared statement for INSERT
insert_query = """INSERT INTO orders (order_id, customer_id, product_id, total, order_date )
VALUES (%s, %s, %s, %s, %s);"""
# Execute the batch of INSERT statements
cursor.executemany(insert_query, events) # Step 2
# Commit the changes
consumer.commit() # Step 3a - commit Kafka
connection.commit() # Step 3b - commit Database
except:
connection.rollback()
finally:
# Disconnect nicely
connection.close() # Step 4
consumer.close()
We can still crash in between Step 3a and 3b, for example on out of memory error.
Ok, we could switch the Step 3a and Step 3b:
# Commit the changes
connection.commit() # Step 3b - commit Database
consumer.commit() # Step 3a - commit Kafka
To first commit to Database and then to Kafka.
- In step 1, we take 10 events, offsets 0 to 9.
- Step 2, we create INSERT statements, somewhere in the background, auto commit sent commit to Kafka, that consumer group
orders_to_database
new offset position is 10. - Step 3b commits to Database
- Crash, commit to Kafka never happens
Now we ended having data in the Database, but when we restart the ETL job, we consume same 10 events again. Either we end-up creating duplicates in the Database, but if there are some constraints regarding UNIQUE column combinations, we might as well not be able to proceed at all.
Offsets In Destination
So it all boils down to a problem, that we can’t maintain distributed transaction across Database and Kafka. If we would have one, we can do commit on both at the same time.
Fact, that such a transaction system does not exist, is probably result of performance. It would be huge performance penalty to implement 2-phase commit on top of many disparate systems. We may choose any Database and messaging combination, and who would maintain such software. If you are interested in more thoughts on this problem, refer to the Consistency and Consensus in the great book Designing Data-Intensive Applications by Martin Kleppmann.
But there is a solution for our ETL script. We may ignore Kafka consumer group protocol and offset commiting completely. We build the offset management ourselves and to keep data and offset position consistent, either we have both events and updated offsets, or nothing, we use the Database. Yes, the database, it has transactions to rescue us.
Offsets With Data in One Table
To manage the script progress, we can just glue offset with the data itself. In Kafka, the address of the message, or its primary key is triplet:
(topic, partition, offset)
Having this 3 values, you can request any exact message from Kafka, that is the only primary key, when we would on Kafka as a database. When creating the destination table for our orders, we can include these 3 columns:
CREATE TABLE orders (
order_id VARCHAR(20) PRIMARY KEY,
customer_id VARCHAR(20),
product_id VARCHAR(20),
total DECIMAL(10, 2),
order_date TIMESTAMP,
topic VARCHAR(20),
partition INT,
offset INT
)
When our script starts, we supply None
as the consumer group name, which turns off the consumer group protocol in Python library.
Apparently, in KafkaJS, this can’t be turned off, but you can create a new unique consumer group name every time you start.
We commit our offsets to the SQL database, how do we retrieve them, when the ETL job is running again according to cron?
Fortunately, the offset
is ever growing integer (per topic-partition), we can just ask SQL database to retrieve the maximum offset per partition:
SELECT topic, partition, MAX(offset) FROM orders GROUP by topic, partition
Lets alter our script putting the pieces together:
# Connect to the database
connection = mysql.connector.connect()
cursor = connection.cursor()
# Obtain offsets from the database
cursor.execute("""SELECT topic, partition, MAX(offset) FROM orders GROUP by topic, partition""")
rows = cursor.fetchall()
# Transform rows into TopicPartition together with offset for Kafka library
offset_map = {
TopicPartition(row['topic'], row['partition']): row['offset']
for row in data
}
# Get Kafka consumer
consumer = Consumer(group_id=None, enable_auto_commit=False) # Do not use auto commit, do not use consumer group protocol
consumer.assign(list(offset_map.keys()))
for topic_partition, offset in offset_map.items():
consumer.seek(topic_partition, offset)
try:
# Poll for messages in batches (max 10 messages per poll)
events = consumer.poll(max_records=10, timeout=1000) # Step 1
# Publish to database
# Use a prepared statement for INSERT
insert_query = """INSERT INTO orders (order_id, customer_id, product_id, total, order_date, topic, partition, offset )
VALUES (%s, %s, %s, %s, %s, %s, %s, %s);"""
# Execute the batch of INSERT statements
cursor.executemany(insert_query, events) # Step 2 TODO transformation
# Commit the changes
connection.commit() # Step 3 - commit Database
except:
connection.rollback()
finally:
# Disconnect nicely
connection.close() # Step 4
consumer.close()
Now there is just one commit and one transaction, the Database transaction. We either get all - data and offsets, or nothing.
Separate Offsets Table
Another option on handling offsets in destination database is to have a separate table:
CREATE TABLE offsets (
topic VARCHAR(20),
partition INT,
offset INT
)
and make sure we write both inserts for data and offsets in one transaction.
Scheduling the Job
Having the etl job written, we just need to make sure we schedule it every, lets say 1 minute. And we must assure, there is exactly one instance running and not more. Otherwise, we would end up with duplicates anyway. As this can be achieved with modern cloud scheduling tools, we do not need to solve any locking mechanism for running at most one instance, every 1 minute.
Pros and cons
So far, we only discussed use-cases, which are small enough to fit on the single machine, so that our script can directly fetch the data. And that is the most obvious con, you can’t use this approach for larger datasets. Also, the script is dirty, it is not a super robust solution. One can also become limited by the offset fetch query, when the destination table becomes large to query, maybe a where clause can help.
On the other hand, in 100 lines, we can solve whole ETL topic. And it does not need to be Kafka only, this works for anything that has replayability on the source and transactions on the destination. We do not necessarily need to deploy any 3rd party framework. More or less, we do not need to learn anything new, or maintain it.
Conclusion
Sometimes, avoiding deploying a new framework, might be better.