ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Java 자바] 12. 멀티 스레드 ④ 스레드풀
    Programming/Java 2022. 9. 12. 20:08
    반응형

    12.9 스레드 풀

    - 병렬 작업 폭증으로 인한 스레드의 폭증을 막기 위해 사용

    - 작업 처리에 사용되는 스레드를 제한된 개수만큼 정해 놓고 작업 큐(Queue)에 들어오는 작업들을 하나씩 스레드가 처리

    - 작업 처리가 끝난 스레드는 다시 작업 큐에서 새로운 작업을 가져와 처리

    - 작업 처리 요청이 폭증하여도 스레드 전체 개수가 늘어나지 않아 성능의 급격한 저하 방지

    12-9-1. 스레드풀 생성 및 종료

    [ 스레드풀 생성 ]

    - ExecutorService 구현 객체(스레드풀)는 Executors 클래스의 두 가지 메소드 중 하나로 생성 가능

    메소드명 (매개 변수) 초기 스레드 수 코어 스레드 수 최대 스레드 수
    newCachedThreadPool() 0 0 Integer.MAX_VALUE
    newFixedThreadPool(int nThreads) 0 nThreads(매개값) nThreads(매개값)

    - 초기 스레드 수: ExecutorService 객체가 생성될 때 기본적으로 생성되는 스레드 수

    - 코어 스레드 수: 스레드 수가 증가된 후 사용되지 않는 스레드를 스레드풀에서 제거할 때 최소한 유지해야 할 스레드 수

    - 최대 스레드 수: 스레드풀에서 관리하는 최대 스레드 수

    // 1. newCachedThreadPool() 호출
    ExecutorService executorService = Executors.newCachedThreadPool();
    
    // 2. newFixedThreadPool(int nThreads) 호출
    ExecutorService executorService = Executors.newFixedThreadPool(
      Runtime.getRuntime().availableProcessors()
    };
    
    // 3. ThreadPoolExecutor 객체 생성: 직접 코어스레드와 최대스레드 개수를 설정하고 싶을 경우
    ExecutorService threadPool = new ThreadPoolExecutor(
      3,					// 코어 스레드 개수
      100,					// 최대 스레드 개수
      120L,					// 놀고 있는 시간
      TimeUnit.SECONDS,			// 놀고 있는 시간 단위
      new SynchronousQueue<Runnable>()	// 작업 큐
    );

    [ 스레드풀 종료 ]

    - 스레드풀은 main 스레드가 종료되더라도 작업을 처리하기 위해 계속 실행 상태로 남아 있음 => 프로세스가 종료되지 않음

    - 애플리케이션을 종료하려면 스레드풀을 종료시켜 스레드들이 종료 상태가 되도록 처리해줘야 함

    리턴 타입 메소드명(매개 변수) 설명
    void shutdown() 현재 처리 중인 작업뿐만 아니라 작업 큐에 대기하고 있는 모든 작업을 처리 후 스레드풀 종료
    List<Runnable> shutdownNow() 현재 작업 처리 중인 스레드를 interrupt해서 작업 중지 시도하고 스레드풀 종료, 리턴 값은 작업 큐에 있는 미처리된 작업(Runnable) 목록
    boolean awaitTermination
    (long timeout,
    TimeUnit unit)
    shutdown() 메소드 호출 이후, 모든 작업 처리를 timeout 시간 내에 완료하면 true 리턴, 완료하지 못하면 작업 처리 중인 스레드를 interrupt하고 false 리턴
    executorService.shutdown();
    executorService.shutdownNow();

    12-9-2. 작업 생성과 처리 요청

    [ 작업 생성 ]

    - 하나의 작업은 Runnable 또는 Callable 구현 클래스로 표현

    // 1. Runnable 구현 클래스
    Runnable task = new Runnable() {
      @Override
      public void run() {				// 리턴 값 없음
        // 스레드가 처리할 작업 내용
      }
    }
    
    // 2. Callable 구현 클래스
    Callable<T> task = new Callable<T>() {
      @Override
      public T call() throws Exception {		// implements Callable<T>에서 지정한 T타입 반환
      // 스레드가 처리할 작업 내용
        return T;
      }
    }

    [ 작업 처리 요청 ]

    - ExecutorService의 작업 큐에 Runnable 또는 Callable 객체를 넣는 행위

    리턴 타입 메소드명(매개 변수) 설명
    void execute(Runnable command) - Runnable을 작업 큐에 저장
    - 작업 처리 결과를 받지 못함
    - 예외 발생시 스레드가 종료되고 해당 스레드 스레드풀에서 제거
    Future<?>
    Future<V>
    Future<V>
    submit(Runnable task)
    submit(Runnable task, V result)
    submit(Callable<V> task)
    - Runnable 또는 Callable을 작업 큐에 저장
    - 리턴된 Future를 통해 작업 처리 결과를 얻을 수 있음
    - 예외가 발생해도 스레드는 종료되지 않고 다음 작업을 위해 재사용
    (스레드의 생성 오버헤더를 줄이기 위해 submit() 사용이 좋음)

    12-9-3. 블로킹 방식의 작업 완료 통보

    - submit() 메소드는 매개값으로 준 Runnable 또는 Callable 작업을 스레드풀의 작업 큐에 저장하고 즉시 Future 객체를 리턴

    리턴 타입 메소드명(매개 변수) 설명
    Future<?>
    Future<V>
    Future<V>
    submit(Runnable task)
    submit(Runnable task, V result)
    submit(Callable<V> task)
    - Runnable 또는 Callable을 작업 큐에 저장
    - 리턴된 Future를 통해 작업 처리 결과를 얻을 수 있음
    - 예외가 발생해도 스레드는 종료되지 않고 다음 작업을 위해 재사용
    (스레드의 생성 오버헤더를 줄이기 위해 submit() 사용이 좋음)

    - Future 객체는 작업이 완료될 때까지 기다렸다가(블로킹되었다가) 최종 결과를 얻는데 사용

      => 지연 완료 (pending completion) 객체라고도 함

    - Future의 get() 메소드: 스레드가 작업을 완료할 때까지 블로킹되었다가 작업 완료시 처리 결과 리턴

    리턴 타입 메소드명(매개 변수) 설명
    V get() 작업이 완료될때까지 블로킹되었다가 처리결과 V를 리턴
    V get(long timeout, TimeUnit unit) timeout 시간 전에 작업이 완료되면 결과 V를 리턴,
    작업이 완료되지 않으면 TimeoutException 발생

    - 리턴 타입 V는 submit의 두번째 매개값인 V 타입이거나 Callable 타입 파라미터 V 타입

    메소드 작업 처리 완료 후 리턴 타입 작업 처리 도중 예외 발생
    submit(Runnable task) future.get() -> null future.get() -> 예외 발생
    submit(Runnable task, Integer result) future.get() -> int 타입 값  future.get() -> 예외 발생
    submit(Callable<String> task) future.get() -> String 타입 값 future.get() -> 예외 발생

    - Future를 이용한 블로킹 방식의 경우, 스레드가 작업을 완료하기 전까지는 get() 메소드가 블로킹되므로 다른 코드 실행 불가
    - get() 메소드를 호출하는 스레드는 새로운 스레드이거나 스레드풀의 또 다른 스레드여야 함

    // 1. 새로운 스레드 생성해서 호출
    new Thread(new Runnable() {
      @Override
      public void run() {
        try {
          future.get();
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }).start();
    
    // 2. 스레드풀의 스레드가 호출
    executorService.submit(new Runnable() {
      @Override
      public void run() {
        try {
          future.get();
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    });

    - Future 객체의 다른 메소드

    리턴 타입 메소드명(매개 변수) 설명
    boolean cancel(boolean mayInterruptIfRunning) 작업 처리가 진행 중일 경우 취소 시킴
    boolean isCancelled() 작업이 취소되었는지 여부
    boolean isDone() 작업 처리가 완료되었는지 여부

    [ 리턴값이 없는 작업 완료 통보 ]

    - 리턴값이 없는 작업일 경우 Runnable 객체로 생성

    Runnable task = new Runnable() {
      @Override
      public void run() {
        // 스레드가 처리할 작업 내용
      }
    };
    
    // 결과값이 없는 작업 처리요청
    Future future = executorService.submit(task);
    
    // 작업처리가 정상적으로 완료되면 null 리턴, 그렇지 못하면 예외 발생
    try {
      future.get();
    } catch (InterruptedException e) {
      // 작업 처리 도중 스레드가 interrupt 될 경우 실행할 코드
    } catch (ExecutionException e) {
      // 작업 처리 도중 예외가 발생된 경우 실행할 코드
    }

    [ 리턴값이 있는 작업 완료 통보 ]

    - 작업 완료 후에 애플리케이션이 처리 결과를 얻어야 된다면 작업 객체를 Callable로 생성

    - 제네릭 타입 파라미터 T는 call() 메소드가 리턴하는 타입이 되도록 함

    Callable<T> task = new Callable<T>() {
      @Override
      public T call() throws Exception {
        // 스레드가 처리할 작업 내용
        return T;
      }
    };
    
    // 작업 처리 요청은 submit() 메소드 호출
    // submit() 메소드는 작업 큐에 Callable 객체를 저장하고 즉시 Future<T>를 리턴
    Future<T> future = executorService.submit(task);
    
    // 스레드풀의 스레드가 Callable 객체의 call() 메소드를 모두 실행하고 T타입 값을 리턴하면,
    // Future<T>의 get() 메소드는 블로킹이 해제되고 T타입 값을 리턴
    try {
      T result = future.get();
    } catch (InterruptedException e) {
      // 작업 처리 도중 스레드가 Interrupt 될 경우 실행할 코드
    } catch (ExecutionException e) {
      // 작업 처리 도중 예외가 발생된 경우 실행할 코드
    }

    [ 작업 처리 결과를 외부 객체에 저장 ]

    - 대개 외부 Result 객체에 두 개 이상의 스레드 작업을 취합할 목적으로 이용

    class Task implements Runnable {
      Result result;
      Task(Result result) { this.result = result; }
      @Override
      public void run() {
        // 작업 코드
        // 처리 결과를 result에 저장
      }
      
      Result result = new Result();
      Runnable task1 = new Task(result);
      Runnable task2 = new Task(result);
      Future<Result> future1 = executorService.submit(task1, result);
      Future<Result> future2 = executorService.submit(task2, result);
      
      try {
        result = future1.get();
        result = future2.get();
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
    
    class Result {
      // 처리 결과를 저장할 Result 클래스
    }

    [ 작업 완료 순으로 통보 ]

    - 작업 양과 스레드 스케줄링에 따라 먼저 요청한 작업이 나중에 완료될 수 있음

    - CompletionService를 이용해 처리 완료된 작업만 통보받을 수 있음 

    리턴 타입 메소드명 (매개 변수) 설명
    Future<V> poll() 완료된 작업의 Future를 가져옴
    완료된 작업이 없다면 즉시 null 반환
    Future<V> poll(long timeout, TimeUnit unit) 완료된 작업의 Future를 가져옴
    완료된 작없이 없다면 timeout까지 블로킹됨
    Future<V> take() 완료된 작업의 Future를 가져옴
    완료된 작없이 없다면 있을 때까지 블로킹됨
    Future<V> submit(Callable<V> task) 스레드풀에 Callable 작업 처리 요청
    Future<V> submit(Runnable task, V result) 스레드풀에 Runnable 작업 처리 요청
    // CompletionService 구현: 생성자 매개값으로 ExecutorService제공
    ExecutorService executorService = Executors.newFixedThreadPool(
      Runtime.getRuntime().availableProcessors()
    );
    CompletionService<V> completionService = new ExecutorCompletionService<V> (
      executorService
    );
    
    // 완료된 작업의 Future를 얻는 법
    completionService.submit(Callable<V> task);
    completionService.submit(Runnable task, V result);
    
    // Callable 작업이 있을 때까지 블로킹되었다가 
    // 완료된 작업의 Future를 얻고, get()메소드로 결과값을 얻어내는 코드
    executorService.submit(new Runnable() {
      @Override
      public void run() {
        while(true) {
          try {
            Future<Integer> future = completionService.take();
            int value = future.get();
          } catch (Exception e) {
            break;
          }
        }
      }
    });

    12-9-4. 콜백 방식의 작업 완료 통보

    - 콜백: 애플리케이션이 스레드에게 작업 처리를 요청한 후, 스레드가 작업을 완료하면 특정 메소드를 자동 실행하는 기법

    - 블로킹 방식은 작업 처리를 요청한 후 작업이 완료될 때까지 블로킹 되지만, 

       콜백 방식은 작업 처리 요청 후 결과를 기다릴 필요 없이 다른 기능 수행 가능(콜백 메소드가 자동으로 실행되기 때문)

    - Runnable 구현 클래스 작성시 콜백 기능을 구현하거나, CompletionHandler를 이용해 콜백 객체 생성 가능

    // CompletionHandler 객체 생성 코드
    // V : 결과값의 타입, A : 첨부값의 타입(없다면 void)
    CompletionHandler<V, A> callback = new CompletionHandler<V, A>() {
      @Override
      public void completed(V result, A attachment) {
        // 작업을 정상 처리 완료했을 때 호출되는 콜백 메소드
      }
      @Override
      public void failed(Throwable exc, A attachment) {
        // 작업 처리 도중 예외가 발생했을 때 호출되는 콜백 메소드
      }
    };
    
    // 작업 처리 결과에 따라 콜백 메소드를 호출하는 Runnable 객체
    Runnable task = new Runnable() {
      @Override
      public void run() {
        try{
          // 작업 처리
          V result = ...;
          callback.completed(result, null);
        } catch(Exception e) {
          callback.failed(e, null);
        }
      }
    };
    반응형

    댓글

Designed by Tistory.