When learning a new technology, sometimes I just want to see it work. It gives me a baseline to extend my ideas, to see what is possible, and to imagine what it can become.
While reading through documentation or following tutorials, I am often greeted with numerous options and configurations. After seeing all the dazzling capabilities, I always want to see the core, the fundamental. Didn’t you ever just want to experience a simple working copy? Hence, I just want to see it work.
This series aims at minimizing the possibility of having a missing link and encourages you to build your next innovative solution based on what you learned here.
What is Kafka Stream?
According to Kafka, Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters.
What do we want to achieve?
- Run Kafka with KRaft in docker.
- Run a producer on your local machine (not in docker) that publishes messages to a topic.
- Run a consumer on your local machine (not in docker) that pulls a message whenever a message is published in a topic.]
Resources
- You can find the source code at my GitHub repository
- Docker image: Bitnami Kafka
Run Kafka in Docker
Navigate to where the docker-compose.yml is. Run the below command in cmd to start a docker container of Kafka in the background.
docker-compose up -d
Docker-compose.yml
version: "3"
services:
kafka:
image: "bitnami/kafka:3.2.3"
hostname: kafka
ports:
- "9092:9092"
environment:
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- ALLOW_PLAINTEXT_LISTENER: It controls whether the Kafka broker allows connections over plaintext listeners. A plaintext listener allows connections over an unencrypted and non-authenticated channel. It means the data exchanged between the client and the Kafka broker is sent in plain text.
- KAFKA_ENABLE_KRAFT: Enable KRaft mode.
- KAFKA_CFG_ADVERTISED_LISTENERS: The hostname a client should use to connect to the Kafka broker. You need this for both producer and consumer to communicate with Kafka.
Let’s publish a message to Kafka
Navigate to where KafkaStream.Producer.csproj
is at. Start a Powershell and run the following command.
dotnet run
When you see the message: “Message delivered to input-topic”, that means you did it!
What is in the Producer?
- We first setup the configuration for the producer.
- Build a message
- Produce the message
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
ClientId = "producer-1"
};
using (var producer = new ProducerBuilder<string, string>(config).Build())
{
var message = new Message<string, string>
{
Key = null, // Set the key if you want to partition the messages
Value = "Hello, Kafka!"
};
try
{
var deliveryResult = await producer
.ProduceAsync("input-topic", message);
Console.WriteLine($"Message delivered to
{deliveryResult.TopicPartitionOffset}");
}
catch (ProduceException<string, string> ex)
{
Console.WriteLine($"Delivery failed: {ex.Error.Reason}");
}
}
What is bootstrap-server?
It serves as an entry point for a client to connect to a Kafka broker.
Let’s setup the consumer
Navigate to where KafkaStream.Consumer.csproj is at. Start a Powershell and run the following command.
dotnet run
The initialization will take a few seconds. Then you should see the following messages.
Consuming messages from topic: input-topic
Press any key to exit
Received message: Hello, Kafka! from partition [0] offset 0
There you go!
What is in the Consumer?
- The main thread sets up the Kafka consumer and starts a background thread to handle message consumption.
- In the background thread, the code runs in a continuous loop, until a cancellation is requested.
- A cancellation can be requested by pressing any key while the program is running.
const string Topic = "input-topic";
// Configure the consumer
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "consumer-group-1",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false // Disable auto commit to have more control over offsets
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
// Subscribe to the topic
consumer.Subscribe(Topic);
Console.WriteLine($"Consuming messages from topic: {Topic}");
// Start consuming messages in a background thread
var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;
var consumerThread = new Thread(() =>
{
try
{
while (true)
{
try
{
var consumeResult =
consumer.Consume(cancellationToken);
Console.WriteLine($"Received message:
{consumeResult.Message.Value} from partition
{consumeResult.Partition} offset
{consumeResult.Offset}";
// Manually commit the offset to mark the message as consumed
consumer.Commit(consumeResult);
}
catch (ConsumeException ex)
{
Console.WriteLine($"Error occurred:
{ex.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// This exception will be thrown when cancellation is requested.
Console.WriteLine("Cancellation requested");
}
finally
{
consumer.Close();
}
});
// Start the consumer thread
consumerThread.Start();
// Wait for a key press to exit
Console.WriteLine("Press any key to exit");
Console.ReadKey();
// Request cancellation and wait for the consumer thread to stop
cancellationTokenSource.Cancel();
consumerThread.Join();
Console.WriteLine("End Consumer");
}
What does offset do?
In this context, it tells the consumer where it left off.
Gotcha
While developing your own producer/consumer, if you get an error message indicating the hostname cannot be resolved, check KAFKA_CFG_ADVERTISED_LISTENERS. If you still run into the same issue, consider updating the host file.
Conclusion
In this article, we successfully run Kafka in a docker container, produce a message to a topic, and consume a message from the topic. There are limitless applications of using Kafka. Go ahead and play with the project, and build something interesting of your own!