Java

[Java][Tutorial] 3-3. Collections: Aggregate Operations

noahkim_ 2023. 10. 20. 21:39

1. Pipeline

  • 일련의 데이터 처리 연산들의 연속입니다.

 

source

  • collection, array, generator function, I/O channel

 

2. Stream

  • 데이터의 일련의 연속
  • 데이터를 직접 저장하지 않고, 파이프라인을 통해 소스로부터 요소들을 나릅니다.

 

double average = roster
                    .stream()
                    .filter(p -> p.getGender() == Person.Sex.MALE)
                    .mapToInt(Person::getAge)
                    .average()
                    .getAsDouble();
  • mapToInt() : IntStream 타입의 새로운 Stream을 리턴하는 연산입니다.
  • average() : IntStream의 요소들에 대한 평균값을 계산하는 연산입니다. (OptionalDouble 타입으로 리턴됩니다)

 

Parallelism

  • 주어진 문제를 여러 하위 문제로 분할한 뒤, 여러 하위 문제들을 동시에 해결하고 그 결과를 합치는 방법
  • 각 하위 문제는 별도의 스레드에서 실행됩니다.
  • 충분한 데이터와 프로세서 코어가 있을 때 성능 향상을 기대할 수 있습니다.

 

Parallel Stream
double average = roster
    .parallelStream()
    .filter(p -> p.getGender() == Person.Sex.MALE)
    .mapToInt(Person::getAge)
    .average()
    .getAsDouble();
  • 다중 스레드 환경에서 스레드간의 충돌이나 메모리 일관성 오류가 발생할 수 있습니다.
  • 원본 컬렉션을 변경하지 않는 연산들을 사용할 때, 스레드 안전하지 않은 컬렉션에서도 안전하게 병렬 처리를 구현할 수 있습니다.

 

Concurrent Reduction
ConcurrentMap<Person.Sex, List<Person>> byGender =
    roster
        .parallelStream()
        .collect(
            Collectors.groupingByConcurrent(Person::getGender));
  • ConcurrentReduction을 사용하면 여러 스레드에서 동시에 리덕션 결과를 수정할 수 있습니다.
  • 데이터의 순서를 고려하지 않고 결과를 도출합니다.

 

Ordering
listOfIntegers
    .parallelStream()
    .forEachOrdered(e -> System.out.print(e + " "));
  • 원래의 데이터 순서를 보장하면서 연산을 수행할 수 있습니다.

 

Side Effects
List<String> listOfStrings =
    new ArrayList<>(Arrays.asList("one", "two"));

String concatenatedString = listOfStrings
    .stream()

    // Don't do this! Interference occurs here.
    .peek(s -> listOfStrings.add("three"))

    .reduce((a, b) -> a + " " + b)
    .get();
  •  Interference
    • 파이프라인 실행 중에 스트림 원본을 수정하려 시도할 경우 ConcurrentModificationException이 발생합니다.

 

Stateful Lambda Expressions
  • 파이프라인의 실행 도중, 변할 수 있는 상태에 의존하는 결과를 생성하는 람다표현식을 의미합니다.
  • 스트림 연산에 stateful lambda expression의 사용은 피해야 합니다.

 

intermediate operation

  • 연산을 수행하면서 새로운 스트림을 반환합니다.
    • lazy하게 동작하므로 최종 연산을 호출할 때만 실행됩니다.
  • filter, map, mapToInt, flatMap

 

terminal operation

  • 스트림 파이프라인의 마지막 연산으로, 스트림의 처리를 완료하고 요소들을 조합하여 결과를 리턴합니다.

 

Reduction
  • 여러 스트림의 요소들을 결합하여 하나의 값을 반환하는 최종 연산입니다.

 

Integer totalAgeReduce = roster
    .stream()
    .map(Person::getAge)
    .reduce(0, (a, b) -> a + b);
   
Integer totalAge = roster
    .stream()
    .mapToInt(Person::getAge)
    .sum();
  • reduce
    • 스트림의 요소를 단일 값으로 줄이는 연산입니다.
    • 인자
      • identity : reduction의 초기값입니다.
      • accumulator : 각 스트림 요소를 처리하여 누적된 결과를 만들기 위한 함수입니다.
    • 기본제공 함수
      • average, sum, min, max, count, forEach

 

collect
Averager averageCollect = roster.stream()
    .filter(p -> p.getGender() == Person.Sex.MALE)
    .map(Person::getAge)
    .collect(Averager::new, Averager::accept, Averager::combine);
    
Map<Person.Sex, List<Person>> byGender =
    roster.stream().collect(
    	Collectors.groupingBy(Person::getGender)
    );
  • 스트림 요소들을 특정한 방식으로 처리하여 목적지 컨테이너로 수집
  • 인자
    • supplier: 결과 컨테이너를 생성하는 함수
    • accumulator: 스트림의 각 요소를 결과 컨테이너에 추가하는 함수
    • combiner: 부분적으로 생성된 결과 컨테이너를 병합하는 함수
  • 그룹화
Collectors.groupingBy(
    Person::getGender,                      
    Collectors.mapping(Person::getName, Collectors.toList())));
        
Collectors.groupingBy(
    Person::getGender,                      
    Collectors.reducing(0, Person::getAge, Integer::sum)));

Collectors.groupingBy(
    Person::getGender,                      
    Collectors.averagingInt(Person::getAge)));
  • 스트림의 요소들을 특정 기준에 따라 그룹화하는 API
  • downstream operation
    • 각 그룹에 대한 추가연산을 수행
  • multi-level operation
    • 각 그룹에 대해 추가적으로 연산을 수행하여 최종 결과를 도출
    • mapping(), reducing(), average()

 

2. Differences Between Aggregate Operations and Iterators

내부반복 사용

  • 반복을 컬렉션 내부에 위임하므로 손쉽게 병렬연산이 가능합니다.

 

요소 가공

  • 직접 컬렉션 대신 스트림의 요소로 데이터를 처리합니다.

 

파라미터에 메서드를 전달하여 집계연산

  • 집계 연산의 커스터마이징이 가능합니다.