Blocking Queue 이해하기
[Blocking Queue 란?]
Blocking Queue는 Thread-safe(스레드 안전)한 큐를 구현하기 위한 인터페이스다. Java5에서 java.util.concurrent 패키지에 추가되었다. 큐의 기본 작업에 블로킹을 추가해 큐가 가득 찼을 때 항목을 추가하려는 스레드나, 큐가 비었을 때 항목을 제거하려는 스레드를 대기 상태로 만든다. 블록킹은 특정 조건이 충족될 때까지 쓰레드를 일시 중지시키는 것으로, 연산이 완료될 때까지 쓰레드를 대기 상태로 만든다.
- 큐가 비어있으면: 요소를 꺼내려는(Thread가 take()를 호출) 스레드는 큐에 요소가 추가될 때까지 대기한다.
- 큐가 가득 차면: 요소를 추가하려는(Thread가 put()을 호출) 스레드는 큐에 여유 공간이 생길 때까지 대기한다.
Blocking Queue 는 멀티쓰레드 환경에서 데이터를 생성해 큐에 추가하는 Producer 와 큐에서 데이터를 가져와 처리하는 Consumer 가 비동기적으로 실행되면서도, 동기화문제 없이 데이터를 공유하기위한 목적으로 주로 사용된다.
다른 java 클래스에서는,
쓰레드풀을 이용할 때 사용되는 Executors 클래스에서 고정된 쓰레드풀의 개수를 반환하는 newFixedThreadPool 이 ThreadPoolExecutor 인스턴스를 생성해 반환하는데, ThreadPoolExecutor 에서 BlockingQueue 를 파라미터로 받아 이용하고 있다. (기본적으로 LinckedBlockingQueue 가 사용된다.)
[Queue 와 BlockingQueue의 차이점]
1. Thread-Safe
일반적으로 Queue를 구현하는 LinkedList, PriorityQueue 등은 여러 개의 스레드에서 동시에 접근할 경우 데이터의 일관성이 깨질 수 있다. 즉, 한 스레드가 poll()을 호출하여 요소를 제거하는 도중에, 다른 스레드가 offer()로 요소를 추가하면 데이터 손실, 중복 추가, 동시 수정 문제가 발생할 수 있다. 반면, BlockingQueue는 내부적으로 동기화 메커니즘을 제공해 여러 개의 스레드가 동시에 put()과 take()를 호출해도 데이터가 안전하게 처리된다. 예를 들어, ArrayBlockingQueue와 LinkedBlockingQueue는 내부적으로 락(Lock)과 조건 변수(Condition Variable)를 사용하여 동기화를 보장한다.
2. 블로킹 기능 (put/take 대기)
일반 Queue는 큐가 비어 있거나 가득 차더라도 즉시 반환한다. 예를 들어, 큐가 비어있을 때 poll()을 호출하면 null을 반환하고, 큐가 가득 차면 offer()는 false를 반환한다. 반면, BlockingQueue는 큐의 상태에 따라 자동으로 블로킹(쓰레드를 대기상태 Blocking로 전환) 한다. 이러한 특징 때문에 producer-consumer 패턴을 구현할 때, consumer는 take()를 호출해 데이터가 들어올 때까지 대기하고, producer는 put()을 호출해 큐가 가득 찼을 때 대기하는 방식으로 사용된다.
3. 대기시간 설정 기능
일반 Queue는 블로킹 기능이 없으므로, 대기 시간을 설정하는 기능도 제공하지 않는다. 반면, BlockingQueue는 요소를 추가하거나 제거할 때 특정 시간 동안만 대기하도록 설정할 수 있다.
- offer(E e, long timeout, TimeUnit unit): 요소를 삽입할 때 큐가 가득 찼을 경우, 지정된 시간 동안 대기하고, 시간이 지나도 공간이 생기지 않으면 false 반환
- poll(long timeout, TimeUnit unit): 요소를 제거할 때 큐가 비어 있을 경우, 지정된 시간 동안 대기하고, 시간이 지나도 데이터가 들어오지 않으면 null 반환
즉, BlockingQueue는 "기다릴 수 있는 큐"이지만, 너무 오래 기다리지 않도록 제한할 수도 있다. 즉, "최대 몇 초 동안만 기다려보고, 시간이 지나도 안 되면 포기"하는 기능을 제공한다.
Blocking Queue 주요 메서드
Blocking Queue 인터페이스는 아래처럼 정의되어 있다.
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
boolean remove(Object o);
boolean contains(Object o);
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}
boolean add(E e)
- 큐가 가득차지 않은 경우 즉시 요소를 추가한다.
- 큐가 가득 찬 경우 IllegalStateException을 던진다.
- 용량이 제한되어 있다면 offer() 를 사용하는 것이 권장된다.
boolean offer(E e)
- 큐가 가득 차지 않은 경우 즉시 요소를 추가하고 true를 반환한다.
- 큐가 가득 찬 경우 추가하지 않고 false를 반환한다.
void put(E e) throws InterruptedException
- 큐가 가득 찬 경우 공간이 생길 때까지 블로킹(대기) 한다.
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException
- 큐가 가득 차 있는 경우 지정된 시간 동안 대기하고, 시간이 지나면 false를 반환한다.
- 블로킹 큐에서 타임아웃을 적용한 데이터 추가가 필요할 때 사용된다.
E take() throws InterruptedException
- 큐의 첫번째 요소를 제거하고 반환한다.
- 큐가 비어 있을 경우 새로운 요소가 들어올 때까지 대기(Blocking) 한다.
E poll(long timeout, TimeUnit unit) throws InterruptedException
- 큐의 첫번째요소를 제거하고 반환한다.
- 큐가 비어 있을 경우 최대 지정된 시간만큼 대기하고, 요소를 가져오지 못하면 null을 반환한다.
- take()와 다르게 타임아웃이 설정된 블로킹 방식이다.
int remainingCapacity()
- 현재 큐가 추가로 저장할 수 있는 공간의 크기를 반환한다.
- 크기 제한이 없는 BlockingQueue의 경우 Integer.MAX_VALUE를 반환할 수 있다.
boolean remove(Object o)
- equals 를 이용해 큐에서 제공된 요소와 동일한 요소를 제거한다.
boolean contains(Object o)
- 큐 내에 특정 요소가 존재하는지 확인한다.
int drainTo(Collection<? super E> c)
- 큐의 모든 요소를 다른 컬렉션으로 옮기고 큐를 비운다.
- 이동한 요소 개수를 반환한다.
int drainTo(Collection<? super E> c, int maxElements)
- 최대 maxElements 개수만큼 큐에서 다른 컬렉션으로 이동시킨다.
- 이동한 요소 개수를 반환한다.
Blocking Queue 구현체
Blocking Queue 는 다양한 구현체가 존재한다.
잘 알려진 ArrayBlockingQueue, LinkedBlockingQueue 와 지연기능이 있는 DelayQueue 만 정리했다.
ArrayBlockingQueue
ArrayBlockingQueue는 크기가 고정된 배열 기반의 BlockingQueue다.
[ArrayBlockingQueue 의 특징은?]
- 크기가 고정된 배열(Array) 기반으로 구현된다.
- FIFO(First-In-First-Out) 방식으로 동작한다.
- Head는 가장 오래된 요소, Tail 에는 가장 최근에 들어온 요소가 존재한다.
- 생성 시 반드시 큐의 최대 크기를 지정해야 한다.
- put()과 take()가 하나의 ReentrantLock을 공유하여 동기화된다.
- 같은 락을 사용하므로 락 경합이 발생할 수 있다.
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
final Object[] items = this.items;
int i = 0;
try {
for (E e : c)
items[i++] = Objects.requireNonNull(e);
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
생성자에서 객체를 만들 때 무조건 초기용량(capacity)을 지정해야 하며, 설정한 용량을 변경할 수 없다.
[fair 옵션은 무엇이고 왜 존재하는가?]
fair 옵션은 starvation 방지를 위한 공정성(Fairness) 정책이다. ArrayBlockingQueue 는 내부적으로 단일 RetrantLock 을 사용해 put() 과 take() 가 같은 락을 공유하므로 락 경합이 발생할 수 있고, starvation 이 일어날 가능성이 있다. 따라서 생성자로 넘어온 fair 옵션을 통해 RetentionLock 의 fair 여부를 설정할 수 있다. 기본값은 false 이며 true 로 설정할 경우 가장 오래 기다린 쓰레드가 먼저 lock 을 얻는다.
- fair = false (기본값)
- lock이 JVM 스케줄러에 의해 랜덤하게 할당된다. (FIFO 보장 X)
- put()이나 take()를 기다리는 스레드가 있더라도 어떤 스레드가 먼저 lock을 얻을지는 예측할 수 없다.
- fair = true
- 오래 기다린 스레드가 lock을 먼저 획득한다 (FIFO 보장)
- 우선순위가 높은 작업이 계속 실행되어 낮은 우선순위 작업이 실행되지 않는 starvation 문제를 방지할 수 있다.
- FIFO 순서를 강제하기 위해 추가적인 스레드 관리 비용이 발생하므로, 전체적인 처리량(throughput)은 감소할 수 있다.
[ArrayBlockingQueue 에서 삽입, 삭제는 어떻게 이루어질까?]
일반적인 Queue 는 LinkedList 로 구현된다. 큐는 앞에서 삭제가 일어나는데, 배열의 경우 빈 공간을 메꾸기 위한 요소들이 이동하는 오버헤드가 발생하기 때문에 잘 사용하지 않는다. 하지만, ArrayBlockingQueue 는 배열로 구현되어 있음에도 이런 오버헤드가 발생하지 않는다. 그 이유는 고정된 크기 내에서 putIndex 와 takeIndex 를 이용해 삽입, 삭제를 관리하기 때문이다. 처음 생성시 크기를 고정해두고 어디에 넣고 뺄지 인덱스로 조절하므로 요소 이동이 없다.
구현 코드를 살펴보면,
삽입시 putIndex 를 이용해 어디에 요소를 넣을지 결정한다.
삭제시 삭제한 원소는 null 처리를 하고 takeIndex 로 배열을 관리한다. 그래서 중간에 요소가 삭제되어도 ArrayList 처럼 오버헤드가 발생하지 않는다.
LinkedBlockingQueue
LinkedBlockingQueue는 연결 리스트 기반의 BlockingQueue 다.
[LinkedBlockingQueue 의 특징은?]
- 연결 리스트(Linked List) 기반으로 구현된다.
- FIFO 방식으로 동작한다.
- 최대 크기를 지정할 수 있지만, 기본적으로 Integer.MAX_VALUE까지 확장이 가능하다.
- put() 과 take() 가 각각 별도의 ReentrantLock을 사용하여 동기화를 구현한다. (putLock, takeLock)
- put() 과 take() 가 하나의 ReentrantLock 을 사용하는 ArrayBlockingQueue보다 락 경합(Contention) 이 줄어들어 높은 동시성이 필요한 환경에서 처리량이 향상될 수 있다.
[언제 사용하는가?]
- 생산자와 소비자가 동시에 실행되면서 대량의 데이터를 처리해야 하는 경우
- 고정된 크기의 제한이 필요 없거나, 동적으로 크기를 확장해야 하는 경우
LinkedBlockingQueue 는 ExecutorService 의 기본 작업 큐로사용된다.
[LinkedBlockingQueue 는 어떻게 동시성을 보장하는가?]
삽입(offer())과 삭제(take()) 에서 각각 다른 락(ReentrantLock)을 사용하여 동시성을 제어한다.
- putLock.lockInterruptibly()을 사용해 동시성 보장
- 여러 개의 생산자가 동시에 put()을 호출하더라도 putLock을 통해 한 번에 하나의 스레드만 삽입이 가능하다.
- AtomicInteger count를 사용하여 경쟁 상태 방지
- count.get()를 먼저 확인한 후, 락을 잡은 후에도 다시 한 번 확인하여 다른 스레드가 선점한 경우 중복 삽입을 방지한다.
- count.getAndIncrement()를 통해 원자적으로 개수를 증가시킨다.
- signalNotEmpty()를 호출하여 take()(소비자)에게 알림
- c == 0이면 이전에 큐가 비어 있었으므로 소비자 스레드에게 알림을 보내 소비자 쓰레드를 깨운다.
- takeLock.lockInterruptibly()를 사용해 동시성 보장
- 여러 개의 소비자가 동시에 take()를 호출하더라도 takeLock을 통해 한 번에 하나의 스레드만 데이터를 가져올 수 있다.
- 큐가 비어 있으면 notEmpty.await()로 블로킹
- 큐에 요소가 없으면 소비자가 무한 대기하지 않도록 블로킹한다.
- 생산자가 offer()를 호출하면 signalNotEmpty()를 통해 깨운다.
- count.getAndDecrement()를 사용하여 원자적으로 개수 감소
- signalNotFull()을 호출하여 offer()(생산자)에게 알림
- c == capacity인 경우, 즉 이전에 큐가 가득 찬 상태였다면 생산자 스레드를 깨운다.
[LinkedBlockingQueue 에서 삽입, 삭제는 어떻게 이루어지는가?]
LinkedBlockingQueue 내부에서 Node 를 이용해 LinkedList 형태로 구현되어 있다.
따라서 기본적인 LinkedList 기반 Queue 와 마찬가지로, 삽입시 새로운 Node 를 큐 끝에 추가하고 기존 마지막 노드의 next 를 새로운 노드로 연결 후 last 포인터를 새 노드로 바꾼다. 삭제시 head 를 다음 노드로 이동시킨다.
DelayQueue
DelayQueue는 Delayed 요소를 저장하며, 설정된 지연 시간이 만료되어야 꺼낼 수 있는 블로킹 큐다.
[DelayQueue 의 특징은?]
- 요소마다 특정 시간 후에만 꺼낼 수 있도록 설계된 큐로 요소의 getDelay() 값이 0 이하가 되어야 take()가 가능하다.
- 큐에 저장될 요소는 Delayed 인터페이스를 구현해야 한다.
- 내부적으로 PriorityQueue를 사용하여 지연시간에 따라 요소들을 자동 정렬한다.
- 큐에서 즉시 가져오는 것이 불가능하며, 지정된 시간까지 대기해야 한다.
DelayQueue 에서 지연시간은 Delayed 인터페이스의 getDelay() 메서드를 통해 제공된다. 따라서 저장되는 요소는 Delayed 인터페이스를 구현해야 한다.
[언제 사용하는가?]
- 일정 시간이 지난 후에 실행해야 하는 작업을 관리할 때
- 예약된 작업의 실행을 지연해야 할 때
예시 코드
[Producer, Consumer 테스트]
class BlockingQueueTest {
@Test
void 큐가_가득_차면_생산자는_블로킹된다() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean isBlocked = new AtomicBoolean(false);
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);
Thread thread = new Thread(() -> {
try {
queue.put(1);
queue.put(2);
queue.put(3);
latch.countDown(); // put(3)까지 완료됨을 알림
isBlocked.set(true); // put(4)에 도달하면 블로킹 예상
queue.put(4); // 큐 크기가 3이므로 이 시점에서 블로킹되어야 함
isBlocked.set(false); // 만약 블로킹되지 않았다면 false로 설정됨
} catch (InterruptedException e) {
}
});
thread.start();
latch.await(); // 생산자가 put(3)까지 실행될 때까지 기다림
Thread.sleep(500); // put(4)에서 블로킹될 시간을 줌
assertThat(queue).hasSize(3);
assertThat(queue).containsExactly(1, 2, 3);
// 생산자가 실제로 블로킹되었는지 확인
assertThat(isBlocked.get()).isTrue();
thread.interrupt();
thread.join();
}
@Test
void 큐가_비어있으면_소비자는_블로킹된다() throws InterruptedException {
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean isBlocked = new AtomicBoolean(false);
Thread thread = new Thread(() -> {
try {
latch.countDown(); // 소비자 스레드 시작 알림
isBlocked.set(true); // take() 호출 전에 블로킹될 가능성을 높이기 위해 설정
queue.take(); // 큐가 비어 있으므로 블로킹됨
isBlocked.set(false); // 만약 take()가 반환되면 블로킹되지 않았음을 의미
} catch (InterruptedException e) {
}
});
thread.start();
latch.await(); // 소비자가 실행될 때까지 대기
Thread.sleep(500); // 소비자가 블로킹될 시간을 줌
assertThat(isBlocked.get()).isTrue(); // 소비자가 여전히 블로킹 상태인지 확인
thread.interrupt();
thread.join();
}
@Test
void 여러_스레드가_동시에_요소를_추가하고_가져와도_손실이_없다() throws InterruptedException {
int threadCount = 100;
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
ExecutorService producerService = Executors.newFixedThreadPool(threadCount);
ExecutorService consumerService = Executors.newFixedThreadPool(threadCount);
CountDownLatch latch = new CountDownLatch(threadCount*2);
List<Integer> consumedValues = Collections.synchronizedList(new ArrayList<>());
IntStream.rangeClosed(1, threadCount)
.forEach(i ->
producerService.submit(() -> {
try {
queue.put(i);
latch.countDown();
} catch (InterruptedException e) {
Thread.currentThread()
.interrupt();
}
})
);
IntStream.range(0, threadCount)
.forEach(i ->
consumerService.submit(() -> {
try {
Integer value = queue.take();
consumedValues.add(value);
latch.countDown();
} catch (InterruptedException e) {
}
})
);
latch.await();
producerService.shutdownNow();
consumerService.shutdownNow();
assertThat(consumedValues).hasSize(threadCount);
assertThat(new HashSet<>(consumedValues)).containsExactlyInAnyOrderElementsOf(
IntStream.rangeClosed(1, threadCount)
.boxed()
.toList());
assertThat(queue).isEmpty();
}
}
[DelayQueue 사용 예시]
Delayed 인터페이스를 구현한 class 를 만든다.
class DelayedTask implements Delayed {
private final long startTime; // 실행 가능한 시간 (현재 시간 + 지연 시간)
private final String taskName;
public DelayedTask(String taskName, long delay, TimeUnit unit) {
this.taskName = taskName;
this.startTime = System.currentTimeMillis() + unit.toMillis(delay);
}
@Override
public long getDelay(TimeUnit unit) {
long remainingTime = startTime - System.currentTimeMillis();
return unit.convert(remainingTime, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed other) {
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), other.getDelay(TimeUnit.MILLISECONDS));
}
public String getTaskName() {
return taskName;
}
}
큐에 넣고 실행시켜보면
DelayQueue<DelayedTask> queue = new DelayQueue<>();
// 3개의 작업을 서로 다른 지연 시간으로 추가
long start = System.currentTimeMillis();
queue.put(new DelayedTask("작업 1", 3, TimeUnit.SECONDS));
queue.put(new DelayedTask("작업 2", 5, TimeUnit.SECONDS));
queue.put(new DelayedTask("작업 3", 1, TimeUnit.SECONDS));
while (!queue.isEmpty()) {
DelayedTask task = queue.take();
float end = (float) (System.currentTimeMillis() - start) / 1000L;
System.out.println(String.format("%s : %.3f초 후 실행", task.getTaskName(), end));
}
대기시간이 짧은 작업 3, 작업1, 작업2 순으로 지연시간에 따라 대기 후 실행된 것을 확인할 수 있다.
'Java' 카테고리의 다른 글
[Java] synchronized block 이해하고 사용하기 (0) | 2025.03.09 |
---|---|
[Java] 자바 객체의 Lock 과 Monitor 이해하기 (0) | 2025.03.09 |
[Java] Callable, Feature 이해 및 사용예시 (0) | 2025.02.26 |
[Java] 열거형(Enum) 이해하고 사용하기 (1) | 2025.02.22 |
[Java] Thread, Runnable 이해하고 사용하기 (1) | 2025.02.21 |