Ir para o conteúdo

Apache Kafka em Kotlin

De Wikiversidade

Visão Geral

[editar | editar código]

Apache Kafka é uma plataforma de streaming distribuída desenvolvida originalmente pela LinkedIn e posteriormente doada à Apache Software Foundation. A integração com Kotlin fornece uma abordagem moderna e concisa para desenvolvimento de sistemas de mensageria, aproveitando as características de segurança de tipos e expressividade da linguagem.

Características Principais do Apache Kafka

[editar | editar código]
  • Alta Taxa de Transferência: Capacidade de processar milhões de mensagens por segundo
  • Escalabilidade Horizontal: Expansão através da adição de novos brokers ao cluster
  • Persistência Durável: Armazenamento de mensagens em disco com replicação automática
  • Tolerância a Falhas: Continuidade operacional mesmo com falhas de componentes individuais
  • Processamento em Tempo Real: Suporte nativo para streaming de dados
  • Baixo Acoplamento: Independência entre produtores e consumidores

Por que Kafka é Diferente

[editar | editar código]

Kafka se diferencia de outras soluções de mensageria tradicionais:

Característica Kafka Sistemas Tradicionais (RabbitMQ, ActiveMQ)
Persistência Todas as mensagens são persistidas Geralmente apenas em memória
Ordem Garantida por partição Limitada ou inexistente
Consumo Múltiplos consumers podem ler a mesma mensagem Mensagem consumida uma vez
Throughput Extremamente alto (milhões/seg) Moderado (milhares/seg)
Retenção Configurável (dias/semanas) Até o consumo
Escalabilidade Horizontal via partições Vertical limitada

Conceitos Fundamentais

[editar | editar código]

Terminologia

[editar | editar código]
Termo Definição
Producer Componente responsável pela publicação de mensagens em tópicos
Consumer Componente que consome mensagens de tópicos específicos
Topic Canal lógico onde mensagens são organizadas por categoria
Partition Subdivisão de um tópico para paralelização e escalabilidade
Broker Servidor individual que compõe o cluster Kafka
Cluster Conjunto de brokers trabalhando em conjunto
Consumer Group Grupo de consumidores que compartilham a carga de processamento
Offset Identificador sequencial único de cada mensagem dentro de uma partição

Comparação com Outros Sistemas

[editar | editar código]
Banco de Dados Tradicional
Kafka não substitui bancos de dados, mas complementa-os oferecendo capacidade de streaming e processamento em tempo real
Filas de Mensagens (RabbitMQ, SQS)
Kafka mantém mensagens por mais tempo e permite múltiplos consumidores, ideal para arquiteturas orientadas a eventos
Sistemas de Logs (ELK Stack)
Kafka pode alimentar sistemas de logs, mas também processa e transforma dados em tempo real
APIs REST
Kafka oferece comunicação assíncrona e desacoplada, complementando APIs síncronas

Casos de Uso e Exemplos Práticos

[editar | editar código]

Esta seção demonstra como os conceitos fundamentais do Kafka se aplicam em cenários reais, conectando teoria e prática.

Caso de Uso 1: Sistema de Notificações (Iniciante)

[editar | editar código]

Cenário: Uma aplicação web precisa enviar notificações quando usuários realizam ações.

Conceitos Aplicados:

  • Topic: user-notifications
  • Producer: Aplicação web que detecta ações dos usuários
  • Consumer: Serviço de email que envia notificações
  • Partições: Uma partição por tipo de notificação
Ação do Usuário → Producer → Topic: "user-notifications" → Consumer → Email Enviado

Implementação dos Conceitos:

Conceito Como se Manifesta Benefício
Producer Aplicação web publica evento "user_registered" Desacoplamento: app não precisa saber sobre email
Topic Canal dedicado para notificações Organização: separa notificações de outros eventos
Consumer Serviço dedicado processa notificações Especialização: foco apenas em envio de emails
Partição Partição 0: cadastros, Partição 1: compras Paralelização: processa tipos diferentes simultaneamente
Offset Controla quais notificações já foram enviadas Confiabilidade: não envia email duplicado

Exemplo de Mensagem:

{
  "tipo": "user_registered",
  "usuario_id": "12345", 
  "email": "user@example.com",
  "timestamp": "2024-01-15T10:30:00Z",
  "template": "welcome_email"
}

Por que Kafka aqui?

  • Se o serviço de email falhar, mensagens ficam persistidas no tópico
  • Podem adicionar SMS, push notifications como novos consumers
  • Aplicação principal não trava esperando envio de email

Caso de Uso 2: E-commerce - Processamento de Pedidos (Intermediário)

[editar | editar código]

Cenário: Quando um produto é vendido, múltiplos sistemas precisam ser atualizados simultaneamente.

Conceitos Aplicados:

  • Topic: order-events
  • Producer: Sistema de checkout
  • Multiple Consumers: Estoque, Pagamento, Entrega, Analytics
  • Consumer Groups: Cada sistema tem seu próprio grupo
  • Replication: Dados críticos replicados para tolerância a falhas
Evento: "order.created" (Producer: Checkout)
        ↓
Topic: "order-events" 
        ↓
┌─────────────────────────────────────────────────────┐
│ Consumer Group: "inventory"    → Reduz estoque      │
│ Consumer Group: "payments"     → Processa pagamento │  
│ Consumer Group: "shipping"     → Cria etiqueta      │
│ Consumer Group: "analytics"    → Atualiza métricas  │
│ Consumer Group: "crm"          → Histórico cliente  │
└─────────────────────────────────────────────────────┘

Aplicação Detalhada dos Conceitos:

Producer (Sistema de Checkout):

// Quando pedido é finalizado
val evento = OrderEvent(
    orderId = "ORD-12345",
    customerId = "CUST-789", 
    items = listOf("PROD-A", "PROD-B"),
    total = 299.99
)
producer.send("order-events", evento.customerId, evento.toJson())

Consumer Groups (Diferentes Serviços):

  • Grupo "inventory": Consome para atualizar estoque
  • Grupo "payments": Consome para processar pagamento
  • Grupo "shipping": Consome para preparar envio
  • Grupo "analytics": Consome para relatórios

Vantagem dos Consumer Groups: Cada grupo recebe a mesma mensagem independentemente, permitindo processamento paralelo sem interferência.

Particionamento Estratégico:

Chave: customerId → Garante que pedidos do mesmo cliente 
                    sejam processados em ordem
                    
Partição 0: Clientes cujo ID termina em 0,1,2
Partição 1: Clientes cujo ID termina em 3,4,5  
Partição 2: Clientes cujo ID termina em 6,7,8,9

Tolerância a Falhas na Prática:

  • Se inventory service falhar, outros continuam processando
  • Quando inventory service voltar, retoma do último offset
  • Nenhum evento de estoque é perdido

Caso de Uso 3: Análise Financeira em Tempo Real (Avançado)

[editar | editar código]

Cenário: Detecção de fraudes em transações bancárias com múltiplos algoritmos analisando simultaneamente.

Conceitos Aplicados:

  • High Throughput: Milhares de transações por segundo
  • Multiple Topics: Separação por tipo de análise
  • Stream Processing: Análise contínua sem armazenamento
  • Low Latency: Decisões em milissegundos
  • Event Sourcing: Histórico completo para auditoria
