Giter Club home page Giter Club logo

practical-kafka's Introduction

kafka

Nuget package

Confluent.Kafka

docker-compose

version: '3.8'

services:
  kafka:
    image: confluentinc/cp-kafka:6.0.14
    depends_on:
      - zookeeper
    ports:
      - '29092:29092'
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:9092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - 8085:8080
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
      DYNAMIC_CONFIG_ENABLED: 'true'

  zookeeper:
    image: confluentinc/cp-zookeeper:6.0.14
    ports:
      - '22181:2181'
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

Producer (Publisher)

Message Service

public class MessageProducerService
{
    public async Task SendMessageAsync(string topic, CustomerDto customer)
    {
        var config = new ProducerConfig
        {
            BootstrapServers = "localhost:29092",
            ClientId = "OrderClient",
            Acks = Acks.All
        };

        var message = new Message<string, string>
        {
            Key = customer.Id,
            Value = JsonSerializer.Serialize(customer)
        };

        using var producer = new ProducerBuilder<string, string>(config).Build();
        await producer.ProduceAsync(topic, message);
    }
}

Api Controller

[Route("api/[controller]")]
[ApiController]
public class CustomersController(CustomerDbContext context, MessageProducerService producerService) : ControllerBase
{

    [HttpGet]
    public async Task<IActionResult>GetCustomers()
    {
        return Ok(await context.Customers.ToListAsync());
    }


    [HttpGet("{customerId}")]
    public async Task<IActionResult>CreateCustomerOrder(string customerId)
    {
        var customer = await context.Customers.SingleOrDefaultAsync(_ => _.Id == customerId);
        if (customer is null) return NotFound();

        await producerService.SendMessageAsync(MessageTopic.CREATE_ORDER, new CustomerDto(customer.Id, customer.Name, customer.Surname, customer.Balance));
        return Ok();
    }
}

Consumer (Subscriber)

Worker Service

public class Worker : BackgroundService
    {
        private readonly ILogger<Worker> _logger;
        private readonly OrderDbContext _context;

        public Worker(ILogger<Worker> logger, OrderDbContext context)
        {
            _logger = logger;
            _context = context;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            var config = new ConsumerConfig
            {
                BootstrapServers = "localhost:29092",
                AutoOffsetReset = AutoOffsetReset.Earliest,
                ClientId = "OrderClient",
                GroupId = "OrderGroup"
            };

            using var consumer = new ConsumerBuilder<string, string>(config).Build();
            consumer.Subscribe(MessageTopic.CREATE_ORDER);
            _logger.LogInformation("Connected kafka");

            while (!stoppingToken.IsCancellationRequested)
            {
                var data = consumer.Consume();

                if (data is not null)
                {
                    var customer = JsonSerializer.Deserialize<CustomerDto>(data.Message.Value);

                    if (customer!.Balance > 0)
                    {
                        _context.Orders.Add(new Order(customer.Id, DateTime.Now));
                        await _context.SaveChangesAsync(stoppingToken);
                        _logger.LogInformation($"{customer.Id} Customer's order has been created successfully.");
                    }
                    else
                    {
                        _logger.LogInformation($"{customer.Id} Customer's balance is less than 0!");

                    }
                }

                _logger.LogInformation($"There is no message");
            }
        }
    }

Kafka UI

Screenshot_1

practical-kafka's People

Contributors

oznakdn avatar

Stargazers

 avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.