티스토리 뷰

SpringBoot에서 RabbitMQ의 메시지를 처리하다 예외가 발생하게 되면 어떻게 될까?

 

`Consumer.receiveMessage(User user)` 메서드에서 일부러 `RuntimeException`을 발생시켜 보자.

public void receiveMessage(User user) {
    System.out.println("Received <" + user + ">");
    throw new RuntimeException("message processing failed");
}

 

저번에 메시지를 보내는 것과 동일한 테스트 코드를 실행시킨다.

@Test
public void obj_message() throws InterruptedException {
    User user = new User("name", "email@example.com", 10);
    producer.sendMessage(user);
    Thread.sleep(500);
}

 

 

실행 결과를 보면 같은 메시지를 반복해서 처리하고 있다는 것을 알 수 있다.

 

 

rabbitmq는 메시지에 대한 ACK를 관리해 해당 메시지가 ACK 처리가 되었다면 큐에서 메시지를 삭제한다. 메시지에 대한 ACK를 처리하는 방식에는 `AUTO`, `MANUAL`, `NONE` 3가지로 설정할 수 있으며 SpringBoot에서는 기본으로 ACK 모드가 `AUTO`로 설정되어 있다.

  • `AUTO` : 기본 설정값으로 메시지 처리 성공 시 `ack`를, 처리 중 에러 발생 시 `nack`를 자동으로 발행한다.
  • `MANUAL` : 개발자가 직접 `ack`와 `nack`를 처리해야 한다.
  • `NONE` : rabbitmq가 자동으로 `ack` 처리하는 모드로, 메시지의 처리 성공/실패 여부와 관계없이 `ack` 처리가 된다.

때문에 메시지를 처리하다 예외가 발생하면 해당 메시지에 nack를 발행하기 때문에 큐에서 메시지가 삭제되지 않아 Consumer가 계속해서 같은 메시지를 처리하기 위해 시도하게 된다.

 

`AUTO`를 사용하면 별도의 설정과 추가적인 로직없이 메시지 처리 실패 시 자동으로 메시지 처리를 재시도할 수 있다. 하지만 메시지가 처리될 때까지 이 과정을 무한반복하게 되기 때문에 비효율적이다. 이를 위해 처리에 실패한 메시지를 다른 큐로 이동시켜 임시로 저장하는 DLQ 방식을 사용한다.

 

DLQ(Dead Letter Queue)

`DLQ`는 처리에 실패한 메시지를 임시로 저장하는 특수한 목적의 큐이다. DLQ에 저장된 메시지는 추후에 다른 방식으로 재처리를 시도할 수 있다. 메시지 처리 실패 시 메시지가 DLQ로 저장되는 과정은 다음과 같다.

 

  1. Consumer가 메시지 처리를 시도한다.
  2. 메시지 처리 중 예외가 발생해 처리에 실패한다.
  3. 설정한 조건에 따라 해당 메시지를 DLX(Dead Letter Exchange)로 전송한다.
  4. DLX에서 Dead Letter Routing Key에 따라 DLQ(Dead Letter Queue)로 라우팅 한다.

 

처리에 실패한 메시지는 크게 2가지 기준으로 DLQ로 보내게 된다.

  • `TTL(Time-To-Live)` 방식 : 큐에서 메시지의 TTL 초과 시 DLQ로 전송
  • `retry` 방식 : 최대 재시도 횟수를 초과 시 DLQ로 전송

 

SpringBoot에서 메시지 실패 처리

메시지를 DLQ에 저장하기 위해서는 가장 먼저 DLX와 DLQ를 생성해야 한다.

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DeadLetterConfig {

    public static final String dlxName = "spring-boot-dlx";
    public static final String dlqName = "spring-boot-dlq";
    public static final String dlRoutingKey = "dead-letter";

    @Bean
    public Queue dlq() {
        return new Queue(dlqName);
    }

    @Bean
    public DirectExchange dlx() {
        return new DirectExchange(dlxName);
    }

    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(dlq()).to(dlx()).with(dlRoutingKey);
    }
}

 

 

실제 작업에 사용되는 큐에서 실패한 메시지를 DLX로 보내기 위해서는 큐가 DLX에 대한 정보와 DLX에서 어떤 DLQ로 라우팅을 결정하기 위해 라우팅 키를 알아야 한다. 이를 위해 큐를 생성할 때 `arguments`로 이 값들을 설정할 수 있다.