Transação Bancária → Topic: "raw-transactions"
                     ↓
            ┌────────┼────────┐
            │        │        │
    Consumer A   Consumer B   Consumer C
    (Geo-Analysis) (ML-Model) (Pattern-Detection)
            │        │        │ 
            ↓        ↓        ↓
    Topic: "geo-   Topic:    Topic:
     alerts"  "ml-scores"  "patterns"
            │        │        │
            └────────┼────────┘
                     ↓
            Consumer: "fraud-aggregator"
                     ↓
            Decisão: APROVAR/BLOQUEAR

Implementação dos Conceitos Avançados:

Múltiplos Topics (Event Sourcing):

Topic Propósito Retenção Partições
raw-transactions Transações originais 30 dias 10 (por região)
geo-alerts Alertas geográficos 7 dias 3 (por criticidade)
ml-scores Pontuações ML 1 dia 5 (por modelo)
final-decisions Decisões finais 365 dias 1 (ordem cronológica)

Stream Processing em Ação:

// Consumer que analisa padrões geográficos
kafkaConsumer.subscribe("raw-transactions")
while (true) {
    val records = consumer.poll(100) // 100ms timeout
    records.forEach { transaction ->
        val geoRisk = analyzeLocation(transaction.location, transaction.userId)
        if (geoRisk > THRESHOLD) {
            producer.send("geo-alerts", createAlert(transaction, geoRisk))
        }
    }
}

Particionamento para Performance:

Topic: "raw-transactions" 
├── Partição 0: Transações Região Norte    → Consumer Norte
├── Partição 1: Transações Região Sul      → Consumer Sul  
├── Partição 2: Transações Região Sudeste  → Consumer Sudeste
└── Partição 3: Transações Internacionais  → Consumer Internacional

Benefício: Análise geográfica paralela + Ordem regional preservada

Alta Disponibilidade:

  • Replication Factor = 3: Cada partição replicada em 3 brokers
  • Acks = "all": Transação só confirmada após replicação completa
  • Retries = 5: Múltiplas tentativas em caso de falha temporária

Caso de Uso 4: Internet das Coisas (IoT) - Telemetria (Especializado)

[editar | editar código]

Cenário: Monitoramento de frota de 10.000 veículos com sensores enviando dados a cada 30 segundos.

Conceitos Aplicados:

  • Massive Throughput: 10.000 × 120 mensagens/hora = 1.2M mensagens/hora
  • Time-based Partitioning: Organização temporal dos dados
  • Retention Policies: Diferentes períodos de retenção por tipo de dado
  • Compression: Redução do volume de dados
  • Batch Processing: Agrupamento para eficiência
10.000 Veículos → Sensores (GPS, Motor, Combustível) 
                      ↓
    Topic: "vehicle-telemetry" (50 partições)
                      ↓
        ┌─────────────┼─────────────┐
        │             │             │
Consumer Group:   Consumer Group:  Consumer Group:
"real-time"       "analytics"     "maintenance"
   ↓                 ↓               ↓
Dashboards        Data Lake      Alertas Preventivos

Particionamento por Veículo:

Chave: vehicleId → hash(vehicleId) % 50 → Partição X

Exemplo:
vehicleId="TRUCK001" → Partição 15
vehicleId="TRUCK002" → Partição 33
vehicleId="TRUCK003" → Partição 7

Vantagem: Todos os dados de um veículo ficam ordenados cronologicamente

Configuração Otimizada para IoT:

# Producer otimizado para IoT
batch.size: 65536              # Agrupa mais mensagens por batch
linger.ms: 100                 # Espera 100ms para formar batches maiores  
compression.type: snappy       # Compressão para reduzir bandwidth
acks: 1                        # Acknowledgment básico (não "all")
retries: 3                     # Poucas tentativas (dados frequentes)

Diferentes Retenções por Necessidade:

Tipo de Dado Topic Retenção Justificativa
GPS (posição) vehicle-location 24 horas Apenas posição atual relevante
Motor (RPM, temp) vehicle-engine 30 dias Análise de padrões de uso
Manutenção vehicle-maintenance 2 anos Histórico para garantia
Combustível vehicle-fuel 90 dias Otimização de rotas

Consumer Especializado para Alertas:

// Consumer que monitora temperatura do motor
kafkaConsumer.subscribe("vehicle-engine")
while (true) {
    val records = consumer.poll(1000)
    records.forEach { engineData ->
        val temp = engineData.value.getDouble("engine_temp") 
        if (temp > 95.0) { // Temperatura crítica
            alertProducer.send("maintenance-alerts", 
                createMaintenanceAlert(engineData.key, temp))
        }
    }
}

Caso de Uso 5: Microserviços - Event-Driven Architecture (Arquitetural)

[editar | editar código]

Cenário: Aplicação de delivery com múltiplos microserviços comunicando-se via eventos.

Conceitos Aplicados:

  • Event Sourcing: Toda mudança de estado é um evento
  • CQRS: Separação entre comandos e consultas
  • Saga Pattern: Transações distribuídas
  • Schema Evolution: Evolução de formatos sem quebrar compatibilidade
  • Dead Letter Queues: Tratamento de falhas persistentes
Arquitetura Event-Driven com Kafka:

User Service → "user.registered" → Kafka → Notification Service
     ↓                                          
Order Service → "order.created" → Kafka → Inventory Service
     ↓                                   → Payment Service 
Payment Service → "payment.completed" → Kafka → Delivery Service
     ↓                                        → Order Service
Delivery Service → "delivery.dispatched" → Kafka → Tracking Service
                                              → Notification Service

Event Sourcing na Prática:

Em vez de armazenar estado atual:

Tabela: users
| id | name | email | status |
|----|------|-------|--------|  
| 1  | João | j@... | active |

Armazena sequência de eventos:

Topic: "user-events"
| offset | event_type      | user_id | data |
|--------|-----------------|---------|------|
| 1      | user.registered | 1       | {name: "João", email: "j@..."} |
| 2      | user.activated  | 1       | {} |
| 3      | email.changed   | 1       | {new_email: "joao@..."} |

Saga Pattern para Transações Distribuídas:

Exemplo: Processamento de Pedido (transação que envolve múltiplos serviços)

1. Order Service → "order.create.requested" 
2. Inventory Service → "inventory.reserved"    ✓ Sucesso
3. Payment Service → "payment.processed"      ✓ Sucesso  
4. Delivery Service → "delivery.scheduled"    ✗ Falha
                           ↓
        Compensação (rollback):
5. Payment Service → "payment.refunded"
6. Inventory Service → "inventory.released" 
7. Order Service → "order.cancelled"

Implementação do Saga:

// Orchestrator que coordena a saga
class OrderSagaOrchestrator {
    fun processOrder(orderId: String) {
        // Etapa 1: Reservar estoque
        producer.send("inventory-commands", ReserveInventory(orderId))
        
        // Aguarda confirmação via consumer
        consumer.subscribe("inventory-events")
        val result = waitForEvent("inventory.reserved", orderId, timeout = 30.seconds)
        
        if (result.success) {
            // Etapa 2: Processar pagamento
            producer.send("payment-commands", ProcessPayment(orderId))
            // ... continua saga
        } else {
            // Compensação: cancelar pedido
            producer.send("order-events", OrderCancelled(orderId, "inventory_unavailable"))
        }
    }
}

Schema Evolution:

Evolução do formato de evento sem quebrar consumers existentes:

// Versão 1 (inicial)
{
  "event_type": "user.registered",
  "user_id": "123",
  "email": "user@example.com"
}

// Versão 2 (adicionado campo opcional)
{
  "event_type": "user.registered", 
  "user_id": "123",
  "email": "user@example.com",
  "phone": "+5511999999999"  // Campo novo, opcional
}

