티스토리 뷰

SpringBoot

SpringBoot + RabbitMQ

eello 2024. 11. 13. 12:05
이 글에서 RabbitMQ는 도커 컨테이너로 실행할 것이기 때문에 도커가 설치되어 있다고 가정한다.

 

1. RabbitMQ 실행

`docker run`으로 이미지를 실행시켜도 좋지만 매번 여러 옵션을 입력해 주기는 불편하기 때문에 `docker-compose`를 작성해서 실행시킬 것이다.

# docker-compose.yml
version: '3.8'

services:
  rabbitmq:
    image: rabbitmq:3.13.7-management-alpine
    container_name: rabbitmq
    ports:
      - 5672:5672
      - 15672:15672
    volumes:
      - ./.docker/rabbitmq/etc/:/etc/rabbitmq/
      - ./.docker/rabbitmq/data/:/var/lib/rabbitmq/
      - ./.docker/rabbitmq/logs/:/var/log/rabbitmq/
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: admin

 

image

이미지는 rabbitmq 이미지의 태그 중 `management`가 붙은 걸 선택한다. `management`가 붙은 이미지는 rabbitmq를 관리할 수 있는 웹 UI를 제공하기 위한 `rabbitmq-management` 플러그인을 포함한다.

 

`alpine`은 해당 도커 이미지가 Alpine Linux 기반으로 만들어졌다는 것이다. Alpine Linux는 경량화된 리눅스 배포판으로 alpine 이미지를 사용한다는 것은 경량화된 이미지를 사용하겠다는 의미이다. 자세한 설명은 Docker Alpine images를 참고하면 된다.

 

ports

도커 컨테이너는 기본적으로 독립적인 네트워크 환경을 갖는다. 때문에 호스트 포트와 컨테이너 내부의 포트를 연결하지 않으면 컨테이너 내부의 서비스를 호스트에서 접근할 수 없다. `ports`는 호스트의 포트와 컨테이너 내부의 포트를 연결하는 역할을 한다.

 

`5672:5672`라면 `:`을 기준으로 왼쪽이 호스트의 포트이며 오른쪽이 컨테이너 내부의 포트이다. `5672`는 rabbitmq의 기본 포트로 호스트의 5672와 컨테이너 내부의 5672를 연결했기 때문에 호스트에서 컨테이너 내부에서 실행되는 rabbitmq에 접속하려면 `localhost:5672`로 접근하면 된다.

 

`15672` 포트는 rabbitmq를 관리할 수 있는 웹 UI 관리 콘솔에 접근할 수 있는 포트이다. 이 또한 호스트의 15672와 연결했기 때문에 이미지를 컨테이너로 실행했을 때 브라우저에서 `localhost:15672`로 접속하면 웹 UI 관리 콘솔에 접속할 수 있다.

 

volumes

기본적으로 컨테이너 내부의 데이터는 일회성이다. 컨테이너를 중지하고 삭제한 후 다시 시작하면, 내부 파일 시스템의 모든 데이터는 초기화되고 이전에 생성된 데이터는 사라진다. 때문에 메시지 큐와 설정 데이터를 저장하기 위해 호스트의 볼륨과 컨테이너 내부의 볼륨을 연결한다. 

 

포트 연결과 마찬가지로 `:`을 기준으로 왼쪽이 호스트의 경로, 오른쪽이 컨테이너 내부의 경로가 된다. `./.docker/rabbitmq/etc/:/etc/rabbitmq/`라면 컨테이너 내부의 `/etc/rabbitmq/` 경로의 볼륨과 docker-compose.yml 파일을 기준으로 `./.docker/rabbitmq/etc/`의 호스트 볼륨을 연결한다. 

 

environment

`environment`는 컨테이너 내부에서 사용할 환경 변수(environment variables)를 설정하는 옵션이다. 이 docker-compose에서 설정한 두 변수는 웹 UI 콘솔에 로그인하기 위한 `username`과 `password`가 된다.

 

이제 작성한 docker-compose.yml을 실행시키면 된다.

docker compose up -d

`-d` 옵션은 컨테이너를 `detached` 모드로 실행하는 옵션으로 백그라운드에서 컨테이너를 실행하게 된다.

만약 도커 컴포즈 파일명을 docker-compose가 아닌 rabbitmq-docker-compose로 했다면
`docker compose -f rabbitmq-docker-compose.yml up -d`
를 입력하면 된다.

 

실행시킨 컨테이너를 종료시키려면 `up` 대신 `down`으로 입력하면 된다.

docker compose down

 

 

2. exchange, queue 생성

`exchange(교환기)`는 프로듀서가 보낸 메시지를 받아서 라우팅 규칙에 따라 적절한 큐로 메시지를 전달하는 역할을 담당한다. `queue`는 메시지가 저장되는 버퍼 역할로 교환기를 통해 전달된 메시지들이 큐에 쌓이고, 소비자가 이를 처리할 때까지 대기한다.

 

