RabbitMQ
RabbitMQ[1] é um broker de mensagens open-source desenvolvido pela VMware, escrito em Erlang. Ele funciona originalmente com o AMQP, e eventualmente foi extendido por plugins para outros protocolos (como STOMP e MQTT).
O programa age como um gerenciador de produtores e consumidores de mensagens, facilitando a interação por meio do método de comunicação assíncrona (onde não se espera por uma resposta imediata ao enviar uma mensagem), e providenciando diferentes maneiras de realizar a comunicação entre eles.
O RabbitMQ é comumente utilizado para chamadas de procedimento remoto, streaming, e minimizar a dependência entre serviços interligados. Suas principais características são a interoperabilidade (diversas bibliotecas para clientes disponíveis e pode ser utilizado com a linguagem de programação de sua preferência), flexibilidade (você definir como suas mensagens são publicadas e consumidas, gerenciando vínculos, filtros de conversas, dentre outras opções de definição) e confiabilidade (reconhecimento de entrega e recebimento de mensagens, além do uso de filas modernas[2])
AMQP
[editar | editar código-fonte]AMQP[3] (Advanced Message Queueing Protocol) é um protocolo open-source que funciona como padrão de envio de mensagens encriptadas e interoperáveis. Sua estrutura possui filas, conversas e vínculos, que são chamadas de "Entidades AMQP". O protocolo é também programável, ou seja, o funcionamento das entidades e rotas são definidos pela própria aplicação. Tendo isto em mente, os desenvolvedores devem tomar cuidado com potenciais conflitos de definição (embora raramente aconteçam). As principais funcionalidades do protocolo são orientação de mensagem, enfileiramento, roteamento, confiabilidade e segurança.
Filas
[editar | editar código-fonte]São buffers que armazenam mensagens até que sejam consumidas por um consumidor, garantindo que sejam entregues em ordem de chegada. Por padrão, o RabbitMQ utiliza o Round-Robin como método de distribuição. As filas possuem algumas propriedades compartilhadas com as conversas mas também possuem algumas próprias como nome, durabilidade (se ela só existe em tempo de execução ou não), exclusividade (a fila é utilizada por apenas uma conexão, sendo removida quando esta conexão fecha), auto-remoção (Se a fila já teve consumidores, ela é removida quando o último consumidor para de consumir ela) e argumentos (utilizados por plugins e funcionalidades específicas do broker). Elas devem ser declaradas para serem utilizadas. Caso uma fila já exista com os mesmos atributos, nada acontece, mas caso possua atributos diferentes, ocorrerá um ERRO 406 (PRECONDITION_FAILED). Os nomes podem ser escolhidos pelas aplicações/serviços ou pelo próprio broker (caso uma string vazia seja passada como nome). Nomes que comecem com "amq." acarretam ERRO 403 (ACESS_REFUSED) pois são reservados para uso interno. Para mais informações sobre filas AMQP, visite Queues.
Conversas
[editar | editar código-fonte]Conversas[4] são as entidades que recebem as mensagens e as roteiam para suas respectivas filas, baseando-se no tipo e no vínculo dela. O RabbitMQ possui quatro tipos de conversas: Conversa Direta, Conversa Dispersa, Conversa Tópica e Conversa por Cabeçalho. Cada uma delas possui atributos como, por exemplo: nome, durabilidade, auto-remoção (A conversa é removida quando a última fila é desvinculada dela) e argumentos.
- A Conversa Padrão é uma Conversa Direta sem nome e previamente declarada pelo broker. Assim, todas as filas criadas são automaticamente vinculadas à ela, fazendo parecer que todas as mensagem enviadas são enviadas diretamente para cada fila, mesmo tecnicamente não sendo isso o que acontece. O RabbitMQ NÃO PERMITE operações de vínculo com a conversa padrão, qualquer tentativa resultará em erro.
- A Conversa Direta envia mensagens baseadas na chave de roteamento. Primeiro uma fila é vinculada à conversa com a chave de roteamento C, depois, quando uma mensagem chega com a chave K, a Conversa roteia ela para a fila onde K = C. Se várias filas cumprirem esta condição, todas recebem a mensagem.
- A Conversa Dispersa faz com que uma mensagem publicada pelo produtor seja enviada para todas as filas ligadas à conversa dispersa, e nesse caso, a chave de roteamento é ignorada. Isso é útil para envio de mensagens em broadcast, como num site de placar de partidas de futebol e tabelas de pontuação máxima num videogame.
- A Conversa Tópica direciona mensagens baseado na chave de roteamento, e no padrão utilizado na hora de vincular a fila a uma conversa. Isso pode ser utilizado a fim de implementar um roteamento para diferentes variações nos vínculos de publicação e inscrição nas filas. Assim, quando uma mensagem com um certo padrão de vínculo for publicada, todas as filas que escolheram esse padrão receberão essa mensagem.
- A Conversa por Cabeçalho direciona mensagens baseado nos headers da mensagem ao invés de utilizar uma chave de roteamento. Com isso, podemos especificar um número variado de headers para uma mensagem, e um conjunto de headers que uma conversa vai receber. É possível escolher se uma conversa vai receber uma mensagem que inclui obrigatoriamente todos os headers escolhidos, ou apenas algum deles com o parâmetro
x-match
.
Vínculos
[editar | editar código-fonte]Funcionam como regras utilizadas pelas conversas para rotear mensagens para as filas. Assim, para uma conversa T rotear uma mensagem para a fila Q, Q deve estar vinculada à T. Vínculos podem ter uma chave de rota que age como um filtro determinando para quais filas uma determinada mensagem vai. Caso uma mensagem não seja enviada para nenhuma fila, ela ou é perdida ou é retornada ao produtor, dependendo dos atributos dela (Isso pode acontecer caso a conversa no qual foi publicada não possua nenhum vínculo).
Consumidores
[editar | editar código-fonte]Aplicações ou serviços que consomem as mensagens armazenadas nas filas. Existem duas maneiras que as mensagens podem ser consumidas:
- Push API - A opção recomendada. O consumidor se inscreve em uma fila e recebe as mensagens conforme elas ficam disponíveis.
- Pull API (Polling) - Ineficiente e deve ser evitada. O consumidor pede as mensagens que a fila possui no momento.
Cada consumidor possui uma consumer-tag que é uma string que o identifica. É possível ter mais de um consumidor por fila, mas também é possível ter exclusivos, ou seja, clientes que ao consumir de uma fila impedem que todos os outros consumam dela.
Quando uma mensagem é consumida, o broker deve retirá-la da fila. Existem duas maneiras de reconhecer que uma mensagem foi consumida:
- Reconhecimento Automático - Remover a mensagem após o broker enviá-la para a aplicação (Utilizando-se dos métodos
basic.deliver
oubasic.get-ok
). Deve ser considerado pouco seguro, pois caso haja um erro no envio ou um erro de conexão a mensagem será perdida. - Reconhecimento Explícito ("Manual") - Remover a mensagem após o consumidor enviar um "OK" (Utilizando-se do método
basic.ack
).
No segundo modo, caso o broker não receba uma confirmação do cliente, ele irá reenviar a mensagem para o próximo consumidor, e caso não hajam consumidores, o broker espera um consumidor para tentar o reenvio. Se o cliente receber a mensagem mas tiver um erro de processamento ou não puder processá-la no momento, ele poderá enviar um basic.reject
informando ao broker que a mensagem foi rejeitada. Pode também ser útil especificar quantas mensagens cada consumidor pode receber para assim enviar o reconhecimento quando todas as mensagens forem enviadas de uma vez. No RabbitMQ pode-se rejeitar múltiplas mensagens utilizando da flag multiple = true
do método basic.nack
.
Produtores
[editar | editar código-fonte]São aplicações que publicam mensagens em conversas para serem geridas pelo broker. Caso uma mensagem seja publicada em uma conversa inexistente, ocorrerá um ERRO 404 (NOT_FOUND), e o canal de conexão do produtor será fechado. Todas as mensagens possuem propriedades que podem ser acessadas pelos clientes, algumas propriedades são configuradas pelos produtores na hora do envio ou pelo RabbitMQ durante o roteamento. Propriedades importantes de uma mensagem (O RabbitMQ não interfere nessas propriedade, é total responsabilidade dos clientes gerenciarem e validarem estes campos):
- Type - Definida pelo produtor para ajudar as aplicações à entenderem que tipo de mensagem ela é. Naturalmente as mensagens acabam sendo divididas em grupos, por isso é comum (mas não obrigatório) a nomenclatura onde os tipos são nomes separados por ponto, como
profile.username.changed
ourequest.created
. É recomendado que o consumidor saiba lidar com uma mensagem com tipo desconhecido para facilitar solução de problemas. - Content Type - Informa o consumidor o tipo de mensagem que foi enviada. Por exemplo, caso a mensagem seja um JSON o Content Type especificado deve ser
application/json
. - Content Encoding - Diz o tipo de codificação da mensagem, por exemplo, se a mensagem foi comprimida por um algoritimo GZip, o Content Encoding deve ser
gzip
. É possível especificar vários tipos de codificação separando-os com uma vírgula.
Para confirmar o envio de uma mensagem, são também utilizados meios iguais aos reconhecimentos dos consumidores apresentados anteriormente, com os métodos basic.ack
e basic.nack
. Para isso, o produtor deve primeiro configurar o canal com o broker para modo de confirmação utilizando o método confirm.select
que deverá ser respondido por um confirm.select-ok
. Existem algumas estratégias de como utilizar este reconhecimento, que podem ser vistas em Strategies For Publishing Confirm. Caso uma mensagem publicada não pôde ser enviada para nenhuma fila, se o atributo mandatory
for true
, a mensagem será devolvida para o produtor, que deverá saber lidar com isso, se o atributo for false
, a mensagem é descartada ou enviada para uma conversa alternativa se existir.
Métodos
[editar | editar código-fonte]Métodos são operações do protocolo e não devem ser confundidos com métodos de programação orientada a objetos. Todos os métodos são agrupados em Classes. A classe queue
por exemplo, possui os métodos queue.declare
e queue.declare-ok
, ambos estão relacionados. Quando um cliente declara uma nova fila, ele envia um pacote com a operação queue.declare
e, caso a criação seja um sucesso, o broker devolve um pacote com o queue.declare-ok
para o cliente. As operações enviadas pelos clientes são chamadas de "Requests" (pedidos) e as enviadas pelo broker são chamadas de "Respondes" (respostas). Nem todos os pedidos possuem respostas e alguns pedidos podem ter múltiplas respostas possíveis.
Canais
[editar | editar código-fonte]O AMQP utiliza uma conexão TCP subjacente para garantir suas entregas e ao invés de fechar a conexão TCP quando não for mais necessária a comunicação, deveria ser primeiro encerrada a conexão AMQP para então encerrar a TCP. Algumas aplicações demandam diversas conexões com o broker e para não consumir tanto recurso essas conexões são multiplexadas em canais. Um canal serve para compartilhar as diversas conexões para uma só conexão TCP. Todas as operações AMQP ocorrem em um canal e toda comunicação existente em um canal é separada dos outros. Um canal só existe se há conexão, então se uma conexão for fechada, todos os canais também serão.
Hospedagem virtual
[editar | editar código-fonte]O RabbitMQ é um sistema com arquitetura de multilocação, ou seja, uma única instância dele serve múltiplos clientes. Desta forma, conexões, conversas, filas, permissões, dentre outros, pertencem à uma hospedagem virtual[5]. No RabbitMQ a hospedagem é criada ou apagada pela linha de comando utilizando a ferramenta rabbitmqctl
. A hospedagem virtual permite um agrupamento lógico e separação de recursos para os usuários. Cada hospedagem possui um nome que é especificado quando um cliente AMQP conecta no RabbitMQ, se a autenticação obter êxito e o usuário tiver as devidas permissões a conexão é estabelecida. As conexões em uma hospedagem só podem realizar operação na mesma, para realizar operações entre hospedagens é necessário que o cliente esteja conectado nas duas, assim, podendo por exemplo consumir em uma e publicar na outra.
Para criar uma hospedagem com a ferramenta rabbitmqctl
, basta utilizar o comando add_vhost
e informar o nome da hospedagem.
rabbitmqctl add_vhost exemplo
O comando acima cria uma hospedagem virtual de nome "exemplo", para deletá-la podemos utilizar o comando delete_vhost
:
rabbitmqctl delete_vhost exemplo
Instalação
[editar | editar código-fonte]Para experimentar com o RabbitMQ em sua máquina, basta utilizar a imagem do Docker[6] feito pela comunidade (Versão mais recente: RabbitMQ 3.13):
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
Para mais informações acesse o guia completo de instalação na documentação oficial[7].
Exemplos
[editar | editar código-fonte]Para entender o funcionamento do RabbitMQ na prática, alguns exemplos de uso em diferentes contextos serão aqui demonstrados. Para isso, utilizaremos códigos em python com o cliente Pika, feito para esse propósito. Esse cliente não é obrigatório, e nem é o foco do tutorial, até porque existem várias outras linguagens com clientes já feitos[8] (é possível usar comandos até num shell), mas é útil para realizar as demonstrações de exemplo.
Inicialmente, vamos instalar o cliente Pika com pip install pika
(devemos incluir isso em nosso código com import pika
).
Exemplo de fila
[editar | editar código-fonte]Com isso, vamos criar um código de produtor e um de consumidor para uma fila[9]:
Começando com o produtor, criaremos o arquivo "produtor.py", e iniciaremos estabelecendo uma conexão com o RabbitMQ, que irá gerenciar o broker:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
Aqui, estamos utilizando 'localhost'. Para se conectar a alguma outra máquina, bastariaria colocar o endereço de IP dela. Em seguida, atribuimos o canal à variável channel
Em seguida, declaramos a fila que será utilizada:
fila = input("Digite o nome da fila: ")
channel.queue_declare(queue=fila)
Agora, basta enviar a mensagem, com o nome da fila em routing_key
, a mensagem em body
, e sem especificações de conversa em exchange
:
mensagem = input("Digite a mensagem: ")
channel.basic_publish(exchange='', routing_key=fila, body=mensagem)
print("Mensagem enviada")
No final, fechamos a conexão:
connection.close()
Note que a mensagem já foi enviada e está no broker agora, por isso não é preciso que o produtor fique conectado. Com isso, a mensagem será entregue para algum consumidor que esteja esperando, ou se não houver nenhum consumidor, fica esperando no broker para ser entregue ao próximo consumidor que entrar.
Para a parte do consumidor criaremos o arquivo " consumidor.py" e também iniciaremos estabelecendo uma conexão com o RabbitMQ:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
O próximo passo é declarar a fila novamente. É bom que tanto o produtor e o consumidor declarem a fila antes de tentar entrar nela, pois qualquer um dos dois pode ser executado primeiro. Assim evitamos o erro de entrar numa fila que não existe (declarar uma fila que não existe não dá erro, apenas não faz nada). É possível ainda ver as filas criadas com rabbitmqctl list_queues
, ou então remover uma com rabbitmqctl delete_queue <nome da fila>
.
fila = input("Digite o nome da fila: ")
channel.queue_declare(queue=fila)
Agora, definimos a função callback
, que será ativada quando uma mensagem for recebida, printando a mensagem e enviando um ack de volta:
def callback(ch, method, properties, body):
print("Recebido: " + body.decode())
ch.basic_ack(delivery_tag=method.delivery_tag)
E ligamos essa função à função de consumo que irá de fato ouvir uma fila:
channel.basic_consume(queue='tutorial', auto_ack=True, on_message_callback=callback)
Com isso, falta só começar a consumir:
channel.start_consuming()
E fechar a conexão:
connection.close()
Agora tudo está feito,e podemos rodar os dois códigos em terminais diferentes, com diferentes produtores e consumidores, para testar o funcionamento. Verifique que os comportamentos condizem com o que o RabbitMQ promete em questão de enfileiramento, roteamento e confiabilidade.
Exemplo de conversa
[editar | editar código-fonte]Tendo feito filas, podemos fazer também conversas[10] e aqui usar as funcionalidades dos diferentes tipos. Neste exemplo, vamos utilizar o modo de conversa dispersa, mas mudar o modo é intuitivo. Começando com o código do produtor:
Novamante, estabelecemos a conexão:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
De modo semelhante à fila, declaramos a conversa, com exchange_type='fanout'
:
conversa = input("Digite o nome da conversa: ")
channel.exchange_declare(exchange=conversa, exchange_type='fanout')
Publicando uma mensagem em broadcast (lembrando que nesse formato, não temos routing_key
), e fechando a conexão:
mensagem = input("Digite a mensagem: ")
channel.basic_publish(exchange=conversa, routing_key='', body=mensagem)
print("Mensagem enviada")
connection.close()
Agora, para o código do consumidor, começamos do mesmo modo:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
Declaramos uma fila nova sem nome, já que o tipo de transmissão não se importa com isso, e com o parâmetro exclusive=True
que fará a fila ser apagada após o uso. Então a conectamos à conversa:
fila_nova = channel.queue_declare(queue='', exclusive=True)
nome_fila = fila_nova.method.queue
channel.queue_bind(exchange=conversa, queue=nome_fila)
Do mesmo modo que fizemos no consumidor da fila, o consumidor escuta por mensagens:
def callback(ch, method, properties, body):
print("Recebido: " + body.decode())
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue=nome_fila, on_message_callback=callback)
print("Aguardando mensagens")
channel.start_consuming()
Com isso, é possível testar a funcionalidade de conversa dispersa do RabbitMQ, e ter uma boa ideia de como utilizar o recurso de conversa.
Referências
[editar | editar código-fonte]- ↑ https://www.rabbitmq.com
- ↑ https://www.rabbitmq.com/docs/quorum-queues
- ↑ https://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol
- ↑ https://www.rabbitmq.com/tutorials/amqp-concepts#exchanges
- ↑ https://www.rabbitmq.com/docs/vhosts
- ↑ https://pt.wikiversity.org/wiki/Otimizando_Docker
- ↑ https://www.rabbitmq.com/docs/download#installation-guides
- ↑ https://www.rabbitmq.com/tutorials
- ↑ https://www.rabbitmq.com/tutorials/tutorial-one-python
- ↑ https://www.rabbitmq.com/tutorials/tutorial-three-python