Apache Kafka em Kotlin
Visão Geral
[editar | editar código]Apache Kafka é uma plataforma de streaming distribuída desenvolvida originalmente pela LinkedIn e posteriormente doada à Apache Software Foundation. A integração com Kotlin fornece uma abordagem moderna e concisa para desenvolvimento de sistemas de mensageria, aproveitando as características de segurança de tipos e expressividade da linguagem.
Características Principais do Apache Kafka
[editar | editar código]- Alta Taxa de Transferência: Capacidade de processar milhões de mensagens por segundo
- Escalabilidade Horizontal: Expansão através da adição de novos brokers ao cluster
- Persistência Durável: Armazenamento de mensagens em disco com replicação automática
- Tolerância a Falhas: Continuidade operacional mesmo com falhas de componentes individuais
- Processamento em Tempo Real: Suporte nativo para streaming de dados
- Baixo Acoplamento: Independência entre produtores e consumidores
Por que Kafka é Diferente
[editar | editar código]Kafka se diferencia de outras soluções de mensageria tradicionais:
| Característica | Kafka | Sistemas Tradicionais (RabbitMQ, ActiveMQ) |
|---|---|---|
| Persistência | Todas as mensagens são persistidas | Geralmente apenas em memória |
| Ordem | Garantida por partição | Limitada ou inexistente |
| Consumo | Múltiplos consumers podem ler a mesma mensagem | Mensagem consumida uma vez |
| Throughput | Extremamente alto (milhões/seg) | Moderado (milhares/seg) |
| Retenção | Configurável (dias/semanas) | Até o consumo |
| Escalabilidade | Horizontal via partições | Vertical limitada |
Conceitos Fundamentais
[editar | editar código]Terminologia
[editar | editar código]| Termo | Definição |
|---|---|
| Producer | Componente responsável pela publicação de mensagens em tópicos |
| Consumer | Componente que consome mensagens de tópicos específicos |
| Topic | Canal lógico onde mensagens são organizadas por categoria |
| Partition | Subdivisão de um tópico para paralelização e escalabilidade |
| Broker | Servidor individual que compõe o cluster Kafka |
| Cluster | Conjunto de brokers trabalhando em conjunto |
| Consumer Group | Grupo de consumidores que compartilham a carga de processamento |
| Offset | Identificador sequencial único de cada mensagem dentro de uma partição |
Comparação com Outros Sistemas
[editar | editar código]- Banco de Dados Tradicional
- Kafka não substitui bancos de dados, mas complementa-os oferecendo capacidade de streaming e processamento em tempo real
- Filas de Mensagens (RabbitMQ, SQS)
- Kafka mantém mensagens por mais tempo e permite múltiplos consumidores, ideal para arquiteturas orientadas a eventos
- Sistemas de Logs (ELK Stack)
- Kafka pode alimentar sistemas de logs, mas também processa e transforma dados em tempo real
- APIs REST
- Kafka oferece comunicação assíncrona e desacoplada, complementando APIs síncronas
Casos de Uso e Exemplos Práticos
[editar | editar código]Esta seção demonstra como os conceitos fundamentais do Kafka se aplicam em cenários reais, conectando teoria e prática.
Caso de Uso 1: Sistema de Notificações (Iniciante)
[editar | editar código]Cenário: Uma aplicação web precisa enviar notificações quando usuários realizam ações.
Conceitos Aplicados:
- Topic:
user-notifications - Producer: Aplicação web que detecta ações dos usuários
- Consumer: Serviço de email que envia notificações
- Partições: Uma partição por tipo de notificação
Ação do Usuário → Producer → Topic: "user-notifications" → Consumer → Email Enviado
Implementação dos Conceitos:
| Conceito | Como se Manifesta | Benefício |
|---|---|---|
| Producer | Aplicação web publica evento "user_registered" | Desacoplamento: app não precisa saber sobre email |
| Topic | Canal dedicado para notificações | Organização: separa notificações de outros eventos |
| Consumer | Serviço dedicado processa notificações | Especialização: foco apenas em envio de emails |
| Partição | Partição 0: cadastros, Partição 1: compras | Paralelização: processa tipos diferentes simultaneamente |
| Offset | Controla quais notificações já foram enviadas | Confiabilidade: não envia email duplicado |
Exemplo de Mensagem:
{
"tipo": "user_registered",
"usuario_id": "12345",
"email": "user@example.com",
"timestamp": "2024-01-15T10:30:00Z",
"template": "welcome_email"
}
Por que Kafka aqui?
- Se o serviço de email falhar, mensagens ficam persistidas no tópico
- Podem adicionar SMS, push notifications como novos consumers
- Aplicação principal não trava esperando envio de email
Caso de Uso 2: E-commerce - Processamento de Pedidos (Intermediário)
[editar | editar código]Cenário: Quando um produto é vendido, múltiplos sistemas precisam ser atualizados simultaneamente.
Conceitos Aplicados:
- Topic:
order-events - Producer: Sistema de checkout
- Multiple Consumers: Estoque, Pagamento, Entrega, Analytics
- Consumer Groups: Cada sistema tem seu próprio grupo
- Replication: Dados críticos replicados para tolerância a falhas
Evento: "order.created" (Producer: Checkout)
↓
Topic: "order-events"
↓
┌─────────────────────────────────────────────────────┐
│ Consumer Group: "inventory" → Reduz estoque │
│ Consumer Group: "payments" → Processa pagamento │
│ Consumer Group: "shipping" → Cria etiqueta │
│ Consumer Group: "analytics" → Atualiza métricas │
│ Consumer Group: "crm" → Histórico cliente │
└─────────────────────────────────────────────────────┘
Aplicação Detalhada dos Conceitos:
Producer (Sistema de Checkout):
// Quando pedido é finalizado
val evento = OrderEvent(
orderId = "ORD-12345",
customerId = "CUST-789",
items = listOf("PROD-A", "PROD-B"),
total = 299.99
)
producer.send("order-events", evento.customerId, evento.toJson())
Consumer Groups (Diferentes Serviços):
- Grupo "inventory": Consome para atualizar estoque
- Grupo "payments": Consome para processar pagamento
- Grupo "shipping": Consome para preparar envio
- Grupo "analytics": Consome para relatórios
Vantagem dos Consumer Groups: Cada grupo recebe a mesma mensagem independentemente, permitindo processamento paralelo sem interferência.
Particionamento Estratégico:
Chave: customerId → Garante que pedidos do mesmo cliente
sejam processados em ordem
Partição 0: Clientes cujo ID termina em 0,1,2
Partição 1: Clientes cujo ID termina em 3,4,5
Partição 2: Clientes cujo ID termina em 6,7,8,9
Tolerância a Falhas na Prática:
- Se inventory service falhar, outros continuam processando
- Quando inventory service voltar, retoma do último offset
- Nenhum evento de estoque é perdido
Caso de Uso 3: Análise Financeira em Tempo Real (Avançado)
[editar | editar código]Cenário: Detecção de fraudes em transações bancárias com múltiplos algoritmos analisando simultaneamente.
Conceitos Aplicados:
- High Throughput: Milhares de transações por segundo
- Multiple Topics: Separação por tipo de análise
- Stream Processing: Análise contínua sem armazenamento
- Low Latency: Decisões em milissegundos
- Event Sourcing: Histórico completo para auditoria
Transação Bancária → Topic: "raw-transactions"
↓
┌────────┼────────┐
│ │ │
Consumer A Consumer B Consumer C
(Geo-Analysis) (ML-Model) (Pattern-Detection)
│ │ │
↓ ↓ ↓
Topic: "geo- Topic: Topic:
alerts" "ml-scores" "patterns"
│ │ │
└────────┼────────┘
↓
Consumer: "fraud-aggregator"
↓
Decisão: APROVAR/BLOQUEAR
Implementação dos Conceitos Avançados:
Múltiplos Topics (Event Sourcing):
| Topic | Propósito | Retenção | Partições |
|---|---|---|---|
raw-transactions |
Transações originais | 30 dias | 10 (por região) |
geo-alerts |
Alertas geográficos | 7 dias | 3 (por criticidade) |
ml-scores |
Pontuações ML | 1 dia | 5 (por modelo) |
final-decisions |
Decisões finais | 365 dias | 1 (ordem cronológica) |
Stream Processing em Ação:
// Consumer que analisa padrões geográficos
kafkaConsumer.subscribe("raw-transactions")
while (true) {
val records = consumer.poll(100) // 100ms timeout
records.forEach { transaction ->
val geoRisk = analyzeLocation(transaction.location, transaction.userId)
if (geoRisk > THRESHOLD) {
producer.send("geo-alerts", createAlert(transaction, geoRisk))
}
}
}
Particionamento para Performance:
Topic: "raw-transactions" ├── Partição 0: Transações Região Norte → Consumer Norte ├── Partição 1: Transações Região Sul → Consumer Sul ├── Partição 2: Transações Região Sudeste → Consumer Sudeste └── Partição 3: Transações Internacionais → Consumer Internacional Benefício: Análise geográfica paralela + Ordem regional preservada
Alta Disponibilidade:
- Replication Factor = 3: Cada partição replicada em 3 brokers
- Acks = "all": Transação só confirmada após replicação completa
- Retries = 5: Múltiplas tentativas em caso de falha temporária
Caso de Uso 4: Internet das Coisas (IoT) - Telemetria (Especializado)
[editar | editar código]Cenário: Monitoramento de frota de 10.000 veículos com sensores enviando dados a cada 30 segundos.
Conceitos Aplicados:
- Massive Throughput: 10.000 × 120 mensagens/hora = 1.2M mensagens/hora
- Time-based Partitioning: Organização temporal dos dados
- Retention Policies: Diferentes períodos de retenção por tipo de dado
- Compression: Redução do volume de dados
- Batch Processing: Agrupamento para eficiência
10.000 Veículos → Sensores (GPS, Motor, Combustível)
↓
Topic: "vehicle-telemetry" (50 partições)
↓
┌─────────────┼─────────────┐
│ │ │
Consumer Group: Consumer Group: Consumer Group:
"real-time" "analytics" "maintenance"
↓ ↓ ↓
Dashboards Data Lake Alertas Preventivos
Particionamento por Veículo:
Chave: vehicleId → hash(vehicleId) % 50 → Partição X Exemplo: vehicleId="TRUCK001" → Partição 15 vehicleId="TRUCK002" → Partição 33 vehicleId="TRUCK003" → Partição 7 Vantagem: Todos os dados de um veículo ficam ordenados cronologicamente
Configuração Otimizada para IoT:
# Producer otimizado para IoT
batch.size: 65536 # Agrupa mais mensagens por batch
linger.ms: 100 # Espera 100ms para formar batches maiores
compression.type: snappy # Compressão para reduzir bandwidth
acks: 1 # Acknowledgment básico (não "all")
retries: 3 # Poucas tentativas (dados frequentes)
Diferentes Retenções por Necessidade:
| Tipo de Dado | Topic | Retenção | Justificativa |
|---|---|---|---|
| GPS (posição) | vehicle-location |
24 horas | Apenas posição atual relevante |
| Motor (RPM, temp) | vehicle-engine |
30 dias | Análise de padrões de uso |
| Manutenção | vehicle-maintenance |
2 anos | Histórico para garantia |
| Combustível | vehicle-fuel |
90 dias | Otimização de rotas |
Consumer Especializado para Alertas:
// Consumer que monitora temperatura do motor
kafkaConsumer.subscribe("vehicle-engine")
while (true) {
val records = consumer.poll(1000)
records.forEach { engineData ->
val temp = engineData.value.getDouble("engine_temp")
if (temp > 95.0) { // Temperatura crítica
alertProducer.send("maintenance-alerts",
createMaintenanceAlert(engineData.key, temp))
}
}
}
Caso de Uso 5: Microserviços - Event-Driven Architecture (Arquitetural)
[editar | editar código]Cenário: Aplicação de delivery com múltiplos microserviços comunicando-se via eventos.
Conceitos Aplicados:
- Event Sourcing: Toda mudança de estado é um evento
- CQRS: Separação entre comandos e consultas
- Saga Pattern: Transações distribuídas
- Schema Evolution: Evolução de formatos sem quebrar compatibilidade
- Dead Letter Queues: Tratamento de falhas persistentes
Arquitetura Event-Driven com Kafka:
User Service → "user.registered" → Kafka → Notification Service
↓
Order Service → "order.created" → Kafka → Inventory Service
↓ → Payment Service
Payment Service → "payment.completed" → Kafka → Delivery Service
↓ → Order Service
Delivery Service → "delivery.dispatched" → Kafka → Tracking Service
→ Notification Service
Event Sourcing na Prática:
Em vez de armazenar estado atual:
Tabela: users | id | name | email | status | |----|------|-------|--------| | 1 | João | j@... | active |
Armazena sequência de eventos:
Topic: "user-events"
| offset | event_type | user_id | data |
|--------|-----------------|---------|------|
| 1 | user.registered | 1 | {name: "João", email: "j@..."} |
| 2 | user.activated | 1 | {} |
| 3 | email.changed | 1 | {new_email: "joao@..."} |
Saga Pattern para Transações Distribuídas:
Exemplo: Processamento de Pedido (transação que envolve múltiplos serviços)
1. Order Service → "order.create.requested"
2. Inventory Service → "inventory.reserved" ✓ Sucesso
3. Payment Service → "payment.processed" ✓ Sucesso
4. Delivery Service → "delivery.scheduled" ✗ Falha
↓
Compensação (rollback):
5. Payment Service → "payment.refunded"
6. Inventory Service → "inventory.released"
7. Order Service → "order.cancelled"
Implementação do Saga:
// Orchestrator que coordena a saga
class OrderSagaOrchestrator {
fun processOrder(orderId: String) {
// Etapa 1: Reservar estoque
producer.send("inventory-commands", ReserveInventory(orderId))
// Aguarda confirmação via consumer
consumer.subscribe("inventory-events")
val result = waitForEvent("inventory.reserved", orderId, timeout = 30.seconds)
if (result.success) {
// Etapa 2: Processar pagamento
producer.send("payment-commands", ProcessPayment(orderId))
// ... continua saga
} else {
// Compensação: cancelar pedido
producer.send("order-events", OrderCancelled(orderId, "inventory_unavailable"))
}
}
}
Schema Evolution:
Evolução do formato de evento sem quebrar consumers existentes:
// Versão 1 (inicial)
{
"event_type": "user.registered",
"user_id": "123",
"email": "user@example.com"
}
// Versão 2 (adicionado campo opcional)
{
"event_type": "user.registered",
"user_id": "123",
"email": "user@example.com",
"phone": "+5511999999999" // Campo novo, opcional
}
// Versão 3 (campo obrigatório com default)
{
"event_type": "user.registered",
"user_id": "123",
"email": "user@example.com",
"phone": "+5511999999999",
"registration_source": "web" // Novo campo com valor padrão
}
Resumo: Conceitos × Casos de Uso
[editar | editar código]| Conceito | Caso Simples | Caso Intermediário | Caso Avançado | Caso IoT | Caso Arquitetural |
|---|---|---|---|---|---|
| Producer | App web | Sistema checkout | Transação bancária | Sensores veículos | Microserviços |
| Consumer | Serviço email | Múltiplos sistemas | Algoritmos ML | Dashboards | Event handlers |
| Topic | Notificações | Pedidos | Transações | Telemetria | Eventos domínio |
| Partition | Por tipo | Por cliente | Por região | Por veículo | Por agregado |
| Consumer Group | Um grupo | Grupo por sistema | Grupo por análise | Grupo por função | Grupo por serviço |
| Replication | Básica | Crítica | Ultra-crítica | Moderada | Por importância |
| Retention | Dias | Semanas | Anos | Variável | Event sourcing |
Empresas que Utilizam Kafka
[editar | editar código]| Empresa | Uso Principal | Escala | Conceitos Principais Aplicados |
|---|---|---|---|
| Activity streams, métricas | +1 trilhão mensagens/dia | High throughput, Stream processing | |
| Netflix | Monitoramento, logs, recomendações | +8 trilhões eventos/dia | Event sourcing, Real-time analytics |
| Spotify | Event logging, user analytics | +400 bilhões eventos/dia | User behavior tracking, Personalization |
| Uber | Surge pricing, localização em tempo real | +1 trilhão mensagens/dia | Geolocation, Dynamic pricing |
| Airbnb | Analytics, ETL, event streaming | +150 bilhões eventos/dia | Data pipeline, Business intelligence |
Quando NÃO Usar Kafka
[editar | editar código]- Aplicações simples com poucos sistemas integrados → Use REST APIs diretas
- Baixo volume de dados (< 1000 mensagens/hora) → Use filas simples (Redis, SQS)
- Necessidade de entrega exata uma vez sem configuração adicional → Use message brokers tradicionais
- Equipe sem experiência em sistemas distribuídos → Comece com soluções mais simples
- Orçamento limitado para infraestrutura e operação → Considere soluções managed (AWS Kinesis, Google Pub/Sub)
Arquitetura
[editar | editar código]Antes de partir para a implementação, é importante entender como os componentes do Kafka se relacionam:
Visão Geral da Arquitetura
[editar | editar código]┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Producer │───▶│ Kafka │◀───│ Consumer │
│ (Kotlin) │ │ Cluster │ │ (Kotlin) │
└─────────────┘ └─────────────┘ └─────────────┘
│
┌───────▼───────┐
│ Zookeeper │
│ (Metadata) │
└───────────────┘
Fluxo Básico de Funcionamento
[editar | editar código]1. Producer envia mensagem para um Topic ↓ 2. Kafka armazena a mensagem em uma Partição ↓ 3. Consumer lê a mensagem da Partição ↓ 4. Consumer processa a mensagem ↓ 5. Consumer confirma o processamento (Offset)
Tutorial Prático
[editar | editar código]Este tutorial guiará você através da configuração completa do Kafka com Kotlin, desde a instalação até a implementação de producers e consumers funcionais.
Pré-requisitos
[editar | editar código]Antes de começar, certifique-se de ter instalado:
- Docker: Versão 20.10 ou superior
- Docker Compose: Versão 2.0 ou superior
- JDK: Versão 17 ou superior
- Kotlin: Versão 1.9 ou superior (ou use via Gradle)
Verificação dos Pré-requisitos:
# Verificar Docker
docker --version
# Saída esperada: Docker version 20.10.x
# Verificar Docker Compose
docker-compose --version
# Saída esperada: Docker Compose version 2.x.x
# Verificar Java
java --version
# Saída esperada: openjdk 17.x.x ou superior
Passo 1: Instalação do Apache Kafka
[editar | editar código]Opção A: Instalação com Docker (Recomendada)
[editar | editar código]A instalação via Docker é a forma mais simples e confiável de executar Kafka localmente.
1.1 Criar Estrutura do Projeto
# Criar diretório do projeto
mkdir kafka-kotlin-tutorial
cd kafka-kotlin-tutorial
# Criar estruturas necessárias
mkdir -p src/main/kotlin/com/exemplo/kafka/{producer,consumer,models}
1.2 Configurar Docker Compose
Criar arquivo `docker-compose.yml` na raiz do projeto:
version: '3.8'
services:
# Zookeeper - Coordenação do cluster Kafka
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
hostname: zookeeper
container_name: kafka-zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
healthcheck:
test: ["CMD", "nc", "-z", "localhost", "2181"]
interval: 30s
timeout: 10s
retries: 3
# Kafka Broker - Servidor principal
kafka:
image: confluentinc/cp-kafka:7.4.0
hostname: kafka
container_name: kafka-broker
depends_on:
zookeeper:
condition: service_healthy
ports:
- "9092:9092"
- "9101:9101"
environment:
# Configuração do Broker
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
# Configuração de Listeners (conexões)
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
# Configurações de Replicação (ambiente desenvolvimento)
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
# Monitoramento JMX
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
# Auto-criação de tópicos (apenas para desenvolvimento)
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
healthcheck:
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9092"]
interval: 30s
timeout: 10s
retries: 3
# Interface Web para Monitoramento
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
depends_on:
kafka:
condition: service_healthy
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local-cluster
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
healthcheck:
test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://localhost:8080"]
interval: 30s
timeout: 10s
retries: 3
1.3 Iniciar o Cluster Kafka
# Iniciar todos os serviços
docker-compose up -d
# Verificar se todos os containers estão rodando
docker-compose ps
# Aguardar inicialização completa (cerca de 30-60 segundos)
# Verificar logs se necessário
docker-compose logs kafka
Saída Esperada:
NAME IMAGE STATUS kafka-zookeeper confluentinc/cp-zookeeper:7.4.0 Up (healthy) kafka-broker confluentinc/cp-kafka:7.4.0 Up (healthy) kafka-ui provectuslabs/kafka-ui:latest Up (healthy)
Opção B: Instalação Manual (Alternativa)
[editar | editar código]Para quem prefere instalação direta no sistema operacional:
Ubuntu/Debian:
# Instalar Java (se não tiver)
sudo apt update
sudo apt install openjdk-17-jdk
# Baixar Kafka
cd /opt
sudo wget https://downloads.apache.org/kafka/2.13-3.5.1/kafka_2.13-3.5.1.tgz
sudo tar -xzf kafka_2.13-3.5.1.tgz
sudo mv kafka_2.13-3.5.1 kafka
sudo chown -R $USER:$USER /opt/kafka
# Iniciar Zookeeper
cd /opt/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties &
# Iniciar Kafka (em outro terminal)
bin/kafka-server-start.sh config/server.properties &
macOS (com Homebrew):
# Instalar Kafka
brew install kafka
# Iniciar Zookeeper
brew services start zookeeper
# Iniciar Kafka
brew services start kafka
Passo 2: Configuração do Projeto Kotlin
[editar | editar código]2.1 Configurar Gradle
[editar | editar código]Criar arquivo `build.gradle.kts`:
plugins {
kotlin("jvm") version "1.9.10"
application
}
group = "com.exemplo.kafka"
version = "1.0.0"
repositories {
mavenCentral()
}
dependencies {
// Kafka Client
implementation("org.apache.kafka:kafka-clients:3.5.1")
// JSON Processing
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.15.2")
implementation("com.fasterxml.jackson.core:jackson-databind:2.15.2")
// Logging
implementation("org.slf4j:slf4j-simple:2.0.7")
// Kotlin Standard Library
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
}
application {
mainClass.set("com.exemplo.kafka.MainKt")
}
kotlin {
jvmToolchain(17)
}
tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile> {
kotlinOptions {
jvmTarget = "17"
freeCompilerArgs = listOf("-Xjsr305=strict")
}
}
2.2 Configurar Gradle Wrapper
[editar | editar código]# Gerar wrapper do Gradle (se não existir)
gradle wrapper --gradle-version 8.4
# Tornar executável (Linux/macOS)
chmod +x gradlew
Passo 3: Implementação Simples em Kotlin
[editar | editar código]3.1 Visão Geral da Implementação
[editar | editar código]Antes de implementar o código, vamos entender o que vamos construir e como funcionará nosso sistema Kafka com Kotlin.
Cenário do Tutorial
[editar | editar código]Vamos criar um sistema de mensagens distribuído que simula aplicações reais. O cenário é:
- Aplicação Producer: Serviço que envia mensagens (ex: API de pedidos)
- Aplicação Consumer: Serviço que processa mensagens (ex: serviço de notificações)
- Tópico Kafka: `mensagens-tutorial`
- Tipo de Mensagem: Mensagens de texto com remetente e timestamp
- Arquitetura: Aplicações separadas comunicando-se via Kafka
Fluxo de Funcionamento
[editar | editar código]1. APLICAÇÃO PRODUCER (Terminal 1) inicia e envia mensagens ↓ 2. KAFKA armazena mensagens no tópico ↓ 3. APLICAÇÃO CONSUMER (Terminal 2) recebe e processa mensagens ↓ 4. Ambas aplicações rodam independentemente ↓ 5. Producer pode ser parado sem afetar Consumer (e vice-versa)
Estrutura do Tópico
[editar | editar código]| Elemento | Valor | Descrição |
|---|---|---|
| Nome do Tópico | `mensagens-tutorial` | Canal onde mensagens são enviadas/recebidas |
| Partições | 3 (auto-criação) | Kafka divide automaticamente para paralelização |
| Chave da Mensagem | ID da mensagem | Usado para garantir ordem e particionamento |
| Valor da Mensagem | JSON da mensagem | Conteúdo serializado da mensagem |
| Consumer Group | `grupo-tutorial` | Grupo que processa as mensagens |
Exemplo de Mensagem
[editar | editar código]Cada mensagem terá este formato JSON:
{
"id": "msg-001",
"conteudo": "Primeira mensagem do tutorial Kafka + Kotlin!",
"timestamp": "2024-01-15T10:30:45.123",
"remetente": "tutorial"
}
Classes que Implementaremos
[editar | editar código]| Classe | Arquivo | Responsabilidade | Métodos Principais |
|---|---|---|---|
| Mensagem | `models/Mensagem.kt` | Modelo de dados | `toJson()`, `fromJson()` |
| ProducerSimples | `producer/ProducerSimples.kt` | Enviar mensagens para Kafka | `enviarMensagem()`, `enviarMensagens()`, `fechar()` |
| ConsumerSimples | `consumer/ConsumerSimples.kt` | Receber mensagens do Kafka | `iniciarConsumo()`, `processarMensagem()`, `fechar()` |
| AplicacaoProducer | `AplicacaoProducer.kt` | Aplicação independente para enviar mensagens | `main()`, `criarMensagensExemplo()` |
| AplicacaoConsumer | `AplicacaoConsumer.kt` | Aplicação independente para processar mensagens | `main()` |
Detalhamento dos Métodos
[editar | editar código]Classe Mensagem:
- `toJson()`: Converte objeto Mensagem para string JSON
- `fromJson(json: String)`: Cria objeto Mensagem a partir de JSON
Classe ProducerSimples:
- `enviarMensagem(topico, mensagem)`: Envia uma mensagem individual
- `enviarMensagens(topico, lista)`: Envia múltiplas mensagens com intervalo
- `fechar()`: Fecha conexão e libera recursos
Classe ConsumerSimples:
- `iniciarConsumo(topico)`: Inicia loop de consumo de mensagens
- `processarMensagem(mensagem, particao, offset)`: Processa mensagem individual
- `pararConsumo()`: Para o loop de consumo
- `fechar()`: Fecha conexão e libera recursos
AplicacaoProducer:
- `main()`: Aplicação independente que envia mensagens para Kafka
- `criarMensagensExemplo()`: Gera mensagens para demonstração
AplicacaoConsumer:
- `main()`: Aplicação independente que consome mensagens do Kafka
Configurações Kafka Utilizadas
[editar | editar código]Producer:
- Bootstrap Servers: `localhost:9092` (conexão com Kafka)
- Serializers: `StringSerializer` (converte para bytes)
- Acks: `"1"` (aguarda confirmação do líder da partição)
- Retries: `3` (tentativas em caso de falha)
Consumer:
- Bootstrap Servers: `localhost:9092` (conexão com Kafka)
- Group ID: `grupo-tutorial` (identificação do grupo)
- Deserializers: `StringDeserializer` (converte bytes para string)
- Auto Offset Reset: `"earliest"` (lê desde o início do tópico)
- Auto Commit: `false` (commit manual para controle)
3.2 Modelo de Dados
[editar | editar código]Criar `src/main/kotlin/com/exemplo/kafka/models/Mensagem.kt`:
package com.exemplo.kafka.models
import com.fasterxml.jackson.annotation.JsonProperty
import java.time.LocalDateTime
/**
* Modelo simples para demonstrar Kafka com Kotlin
*
* Esta classe representa uma mensagem que será enviada através do Kafka.
* Contém informações básicas como ID, conteúdo, timestamp e remetente.
*
* @property id Identificador único da mensagem (usado como chave no Kafka)
* @property conteudo Texto da mensagem
* @property timestamp Momento de criação da mensagem
* @property remetente Quem enviou a mensagem
*/
data class Mensagem(
@JsonProperty("id")
val id: String,
@JsonProperty("conteudo")
val conteudo: String,
@JsonProperty("timestamp")
val timestamp: String = LocalDateTime.now().toString(),
@JsonProperty("remetente")
val remetente: String = "sistema"
) {
/**
* Converte a mensagem para formato JSON
* Este método será usado pelo Producer para serializar a mensagem
* antes de enviar para o Kafka
*/
fun toJson(): String {
val mapper = com.fasterxml.jackson.module.kotlin.jacksonObjectMapper()
return mapper.writeValueAsString(this)
}
companion object {
/**
* Cria uma Mensagem a partir de uma string JSON
* Este método será usado pelo Consumer para deserializar
* mensagens recebidas do Kafka
*/
fun fromJson(json: String): Mensagem {
val mapper = com.fasterxml.jackson.module.kotlin.jacksonObjectMapper()
return mapper.readValue(json, Mensagem::class.java)
}
}
}
Por que esta estrutura?
- `id`: Usado como chave no Kafka para particionamento e ordenação
- `conteudo`: O texto principal da mensagem
- `timestamp`: Para rastreamento temporal das mensagens
- `remetente`: Identificação de quem enviou (útil para filtros/logs)
- `@JsonProperty`: Garante nomes consistentes na serialização JSON
- `toJson()` / `fromJson()`: Facilita conversão entre objeto e JSON
3.3 Producer Simples
[editar | editar código]Criar `src/main/kotlin/com/exemplo/kafka/producer/ProducerSimples.kt`:
package com.exemplo.kafka.producer
import com.exemplo.kafka.models.Mensagem
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import java.util.*
/**
* Producer Kafka simples para demonstração
*
* Esta classe é responsável por ENVIAR mensagens para o Kafka.
* Ela conecta com o broker Kafka e publica mensagens no tópico especificado.
*
* Funcionalidades principais:
* - Conectar com o cluster Kafka
* - Serializar mensagens para JSON
* - Enviar mensagens de forma assíncrona
* - Tratar erros e confirmações
*/
class ProducerSimples {
private val producer: KafkaProducer<String, String>
init {
// Configuração do Producer Kafka
val props = Properties().apply {
// CONEXÃO: Onde encontrar o Kafka
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
// SERIALIZAÇÃO: Como converter objetos Kotlin em bytes
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
// CONFIABILIDADE: Garantias de entrega
put(ProducerConfig.ACKS_CONFIG, "1") // Aguarda confirmação do líder
put(ProducerConfig.RETRIES_CONFIG, 3) // Tentativas em caso de falha
put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000) // Intervalo entre tentativas
// PERFORMANCE: Otimizações para throughput
put(ProducerConfig.BATCH_SIZE_CONFIG, 16384) // Agrupa mensagens em batches
put(ProducerConfig.LINGER_MS_CONFIG, 5) // Tempo de espera para formar batch
}
producer = KafkaProducer(props)
println("🔗 Producer Kafka conectado e pronto para enviar mensagens")
}
/**
* Envia uma mensagem individual para o tópico especificado
*
* @param topico Nome do tópico Kafka onde enviar a mensagem
* @param mensagem Objeto Mensagem a ser enviado
* @return true se enviado com sucesso, false caso contrário
*/
fun enviarMensagem(topico: String, mensagem: Mensagem): Boolean {
return try {
// Criar registro Kafka
val record = ProducerRecord(
topico, // Tópico de destino
mensagem.id, // Chave (garante ordem por ID)
mensagem.toJson() // Valor (mensagem serializada)
)
// Enviar de forma assíncrona com callback
val future = producer.send(record) { metadata, exception ->
if (exception != null) {
println("❌ Erro ao enviar mensagem ${mensagem.id}: ${exception.message}")
} else {
println("✅ Mensagem ${mensagem.id} enviada com sucesso!")
println(" ├─ Tópico: ${metadata.topic()}")
println(" ├─ Partição: ${metadata.partition()}")
println(" ├─ Offset: ${metadata.offset()}")
println(" └─ Timestamp: ${metadata.timestamp()}")
}
}
// Aguardar confirmação (torna síncrono para este exemplo)
future.get()
true
} catch (e: Exception) {
println("❌ Erro inesperado ao enviar mensagem: ${e.message}")
false
}
}
/**
* Envia múltiplas mensagens com intervalo entre elas
*
* @param topico Nome do tópico Kafka
* @param mensagens Lista de mensagens a serem enviadas
*/
fun enviarMensagens(topico: String, mensagens: List<Mensagem>) {
println("📤 Iniciando envio de ${mensagens.size} mensagens para o tópico '$topico'")
mensagens.forEachIndexed { index, mensagem ->
println("\n📝 Enviando mensagem ${index + 1}/${mensagens.size}")
val sucesso = enviarMensagem(topico, mensagem)
if (sucesso) {
println(" ✅ Mensagem enviada: \"${mensagem.conteudo}\"")
} else {
println(" ❌ Falha no envio da mensagem ${mensagem.id}")
}
// Pausa entre mensagens para visualizar o processo
Thread.sleep(1000)
}
println("\n🎉 Envio de mensagens concluído!")
}
/**
* Fecha o producer e libera recursos
* IMPORTANTE: Sempre chamar este método ao finalizar
*/
fun fechar() {
println("🔒 Fechando producer e liberando recursos...")
producer.close()
println("✅ Producer fechado com sucesso")
}
}
Pontos importantes do Producer:
- Bootstrap Servers: Como o producer encontra o cluster Kafka
- Chave da Mensagem: Usamos `mensagem.id` para garantir ordem
- Callback Assíncrono: Recebe confirmação ou erro de cada envio
- Batch Processing: Kafka agrupa mensagens para eficiência
- Retry Logic: Tenta novamente em caso de falhas temporárias
3.4 Consumer Simples
[editar | editar código]Criar `src/main/kotlin/com/exemplo/kafka/consumer/ConsumerSimples.kt`:
package com.exemplo.kafka.consumer
import com.exemplo.kafka.models.Mensagem
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import java.time.Duration
import java.util.*
/**
* Consumer Kafka simples para demonstração
*
* Esta classe é responsável por RECEBER mensagens do Kafka.
* Ela se conecta ao broker, inscreve-se em um tópico e processa
* mensagens conforme elas chegam.
*
* Funcionalidades principais:
* - Conectar com o cluster Kafka
* - Inscrever-se em tópicos específicos
* - Fazer polling (buscar) por novas mensagens
* - Deserializar JSON para objetos Kotlin
* - Processar mensagens de forma controlada
* - Confirmar processamento (commit de offset)
*/
class ConsumerSimples(private val grupoConsumidor: String) {
private val consumer: KafkaConsumer<String, String>
private var continuarConsumindo = true
init {
// Configuração do Consumer Kafka
val props = Properties().apply {
// CONEXÃO: Onde encontrar o Kafka
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
// IDENTIFICAÇÃO: Grupo de consumidores
put(ConsumerConfig.GROUP_ID_CONFIG, grupoConsumidor)
// DESERIALIZAÇÃO: Como converter bytes em objetos Kotlin
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
// POSICIONAMENTO: De onde começar a ler
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") // Lê desde o início
// CONFIRMAÇÃO: Como confirmar processamento
put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") // Commit manual
put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
// SESSÃO: Configurações de conectividade
put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") // Timeout da sessão
put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000") // Heartbeat
}
consumer = KafkaConsumer(props)
println("🔗 Consumer Kafka criado para o grupo '$grupoConsumidor'")
}
/**
* Inicia o consumo de mensagens do tópico especificado
*
* Este método:
* 1. Inscreve o consumer no tópico
* 2. Entra em loop infinito fazendo polling
* 3. Processa cada mensagem recebida
* 4. Confirma o processamento (commit)
*
* @param topico Nome do tópico para consumir mensagens
*/
fun iniciarConsumo(topico: String) {
try {
// INSCRIÇÃO: Dizer ao Kafka que queremos mensagens deste tópico
consumer.subscribe(listOf(topico))
println("🎯 Consumer inscrito no tópico: $topico")
println("👥 Grupo de consumidores: $grupoConsumidor")
println("⏳ Aguardando mensagens...\n")
var contadorMensagens = 0
// LOOP PRINCIPAL: Buscar e processar mensagens continuamente
while (continuarConsumindo) {
// POLLING: Buscar novas mensagens (aguarda até 1 segundo)
val records = consumer.poll(Duration.ofMillis(1000))
// PROCESSAR: Cada mensagem recebida
records.forEach { record ->
try {
contadorMensagens++
// DESERIALIZAR: Converter JSON para objeto Mensagem
val mensagem = Mensagem.fromJson(record.value())
// PROCESSAR: Executar lógica de negócio
processarMensagem(mensagem, record.partition(), record.offset(), contadorMensagens)
// CONFIRMAR: Commit manual do offset
consumer.commitSync()
} catch (e: Exception) {
println("❌ Erro ao processar mensagem:")
println(" ├─ Partição: ${record.partition()}")
println(" ├─ Offset: ${record.offset()}")
println(" ├─ Valor: ${record.value()}")
println(" └─ Erro: ${e.message}")
}
}
// OTIMIZAÇÃO: Pequena pausa se não há mensagens
if (records.isEmpty) {
Thread.sleep(100)
}
}
} catch (e: Exception) {
println("❌ Erro crítico no consumer: ${e.message}")
} finally {
fechar()
}
}
/**
* Processa uma mensagem individual
*
* Aqui é onde colocamos a LÓGICA DE NEGÓCIO específica
* da nossa aplicação. No tutorial, apenas exibimos a mensagem,
* mas em aplicações reais aqui teríamos:
* - Salvamento em banco de dados
* - Envio de emails/notificações
* - Chamadas para APIs externas
* - Transformação de dados
*
* @param mensagem Objeto mensagem deserializado
* @param particao Partição de onde veio a mensagem
* @param offset Posição da mensagem na partição
* @param contador Número sequencial de mensagens processadas
*/
private fun processarMensagem(mensagem: Mensagem, particao: Int, offset: Long, contador: Int) {
println("📨 Mensagem #$contador recebida:")
println(" ├─ ID: ${mensagem.id}")
println(" ├─ Conteúdo: ${mensagem.conteudo}")
println(" ├─ Remetente: ${mensagem.remetente}")
println(" ├─ Timestamp: ${mensagem.timestamp}")
println(" ├─ Partição: $particao")
println(" └─ Offset: $offset")
println()
// SIMULAR PROCESSAMENTO: Na vida real seria bem mais complexo
Thread.sleep(500)
// Exemplos do que poderia ser feito aqui:
// - salvarNoBancoDeDados(mensagem)
// - enviarNotificacao(mensagem.remetente, mensagem.conteudo)
// - atualizarCache(mensagem.id, mensagem)
// - chamarApiExterna(mensagem)
}
/**
* Para o loop de consumo
* Útil para shutdown graceful da aplicação
*/
fun pararConsumo() {
println("🛑 Solicitação de parada do consumer recebida")
continuarConsumindo = false
}
/**
* Fecha o consumer e libera recursos
* IMPORTANTE: Sempre chamar este método ao finalizar
*/
fun fechar() {
println("🔒 Fechando consumer e liberando recursos...")
consumer.close()
println("✅ Consumer fechado com sucesso")
}
}
Pontos importantes do Consumer:
- Consumer Group: Permite balanceamento de carga entre múltiplos consumers
- Offset Management: Controla qual mensagem foi processada por último
- Polling Model: Consumer "puxa" mensagens (não são "empurradas")
- Manual Commit: Confirma processamento apenas após sucesso
- Error Handling: Trata erros sem parar o consumer
3.5 Aplicações Separadas (Arquitetura Real)
[editar | editar código]⚠️ IMPORTANTE: Na prática, producer e consumer rodam em aplicações diferentes. Isso reflete melhor a realidade de sistemas distribuídos onde:
- Producer: Pode ser uma API REST, um serviço de pedidos, etc.
- Consumer: Pode ser um serviço de notificações, processamento de dados, etc.
3.5.1 Aplicação Producer
[editar | editar código]Criar `src/main/kotlin/com/exemplo/kafka/AplicacaoProducer.kt`:
package com.exemplo.kafka
import com.exemplo.kafka.models.Mensagem
import com.exemplo.kafka.producer.ProducerSimples
/**
* Aplicação Producer - Envia mensagens para Kafka
*
* Esta aplicação simula um serviço que gera eventos/mensagens.
* Exemplos reais: API de e-commerce enviando eventos de pedidos,
* sensor IoT enviando telemetria, sistema de logs, etc.
*
* EXECUÇÃO: ./gradlew run -PmainClass=com.exemplo.kafka.AplicacaoProducer
*/
fun main() {
println("📤 APLICAÇÃO PRODUCER - Kafka + Kotlin")
println("======================================\n")
val topico = "mensagens-tutorial"
// Aguardar Kafka estar disponível
println("⏳ Conectando com Kafka...")
Thread.sleep(2000)
// Criar producer
val producer = ProducerSimples()
try {
// Cenário 1: Enviar mensagens de exemplo (simulando lote inicial)
println("📝 Cenário 1: Enviando lote de mensagens de exemplo\n")
val mensagensExemplo = criarMensagensExemplo()
producer.enviarMensagens(topico, mensagensExemplo)
Thread.sleep(3000)
// Cenário 2: Simular chegada de novas mensagens (como uma API recebendo requests)
println("\n📨 Cenário 2: Simulando chegada de novas mensagens...\n")
val novasMensagens = simularNovasMensagens()
novasMensagens.forEach { mensagem ->
producer.enviarMensagem(topico, mensagem)
Thread.sleep(2000) // Simula intervalo entre requests
}
println("\n✅ Producer finalizou o envio de todas as mensagens")
println("🔄 Consumer pode continuar rodando em outra aplicação")
} catch (e: Exception) {
println("❌ Erro na aplicação producer: ${e.message}")
} finally {
producer.fechar()
println("👋 Aplicação Producer finalizada")
}
}
/**
* Cria mensagens de exemplo iniciais
*/
fun criarMensagensExemplo(): List<Mensagem> {
return listOf(
Mensagem(
id = "msg-001",
conteudo = "Sistema iniciado - Primeira mensagem",
remetente = "sistema"
),
Mensagem(
id = "msg-002",
conteudo = "Tutorial Kafka + Kotlin em execução",
remetente = "tutorial"
),
Mensagem(
id = "msg-003",
conteudo = "Mensagem de teste de conectividade",
remetente = "health-check"
)
)
}
/**
* Simula chegada de mensagens (como requests de uma API)
*/
fun simularNovasMensagens(): List<Mensagem> {
return listOf(
Mensagem(
id = "pedido-001",
conteudo = "Novo pedido recebido: Smartphone XYZ",
remetente = "api-pedidos"
),
Mensagem(
id = "user-005",
conteudo = "Usuário realizou login no sistema",
remetente = "api-auth"
),
Mensagem(
id = "notif-001",
conteudo = "Enviar email de confirmação para cliente",
remetente = "sistema-notificacoes"
)
)
}
3.5.2 Aplicação Consumer
[editar | editar código]Criar `src/main/kotlin/com/exemplo/kafka/AplicacaoConsumer.kt`:
package com.exemplo.kafka
import com.exemplo.kafka.consumer.ConsumerSimples
/**
* Aplicação Consumer - Processa mensagens do Kafka
*
* Esta aplicação simula um serviço que processa eventos/mensagens.
* Exemplos reais: serviço de notificações, processamento de pedidos,
* analytics, indexação para busca, backup de dados, etc.
*
* EXECUÇÃO: ./gradlew run -PmainClass=com.exemplo.kafka.AplicacaoConsumer
*/
fun main() {
println("📨 APLICAÇÃO CONSUMER - Kafka + Kotlin")
println("======================================\n")
val topico = "mensagens-tutorial"
val grupoConsumidor = "processador-mensagens"
println("🎯 Configuração do Consumer:")
println(" ├─ Tópico: $topico")
println(" ├─ Grupo: $grupoConsumidor")
println(" └─ Broker: localhost:9092\n")
// Aguardar Kafka estar disponível
println("⏳ Conectando com Kafka...")
Thread.sleep(1000)
// Criar e iniciar consumer
val consumer = ConsumerSimples(grupoConsumidor)
try {
println("🔄 Iniciando consumo contínuo...")
println("📨 Aguardando mensagens (Ctrl+C para parar)...\n")
// IMPORTANTE: Este loop roda indefinidamente
// Na prática, seria um serviço que roda 24/7
consumer.iniciarConsumo(topico)
} catch (e: InterruptedException) {
println("\n🛑 Consumer interrompido pelo usuário")
} catch (e: Exception) {
println("❌ Erro na aplicação consumer: ${e.message}")
} finally {
consumer.fechar()
println("👋 Aplicação Consumer finalizada")
}
}
3.5.3 Configuração do Gradle para Múltiplas Aplicações
[editar | editar código]Atualizar `build.gradle.kts` para suportar execução de aplicações específicas:
plugins {
kotlin("jvm") version "1.9.10"
application
}
group = "com.exemplo.kafka"
version = "1.0.0"
repositories {
mavenCentral()
}
dependencies {
// Kafka Client
implementation("org.apache.kafka:kafka-clients:3.5.1")
// JSON Processing
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.15.2")
implementation("com.fasterxml.jackson.core:jackson-databind:2.15.2")
// Logging
implementation("org.slf4j:slf4j-simple:2.0.7")
// Kotlin Standard Library
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
}
// Configuração para aplicação padrão (Consumer)
application {
mainClass.set("com.exemplo.kafka.AplicacaoConsumerKt")
}
kotlin {
jvmToolchain(17)
}
tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile> {
kotlinOptions {
jvmTarget = "17"
freeCompilerArgs = listOf("-Xjsr305=strict")
}
}
// Task específica para executar o Producer
tasks.register<JavaExec>("runProducer") {
group = "application"
description = "Executa a aplicação Producer"
classpath = sourceSets["main"].runtimeClasspath
mainClass.set("com.exemplo.kafka.AplicacaoProducerKt")
}
// Task específica para executar o Consumer
tasks.register<JavaExec>("runConsumer") {
group = "application"
description = "Executa a aplicação Consumer"
classpath = sourceSets["main"].runtimeClasspath
mainClass.set("com.exemplo.kafka.AplicacaoConsumerKt")
}
Vantagens desta Arquitetura:
- 🔄 Independência: Producer e Consumer podem ser reiniciados independentemente
- 📈 Escalabilidade: Pode executar múltiplos consumers para balanceamento de carga
- 🛠️ Manutenção: Atualizações em um serviço não afetam o outro
- 🎯 Especialização: Cada aplicação foca em sua responsabilidade específica
- 🌐 Distribuição: Podem rodar em servidores diferentes
- ⚡ Performance: Não há interferência entre producer e consumer
Resumo da Implementação
[editar | editar código]| Componente | Responsabilidade | Execução | Comportamento |
|---|---|---|---|
| Mensagem | Estrutura de dados | - | Objeto serializable JSON |
| ProducerSimples | Enviar mensagens | - | Classe utilitária |
| ConsumerSimples | Receber mensagens | - | Classe utilitária |
| AplicacaoProducer | Aplicação independente | `./gradlew runProducer` | Envia mensagens e finaliza |
| AplicacaoConsumer | Aplicação independente | `./gradlew runConsumer` | Processa mensagens continuamente |
Agora temos um sistema distribuído real onde: 1. AplicacaoProducer envia mensagens e pode ser finalizada 2. AplicacaoConsumer fica rodando continuamente processando mensagens 3. Ambas aplicações são independentes e podem rodar em momentos diferentes 4. Reflete a arquitetura real de sistemas distribuídos com Kafka
Passo 4: Testando a Implementação
[editar | editar código]4.1 Verificar se Kafka está Rodando
[editar | editar código]# Verificar containers
docker-compose ps
# Verificar logs do Kafka
docker-compose logs kafka | tail -20
# Testar conectividade
docker exec kafka-broker kafka-topics --list --bootstrap-server localhost:9092
4.2 Compilar o Projeto
[editar | editar código]# Compilar o projeto
./gradlew build
# Verificar se compilou sem erros
./gradlew compileKotlin
4.3 Executar as Aplicações Separadas
[editar | editar código]⭐ NOVA ABORDAGEM: Aplicações Independentes
Agora temos duas aplicações separadas que simulam um ambiente distribuído real:
4.3.1 Executar Consumer (Terminal 1)
[editar | editar código]# Executar aplicação Consumer (fica rodando continuamente)
./gradlew runConsumer
Saída do Consumer:
📨 APLICAÇÃO CONSUMER - Kafka + Kotlin ====================================== 🎯 Configuração do Consumer: ├─ Tópico: mensagens-tutorial ├─ Grupo: processador-mensagens └─ Broker: localhost:9092 ⏳ Conectando com Kafka... 🔗 Consumer Kafka criado para o grupo 'processador-mensagens' 🔄 Iniciando consumo contínuo... 📨 Aguardando mensagens (Ctrl+C para parar)... 🎯 Consumer inscrito no tópico: mensagens-tutorial 👥 Grupo de consumidores: processador-mensagens ⏳ Aguardando mensagens...
4.3.2 Executar Producer (Terminal 2)
[editar | editar código]# Em outro terminal, executar aplicação Producer
./gradlew runProducer
Saída do Producer:
📤 APLICAÇÃO PRODUCER - Kafka + Kotlin ====================================== ⏳ Conectando com Kafka... 🔗 Producer Kafka conectado e pronto para enviar mensagens 📝 Cenário 1: Enviando lote de mensagens de exemplo 📤 Iniciando envio de 3 mensagens para o tópico 'mensagens-tutorial' 📝 Enviando mensagem 1/3 ✅ Mensagem msg-001 enviada com sucesso! ├─ Tópico: mensagens-tutorial ├─ Partição: 0 ├─ Offset: 0 └─ Timestamp: 1641234567890 ✅ Mensagem enviada: "Sistema iniciado - Primeira mensagem" ... 📨 Cenário 2: Simulando chegada de novas mensagens... 📝 Enviando mensagem 1/1 ✅ Mensagem pedido-001 enviada com sucesso! ├─ Tópico: mensagens-tutorial ├─ Partição: 1 ├─ Offset: 0 └─ Timestamp: 1641234571890 ✅ Mensagem enviada: "Novo pedido recebido: Smartphone XYZ" ✅ Producer finalizou o envio de todas as mensagens 🔄 Consumer pode continuar rodando em outra aplicação 🔒 Fechando producer e liberando recursos... ✅ Producer fechado com sucesso 👋 Aplicação Producer finalizada
4.3.3 Saída no Consumer (Terminal 1)
[editar | editar código]Enquanto isso, no Terminal 1 (Consumer), você verá:
📨 Mensagem #1 recebida: ├─ ID: msg-001 ├─ Conteúdo: Sistema iniciado - Primeira mensagem ├─ Remetente: sistema ├─ Timestamp: 2024-01-15T10:30:45.123 ├─ Partição: 0 └─ Offset: 0 📨 Mensagem #2 recebida: ├─ ID: msg-002 ├─ Conteúdo: Tutorial Kafka + Kotlin em execução ├─ Remetente: tutorial ├─ Timestamp: 2024-01-15T10:30:46.456 ├─ Partição: 2 └─ Offset: 0 📨 Mensagem #3 recebida: ├─ ID: pedido-001 ├─ Conteúdo: Novo pedido recebido: Smartphone XYZ ├─ Remetente: api-pedidos ├─ Timestamp: 2024-01-15T10:30:51.789 ├─ Partição: 1 └─ Offset: 0 # Consumer continua rodando, aguardando novas mensagens...
4.3.4 Comandos Alternativos
[editar | editar código]# Executar Consumer como aplicação padrão
./gradlew run
# Executar Producer usando classe específica
./gradlew run -PmainClass=com.exemplo.kafka.AplicacaoProducerKt
# Executar Consumer usando classe específica
./gradlew run -PmainClass=com.exemplo.kafka.AplicacaoConsumerKt
🎯 Vantagens desta Abordagem:
- Realismo: Simula ambiente de produção onde serviços rodam independentemente
- Flexibilidade: Producer pode ser executado múltiplas vezes sem reiniciar Consumer
- Escalabilidade: Pode executar múltiplos Consumers em paralelo
- Manutenção: Atualizações em um serviço não afetam o outro
- Teste: Facilita teste de cenários como falhas e recuperação
Passo 5: Monitoramento e Verificação
[editar | editar código]5.1 Interface Web Kafka UI
[editar | editar código]Acesse localhost:8080 no navegador para:
- Visualizar tópicos criados automaticamente
- Monitorar mensagens em tempo real
- Verificar consumer groups e seu status
- Analisar performance do cluster
5.2 Comandos de Linha Kafka
[editar | editar código]# Listar tópicos
docker exec kafka-broker kafka-topics --list --bootstrap-server localhost:9092
# Detalhar tópico específico
docker exec kafka-broker kafka-topics --describe \
--topic mensagens-tutorial --bootstrap-server localhost:9092
# Consumir mensagens via linha de comando
docker exec kafka-broker kafka-console-consumer \
--topic mensagens-tutorial --bootstrap-server localhost:9092 \
--from-beginning
# Produzir mensagens via linha de comando
docker exec -it kafka-broker kafka-console-producer \
--topic mensagens-tutorial --bootstrap-server localhost:9092
5.3 Verificar Consumer Groups
[editar | editar código]# Listar grupos de consumidores
docker exec kafka-broker kafka-consumer-groups \
--bootstrap-server localhost:9092 --list
# Detalhar grupo específico
docker exec kafka-broker kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--group grupo-tutorial --describe
Passo 6: Experimentações Adicionais
[editar | editar código]6.1 Teste com Múltiplos Consumers
[editar | editar código]Execute a aplicação em múltiplos terminais para ver o balanceamento de carga:
# Terminal 1
./gradlew run
# Terminal 2 (em outro terminal)
./gradlew run
6.2 Teste de Tolerância a Falhas
[editar | editar código]# 1. Execute a aplicação
./gradlew run
# 2. Em outro terminal, pare o Kafka
docker-compose stop kafka
# 3. Reinicie o Kafka
docker-compose start kafka
# Observe como a aplicação se comporta
6.3 Criação Manual de Tópicos
[editar | editar código]# Criar tópico com configurações específicas
docker exec kafka-broker kafka-topics --create \
--topic meu-topico-personalizado \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1
# Configurar retenção de mensagens
docker exec kafka-broker kafka-configs --alter \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name meu-topico-personalizado \
--add-config retention.ms=86400000
Solução de Problemas
[editar | editar código]Problemas Comuns
[editar | editar código]| Problema | Sintoma | Solução |
|---|---|---|
| Kafka não inicia | Container em estado "Exited" | Verificar logs: docker-compose logs kafka
|
| Conexão recusada | Connection refused |
Aguardar inicialização ou verificar porta 9092 |
| Mensagens não aparecem | Consumer não recebe mensagens | Verificar nome do tópico e grupo de consumidores |
| Erro de serialização | JsonParseException |
Verificar formato JSON das mensagens |
| OutOfMemoryError | JVM fica sem memória | Ajustar heap: -Xmx512m
|
Comandos de Diagnóstico
[editar | editar código]# Verificar se portas estão abertas
netstat -tuln | grep -E "(9092|2181|8080)"
# Verificar logs em tempo real
docker-compose logs -f kafka
# Resetar ambiente (CUIDADO: apaga dados)
docker-compose down -v
docker-compose up -d
# Verificar conectividade interna
docker exec kafka-broker kafka-broker-api-versions \
--bootstrap-server localhost:9092
Logs Úteis
[editar | editar código]# Logs detalhados do Kafka
docker-compose logs kafka | grep -E "(ERROR|WARN)"
# Logs do consumer
docker-compose logs kafka | grep "consumer"
# Monitorar criação de tópicos
docker-compose logs kafka | grep "Created topic"
Finalização
[editar | editar código]Para parar todo o ambiente:
# Parar aplicação Kotlin (Ctrl+C no terminal)
# Parar containers Docker
docker-compose down
# Parar e remover volumes (dados são perdidos)
docker-compose down -v
Próximos Passos
[editar | editar código]Após concluir este tutorial básico, você pode explorar:
- Kafka Streams: Processamento de streams em tempo real
- Schema Registry: Gerenciamento de esquemas de mensagens
- Configurações de Produção: Replicação, particionamento otimizado
- Monitoramento Avançado: Métricas e alertas
- Segurança: SSL/TLS e autenticação SASL
- Performance Tuning: Otimização para alto throughput
Glossário
[editar | editar código]- Bootstrap Servers
- Lista de brokers Kafka para conexão inicial
- Consumer Group
- Conjunto de consumers que dividem o processamento de um tópico
- Idempotência
- Garantia de que operações duplicadas não causam efeitos colaterais
- Lag
- Diferença entre mensagens produzidas e consumidas
- Offset
- Posição sequencial de uma mensagem dentro de uma partição
- Partition
- Divisão lógica de um tópico para paralelização
- Rebalancing
- Redistribuição de partições entre consumers de um grupo
- Replication Factor
- Número de cópias de cada partição no cluster