rabbitmq의 전체적인 메시지의 흐름을 간단하게 보면 다음과 같다.

  1. 프로듀서가 메시지를 생성해 rabbitmq의 exchange로 보낸다.
  2. exchange는 설정된 라우팅 규칙에 따라 바인딩되어 있는 queue에 메시지를 전달한다.
  3. queue에 도착한 메시지는 컨슈머가 가져가 처리한다.

 

이제 exchange와 queue를 생성해 보자. 생성하기 위해 웹 UI 관리 콘솔에 접속한다. docker-compose에서 `localhost:15672`에 연결했으니 해당 주소로 접속하면 아래와 같은 화면이 나오게 된다.

 

username과 password 또한 docker-compose의 environment에서 설정한 대로 `admin/admin`으로 로그인하면 된다.

 

이제 exchanges 탭으로 들어가 보면 기본적으로 생성되어 있는 교환기들을 볼 수 있다. 밑에 Add a new exchange에 다음과 같이 설정하고 exchange를 생성한다.

 

여기서 exchange의 타입은 `direct`, `fanout`, `headers`, `topic` 4가지가 존재한다. 이러한 타입들은 프로듀서가 생성한 메시지의 라우팅 키를 처리하는 방식에 따른 타입이다.

 

exchange의 타입

더보기

direct

direct 방식은 라우팅 키가 정확히 일치하는 큐에만 메시지를 전달한다.

 

 

 

fanout

fanout 방식은 exchange에 바인딩된 모든 큐에 메시지를 전달한다. 생성된 메시지의 라우팅 키는 무시된다.

 

 

topic

topic 방식은 라우팅 키의 패턴에 따라 일치하는 큐에만 메시지를 전달한다. 패턴에는 두 가지의 와일드카드 문자를 지원한다.

  • `*` : 단어 하나를 대체
  • `#` : 0개 이상의 단어를 대채한다.

여기서 단어는 라우팅 키에서 `.`으로 구분된다.

 

 

 

headers

마지막으로 headers는 라우팅 키 대신 헤더 값에 따라 메시지를 전달한다. 이때, 헤더 값의 여러 속성에 대해 조건을 설정할 수 있으며 헤더 속성에 `x-match`를 추가해 이 조건들이 모두 만족하는지(`all`) 또는 하나라도 만족하는지(`any`)로 설정할 수 있으며 별도로 설정하지 않았다면 `all`이 기본값이다.

 

 

메시지 헤더 속성으로 `{"lang" : "py", "fw" : "django"}`로 전송했다면 이 속성과 완전히 일치하는 세 번째 큐와 `fw` 속성은 일치하지 않지만 `"x-match" : "any"`로 설정되어 `lang` 속성이 일치한 첫 번째 큐에 메시지가 전달된다.

 

 

 

 

다음으로 Queues and Streams 탭으로 이동해 Add a new queue에 다음과 같이 설정하고 queue를 생성한다.

 

 

 

메시지 전송을 위한 기본적인 설정은 끝났다. 이제 스프링부트에서 설정하고 메시지를 생성하고 처리하면 된다.

 

3. SpringBoot RabbitMQ 설정

예제는 Spring RabbitMQ의 Getting Started를 참고했으며 사용한 스프링부트는 `3.3.5` 버전이다.

 

3.1 의존성 추가

`build.gradle`에 다음 의존성을 추가한다.

버전은 mvnrepository를 참고
implementation 'org.springframework.boot:spring-boot-starter-amqp:3.3.5'

 

3.2 Consumer

@Component
public class Consumer {

    public void receiveMessage(String message) {
        System.out.println("Received <" + message + ">");
    }
}

 

Consumer는 메시지 큐에서 메시지를 가져와 처리하는 `소비자` 역할을 한다.

 

3.3 RabbitMQConfig

import eello.rabbitmq.consumer.Consumer;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitMQConfig {

    public static final String exchangeName = "spring-boot-exchange";
    public static final String queueName = "spring-boot-queue";
    public static final String routingKey = "foo.bar";

    @Bean
    public Queue queue() {
        return new Queue(queueName);
    }

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(exchangeName);
    }

    @Bean
    public Binding binding(Queue queue, DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with(routingKey);
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Consumer consumer) {
        return new MessageListenerAdapter(consumer, "receiveMessage");
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);
        return container;
    }
}

 

 

directExchange()

위에서 exchange를 생성할 때 타입을 direct로 했기 때문에 DirectExchange를 빈으로 등록한다. 만약 topic, fanout, headers와 같이 다른 exchange를 생성했다면 그에 맞는 빈을 등록해야 한다.

 

binding()

위에서 exchange와 queue를 생성을 했지만 이 둘 사이를 연결시켜주지는 않았다. binding()에서 라우팅 키 `foo.bar`를 사용해 `spring-boot-queue`와 `spring-boot-exchange`를 바인딩시켜 준다.

 

listenerAdapter()

`MessageListenerAdapter`는 메시지를 소비할 `consumer`와 해당 `consumer`의 어떤 메서드에서 메시지를 처리할 건지 정의한다. 이 예제에서는 Receiver 클래스의` receiveMessage` 메서드에서 메시지를 처리한다.

 

3.4 application.yml

