1. Pipeline
- 일련의 데이터 처리 연산들의 연속입니다.
source
- collection, array, generator function, I/O channel
2. Stream
- 데이터의 일련의 연속
- 데이터를 직접 저장하지 않고, 파이프라인을 통해 소스로부터 요소들을 나릅니다.
double average = roster
.stream()
.filter(p -> p.getGender() == Person.Sex.MALE)
.mapToInt(Person::getAge)
.average()
.getAsDouble();
- mapToInt() : IntStream 타입의 새로운 Stream을 리턴하는 연산입니다.
- average() : IntStream의 요소들에 대한 평균값을 계산하는 연산입니다. (OptionalDouble 타입으로 리턴됩니다)
Parallelism
- 주어진 문제를 여러 하위 문제로 분할한 뒤, 여러 하위 문제들을 동시에 해결하고 그 결과를 합치는 방법
- 각 하위 문제는 별도의 스레드에서 실행됩니다.
- 충분한 데이터와 프로세서 코어가 있을 때 성능 향상을 기대할 수 있습니다.
Parallel Stream
double average = roster
.parallelStream()
.filter(p -> p.getGender() == Person.Sex.MALE)
.mapToInt(Person::getAge)
.average()
.getAsDouble();
- 다중 스레드 환경에서 스레드간의 충돌이나 메모리 일관성 오류가 발생할 수 있습니다.
- 원본 컬렉션을 변경하지 않는 연산들을 사용할 때, 스레드 안전하지 않은 컬렉션에서도 안전하게 병렬 처리를 구현할 수 있습니다.
Concurrent Reduction
ConcurrentMap<Person.Sex, List<Person>> byGender =
roster
.parallelStream()
.collect(
Collectors.groupingByConcurrent(Person::getGender));
- ConcurrentReduction을 사용하면 여러 스레드에서 동시에 리덕션 결과를 수정할 수 있습니다.
- 데이터의 순서를 고려하지 않고 결과를 도출합니다.
Ordering
listOfIntegers
.parallelStream()
.forEachOrdered(e -> System.out.print(e + " "));
- 원래의 데이터 순서를 보장하면서 연산을 수행할 수 있습니다.
Side Effects
List<String> listOfStrings =
new ArrayList<>(Arrays.asList("one", "two"));
String concatenatedString = listOfStrings
.stream()
// Don't do this! Interference occurs here.
.peek(s -> listOfStrings.add("three"))
.reduce((a, b) -> a + " " + b)
.get();
- Interference
- 파이프라인 실행 중에 스트림 원본을 수정하려 시도할 경우 ConcurrentModificationException이 발생합니다.
Stateful Lambda Expressions
- 파이프라인의 실행 도중, 변할 수 있는 상태에 의존하는 결과를 생성하는 람다표현식을 의미합니다.
- 스트림 연산에 stateful lambda expression의 사용은 피해야 합니다.
intermediate operation
- 연산을 수행하면서 새로운 스트림을 반환합니다.
- lazy하게 동작하므로 최종 연산을 호출할 때만 실행됩니다.
- filter, map, mapToInt, flatMap
terminal operation
- 스트림 파이프라인의 마지막 연산으로, 스트림의 처리를 완료하고 요소들을 조합하여 결과를 리턴합니다.
Reduction
- 여러 스트림의 요소들을 결합하여 하나의 값을 반환하는 최종 연산입니다.
Integer totalAgeReduce = roster
.stream()
.map(Person::getAge)
.reduce(0, (a, b) -> a + b);
Integer totalAge = roster
.stream()
.mapToInt(Person::getAge)
.sum();
- reduce
- 스트림의 요소를 단일 값으로 줄이는 연산입니다.
- 인자
- identity : reduction의 초기값입니다.
- accumulator : 각 스트림 요소를 처리하여 누적된 결과를 만들기 위한 함수입니다.
- 기본제공 함수
- average, sum, min, max, count, forEach
collect
Averager averageCollect = roster.stream()
.filter(p -> p.getGender() == Person.Sex.MALE)
.map(Person::getAge)
.collect(Averager::new, Averager::accept, Averager::combine);
Map<Person.Sex, List<Person>> byGender =
roster.stream().collect(
Collectors.groupingBy(Person::getGender)
);
- 스트림 요소들을 특정한 방식으로 처리하여 목적지 컨테이너로 수집
- 인자
- supplier: 결과 컨테이너를 생성하는 함수
- accumulator: 스트림의 각 요소를 결과 컨테이너에 추가하는 함수
- combiner: 부분적으로 생성된 결과 컨테이너를 병합하는 함수
- 그룹화
Collectors.groupingBy(
Person::getGender,
Collectors.mapping(Person::getName, Collectors.toList())));
Collectors.groupingBy(
Person::getGender,
Collectors.reducing(0, Person::getAge, Integer::sum)));
Collectors.groupingBy(
Person::getGender,
Collectors.averagingInt(Person::getAge)));
- 스트림의 요소들을 특정 기준에 따라 그룹화하는 API
- downstream operation
- 각 그룹에 대한 추가연산을 수행
- multi-level operation
- 각 그룹에 대해 추가적으로 연산을 수행하여 최종 결과를 도출
- mapping(), reducing(), average()
2. Differences Between Aggregate Operations and Iterators
내부반복 사용
- 반복을 컬렉션 내부에 위임하므로 손쉽게 병렬연산이 가능합니다.
요소 가공
- 직접 컬렉션 대신 스트림의 요소로 데이터를 처리합니다.
파라미터에 메서드를 전달하여 집계연산
- 집계 연산의 커스터마이징이 가능합니다.
'Java' 카테고리의 다른 글
[Java의 정석] 13-1. 스레드: 프로세스와 스레드 (0) | 2023.11.27 |
---|---|
[Java] Stream (2) | 2023.10.22 |
[Java][Tutorial] 3-2. Collections: Queue, Deque, Map (0) | 2023.10.15 |
[Java][Tutorial] 3-1. Collections: Collection, Set, List (2) | 2023.10.15 |
[Java][Tutorial] 2-1. Essential Java Classes: Exceptions (0) | 2023.10.15 |