// Versão 3 (campo obrigatório com default)
{
  "event_type": "user.registered",
  "user_id": "123", 
  "email": "user@example.com",
  "phone": "+5511999999999",
  "registration_source": "web"  // Novo campo com valor padrão
}

Resumo: Conceitos × Casos de Uso

[editar | editar código]
Conceito Caso Simples Caso Intermediário Caso Avançado Caso IoT Caso Arquitetural
Producer App web Sistema checkout Transação bancária Sensores veículos Microserviços
Consumer Serviço email Múltiplos sistemas Algoritmos ML Dashboards Event handlers
Topic Notificações Pedidos Transações Telemetria Eventos domínio
Partition Por tipo Por cliente Por região Por veículo Por agregado
Consumer Group Um grupo Grupo por sistema Grupo por análise Grupo por função Grupo por serviço
Replication Básica Crítica Ultra-crítica Moderada Por importância
Retention Dias Semanas Anos Variável Event sourcing

Empresas que Utilizam Kafka

[editar | editar código]
Empresa Uso Principal Escala Conceitos Principais Aplicados
LinkedIn Activity streams, métricas +1 trilhão mensagens/dia High throughput, Stream processing
Netflix Monitoramento, logs, recomendações +8 trilhões eventos/dia Event sourcing, Real-time analytics
Spotify Event logging, user analytics +400 bilhões eventos/dia User behavior tracking, Personalization
Uber Surge pricing, localização em tempo real +1 trilhão mensagens/dia Geolocation, Dynamic pricing
Airbnb Analytics, ETL, event streaming +150 bilhões eventos/dia Data pipeline, Business intelligence

Quando NÃO Usar Kafka

[editar | editar código]
  • Aplicações simples com poucos sistemas integrados → Use REST APIs diretas
  • Baixo volume de dados (< 1000 mensagens/hora) → Use filas simples (Redis, SQS)
  • Necessidade de entrega exata uma vez sem configuração adicional → Use message brokers tradicionais
  • Equipe sem experiência em sistemas distribuídos → Comece com soluções mais simples
  • Orçamento limitado para infraestrutura e operação → Considere soluções managed (AWS Kinesis, Google Pub/Sub)

Arquitetura

[editar | editar código]

Antes de partir para a implementação, é importante entender como os componentes do Kafka se relacionam:

Visão Geral da Arquitetura

[editar | editar código]
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Producer  │───▶│    Kafka    │◀───│  Consumer   │
│  (Kotlin)   │    │   Cluster   │    │  (Kotlin)   │
└─────────────┘    └─────────────┘    └─────────────┘
                           │
                   ┌───────▼───────┐
                   │   Zookeeper   │
                   │  (Metadata)   │
                   └───────────────┘

Fluxo Básico de Funcionamento

[editar | editar código]
1. Producer envia mensagem para um Topic
   ↓
2. Kafka armazena a mensagem em uma Partição  
   ↓
3. Consumer lê a mensagem da Partição
   ↓
4. Consumer processa a mensagem
   ↓
5. Consumer confirma o processamento (Offset)

Tutorial Prático

[editar | editar código]

Este tutorial guiará você através da configuração completa do Kafka com Kotlin, desde a instalação até a implementação de producers e consumers funcionais.

Pré-requisitos

[editar | editar código]

Antes de começar, certifique-se de ter instalado:

  • Docker: Versão 20.10 ou superior
  • Docker Compose: Versão 2.0 ou superior
  • JDK: Versão 17 ou superior
  • Kotlin: Versão 1.9 ou superior (ou use via Gradle)

Verificação dos Pré-requisitos:

# Verificar Docker
docker --version
# Saída esperada: Docker version 20.10.x

# Verificar Docker Compose  
docker-compose --version
# Saída esperada: Docker Compose version 2.x.x

# Verificar Java
java --version
# Saída esperada: openjdk 17.x.x ou superior

Passo 1: Instalação do Apache Kafka

[editar | editar código]

Opção A: Instalação com Docker (Recomendada)

[editar | editar código]

A instalação via Docker é a forma mais simples e confiável de executar Kafka localmente.

1.1 Criar Estrutura do Projeto

# Criar diretório do projeto
mkdir kafka-kotlin-tutorial
cd kafka-kotlin-tutorial

# Criar estruturas necessárias
mkdir -p src/main/kotlin/com/exemplo/kafka/{producer,consumer,models}

1.2 Configurar Docker Compose

Criar arquivo `docker-compose.yml` na raiz do projeto:

version: '3.8'
services:
  # Zookeeper - Coordenação do cluster Kafka
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    hostname: zookeeper
    container_name: kafka-zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    healthcheck:
      test: ["CMD", "nc", "-z", "localhost", "2181"]
      interval: 30s
      timeout: 10s
      retries: 3

  # Kafka Broker - Servidor principal
  kafka:
    image: confluentinc/cp-kafka:7.4.0
    hostname: kafka
    container_name: kafka-broker
    depends_on:
      zookeeper:
        condition: service_healthy
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      # Configuração do Broker
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      
      # Configuração de Listeners (conexões)
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      
      # Configurações de Replicação (ambiente desenvolvimento)
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      
      # Monitoramento JMX
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      
      # Auto-criação de tópicos (apenas para desenvolvimento)
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
    healthcheck:
      test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9092"]
      interval: 30s
      timeout: 10s
      retries: 3

  # Interface Web para Monitoramento
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    depends_on:
      kafka:
        condition: service_healthy
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local-cluster
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
    healthcheck:
      test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://localhost:8080"]
      interval: 30s
      timeout: 10s
      retries: 3

1.3 Iniciar o Cluster Kafka

# Iniciar todos os serviços
docker-compose up -d

# Verificar se todos os containers estão rodando
docker-compose ps

# Aguardar inicialização completa (cerca de 30-60 segundos)
# Verificar logs se necessário
docker-compose logs kafka

Saída Esperada:

NAME           IMAGE                            STATUS
kafka-zookeeper    confluentinc/cp-zookeeper:7.4.0   Up (healthy)
kafka-broker       confluentinc/cp-kafka:7.4.0        Up (healthy)  
kafka-ui           provectuslabs/kafka-ui:latest       Up (healthy)

Opção B: Instalação Manual (Alternativa)

[editar | editar código]

Para quem prefere instalação direta no sistema operacional:

Ubuntu/Debian:

# Instalar Java (se não tiver)
sudo apt update
sudo apt install openjdk-17-jdk

# Baixar Kafka
cd /opt
sudo wget https://downloads.apache.org/kafka/2.13-3.5.1/kafka_2.13-3.5.1.tgz
sudo tar -xzf kafka_2.13-3.5.1.tgz
sudo mv kafka_2.13-3.5.1 kafka
sudo chown -R $USER:$USER /opt/kafka

# Iniciar Zookeeper
cd /opt/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties &

# Iniciar Kafka (em outro terminal)
bin/kafka-server-start.sh config/server.properties &

macOS (com Homebrew):

# Instalar Kafka
brew install kafka

# Iniciar Zookeeper
brew services start zookeeper

# Iniciar Kafka
brew services start kafka

Passo 2: Configuração do Projeto Kotlin

[editar | editar código]

2.1 Configurar Gradle

[editar | editar código]

Criar arquivo `build.gradle.kts`:

plugins {
    kotlin("jvm") version "1.9.10"
    application
}

group = "com.exemplo.kafka"
version = "1.0.0"

repositories {
    mavenCentral()
}