rabbitmq에 접속할 수 있는 사용자 정보를 입력해야 한다. docker-compose에서 환경변수로 `admin/admin`으로 설정했기 때문에 `application.yml`에 다음과 같이 설정을 추가한다.

spring:
  rabbitmq:
    username: admin
    password: admin

 

 

3.5 Producer

이제 메시지를 보내는 Producer를 추가하면 마무리가 된다.

@Component
public class Producer {

    private final RabbitTemplate rabbitTemplate;

    public Producer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendMessage(String message) {
        System.out.println("Sending message...");
        rabbitTemplate.convertAndSend(RabbitMQConfig.exchangeName, "foo.bar", message);
    }
}

 

RabbitTemplate은 메시지를 지정된 exchange에 전송한다. 위의 경우에서는

  • RabbitMQConfig.exchangeName(`"spring-boot-exchange"`)에 메시지를 전송하며
  • 메시지의 라우팅 키 값은 `foo.bar`이고
  • 메시지 본문은 함수의 인자로 주어진 `message`가 된다.

RabbitTemplate은 별도로 빈으로 등록하지 않더라도 기본 설정에 따라 기본 빈이 등록된다.

 

3.6 테스트

간단하게 메시지를 생성하는 테스트 코드를 만들어서 메시지를 보내보자.

@SpringBootTest
class RabbitmqApplicationTests {

    @Autowired
    private Producer producer;

    @Test
    public void string_message() throws InterruptedException {
        producer.sendMessage("Hello from RabbitMQ!");
        Thread.sleep(500);
    }

}

 

Consumer가 메시지를 받아 처리하는 과정은 비동기로 이루어지기 때문에 Consumer가 처리하기 전에 테스트가 종료될 수 있다. 때문에 결과를 확인하기 위해 `Thread.sleep(500)`으로 테스트가 조금 더 유지되도록 한다.

 

실행한 결과는 다음과 같이 producer로 rabbitmq에 메시지를 보내고 consumer에서 메시지를 받아 출력하게 된다.

 

 

4. 메시지 본문으로 Object를 보내보자.

보통 이러한 메시지 큐를 사용하면서 단순 스트링만 보내기보단 여러 정보를 보내는 경우가 많을 것이다. 예를 들어 유저 정보를 보낸다고 가정하고 User 클래스를 다음과 같이 정의해 보자.

public class User {

    private String username;
    private String email;
    private int age;

    public User() {}

    public User(String username, String email, int age) {
        this.username = username;
        this.email = email;
        this.age = age;
    }

    // getter

    @Override
    public String toString() {
        return "User{" +
                "username='" + username + '\'' +
                ", email='" + email + '\'' +
                ", age=" + age +
                '}';
    }
}

 

 

 

일반적으로 객체 정보를 보낼 때는 해당 객체 정보를 json 문자열로 만들어 보내고 받는 쪽에서 json을 파싱해 사용하는 방법을 사용한다. 메시지를 전송할 때 json 문자열로 변환하기 위해 `RabbitMQConfig`에 `Jackson2JsonMessageConverter`를 `MessageConverter` 빈으로 등록한다.

// RabbitMQConfig
@Bean
public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
}

 

이제 RabbitTemplate이 위에서 등록된 MessageConverter를 사용해 json 문자열로 변환해 메시지를 생성할 수 있도록 직접 RabbitTemplate을 정의해 빈으로 등록한다.

// RabbitMQConfig
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(messageConverter);
    return rabbitTemplate;
}

 

 

객체 정보를 json으로 변환해서 메시지를 생성했으니 처리하는 소비자는 json 문자열을 다시 객체로 변환해야 한다. 이를 위해 기존의 `listenerAdapter`에도 MessageConverter를 등록한다.

// RabbitMQConfig
@Bean
public MessageListenerAdapter listenerAdapter(Consumer consumer, MessageConverter messageConverter) {
    MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(consumer, "receiveMessage");
    listenerAdapter.setMessageConverter(messageConverter);
    return listenerAdapter;
}

 

 

테스트하기 전에 User 정보를 메시지로 보내는 Producer의 메서드와 User 정보를 받는 Consumer 메서드를 각각 다음과 같이 정의한다.

// Producer
public void sendMessage(User user) {
    System.out.println("Sending User information...");
    rabbitTemplate.convertAndSend(RabbitMQConfig.exchangeName, "foo.bar", user);
}

// Consumer
public void receiveMessage(User user) {
    System.out.println("Received <" + user + ">");
}

 

마지막으로 테스트를 해보자. 테스트는 String 메시지를 보낼 때와 거의 동일하다.

@Test
public void obj_message() throws InterruptedException {
    User user = new User("name", "email@example.com", 10);
    producer.sendMessage(user);
    Thread.sleep(500);
}

 

 

Consumer에서 객체로 변환되어 수신한 User 정보를 볼 수 있다.

 

'SpringBoot' 카테고리의 다른 글

@Transactional 이해하기  (0) 2025.04.24
DLQ(SpringBoot + RabbitMQ 메시지 실패 처리)  (1) 2024.11.20
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
TAG
more
«   2025/10   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함