// RabbitMQConfig.java
@Bean
public Queue queue() {
    Map<String, Object> arguments = new HashMap<>();
    arguments.put("x-dead-letter-exchange", DlqConfig.dlxName);
    arguments.put("x-dead-letter-routing-key", DlqConfig.dlRoutingKey);
    return new Queue(queueName, true, false, false, arguments);
}

 

new Queue(`name`, `durable`, `exclusive`, `autoDelete`, `arguments`)

  • String `name` : 큐 이름
  • boolean `durable` : 큐의 내구성 설정
    - true(default) : rabbitmq가 재시작되어도 큐는 유지
    - false : rabbitmq가 재시작 시 큐 삭제
  • boolean `exclusive` : 큐의 독점적 사용 여부
    - true : 큐가 현재 연결에서만 사용 가능하며, 다른 연결에서는 접근할 수 없음 또한 연결이 종료되면 큐 자동 삭제
    - false(default) : 다른 연결도 큐에 접근할 수 있음
  • boolean `autoDelete` : 큐의 자동 삭제 여부
    - true : 큐를 사용하는 마지막 Consumer가 연결을 종료하면 큐가 자동으로 삭제
    - false(default) : 큐에 연결된 사용자가 없어도 큐 유지
  • Map<String, Object> `arguments` : 큐의 추가적인 속성 설정
    - ex) `x-dead-letter-exchange`, `x-message-ttl`, `x-max-length` 등

 

1. TTL 방식으로 처리

TTL은 메시지가 큐에서 생존할 수 있는 최대 시간을 의미하며 밀리초 단위로 설정 가능하다. TTL 방식은 메시지가 큐에서 TTL을 초과하게 되면 작업 큐에서 DLQ로 옮겨 저장하는 방식이다.

 

TTL은 큐를 생성할 때 만들었던 arguments에 `x-message-ttl`을 추가해 설정할 수 있다.

// RabbitMQConfig.java
@Bean
public Queue queue() {
    Map<String, Object> arguments = new HashMap<>();
    arguments.put("x-message-ttl", 2000); // ttl 2초로 설정
    arguments.put("x-dead-letter-exchange", DlqConfig.dlxName);
    arguments.put("x-dead-letter-routing-key", DlqConfig.dlRoutingKey);
    return new Queue(queueName, true, false, false, arguments);
}

 

아까와 동일한 테스트에 Thread.sleep()만 3초로 수정해 실행해 보자.

@Test
public void obj_message() throws InterruptedException {
    User user = new User("name", "email@example.com", 10);
    producer.sendMessage(user);
    Thread.sleep(3000);
}

 

많은 에러가 출력됐지만 rabbitmq의 웹 UI 콘솔에 접속해 `spring-boot-dlq`에 메시지가 1개 존재하는 것을 볼 수 있다.

 

 

2. 재시도 횟수로 처리

재시도 횟수로 처리하는 방식은 Consumer에서 메시지 처리를 실패했을 때 최대 재시도 횟수까지 메시지 처리를 재시도하는 방식이다. 이 방식의 구현방법은 크게 2가지로 나눌 수 있다.

  • RetryTemplate
  • Message Header

2.0 준비

rabbitmq에서 메시지에 대해 `NACK` 응답으로 해당 메시지를 DLQ로 보내기 때문에 `NACK`를 하기 위해 ACK 모드를 기본 설정인 `AUTO`가 아닌 `MANUAL`로 설정하는 것이 권장된다. ACK 모드 설정은 `RabbitMQConfig.container()`에서 설정하면 된다.

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(queueName);
    container.setMessageListener(listenerAdapter);
    
    // ACK모드 - MANUAL로 설정
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    
    return container;
}

 

`ACK/NACK` 응답을 처리하는 함수는 Channel 객체에 포함되어 있고 메시지를 구분하기 위한 고유한 식별자는 Message 객체에 포함되어 있다. 따라서 Consumer에서 직접 `ACK`와 `NACK` 처리를 하기 위해서는 Channel 객체와 Message 객체가 추가로 필요하다. Consumer에서 Channel과 Message 객체를 사용하기 위해 `ChannelAwareMessageListener`를 구현해야 한다.

@Component
public class Consumer implements ChannelAwareMessageListener {

    public void receiveMessage(String message) {
        System.out.println("Received <" + message + ">");
    }
    
    public void receiveMessage(User user) {
        System.out.println("Received <" + user + ">");
        throw new RuntimeException("message processing failed");
    }

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        // 메시지 처리 로직
    }
}

 

 

