I Just Want to See It Work: Kafka Stream, C# Producer and Consumer

Written by jack.h.mocha | Published 2023/07/23
Tech Story Tags: kafka | .net | c-sharp | docker | containerization | microservices | kafka-streams | tutorial

TLDRWhen learning a new technology, sometimes I just want to see it work. It gives me a baseline to extend my ideas, to see what is possible, and to imagine what it can become. While reading through documentation or following tutorials, I am often greeted with numerous options and configurations. After seeing all the dazzling capabilities, I always want to see the core, the fundamental. Didn’t you ever just want to experience a simple working copy? Hence, I just want to see it work. This series aims at minimizing the possibility of having a missing link and encourages you to build your next innovative solution based on what you learned here.via the TL;DR App

When learning a new technology, sometimes I just want to see it work. It gives me a baseline to extend my ideas, to see what is possible, and to imagine what it can become.

While reading through documentation or following tutorials, I am often greeted with numerous options and configurations. After seeing all the dazzling capabilities, I always want to see the core, the fundamental. Didn’t you ever just want to experience a simple working copy? Hence, I just want to see it work.

This series aims at minimizing the possibility of having a missing link and encourages you to build your next innovative solution based on what you learned here.

What is Kafka Stream?

According to Kafka, Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters.

What do we want to achieve?

  • Run Kafka with KRaft in docker.
  • Run a producer on your local machine (not in docker) that publishes messages to a topic.
  • Run a consumer on your local machine (not in docker) that pulls a message whenever a message is published in a topic.]

Resources

Run Kafka in Docker

Navigate to where the docker-compose.yml is. Run the below command in cmd to start a docker container of Kafka in the background.

docker-compose up -d

Docker-compose.yml

version: "3"
services:
  kafka:
    image: "bitnami/kafka:3.2.3"
    hostname: kafka
    ports:
      - "9092:9092"
    environment:
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092

  • ALLOW_PLAINTEXT_LISTENER: It controls whether the Kafka broker allows connections over plaintext listeners. A plaintext listener allows connections over an unencrypted and non-authenticated channel. It means the data exchanged between the client and the Kafka broker is sent in plain text.
  • KAFKA_ENABLE_KRAFT: Enable KRaft mode.
  • KAFKA_CFG_ADVERTISED_LISTENERS: The hostname a client should use to connect to the Kafka broker. You need this for both producer and consumer to communicate with Kafka.

Let’s publish a message to Kafka

Navigate to where KafkaStream.Producer.csproj is at. Start a Powershell and run the following command.

dotnet run

When you see the message: “Message delivered to input-topic”, that means you did it!

What is in the Producer?

  • We first setup the configuration for the producer.
  • Build a message
  • Produce the message

var config = new ProducerConfig
{
	BootstrapServers = "localhost:9092",
	ClientId = "producer-1"
};

using (var producer = new ProducerBuilder<string, string>(config).Build())
{
	var message = new Message<string, string>
	{
		Key = null, // Set the key if you want to partition the messages
		Value = "Hello, Kafka!"
	};

	try
	{
		var deliveryResult = await producer
                                    .ProduceAsync("input-topic", message);

		Console.WriteLine($"Message delivered to 
                            {deliveryResult.TopicPartitionOffset}");
	}
	catch (ProduceException<string, string> ex)
	{
		Console.WriteLine($"Delivery failed: {ex.Error.Reason}");
	}
}

What is bootstrap-server?

It serves as an entry point for a client to connect to a Kafka broker.

Let’s setup the consumer

Navigate to where KafkaStream.Consumer.csproj is at. Start a Powershell and run the following command.

dotnet run

The initialization will take a few seconds. Then you should see the following messages.

Consuming messages from topic: input-topic
Press any key to exit
Received message: Hello, Kafka! from partition [0] offset 0

There you go!

What is in the Consumer?

  • The main thread sets up the Kafka consumer and starts a background thread to handle message consumption.
  • In the background thread, the code runs in a continuous loop, until a cancellation is requested.
  • A cancellation can be requested by pressing any key while the program is running.

const string Topic = "input-topic";

// Configure the consumer
var config = new ConsumerConfig
{
	BootstrapServers = "localhost:9092",
	GroupId = "consumer-group-1",
	AutoOffsetReset = AutoOffsetReset.Earliest,
	EnableAutoCommit = false // Disable auto commit to have more control over offsets
};

using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
	// Subscribe to the topic
	consumer.Subscribe(Topic);

	Console.WriteLine($"Consuming messages from topic: {Topic}");

	// Start consuming messages in a background thread
	var cancellationTokenSource = new CancellationTokenSource();
	var cancellationToken = cancellationTokenSource.Token;

	var consumerThread = new Thread(() =>
	{
		try
		{
			while (true)
			{
				try
				{
					var consumeResult = 
                          consumer.Consume(cancellationToken);

					Console.WriteLine($"Received message: 
                          {consumeResult.Message.Value} from partition 
                          {consumeResult.Partition} offset 
                          {consumeResult.Offset}";

					// Manually commit the offset to mark the message as consumed
					consumer.Commit(consumeResult);
				}
				catch (ConsumeException ex)
				{
					Console.WriteLine($"Error occurred: 
                        {ex.Error.Reason}");
				}
			}
		}
		catch (OperationCanceledException)
		{
			// This exception will be thrown when cancellation is requested.
			Console.WriteLine("Cancellation requested");
		}
		finally
		{
			consumer.Close();
		}
	});

	// Start the consumer thread
	consumerThread.Start();

	// Wait for a key press to exit
	Console.WriteLine("Press any key to exit");
	Console.ReadKey();

	// Request cancellation and wait for the consumer thread to stop
	cancellationTokenSource.Cancel();
	consumerThread.Join();

	Console.WriteLine("End Consumer");
}

What does offset do?

In this context, it tells the consumer where it left off.

Gotcha

While developing your own producer/consumer, if you get an error message indicating the hostname cannot be resolved, check KAFKA_CFG_ADVERTISED_LISTENERS. If you still run into the same issue, consider updating the host file.

Conclusion

In this article, we successfully run Kafka in a docker container, produce a message to a topic, and consume a message from the topic. There are limitless applications of using Kafka. Go ahead and play with the project, and build something interesting of your own!


Written by jack.h.mocha | Project Lead, Senior Software Engineer
Published by HackerNoon on 2023/07/23