Start with Kafka and .NET Core, Step-by-Step Guide, Part 1 (producer)

Jasmin Sinanovic
3 min readJul 8, 2021

In this article I will show you how to get started with Kafka and write your first .NET console app to produce messages to your Kafka topic.

1. Create a console project

dotnet new Kafka.Producer

2. Add the nuget package

dotnet add package Confluent.Kafka

3. Content of Program.cs

namespace Kafka.Producer
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
class Program
{
static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
private static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((context, collection) =>
{
collection.AddHostedService<KafkaProducerHostedService>();
});
}
public class KafkaProducerHostedService : IHostedService
{
private readonly ILogger<KafkaProducerHostedService> _logger;
private readonly IProducer<Null, string> _producer;
public KafkaProducerHostedService(ILogger<KafkaProducerHostedService> logger)
{
_logger = logger;
// Create the producer configuration
_producer = new ProducerBuilder<Null, string>(new ProducerConfig()
{
// The Kafka endpoint address
BootstrapServers = "localhost:9092",
}).Build();
}
public async Task StartAsync(CancellationToken cancellationToken)
{
Random random = new Random();
while (true)
{
try
{
int minOrders = 1;
int maxOrders = 205;
List<string> stores = CreateRandomStore();
Thread.Sleep(1500);double averageNumber = minOrders + random.Next(50) * (maxOrders - minOrders);
var serrializedMessage = "{ 'data': " + stores[random.Next(stores.Count)] + ", 'orders': " + averageNumber + " }";
await _producer.ProduceAsync("demo-topic", new Message<Null, string>()
{
Value = serrializedMessage
}, cancellationToken);
_logger.LogInformation($"SENT: " + serrializedMessage);
}
catch (Exception ex)
{
_logger.LogInformation("Failed" + ex.Message);
}
}
}
private List<string> CreateRandomStore()
{
return new List<string> { "Store-A", "Store-B", "Store-C", "Store-D", "Store-E", "Store-F" };
}
public Task StopAsync(CancellationToken cancellationToken)
{
_producer?.Dispose();
return Task.CompletedTask;
}
}
}

4. Start Docker with Kafka

4.1. Create a docker-compose.yml file with the following content:

---
version: '3.7'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
hostname: zookeeper-demo
container_name: zookeeper-demo
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-server:6.2.0
hostname: kafka
container_name: kafka
depends_on:
- zookeeper-demo
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-demo:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry-demo:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema-registry:
image: confluentinc/cp-schema-registry:6.2.0
hostname: schema-registry-demo
container_name: schema-registry-demo
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry-demo
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

4.2. Run the following command:

docker-compose up

It will take a while but you’ll get a working Kafka installation.

4.3. After all services are up and running open http://localhost:9021

4.4. Create a new topic ‘demo-topic’

5. Run the program

Run the program and enjoy!

dotnet run

Kafka topic messages

Conclusion

In the next chapter where I will implement Kafka Consumer.

https://github.com/JasminSinanovic/start-with-kafka-dotnet/tree/master/part-1

Next chapter : Part 2: (consumer)

--

--