`ChannelAwareMessageListener`는 `MessageListener`를 확장하고 있기 때문에 Consumer를 MessageListenerAdapter를 사용하지 않아도 직접 `MessageListenter`로 등록할 수 있다.

 

때문에 더이상 `RabbitMQConfig.listenerAdapter()`는 빈으로 등록되지 않아도 된다. Consumer를 직접 MessageListener로 등록하기 위해 `RabbitMQConfig.container()`를 수정한다.

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, Consumer consumer) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(queueName);
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    
    container.setMessageListener(consumer); // MessageListener로 Consumer 등록
    return container;
}

 

`receiveMessage()`가 메시지 처리 함수가 될 수 있었던 것은 MessageListenerAdapter에 `receiveMessage()`가 메시지 처리 함수라는 것을 지정했기 때문이다.

MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(consumer, "receiveMessage");

 

하지만 어댑터를 사용하지 않고 Consumer를 직접 MessageListener로 등록했기 때문에 더이상 `receiveMessage()`가 메시지 처리 함수가 아니다. 메시지 처리 로직은 오버라이드한 `onMessage()` 함수에 구현하면 된다.

 

@Component
@RequiredArgsConstructor
public class Consumer implements ChannelAwareMessageListener {

    private final MessageConverter messageConverter; // MessageConverter 주입받음

    // receiveMessage..
    
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        // 메시지 처리 로직
        User user = (User) messageConverter.fromMessage(message); // 메시지로부터 User 객체 뽑음
        System.out.println("Received <" + user + ">");

        throw new RuntimeException("message processing failed"); // 강제 실패
    }
}

 

기존 Consumer와 달라진 점은 크게 2가지이다.

  • `private final MessageConverter messageConverter;`
  • `User user = (User) messageConverter.fromMessage(message);`

`onMessage()`에는 User 객체를 인자로 받을 수 없다. 때문에 Message 객체로부터 User 객체를 뽑아야 한다. 이를 위해서는 MessageConverter가 필요한데, ListenerAdapter를 적용했을 때는 Adapter에 MessageConverter를 지정했지만 이제 Consumer를 직접 MessageListener로 지정했기 때문에 Consumer에는 MessageConverter가 존재하지 않아 RabbitMQConfig에서 빈으로 등록했던 `Jackson2JsonMessageConverter`를 주입받는다. 그리고 이 `messageConverter`를 이용해 Message 객체를 User 객체로 변환한다.

 

 

2.1 RetryTemplate

RetryTemplate은 다시 rabbitmq로부터 메시지를 받아와 재시도하는 방식이 아닌 애플리케이션 내부에서 재시도를 하는 방법이다.

 

RetryTemplate을 사용하기 위해서는 Retry에 대한 정책을 설정해서 RetryTemplate을 빈으로 등록해야 한다.

// RabbitMQConfig에 추가
@Bean
public RetryTemplate retryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(3); // 최대 재시도 횟수 3회
    retryTemplate.setRetryPolicy(retryPolicy);
    return retryTemplate;
}

 

RetryTemplate이 재처리를 시도하기 위해서는 메시지 처리 로직을 RetryTemplate으로 감싸면 된다.

// Consumer.onMessage
@Override
public void onMessage(Message message, Channel channel) {
    retryTemplate.execute(context -> { // 1. 메시지 처리 콜백 함수
        try {
            User user = (User) messageConverter.fromMessage(message);
            System.out.println("Received <" + user + ">");

            throw new RuntimeException("message processing failed"); // 강제 실패

            // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 메시지 처리 성공시 ACK 응답
        } catch (Exception e) {
            throw e;
        }
    }, context -> { // 2. 재시도 횟수 초과시 콜백 함수
        System.out.println("retry count exceeded !");
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        System.out.println("Message NACKed and sent to DLQ");
        return null;
    });
}

 

`retryTemplate.execute(retryCallback, recoveryCallback)`

  • `retryCallback` : 메시지 처리 콜백 함수
  • `recoveryCallback` : 재시도 횟수 초과시 실행할 콜백 함수

RetryTemplate에 의해 최대 재시도 횟수가 초과하면 `recoveryCallback` 함수 내의 `channel.basicNack()` 메서드로 해당 메시지에 대한 `NACK` 응답을 한다. `channel.basicNack()` 메서드의 마지막 인자는 `boolean requeue`로 메시지를 다시 큐에 넣을 것인지에 대한 여부이다. 만약 큐가 DLX와 연결되어 있고 처리에 실패한 메시지를 requeue를 하지 않는다면 해당 메시지를 DLX로 보내 결국 DLQ로 보내지게 된다.

 

