비동기에 대해
지난번 예시 정정
@Service
public class PostService {
private final PostRepository postRepository;
private final SummaryService summaryService;
@Transactional
public void createPostAsync(CreatePostCommand cmd) {
// 1) 게시글 저장
Post post = new Post(cmd.getTitle(), cmd.getContent());
postRepository.save(post);
// 2) 요약 생성을 비동기로 위임
summaryService.generateSummaryAsync(post.getId());
// 3) 여기서 트랜잭션 커밋 (게시글은 DB에 확정)
// 요약 생성 성공/실패는 이 시점에 알 수 없음
}
}
// 요약 서비스 – @Async 때문에 완전히 다른 트랜잭션이 됨
@Service
public class SummaryService {
private final SummaryRepository summaryRepository;
private final AiClient aiClient;
@Async
@Transactional
public void generateSummaryAsync(Long postId) {
try {
// 별도 스레드 + 별도 트랜잭션에서 AI 호출
String summary = aiClient.summarizePost(postId); // 실패 시 이 트랜잭션만 롤백
summaryRepository.save(new Summary(postId, summary));
} catch (Exception e) {
// 로그는 남지만, 호출자(PostService)까지 예외는 올라가지 않음
log.error("비동기 요약 생성 실패. postId={}", postId, e);
throw e; // 던져도 @Async 밖의 트랜잭션(PostService)에는 영향 없음
}
}
}
시나리오
요약 생성이 실패했을 때, 게시글은 실패처리되었을까?
- createPostAsync() 트랜잭션:
- posts 테이블에 게시글 생성됨.
- generateSummaryAsync() 트랜잭션:
- 외부 API 호출 실패시 요약만 롤백되고, 게시글은 살아있음
시나리오2
(어디까지나 이해를 돕기 위한 시나리오임…)
매일 GET /movies/export 의 외부 API를 통해 수십GB 분량의 데이터를 업데이트 해야 함.
단, 원자적 작업이어야 함 (업데이트된 카탈로그를 완전하게 보여주거나, 그냥 전날의 카탈로그를 보여줘야 함)
지난 게시글의 내용을 완벽하게 까먹었다고 가정한다면 이렇게 짤 수 있습니다.
@Transactional
public void updateMovies(UpdateMoviesCommand cmd) {
List<Movie> movies = movieClient.get(); // 시간이 많이 걸리는 API
movieRepository.saveMovieBatch(movies);
}
조금 더 머리를 쓴다면 데이터베이스 대량 업데이트를 의식하여 페이징 단위로 데이터 업데이트를 할 수도 있습니다.
하지만 결국 지난 게시글에서 설명한 바와 같이
이런 패턴은 트랜잭션의 수명을 불필요하게 길게 만들고, 스레드 풀을 고갈시켜 데이터베이스 장애로 이어질 수 있습니다.
그렇다면 조금 생각을 바꿔서..
비동기의 유혹
가장 먼저 떠오르는 해법은:
“느린 외부 API 호출은 트랜잭션에서 분리하고, 결과를 받은 뒤에 별도의 트랜잭션에서 DB에 저장하면 되지 않을까?”
예를 들어, 이런 식으로 이벤트리스너를 써볼 수 있습니다.
public void updateMovies(UpdateMoviesCommand cmd) {
// 1) 트랜잭션 없음
List<Movie> movies = movieClient.get(); // 매우 큰 응답
// 2) 응답을 통째로 이벤트로 날림
eventPublisher.publishEvent(new MoviesFetchedEvent(movies));
}
@EventListener
@Transactional
public void handleMoviesFetched(MoviesFetchedEvent event) {
movieRepository.saveMovieBatch(event.movies()); // 여기서 트랜잭션
}
하지만 근본적인 문제가 여전히 존재합니다.
List<Movie> movies 라는 수십GB의 데이터를 한번에 메모리에 올린다는 점입니다.
트랜잭션의 수명을 줄였을 뿐, OOM의 문제에서 자유롭지 않습니다.
이처럼 큰 데이터의 트랜잭션을 어떻게 해결할 수 있을까요?
스트리밍/페이징 + 스테이징 + 커밋 패턴
DB 트랜잭션을 짧게 가져가고, 외부 API는 스트리밍/페이징으로 받아서 메모리를 보호하면서
사용자 입장에서 원자적으로 데이터가 전환되도록 하려면 몇가지 조건이 필요합니다.
1. 데이터는 청크단위로
일단 데이터는 청크 단위로 받아야 합니다.
예시를 위해 외부 API가 페이징을 지원한다고 가정하겠습니다.
2. 테이블 단위 추가
1) 작업 상태 테이블 – movie_import_job 한 번의 전체 카탈로그 업데이트를 “작업(job)” 단위로 관리
movie_import_job
-------------------------
id BIGINT (PK)
status VARCHAR -- PENDING / RUNNING / FAILED / DONE
started_at TIMESTAMP
finished_at TIMESTAMP
total_pages INT -- (옵션) 예상 페이지 수
processed_pages INT -- (옵션) 실제 처리한 페이지 수
error_message VARCHAR -- (옵션) 실패 이유
2) 스테이징 테이블 – movie_staging “이번 작업에서 받은 영화 데이터” 를 임시로 쌓아두는 테이블 job_id 로 여러 작업의 데이터를 구분
movie_staging
-------------------------
job_id BIGINT -- FK -> movie_import_job.id
movie_id VARCHAR
title VARCHAR
released_at DATE
... ...
PRIMARY KEY(job_id, movie_id)
해결
1. 짧은 트랜잭션의 작업 시작
@Service
public class MovieImportJobService {
@Transactional
public Long startJob() {
MovieImportJob job = new MovieImportJob();
job.setStatus(Status.PENDING);
job.setStartedAt(Instant.now());
jobRepository.save(job);
return job.getId();
}
}
2. 외부 API 호출하면서 페이지 단위로 스테이징 저장 (짧은 트랜잭션으로)
@Service
public class MovieImportRunner {
private final MovieImportJobService jobService;
private final MoviePagingClient movieClient;
private final MovieStagingService stagingService;
@Async // 또는 배치 잡에서 호출
public void runImport() {
Long jobId = jobService.startJob();
try {
jobService.markRunning(jobId);
int page = 0;
int size = 1000;
while (true) {
// 1) 외부 API 호출 (트랜잭션 없음)
List<MovieDto> movies = movieClient.getPage(page, size);
if (movies.isEmpty()) {
break; // 더 이상 데이터 없음
}
// 2) 페이지 단위로 스테이징 테이블에 저장 (짧은 트랜잭션)
stagingService.savePage(jobId, page, movies);
jobService.incrementProcessedPages(jobId);
page++;
}
// 3) 검증 및 최종 스위치 (짧은 트랜잭션)
jobService.validateAndSwitch(jobId);
} catch (Exception ex) {
jobService.markFailed(jobId, ex);
}
}
}
@Service
public class MovieStagingService {
private final MovieStagingRepository stagingRepository;
@Transactional
public void savePage(Long jobId, int page, List<MovieDto> movies) {
// page 정보를 별도 컬럼으로 저장하고 싶다면 추가
// 예: movie_staging(job_id, page_no, movie_id, ...)
List<MovieStagingEntity> entities = movies.stream()
.map(dto -> MovieStagingEntity.from(jobId, dto))
.toList();
stagingRepository.saveAll(entities); // INSERT batch
}
}
@Service
public class MovieImportJobService {
// ...
@Transactional
public void validateAndSwitch(Long jobId) {
// 1) 검증: 개수, 중복 등
stagingRepository.validateForJob(jobId); // 문제가 있으면 예외 발생 → 롤백
// 2) 메인 테이블 교체 방식 선택
// (a) 전체 삭제 후 INSERT
// (b) 버전 테이블 + current_version 포인터 스위치
// 여기서는 간단히 (a) 예시
movieRepository.deleteAll(); // 기존 영화 삭제
movieRepository.insertFromStaging(jobId); // 이번 job의 영화들로 교체
// 3) 작업 상태 변경
MovieImportJob job = jobRepository.findById(jobId).orElseThrow();
job.setStatus(Status.DONE);
job.setFinishedAt(Instant.now());
}
@Transactional
public void markRunning(Long jobId) {
jobRepository.findById(jobId).ifPresent(job -> job.setStatus(Status.RUNNING));
}
@Transactional
public void incrementProcessedPages(Long jobId) {
jobRepository.incrementProcessedPages(jobId);
}
@Transactional
public void markFailed(Long jobId, Exception ex) {
jobRepository.findById(jobId).ifPresent(job -> {
job.setStatus(Status.FAILED);
job.setErrorMessage(ex.getMessage());
job.setFinishedAt(Instant.now());
});
}
}
생각해보자
- 만약 중간에 실패한다면?
- 3페이지까지 저장했는데, 4페이지에서 오류가 발생했다!
- 재시도를 한다면? 재시도를 한다면 어디서부터 해야할까?
- 실제 프레임워크에서 해결하는 방식은?
- Spring Batch
- ItemReader / ItemWriter
댓글남기기