dependencies {
    // Kafka Client
    implementation("org.apache.kafka:kafka-clients:3.5.1")
    
    // JSON Processing
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.15.2")
    implementation("com.fasterxml.jackson.core:jackson-databind:2.15.2")
    
    // Logging
    implementation("org.slf4j:slf4j-simple:2.0.7")
    
    // Kotlin Standard Library
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
}

application {
    mainClass.set("com.exemplo.kafka.MainKt")
}

kotlin {
    jvmToolchain(17)
}

tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile> {
    kotlinOptions {
        jvmTarget = "17"
        freeCompilerArgs = listOf("-Xjsr305=strict")
    }
}

2.2 Configurar Gradle Wrapper

[editar | editar código]
# Gerar wrapper do Gradle (se não existir)
gradle wrapper --gradle-version 8.4

# Tornar executável (Linux/macOS)
chmod +x gradlew

Passo 3: Implementação Simples em Kotlin

[editar | editar código]

3.1 Visão Geral da Implementação

[editar | editar código]

Antes de implementar o código, vamos entender o que vamos construir e como funcionará nosso sistema Kafka com Kotlin.

Cenário do Tutorial

[editar | editar código]

Vamos criar um sistema de mensagens distribuído que simula aplicações reais. O cenário é:

  • Aplicação Producer: Serviço que envia mensagens (ex: API de pedidos)
  • Aplicação Consumer: Serviço que processa mensagens (ex: serviço de notificações)
  • Tópico Kafka: `mensagens-tutorial`
  • Tipo de Mensagem: Mensagens de texto com remetente e timestamp
  • Arquitetura: Aplicações separadas comunicando-se via Kafka

Fluxo de Funcionamento

[editar | editar código]
1. APLICAÇÃO PRODUCER (Terminal 1) inicia e envia mensagens
   ↓
2. KAFKA armazena mensagens no tópico
   ↓  
3. APLICAÇÃO CONSUMER (Terminal 2) recebe e processa mensagens
   ↓
4. Ambas aplicações rodam independentemente
   ↓
5. Producer pode ser parado sem afetar Consumer (e vice-versa)

Estrutura do Tópico

[editar | editar código]
Elemento Valor Descrição
Nome do Tópico `mensagens-tutorial` Canal onde mensagens são enviadas/recebidas
Partições 3 (auto-criação) Kafka divide automaticamente para paralelização
Chave da Mensagem ID da mensagem Usado para garantir ordem e particionamento
Valor da Mensagem JSON da mensagem Conteúdo serializado da mensagem
Consumer Group `grupo-tutorial` Grupo que processa as mensagens

Exemplo de Mensagem

[editar | editar código]

Cada mensagem terá este formato JSON:

{
  "id": "msg-001",
  "conteudo": "Primeira mensagem do tutorial Kafka + Kotlin!",
  "timestamp": "2024-01-15T10:30:45.123",
  "remetente": "tutorial"
}

Classes que Implementaremos

[editar | editar código]
Classe Arquivo Responsabilidade Métodos Principais
Mensagem `models/Mensagem.kt` Modelo de dados `toJson()`, `fromJson()`
ProducerSimples `producer/ProducerSimples.kt` Enviar mensagens para Kafka `enviarMensagem()`, `enviarMensagens()`, `fechar()`
ConsumerSimples `consumer/ConsumerSimples.kt` Receber mensagens do Kafka `iniciarConsumo()`, `processarMensagem()`, `fechar()`
AplicacaoProducer `AplicacaoProducer.kt` Aplicação independente para enviar mensagens `main()`, `criarMensagensExemplo()`
AplicacaoConsumer `AplicacaoConsumer.kt` Aplicação independente para processar mensagens `main()`

Detalhamento dos Métodos

[editar | editar código]

Classe Mensagem:

  • `toJson()`: Converte objeto Mensagem para string JSON
  • `fromJson(json: String)`: Cria objeto Mensagem a partir de JSON

Classe ProducerSimples:

  • `enviarMensagem(topico, mensagem)`: Envia uma mensagem individual
  • `enviarMensagens(topico, lista)`: Envia múltiplas mensagens com intervalo
  • `fechar()`: Fecha conexão e libera recursos

Classe ConsumerSimples:

  • `iniciarConsumo(topico)`: Inicia loop de consumo de mensagens
  • `processarMensagem(mensagem, particao, offset)`: Processa mensagem individual
  • `pararConsumo()`: Para o loop de consumo
  • `fechar()`: Fecha conexão e libera recursos

AplicacaoProducer:

  • `main()`: Aplicação independente que envia mensagens para Kafka
  • `criarMensagensExemplo()`: Gera mensagens para demonstração

AplicacaoConsumer:

  • `main()`: Aplicação independente que consome mensagens do Kafka

Configurações Kafka Utilizadas

[editar | editar código]

Producer:

  • Bootstrap Servers: `localhost:9092` (conexão com Kafka)
  • Serializers: `StringSerializer` (converte para bytes)
  • Acks: `"1"` (aguarda confirmação do líder da partição)
  • Retries: `3` (tentativas em caso de falha)

Consumer:

  • Bootstrap Servers: `localhost:9092` (conexão com Kafka)
  • Group ID: `grupo-tutorial` (identificação do grupo)
  • Deserializers: `StringDeserializer` (converte bytes para string)
  • Auto Offset Reset: `"earliest"` (lê desde o início do tópico)
  • Auto Commit: `false` (commit manual para controle)

3.2 Modelo de Dados

[editar | editar código]

Criar `src/main/kotlin/com/exemplo/kafka/models/Mensagem.kt`:

package com.exemplo.kafka.models

import com.fasterxml.jackson.annotation.JsonProperty
import java.time.LocalDateTime

/**
 * Modelo simples para demonstrar Kafka com Kotlin
 * 
 * Esta classe representa uma mensagem que será enviada através do Kafka.
 * Contém informações básicas como ID, conteúdo, timestamp e remetente.
 * 
 * @property id Identificador único da mensagem (usado como chave no Kafka)
 * @property conteudo Texto da mensagem
 * @property timestamp Momento de criação da mensagem
 * @property remetente Quem enviou a mensagem
 */
data class Mensagem(
    @JsonProperty("id")
    val id: String,
    
    @JsonProperty("conteudo") 
    val conteudo: String,
    
    @JsonProperty("timestamp")
    val timestamp: String = LocalDateTime.now().toString(),
    
    @JsonProperty("remetente")
    val remetente: String = "sistema"
) {
    /**
     * Converte a mensagem para formato JSON
     * Este método será usado pelo Producer para serializar a mensagem
     * antes de enviar para o Kafka
     */
    fun toJson(): String {
        val mapper = com.fasterxml.jackson.module.kotlin.jacksonObjectMapper()
        return mapper.writeValueAsString(this)
    }
    
    companion object {
        /**
         * Cria uma Mensagem a partir de uma string JSON
         * Este método será usado pelo Consumer para deserializar
         * mensagens recebidas do Kafka
         */
        fun fromJson(json: String): Mensagem {
            val mapper = com.fasterxml.jackson.module.kotlin.jacksonObjectMapper()
            return mapper.readValue(json, Mensagem::class.java)
        }
    }
}

Por que esta estrutura?

  • `id`: Usado como chave no Kafka para particionamento e ordenação
  • `conteudo`: O texto principal da mensagem
  • `timestamp`: Para rastreamento temporal das mensagens
  • `remetente`: Identificação de quem enviou (útil para filtros/logs)
  • `@JsonProperty`: Garante nomes consistentes na serialização JSON
  • `toJson()` / `fromJson()`: Facilita conversão entre objeto e JSON