메시지에 대한 ACK/NACK 응답을 하기 위해 channel의 basicAck()와 basicNack()를 사용한다.
- channel.basicAck(long deliveryTag, boolean multiple)
- channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)

deliveryTag : 메시지에 대한 고유 식별자
multiple : true일 경우 현재 메시지의 deliveryTag보다 작거나 같은 메시지에 대해 모두 ACK 또는 NACK / false 경우 현재 메시지에 대해서만 ACK 또는 NACK
requeue : 처리에 실패한 메시지에 대해 큐에 다시 넣을지에 대한 여부

 

 

동일한 테스트로 실행한 결과는 다음과 같다. 총 메시지를 3번 읽고 최대 재시도 횟수를 초과해 DQL로 보내지게 된다.

 

DLQ도 확인해보면 위에서 했던 TTL 테스트에서 추가된 1개의 메시지를 포함해 총 2개가 DLQ에 존재하는 것을 알 수 있다.

 

 

2.2 Message Header 사용

Message Header를 사용하는 방식은 재시도 횟수를 Message Header의 속성에 포함시켜 관리하는 방법이다. 애플리케이션 내부에서 자체적으로 재시도를 하는 RetryTemplate과 달리 rabbitmq에서 다시 메시지를 가져와 재시도하는 방식이 된다.

 

rabbitmq는 자체적으로 재시도 횟수를 카운팅하지 않기 때문에 재시도 횟수에 대한 정보를 어딘가에 저장해야한다. 여기서는 재시도 횟수를 메시지의 헤더를 사용해 카운팅한다.

// Consumer.onMessage
@Override
public void onMessage(Message message, Channel channel) throws IOException {
    MessageProperties properties = message.getMessageProperties();
    long deliveryTag = properties.getDeliveryTag(); // 메시지 고유 식별자
    Map<String, Object> headers = properties.getHeaders(); // 메시지 헤더들
    int retryCount = (int) headers.getOrDefault("x-retry-count", 0); // 헤더 중 재시도 횟수 속성 값

    System.out.println("try count : " + retryCount);

    try {
        // 메시지 처리 시도
        User user = (User) messageConverter.fromMessage(message);
        System.out.println("Received <" + user + ">");
        throw new RuntimeException("message processing failed"); // 강제 실패
        // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 메시지 처리 성공시 ACK
    } catch (Exception e) {
        // 메시지 처리 실패시
        if (retryCount >= 2) { // 최대 재시도 횟수 초과 시
            System.out.println("retry count exceeded !");
            channel.basicNack(deliveryTag, false, false);
            System.out.println("Message NACKed and sent to DLQ");
        } else { // 메시지 처리 재시도
            headers.put("x-retry-count", retryCount + 1);

            rabbitTemplate.convertAndSend(RabbitMQConfig.exchangeName, "foo.bar", message, msg -> {
                msg.getMessageProperties().getHeaders().putAll(headers); // 기존 헤더 복사
                return msg;
            });

            channel.basicAck(deliveryTag, false); // 기존 메시지 ACK
        }
    }
}

 

`channel.basicNack()`로 `requeue = false`면 메시지가 다시 큐에 들어가지만, 메시지 처리 재시도할 때 메시지를 재발행해 다시 큐에 넣는다. 그 이유는 메시지 재시도 횟수를 업데이트하기 위함이다. 메시지 처리 재시도에서 `header`의 `x-retry-count`를 업데이트하더라도 `channel.basicNack()`로 다시 큐에 넣게 되면 이 변경한 헤더 값은 업데이트되지 않지 않고 초기 헤더값 그대로 requeue된다. 때문에 `x-retry-count`에 1 증가한 값을 포함한 전체 헤더를 복사해 새로운 메시지에 적용해 발행해 재시도 횟수를 카운팅한다.

 

새로운 메시지를 발행한 후 기존의 메시지는 `ACK` 처리해 큐에서 삭제되도록 한다.

 

위의 테스트 코드와 동일하게 테스트를 진행한 결과는 다음과 같다.

 

'SpringBoot' 카테고리의 다른 글

@Transactional 이해하기  (0) 2025.04.24
SpringBoot + RabbitMQ  (0) 2024.11.13
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
TAG
more
«   2025/05   »
1 2 3
4 5 6 7 8 9 10
11 12 13 14 15 16 17
18 19 20 21 22 23 24
25 26 27 28 29 30 31
글 보관함