Start with Kafka and .NET Core, Step-by-Step Guide, Part 2: (consumer)

Jasmin Sinanovic
2 min readJul 9, 2021

In this article I will continue with the previous Kafka blog where we created the Producer. In this part I will write a simple console app that will consume data from the Kafka topic “demo-topic” that is fed with data by our Producer.

1. Create a console project

dotnet new Kafka.Consumer

2. Add the nuget package

dotnet add package Confluent.Kafka

3. Content of Program.cs

namespace Kafka.Consumer
{
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.DependencyInjection;
using System.Threading.Tasks;
using System.Threading;
using System;
class Program
{
static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
private static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((context, collection) =>
{
collection.AddHostedService<KafkaConsumerHostedService>();
});
}
public class KafkaConsumerHostedService : IHostedService
{
public Task StartAsync(CancellationToken cancellationToken)
{
var conf = new ConsumerConfig
{
GroupId = "demo",
BootstrapServers = "localhost:9092",
AutoOffsetReset = AutoOffsetReset.Latest,
};
using (var builder = new ConsumerBuilder<Ignore, string>(conf).Build())
{
try
{
builder.Subscribe("demo-topic");
var cancelToken = new CancellationTokenSource();
try
{
while (!cancellationToken.IsCancellationRequested)
{
var consumer = builder.Consume(cancelToken.Token);
Console.WriteLine($"Message Recived: {consumer.Message.Value} ");
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
builder.Close();
}
return Task.CompletedTask;
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
builder.Close();
return Task.CompletedTask;
}
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}
}

4. Run the Producer and Consumer

Enjoy!

dotnet run

Conclusion

Happy coding. GitHub link : https://github.com/JasminSinanovic/start-with-kafka-dotnet/tree/master/part-2

Next chapter : How connect Kafka topic with InfluxDB.

https://jsinanovic.medium.com/start-with-kafka-and-net-core-step-by-step-guide-part-3-kafka-connector-influxdb-eab181cd4783

--

--