3.3 Producer Simples

[editar | editar código]

Criar `src/main/kotlin/com/exemplo/kafka/producer/ProducerSimples.kt`:

package com.exemplo.kafka.producer

import com.exemplo.kafka.models.Mensagem
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import java.util.*

/**
 * Producer Kafka simples para demonstração
 * 
 * Esta classe é responsável por ENVIAR mensagens para o Kafka.
 * Ela conecta com o broker Kafka e publica mensagens no tópico especificado.
 * 
 * Funcionalidades principais:
 * - Conectar com o cluster Kafka
 * - Serializar mensagens para JSON
 * - Enviar mensagens de forma assíncrona
 * - Tratar erros e confirmações
 */
class ProducerSimples {
    private val producer: KafkaProducer<String, String>
    
    init {
        // Configuração do Producer Kafka
        val props = Properties().apply {
            // CONEXÃO: Onde encontrar o Kafka
            put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
            
            // SERIALIZAÇÃO: Como converter objetos Kotlin em bytes
            put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
            put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
            
            // CONFIABILIDADE: Garantias de entrega
            put(ProducerConfig.ACKS_CONFIG, "1") // Aguarda confirmação do líder
            put(ProducerConfig.RETRIES_CONFIG, 3) // Tentativas em caso de falha
            put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000) // Intervalo entre tentativas
            
            // PERFORMANCE: Otimizações para throughput
            put(ProducerConfig.BATCH_SIZE_CONFIG, 16384) // Agrupa mensagens em batches
            put(ProducerConfig.LINGER_MS_CONFIG, 5) // Tempo de espera para formar batch
        }
        
        producer = KafkaProducer(props)
        println("🔗 Producer Kafka conectado e pronto para enviar mensagens")
    }
    
    /**
     * Envia uma mensagem individual para o tópico especificado
     * 
     * @param topico Nome do tópico Kafka onde enviar a mensagem
     * @param mensagem Objeto Mensagem a ser enviado
     * @return true se enviado com sucesso, false caso contrário
     */
    fun enviarMensagem(topico: String, mensagem: Mensagem): Boolean {
        return try {
            // Criar registro Kafka
            val record = ProducerRecord(
                topico,           // Tópico de destino
                mensagem.id,      // Chave (garante ordem por ID)
                mensagem.toJson() // Valor (mensagem serializada)
            )
            
            // Enviar de forma assíncrona com callback
            val future = producer.send(record) { metadata, exception ->
                if (exception != null) {
                    println("❌ Erro ao enviar mensagem ${mensagem.id}: ${exception.message}")
                } else {
                    println("✅ Mensagem ${mensagem.id} enviada com sucesso!")
                    println("   ├─ Tópico: ${metadata.topic()}")
                    println("   ├─ Partição: ${metadata.partition()}")
                    println("   ├─ Offset: ${metadata.offset()}")
                    println("   └─ Timestamp: ${metadata.timestamp()}")
                }
            }
            
            // Aguardar confirmação (torna síncrono para este exemplo)
            future.get()
            true
            
        } catch (e: Exception) {
            println("❌ Erro inesperado ao enviar mensagem: ${e.message}")
            false
        }
    }
    
    /**
     * Envia múltiplas mensagens com intervalo entre elas
     * 
     * @param topico Nome do tópico Kafka
     * @param mensagens Lista de mensagens a serem enviadas
     */
    fun enviarMensagens(topico: String, mensagens: List<Mensagem>) {
        println("📤 Iniciando envio de ${mensagens.size} mensagens para o tópico '$topico'")
        
        mensagens.forEachIndexed { index, mensagem ->
            println("\n📝 Enviando mensagem ${index + 1}/${mensagens.size}")
            val sucesso = enviarMensagem(topico, mensagem)
            
            if (sucesso) {
                println("   ✅ Mensagem enviada: \"${mensagem.conteudo}\"")
            } else {
                println("   ❌ Falha no envio da mensagem ${mensagem.id}")
            }
            
            // Pausa entre mensagens para visualizar o processo
            Thread.sleep(1000)
        }
        
        println("\n🎉 Envio de mensagens concluído!")
    }
    
    /**
     * Fecha o producer e libera recursos
     * IMPORTANTE: Sempre chamar este método ao finalizar
     */
    fun fechar() {
        println("🔒 Fechando producer e liberando recursos...")
        producer.close()
        println("✅ Producer fechado com sucesso")
    }
}

Pontos importantes do Producer:

  • Bootstrap Servers: Como o producer encontra o cluster Kafka
  • Chave da Mensagem: Usamos `mensagem.id` para garantir ordem
  • Callback Assíncrono: Recebe confirmação ou erro de cada envio
  • Batch Processing: Kafka agrupa mensagens para eficiência
  • Retry Logic: Tenta novamente em caso de falhas temporárias

3.4 Consumer Simples

[editar | editar código]

Criar `src/main/kotlin/com/exemplo/kafka/consumer/ConsumerSimples.kt`:

package com.exemplo.kafka.consumer

import com.exemplo.kafka.models.Mensagem
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import java.time.Duration
import java.util.*

/**
 * Consumer Kafka simples para demonstração
 * 
 * Esta classe é responsável por RECEBER mensagens do Kafka.
 * Ela se conecta ao broker, inscreve-se em um tópico e processa
 * mensagens conforme elas chegam.
 * 
 * Funcionalidades principais:
 * - Conectar com o cluster Kafka
 * - Inscrever-se em tópicos específicos
 * - Fazer polling (buscar) por novas mensagens
 * - Deserializar JSON para objetos Kotlin
 * - Processar mensagens de forma controlada
 * - Confirmar processamento (commit de offset)
 */
class ConsumerSimples(private val grupoConsumidor: String) {
    private val consumer: KafkaConsumer<String, String>
    private var continuarConsumindo = true
    
    init {
        // Configuração do Consumer Kafka
        val props = Properties().apply {
            // CONEXÃO: Onde encontrar o Kafka
            put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
            
            // IDENTIFICAÇÃO: Grupo de consumidores
            put(ConsumerConfig.GROUP_ID_CONFIG, grupoConsumidor)
            
            // DESERIALIZAÇÃO: Como converter bytes em objetos Kotlin
            put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
            put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
            
            // POSICIONAMENTO: De onde começar a ler
            put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") // Lê desde o início
            
            // CONFIRMAÇÃO: Como confirmar processamento
            put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") // Commit manual
            put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
            
            // SESSÃO: Configurações de conectividade
            put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") // Timeout da sessão
            put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000") // Heartbeat
        }
        
        consumer = KafkaConsumer(props)
        println("🔗 Consumer Kafka criado para o grupo '$grupoConsumidor'")
    }
    
