[Java 자바] 12. 멀티 스레드 ④ 스레드풀
12.9 스레드 풀
- 병렬 작업 폭증으로 인한 스레드의 폭증을 막기 위해 사용
- 작업 처리에 사용되는 스레드를 제한된 개수만큼 정해 놓고 작업 큐(Queue)에 들어오는 작업들을 하나씩 스레드가 처리
- 작업 처리가 끝난 스레드는 다시 작업 큐에서 새로운 작업을 가져와 처리
- 작업 처리 요청이 폭증하여도 스레드 전체 개수가 늘어나지 않아 성능의 급격한 저하 방지
12-9-1. 스레드풀 생성 및 종료
[ 스레드풀 생성 ]
- ExecutorService 구현 객체(스레드풀)는 Executors 클래스의 두 가지 메소드 중 하나로 생성 가능
메소드명 (매개 변수) | 초기 스레드 수 | 코어 스레드 수 | 최대 스레드 수 |
newCachedThreadPool() | 0 | 0 | Integer.MAX_VALUE |
newFixedThreadPool(int nThreads) | 0 | nThreads(매개값) | nThreads(매개값) |
- 초기 스레드 수: ExecutorService 객체가 생성될 때 기본적으로 생성되는 스레드 수
- 코어 스레드 수: 스레드 수가 증가된 후 사용되지 않는 스레드를 스레드풀에서 제거할 때 최소한 유지해야 할 스레드 수
- 최대 스레드 수: 스레드풀에서 관리하는 최대 스레드 수
// 1. newCachedThreadPool() 호출
ExecutorService executorService = Executors.newCachedThreadPool();
// 2. newFixedThreadPool(int nThreads) 호출
ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
};
// 3. ThreadPoolExecutor 객체 생성: 직접 코어스레드와 최대스레드 개수를 설정하고 싶을 경우
ExecutorService threadPool = new ThreadPoolExecutor(
3, // 코어 스레드 개수
100, // 최대 스레드 개수
120L, // 놀고 있는 시간
TimeUnit.SECONDS, // 놀고 있는 시간 단위
new SynchronousQueue<Runnable>() // 작업 큐
);
[ 스레드풀 종료 ]
- 스레드풀은 main 스레드가 종료되더라도 작업을 처리하기 위해 계속 실행 상태로 남아 있음 => 프로세스가 종료되지 않음
- 애플리케이션을 종료하려면 스레드풀을 종료시켜 스레드들이 종료 상태가 되도록 처리해줘야 함
리턴 타입 | 메소드명(매개 변수) | 설명 |
void | shutdown() | 현재 처리 중인 작업뿐만 아니라 작업 큐에 대기하고 있는 모든 작업을 처리 후 스레드풀 종료 |
List<Runnable> | shutdownNow() | 현재 작업 처리 중인 스레드를 interrupt해서 작업 중지 시도하고 스레드풀 종료, 리턴 값은 작업 큐에 있는 미처리된 작업(Runnable) 목록 |
boolean | awaitTermination (long timeout, TimeUnit unit) |
shutdown() 메소드 호출 이후, 모든 작업 처리를 timeout 시간 내에 완료하면 true 리턴, 완료하지 못하면 작업 처리 중인 스레드를 interrupt하고 false 리턴 |
executorService.shutdown();
executorService.shutdownNow();
12-9-2. 작업 생성과 처리 요청
[ 작업 생성 ]
- 하나의 작업은 Runnable 또는 Callable 구현 클래스로 표현
// 1. Runnable 구현 클래스
Runnable task = new Runnable() {
@Override
public void run() { // 리턴 값 없음
// 스레드가 처리할 작업 내용
}
}
// 2. Callable 구현 클래스
Callable<T> task = new Callable<T>() {
@Override
public T call() throws Exception { // implements Callable<T>에서 지정한 T타입 반환
// 스레드가 처리할 작업 내용
return T;
}
}
[ 작업 처리 요청 ]
- ExecutorService의 작업 큐에 Runnable 또는 Callable 객체를 넣는 행위
리턴 타입 | 메소드명(매개 변수) | 설명 |
void | execute(Runnable command) | - Runnable을 작업 큐에 저장 - 작업 처리 결과를 받지 못함 - 예외 발생시 스레드가 종료되고 해당 스레드 스레드풀에서 제거 |
Future<?> Future<V> Future<V> |
submit(Runnable task) submit(Runnable task, V result) submit(Callable<V> task) |
- Runnable 또는 Callable을 작업 큐에 저장 - 리턴된 Future를 통해 작업 처리 결과를 얻을 수 있음 - 예외가 발생해도 스레드는 종료되지 않고 다음 작업을 위해 재사용 (스레드의 생성 오버헤더를 줄이기 위해 submit() 사용이 좋음) |
12-9-3. 블로킹 방식의 작업 완료 통보
- submit() 메소드는 매개값으로 준 Runnable 또는 Callable 작업을 스레드풀의 작업 큐에 저장하고 즉시 Future 객체를 리턴
리턴 타입 | 메소드명(매개 변수) | 설명 |
Future<?> Future<V> Future<V> |
submit(Runnable task) submit(Runnable task, V result) submit(Callable<V> task) |
- Runnable 또는 Callable을 작업 큐에 저장 - 리턴된 Future를 통해 작업 처리 결과를 얻을 수 있음 - 예외가 발생해도 스레드는 종료되지 않고 다음 작업을 위해 재사용 (스레드의 생성 오버헤더를 줄이기 위해 submit() 사용이 좋음) |
- Future 객체는 작업이 완료될 때까지 기다렸다가(블로킹되었다가) 최종 결과를 얻는데 사용
=> 지연 완료 (pending completion) 객체라고도 함
- Future의 get() 메소드: 스레드가 작업을 완료할 때까지 블로킹되었다가 작업 완료시 처리 결과 리턴
리턴 타입 | 메소드명(매개 변수) | 설명 |
V | get() | 작업이 완료될때까지 블로킹되었다가 처리결과 V를 리턴 |
V | get(long timeout, TimeUnit unit) | timeout 시간 전에 작업이 완료되면 결과 V를 리턴, 작업이 완료되지 않으면 TimeoutException 발생 |
- 리턴 타입 V는 submit의 두번째 매개값인 V 타입이거나 Callable 타입 파라미터 V 타입
메소드 | 작업 처리 완료 후 리턴 타입 | 작업 처리 도중 예외 발생 |
submit(Runnable task) | future.get() -> null | future.get() -> 예외 발생 |
submit(Runnable task, Integer result) | future.get() -> int 타입 값 | future.get() -> 예외 발생 |
submit(Callable<String> task) | future.get() -> String 타입 값 | future.get() -> 예외 발생 |
- Future를 이용한 블로킹 방식의 경우, 스레드가 작업을 완료하기 전까지는 get() 메소드가 블로킹되므로 다른 코드 실행 불가
- get() 메소드를 호출하는 스레드는 새로운 스레드이거나 스레드풀의 또 다른 스레드여야 함
// 1. 새로운 스레드 생성해서 호출
new Thread(new Runnable() {
@Override
public void run() {
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
// 2. 스레드풀의 스레드가 호출
executorService.submit(new Runnable() {
@Override
public void run() {
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
}
});
- Future 객체의 다른 메소드
리턴 타입 | 메소드명(매개 변수) | 설명 |
boolean | cancel(boolean mayInterruptIfRunning) | 작업 처리가 진행 중일 경우 취소 시킴 |
boolean | isCancelled() | 작업이 취소되었는지 여부 |
boolean | isDone() | 작업 처리가 완료되었는지 여부 |
[ 리턴값이 없는 작업 완료 통보 ]
- 리턴값이 없는 작업일 경우 Runnable 객체로 생성
Runnable task = new Runnable() {
@Override
public void run() {
// 스레드가 처리할 작업 내용
}
};
// 결과값이 없는 작업 처리요청
Future future = executorService.submit(task);
// 작업처리가 정상적으로 완료되면 null 리턴, 그렇지 못하면 예외 발생
try {
future.get();
} catch (InterruptedException e) {
// 작업 처리 도중 스레드가 interrupt 될 경우 실행할 코드
} catch (ExecutionException e) {
// 작업 처리 도중 예외가 발생된 경우 실행할 코드
}
[ 리턴값이 있는 작업 완료 통보 ]
- 작업 완료 후에 애플리케이션이 처리 결과를 얻어야 된다면 작업 객체를 Callable로 생성
- 제네릭 타입 파라미터 T는 call() 메소드가 리턴하는 타입이 되도록 함
Callable<T> task = new Callable<T>() {
@Override
public T call() throws Exception {
// 스레드가 처리할 작업 내용
return T;
}
};
// 작업 처리 요청은 submit() 메소드 호출
// submit() 메소드는 작업 큐에 Callable 객체를 저장하고 즉시 Future<T>를 리턴
Future<T> future = executorService.submit(task);
// 스레드풀의 스레드가 Callable 객체의 call() 메소드를 모두 실행하고 T타입 값을 리턴하면,
// Future<T>의 get() 메소드는 블로킹이 해제되고 T타입 값을 리턴
try {
T result = future.get();
} catch (InterruptedException e) {
// 작업 처리 도중 스레드가 Interrupt 될 경우 실행할 코드
} catch (ExecutionException e) {
// 작업 처리 도중 예외가 발생된 경우 실행할 코드
}
[ 작업 처리 결과를 외부 객체에 저장 ]
- 대개 외부 Result 객체에 두 개 이상의 스레드 작업을 취합할 목적으로 이용
class Task implements Runnable {
Result result;
Task(Result result) { this.result = result; }
@Override
public void run() {
// 작업 코드
// 처리 결과를 result에 저장
}
Result result = new Result();
Runnable task1 = new Task(result);
Runnable task2 = new Task(result);
Future<Result> future1 = executorService.submit(task1, result);
Future<Result> future2 = executorService.submit(task2, result);
try {
result = future1.get();
result = future2.get();
} catch (Exception e) {
e.printStackTrace();
}
}
class Result {
// 처리 결과를 저장할 Result 클래스
}
[ 작업 완료 순으로 통보 ]
- 작업 양과 스레드 스케줄링에 따라 먼저 요청한 작업이 나중에 완료될 수 있음
- CompletionService를 이용해 처리 완료된 작업만 통보받을 수 있음
리턴 타입 | 메소드명 (매개 변수) | 설명 |
Future<V> | poll() | 완료된 작업의 Future를 가져옴 완료된 작업이 없다면 즉시 null 반환 |
Future<V> | poll(long timeout, TimeUnit unit) | 완료된 작업의 Future를 가져옴 완료된 작없이 없다면 timeout까지 블로킹됨 |
Future<V> | take() | 완료된 작업의 Future를 가져옴 완료된 작없이 없다면 있을 때까지 블로킹됨 |
Future<V> | submit(Callable<V> task) | 스레드풀에 Callable 작업 처리 요청 |
Future<V> | submit(Runnable task, V result) | 스레드풀에 Runnable 작업 처리 요청 |
// CompletionService 구현: 생성자 매개값으로 ExecutorService제공
ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors()
);
CompletionService<V> completionService = new ExecutorCompletionService<V> (
executorService
);
// 완료된 작업의 Future를 얻는 법
completionService.submit(Callable<V> task);
completionService.submit(Runnable task, V result);
// Callable 작업이 있을 때까지 블로킹되었다가
// 완료된 작업의 Future를 얻고, get()메소드로 결과값을 얻어내는 코드
executorService.submit(new Runnable() {
@Override
public void run() {
while(true) {
try {
Future<Integer> future = completionService.take();
int value = future.get();
} catch (Exception e) {
break;
}
}
}
});
12-9-4. 콜백 방식의 작업 완료 통보
- 콜백: 애플리케이션이 스레드에게 작업 처리를 요청한 후, 스레드가 작업을 완료하면 특정 메소드를 자동 실행하는 기법
- 블로킹 방식은 작업 처리를 요청한 후 작업이 완료될 때까지 블로킹 되지만,
콜백 방식은 작업 처리 요청 후 결과를 기다릴 필요 없이 다른 기능 수행 가능(콜백 메소드가 자동으로 실행되기 때문)
- Runnable 구현 클래스 작성시 콜백 기능을 구현하거나, CompletionHandler를 이용해 콜백 객체 생성 가능
// CompletionHandler 객체 생성 코드
// V : 결과값의 타입, A : 첨부값의 타입(없다면 void)
CompletionHandler<V, A> callback = new CompletionHandler<V, A>() {
@Override
public void completed(V result, A attachment) {
// 작업을 정상 처리 완료했을 때 호출되는 콜백 메소드
}
@Override
public void failed(Throwable exc, A attachment) {
// 작업 처리 도중 예외가 발생했을 때 호출되는 콜백 메소드
}
};
// 작업 처리 결과에 따라 콜백 메소드를 호출하는 Runnable 객체
Runnable task = new Runnable() {
@Override
public void run() {
try{
// 작업 처리
V result = ...;
callback.completed(result, null);
} catch(Exception e) {
callback.failed(e, null);
}
}
};