    /**
     * Inicia o consumo de mensagens do tópico especificado
     * 
     * Este método:
     * 1. Inscreve o consumer no tópico
     * 2. Entra em loop infinito fazendo polling
     * 3. Processa cada mensagem recebida
     * 4. Confirma o processamento (commit)
     * 
     * @param topico Nome do tópico para consumir mensagens
     */
    fun iniciarConsumo(topico: String) {
        try {
            // INSCRIÇÃO: Dizer ao Kafka que queremos mensagens deste tópico
            consumer.subscribe(listOf(topico))
            println("🎯 Consumer inscrito no tópico: $topico")
            println("👥 Grupo de consumidores: $grupoConsumidor")
            println("⏳ Aguardando mensagens...\n")
            
            var contadorMensagens = 0
            
            // LOOP PRINCIPAL: Buscar e processar mensagens continuamente
            while (continuarConsumindo) {
                // POLLING: Buscar novas mensagens (aguarda até 1 segundo)
                val records = consumer.poll(Duration.ofMillis(1000))
                
                // PROCESSAR: Cada mensagem recebida
                records.forEach { record ->
                    try {
                        contadorMensagens++
                        
                        // DESERIALIZAR: Converter JSON para objeto Mensagem
                        val mensagem = Mensagem.fromJson(record.value())
                        
                        // PROCESSAR: Executar lógica de negócio
                        processarMensagem(mensagem, record.partition(), record.offset(), contadorMensagens)
                        
                        // CONFIRMAR: Commit manual do offset
                        consumer.commitSync()
                        
                    } catch (e: Exception) {
                        println("❌ Erro ao processar mensagem:")
                        println("   ├─ Partição: ${record.partition()}")
                        println("   ├─ Offset: ${record.offset()}")
                        println("   ├─ Valor: ${record.value()}")
                        println("   └─ Erro: ${e.message}")
                    }
                }
                
                // OTIMIZAÇÃO: Pequena pausa se não há mensagens
                if (records.isEmpty) {
                    Thread.sleep(100)
                }
            }
            
        } catch (e: Exception) {
            println("❌ Erro crítico no consumer: ${e.message}")
        } finally {
            fechar()
        }
    }
    
    /**
     * Processa uma mensagem individual
     * 
     * Aqui é onde colocamos a LÓGICA DE NEGÓCIO específica
     * da nossa aplicação. No tutorial, apenas exibimos a mensagem,
     * mas em aplicações reais aqui teríamos:
     * - Salvamento em banco de dados
     * - Envio de emails/notificações  
     * - Chamadas para APIs externas
     * - Transformação de dados
     * 
     * @param mensagem Objeto mensagem deserializado
     * @param particao Partição de onde veio a mensagem
     * @param offset Posição da mensagem na partição
     * @param contador Número sequencial de mensagens processadas
     */
    private fun processarMensagem(mensagem: Mensagem, particao: Int, offset: Long, contador: Int) {
        println("📨 Mensagem #$contador recebida:")
        println("   ├─ ID: ${mensagem.id}")
        println("   ├─ Conteúdo: ${mensagem.conteudo}")
        println("   ├─ Remetente: ${mensagem.remetente}")
        println("   ├─ Timestamp: ${mensagem.timestamp}")
        println("   ├─ Partição: $particao")
        println("   └─ Offset: $offset")
        println()
        
        // SIMULAR PROCESSAMENTO: Na vida real seria bem mais complexo
        Thread.sleep(500)
        
        // Exemplos do que poderia ser feito aqui:
        // - salvarNoBancoDeDados(mensagem)
        // - enviarNotificacao(mensagem.remetente, mensagem.conteudo)
        // - atualizarCache(mensagem.id, mensagem)
        // - chamarApiExterna(mensagem)
    }
    
    /**
     * Para o loop de consumo
     * Útil para shutdown graceful da aplicação
     */
    fun pararConsumo() {
        println("🛑 Solicitação de parada do consumer recebida")
        continuarConsumindo = false
    }
    
    /**
     * Fecha o consumer e libera recursos
     * IMPORTANTE: Sempre chamar este método ao finalizar
     */
    fun fechar() {
        println("🔒 Fechando consumer e liberando recursos...")
        consumer.close()
        println("✅ Consumer fechado com sucesso")
    }
}

Pontos importantes do Consumer:

  • Consumer Group: Permite balanceamento de carga entre múltiplos consumers
  • Offset Management: Controla qual mensagem foi processada por último
  • Polling Model: Consumer "puxa" mensagens (não são "empurradas")
  • Manual Commit: Confirma processamento apenas após sucesso
  • Error Handling: Trata erros sem parar o consumer

3.5 Aplicações Separadas (Arquitetura Real)

[editar | editar código]

⚠️ IMPORTANTE: Na prática, producer e consumer rodam em aplicações diferentes. Isso reflete melhor a realidade de sistemas distribuídos onde:

  • Producer: Pode ser uma API REST, um serviço de pedidos, etc.
  • Consumer: Pode ser um serviço de notificações, processamento de dados, etc.

3.5.1 Aplicação Producer

[editar | editar código]

Criar `src/main/kotlin/com/exemplo/kafka/AplicacaoProducer.kt`:

package com.exemplo.kafka

import com.exemplo.kafka.models.Mensagem
import com.exemplo.kafka.producer.ProducerSimples

/**
 * Aplicação Producer - Envia mensagens para Kafka
 * 
 * Esta aplicação simula um serviço que gera eventos/mensagens.
 * Exemplos reais: API de e-commerce enviando eventos de pedidos,
 * sensor IoT enviando telemetria, sistema de logs, etc.
 * 
 * EXECUÇÃO: ./gradlew run -PmainClass=com.exemplo.kafka.AplicacaoProducer
 */
fun main() {
    println("📤 APLICAÇÃO PRODUCER - Kafka + Kotlin")
    println("======================================\n")
    
    val topico = "mensagens-tutorial"
    
    // Aguardar Kafka estar disponível
    println("⏳ Conectando com Kafka...")
    Thread.sleep(2000)
    
    // Criar producer
    val producer = ProducerSimples()
    
    try {
        // Cenário 1: Enviar mensagens de exemplo (simulando lote inicial)
        println("📝 Cenário 1: Enviando lote de mensagens de exemplo\n")
        val mensagensExemplo = criarMensagensExemplo()
        producer.enviarMensagens(topico, mensagensExemplo)
        
        Thread.sleep(3000)
        
        // Cenário 2: Simular chegada de novas mensagens (como uma API recebendo requests)
        println("\n📨 Cenário 2: Simulando chegada de novas mensagens...\n")
        val novasMensagens = simularNovasMensagens()
        
        novasMensagens.forEach { mensagem ->
            producer.enviarMensagem(topico, mensagem)
            Thread.sleep(2000) // Simula intervalo entre requests
        }
        
        println("\n✅ Producer finalizou o envio de todas as mensagens")
        println("🔄 Consumer pode continuar rodando em outra aplicação")
        
    } catch (e: Exception) {
        println("❌ Erro na aplicação producer: ${e.message}")
    } finally {
        producer.fechar()
        println("👋 Aplicação Producer finalizada")
    }
}

/**
 * Cria mensagens de exemplo iniciais
 */
fun criarMensagensExemplo(): List<Mensagem> {
    return listOf(
        Mensagem(
            id = "msg-001",
            conteudo = "Sistema iniciado - Primeira mensagem",
            remetente = "sistema"
        ),
        Mensagem(
            id = "msg-002", 
            conteudo = "Tutorial Kafka + Kotlin em execução",
            remetente = "tutorial"
        ),
        Mensagem(
            id = "msg-003",
            conteudo = "Mensagem de teste de conectividade",
            remetente = "health-check"
        )
    )
}

/**
 * Simula chegada de mensagens (como requests de uma API)
 */
fun simularNovasMensagens(): List<Mensagem> {
    return listOf(
        Mensagem(
            id = "pedido-001",
            conteudo = "Novo pedido recebido: Smartphone XYZ",
            remetente = "api-pedidos"
        ),
        Mensagem(
            id = "user-005",
            conteudo = "Usuário realizou login no sistema",
            remetente = "api-auth"
        ),
        Mensagem(
            id = "notif-001",
            conteudo = "Enviar email de confirmação para cliente",
            remetente = "sistema-notificacoes"
        )
    )
}

3.5.2 Aplicação Consumer

[editar | editar código]

Criar `src/main/kotlin/com/exemplo/kafka/AplicacaoConsumer.kt`:

package com.exemplo.kafka

import com.exemplo.kafka.consumer.ConsumerSimples

/**
 * Aplicação Consumer - Processa mensagens do Kafka
 * 
 * Esta aplicação simula um serviço que processa eventos/mensagens.
 * Exemplos reais: serviço de notificações, processamento de pedidos,
 * analytics, indexação para busca, backup de dados, etc.
 * 
 * EXECUÇÃO: ./gradlew run -PmainClass=com.exemplo.kafka.AplicacaoConsumer
 */
fun main() {
    println("📨 APLICAÇÃO CONSUMER - Kafka + Kotlin")
    println("======================================\n")
    
    val topico = "mensagens-tutorial"
    val grupoConsumidor = "processador-mensagens"
    
    println("🎯 Configuração do Consumer:")
    println("   ├─ Tópico: $topico")
    println("   ├─ Grupo: $grupoConsumidor")
    println("   └─ Broker: localhost:9092\n")
    
    // Aguardar Kafka estar disponível
    println("⏳ Conectando com Kafka...")
    Thread.sleep(1000)
    
    // Criar e iniciar consumer
    val consumer = ConsumerSimples(grupoConsumidor)
    
    try {
        println("🔄 Iniciando consumo contínuo...")
        println("📨 Aguardando mensagens (Ctrl+C para parar)...\n")
        
        // IMPORTANTE: Este loop roda indefinidamente
        // Na prática, seria um serviço que roda 24/7
        consumer.iniciarConsumo(topico)
        
    } catch (e: InterruptedException) {
        println("\n🛑 Consumer interrompido pelo usuário")
    } catch (e: Exception) {
        println("❌ Erro na aplicação consumer: ${e.message}")
    } finally {
        consumer.fechar()
        println("👋 Aplicação Consumer finalizada")
    }
}

3.5.3 Configuração do Gradle para Múltiplas Aplicações

[editar | editar código]

Atualizar `build.gradle.kts` para suportar execução de aplicações específicas:

plugins {
    kotlin("jvm") version "1.9.10"
    application
}

group = "com.exemplo.kafka"
version = "1.0.0"

repositories {
    mavenCentral()
}

dependencies {
    // Kafka Client
    implementation("org.apache.kafka:kafka-clients:3.5.1")
    
    // JSON Processing
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.15.2")
    implementation("com.fasterxml.jackson.core:jackson-databind:2.15.2")
    
    // Logging
    implementation("org.slf4j:slf4j-simple:2.0.7")
    
    // Kotlin Standard Library
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
}

// Configuração para aplicação padrão (Consumer)
application {
    mainClass.set("com.exemplo.kafka.AplicacaoConsumerKt")
}

kotlin {
    jvmToolchain(17)
}

tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile> {
    kotlinOptions {
        jvmTarget = "17"
        freeCompilerArgs = listOf("-Xjsr305=strict")
    }
}

// Task específica para executar o Producer
tasks.register<JavaExec>("runProducer") {
    group = "application"
    description = "Executa a aplicação Producer"
    classpath = sourceSets["main"].runtimeClasspath
    mainClass.set("com.exemplo.kafka.AplicacaoProducerKt")
}

// Task específica para executar o Consumer
tasks.register<JavaExec>("runConsumer") {
    group = "application"
    description = "Executa a aplicação Consumer"
    classpath = sourceSets["main"].runtimeClasspath
    mainClass.set("com.exemplo.kafka.AplicacaoConsumerKt")
}

Vantagens desta Arquitetura:

  • 🔄 Independência: Producer e Consumer podem ser reiniciados independentemente
  • 📈 Escalabilidade: Pode executar múltiplos consumers para balanceamento de carga
  • 🛠️ Manutenção: Atualizações em um serviço não afetam o outro
  • 🎯 Especialização: Cada aplicação foca em sua responsabilidade específica
  • 🌐 Distribuição: Podem rodar em servidores diferentes
  • ⚡ Performance: Não há interferência entre producer e consumer

Resumo da Implementação

[editar | editar código]
Componente Responsabilidade Execução Comportamento
Mensagem Estrutura de dados - Objeto serializable JSON
ProducerSimples Enviar mensagens - Classe utilitária
ConsumerSimples Receber mensagens - Classe utilitária
AplicacaoProducer Aplicação independente `./gradlew runProducer` Envia mensagens e finaliza
AplicacaoConsumer Aplicação independente `./gradlew runConsumer` Processa mensagens continuamente

Agora temos um sistema distribuído real onde: 1. AplicacaoProducer envia mensagens e pode ser finalizada 2. AplicacaoConsumer fica rodando continuamente processando mensagens 3. Ambas aplicações são independentes e podem rodar em momentos diferentes 4. Reflete a arquitetura real de sistemas distribuídos com Kafka

Passo 4: Testando a Implementação

[editar | editar código]

4.1 Verificar se Kafka está Rodando

[editar | editar código]
# Verificar containers
docker-compose ps

# Verificar logs do Kafka
docker-compose logs kafka | tail -20

# Testar conectividade
docker exec kafka-broker kafka-topics --list --bootstrap-server localhost:9092

4.2 Compilar o Projeto

[editar | editar código]
# Compilar o projeto
./gradlew build

# Verificar se compilou sem erros
./gradlew compileKotlin

4.3 Executar as Aplicações Separadas

[editar | editar código]

⭐ NOVA ABORDAGEM: Aplicações Independentes

Agora temos duas aplicações separadas que simulam um ambiente distribuído real:

4.3.1 Executar Consumer (Terminal 1)

[editar | editar código]
# Executar aplicação Consumer (fica rodando continuamente)
./gradlew runConsumer

Saída do Consumer:

📨 APLICAÇÃO CONSUMER - Kafka + Kotlin
======================================

🎯 Configuração do Consumer:
   ├─ Tópico: mensagens-tutorial
   ├─ Grupo: processador-mensagens
   └─ Broker: localhost:9092

⏳ Conectando com Kafka...
🔗 Consumer Kafka criado para o grupo 'processador-mensagens'
🔄 Iniciando consumo contínuo...
📨 Aguardando mensagens (Ctrl+C para parar)...

🎯 Consumer inscrito no tópico: mensagens-tutorial
👥 Grupo de consumidores: processador-mensagens
⏳ Aguardando mensagens...

4.3.2 Executar Producer (Terminal 2)

[editar | editar código]
# Em outro terminal, executar aplicação Producer
./gradlew runProducer

Saída do Producer:

📤 APLICAÇÃO PRODUCER - Kafka + Kotlin
======================================

⏳ Conectando com Kafka...
🔗 Producer Kafka conectado e pronto para enviar mensagens

📝 Cenário 1: Enviando lote de mensagens de exemplo

📤 Iniciando envio de 3 mensagens para o tópico 'mensagens-tutorial'

📝 Enviando mensagem 1/3
✅ Mensagem msg-001 enviada com sucesso!
   ├─ Tópico: mensagens-tutorial
   ├─ Partição: 0
   ├─ Offset: 0
   └─ Timestamp: 1641234567890
   ✅ Mensagem enviada: "Sistema iniciado - Primeira mensagem"

...

📨 Cenário 2: Simulando chegada de novas mensagens...

📝 Enviando mensagem 1/1
✅ Mensagem pedido-001 enviada com sucesso!
   ├─ Tópico: mensagens-tutorial
   ├─ Partição: 1
   ├─ Offset: 0
   └─ Timestamp: 1641234571890
   ✅ Mensagem enviada: "Novo pedido recebido: Smartphone XYZ"

✅ Producer finalizou o envio de todas as mensagens
🔄 Consumer pode continuar rodando em outra aplicação
🔒 Fechando producer e liberando recursos...
✅ Producer fechado com sucesso
👋 Aplicação Producer finalizada

4.3.3 Saída no Consumer (Terminal 1)

[editar | editar código]

Enquanto isso, no Terminal 1 (Consumer), você verá:

📨 Mensagem #1 recebida:
   ├─ ID: msg-001
   ├─ Conteúdo: Sistema iniciado - Primeira mensagem
   ├─ Remetente: sistema
   ├─ Timestamp: 2024-01-15T10:30:45.123
   ├─ Partição: 0
   └─ Offset: 0

📨 Mensagem #2 recebida:
   ├─ ID: msg-002
   ├─ Conteúdo: Tutorial Kafka + Kotlin em execução
   ├─ Remetente: tutorial
   ├─ Timestamp: 2024-01-15T10:30:46.456
   ├─ Partição: 2
   └─ Offset: 0

📨 Mensagem #3 recebida:
   ├─ ID: pedido-001
   ├─ Conteúdo: Novo pedido recebido: Smartphone XYZ
   ├─ Remetente: api-pedidos
   ├─ Timestamp: 2024-01-15T10:30:51.789
   ├─ Partição: 1
   └─ Offset: 0

# Consumer continua rodando, aguardando novas mensagens...

4.3.4 Comandos Alternativos

[editar | editar código]
# Executar Consumer como aplicação padrão
./gradlew run

# Executar Producer usando classe específica
./gradlew run -PmainClass=com.exemplo.kafka.AplicacaoProducerKt

# Executar Consumer usando classe específica  
./gradlew run -PmainClass=com.exemplo.kafka.AplicacaoConsumerKt

🎯 Vantagens desta Abordagem:

  • Realismo: Simula ambiente de produção onde serviços rodam independentemente
  • Flexibilidade: Producer pode ser executado múltiplas vezes sem reiniciar Consumer
  • Escalabilidade: Pode executar múltiplos Consumers em paralelo
  • Manutenção: Atualizações em um serviço não afetam o outro
  • Teste: Facilita teste de cenários como falhas e recuperação

Passo 5: Monitoramento e Verificação

[editar | editar código]

5.1 Interface Web Kafka UI

[editar | editar código]

Acesse localhost:8080 no navegador para:

  • Visualizar tópicos criados automaticamente
  • Monitorar mensagens em tempo real
  • Verificar consumer groups e seu status
  • Analisar performance do cluster

5.2 Comandos de Linha Kafka

[editar | editar código]
# Listar tópicos
docker exec kafka-broker kafka-topics --list --bootstrap-server localhost:9092

# Detalhar tópico específico
docker exec kafka-broker kafka-topics --describe \
  --topic mensagens-tutorial --bootstrap-server localhost:9092

# Consumir mensagens via linha de comando
docker exec kafka-broker kafka-console-consumer \
  --topic mensagens-tutorial --bootstrap-server localhost:9092 \
  --from-beginning

# Produzir mensagens via linha de comando
docker exec -it kafka-broker kafka-console-producer \
  --topic mensagens-tutorial --bootstrap-server localhost:9092

5.3 Verificar Consumer Groups

[editar | editar código]
# Listar grupos de consumidores
docker exec kafka-broker kafka-consumer-groups \
  --bootstrap-server localhost:9092 --list

# Detalhar grupo específico
docker exec kafka-broker kafka-consumer-groups \
  --bootstrap-server localhost:9092 \
  --group grupo-tutorial --describe

Passo 6: Experimentações Adicionais

[editar | editar código]

6.1 Teste com Múltiplos Consumers

[editar | editar código]

Execute a aplicação em múltiplos terminais para ver o balanceamento de carga:

# Terminal 1
./gradlew run

# Terminal 2 (em outro terminal)
./gradlew run

6.2 Teste de Tolerância a Falhas

[editar | editar código]
# 1. Execute a aplicação
./gradlew run

# 2. Em outro terminal, pare o Kafka
docker-compose stop kafka

# 3. Reinicie o Kafka
docker-compose start kafka

# Observe como a aplicação se comporta

6.3 Criação Manual de Tópicos

[editar | editar código]
# Criar tópico com configurações específicas
docker exec kafka-broker kafka-topics --create \
  --topic meu-topico-personalizado \
  --bootstrap-server localhost:9092 \
  --partitions 3 \
  --replication-factor 1

# Configurar retenção de mensagens
docker exec kafka-broker kafka-configs --alter \
  --bootstrap-server localhost:9092 \
  --entity-type topics \
  --entity-name meu-topico-personalizado \
  --add-config retention.ms=86400000

Solução de Problemas

[editar | editar código]

Problemas Comuns

[editar | editar código]
Problema Sintoma Solução
Kafka não inicia Container em estado "Exited" Verificar logs: docker-compose logs kafka
Conexão recusada Connection refused Aguardar inicialização ou verificar porta 9092
Mensagens não aparecem Consumer não recebe mensagens Verificar nome do tópico e grupo de consumidores
Erro de serialização JsonParseException Verificar formato JSON das mensagens
OutOfMemoryError JVM fica sem memória Ajustar heap: -Xmx512m

Comandos de Diagnóstico

[editar | editar código]
# Verificar se portas estão abertas
netstat -tuln | grep -E "(9092|2181|8080)"

# Verificar logs em tempo real
docker-compose logs -f kafka

# Resetar ambiente (CUIDADO: apaga dados)
docker-compose down -v
docker-compose up -d

# Verificar conectividade interna
docker exec kafka-broker kafka-broker-api-versions \
  --bootstrap-server localhost:9092

Logs Úteis

[editar | editar código]
# Logs detalhados do Kafka
docker-compose logs kafka | grep -E "(ERROR|WARN)"

# Logs do consumer
docker-compose logs kafka | grep "consumer"

# Monitorar criação de tópicos
docker-compose logs kafka | grep "Created topic"

Finalização

[editar | editar código]

Para parar todo o ambiente:

# Parar aplicação Kotlin (Ctrl+C no terminal)

# Parar containers Docker
docker-compose down

# Parar e remover volumes (dados são perdidos)
docker-compose down -v

Próximos Passos

[editar | editar código]

Após concluir este tutorial básico, você pode explorar:

  • Kafka Streams: Processamento de streams em tempo real
  • Schema Registry: Gerenciamento de esquemas de mensagens
  • Configurações de Produção: Replicação, particionamento otimizado
  • Monitoramento Avançado: Métricas e alertas
  • Segurança: SSL/TLS e autenticação SASL
  • Performance Tuning: Otimização para alto throughput

Glossário

[editar | editar código]
Bootstrap Servers
Lista de brokers Kafka para conexão inicial
Consumer Group
Conjunto de consumers que dividem o processamento de um tópico
Idempotência
Garantia de que operações duplicadas não causam efeitos colaterais
Lag
Diferença entre mensagens produzidas e consumidas
Offset
Posição sequencial de uma mensagem dentro de uma partição
Partition
Divisão lógica de um tópico para paralelização
Rebalancing
Redistribuição de partições entre consumers de um grupo
Replication Factor
Número de cópias de cada partição no cluster

Ver Também

[editar | editar código]
[editar | editar código]