One of the most powerful features of the Java 8 Streams provides an easier path to parallel programming. However, in some cases, parallel stream may make programs run slower in comparison with the serial stream, including for loop to iterate over a collection. I put some common mistakes when using the Java 8 parallel stream to warn myself how to use it correctly and effectively.
1. When to use parallel streams
- A problem could be spitted smaller and independent tasks that are solved in parallel, follow Fork/Join Framework proposal.
- According to Doug Lea, computation is longer than typically 100 microseconds. In practice, we measure this by M * Q >= 10.000 (N: the number of elements and Q: cost per element).
For example: an element is a function like (x, y) -> x + y, so N >=10.000 elements for parallel execution is worthwhile. Otherwise it does not benefit. - Two efficient scenarios when using parallel stream: parallelizing waiting tasks, and parallelizing intensive computing tasks on an otherwise idling machine with more than one processor. (Refer to Pierre-Yves Saumont cited). So no need to use parallel stream if a machine has only one processor.
2. Parallel vs Sequence in the entire pipeline.
Sometime, we try to switch between parallel and sequence in the middle:
Collection.stream() .filter(…) .parallel() .map(…) .sequence (…); .collect(…);
The rule here is “last one wins” that means entire stream will run sequentially.
3. Be careful with nested parallel streams.
Nested parallel streams result in out of memory because outer parallel stream could exhaust ForkJoin workers. Following code below creates two dimensions array, which throws exception:
java.lang.OutOfMemoryError: Java heap space
public static void main (String[] args) { int[][] result = new int[10000][10000]; IntStream.range(0, 10_000).parallel() .forEach(i -> { result[i][0] = (int) Math.round(Math.random() * i * 1000); IntStream.range(0, 10000).parallel() .forEach((int j) -> result[i][j] = (int) Math.round(Math.random() * j * 1000)); }); }
4. Excess of auto (un) boxing.
Looking at the following code to see outboxing and boxing of Integer in every filter call so that it affects performance of a program.
List<Integer> even = numbers. parallelStream() .filter(n -> n % 2 == 0) .sorted() .collect(toList());
Hence, the code should avoid unnecessary autoboxing. Fixed it by convert iterator to primitive, and then boxing to return of collection.
List<Integer> even = numbers. parallelStream() .mapToInt(n -> n) // convert to primitive .filter(n ->n % 2 == 0) .sorted() .boxed() // boxing .collect(toList());
5. Distinguish between FindFirst and FindAny.
Consider parallelStream().findFirst()
- “First” means first in encounters order.
- Parallel can find a matching element quickly.
- But still have to search space to the left to ensure it is first.
Meanwhile, parallelStream().findAny()
- Parallel can find a matching element quickly.
- And it’s done. Other threads give up and we found the solution already.
Therefore, findFirst is not free because it have to guarantee the first element in order.
6. Reduction: Identity must be an immutable value.
Identity value start value of each partition of a parallel reduction and become the result if there are no value in stream. Don’t use mutable for identity value.
7. Reduction functions must be associative.
A functional is associative if different grouping of operands don’t affect the result, which means elementary arithmetic: (a + b) + c = a + (b + c)
static List<String> letters = Arrays.asList("a", "b","c","d","e", "f", "g", "h"); public static void main(String[] args) { /** Sequence stream and Reduction function: (x, y) -> x + y * Output: abcdefgh */ System.out.println(letters.stream() .parallel() .reduce("", (x, y) -> x + y) ); /** Sequence stream and Reduction function:(x, y) -> y + x + y * Output: hgfedcbaabcdefgh */ System.out.println(letters.stream() .reduce("", (x, y) -> y + x + y) ); /** Parallel stream and Reduction function: (x, y) -> y + x + y) * Output: hhgghhffeeffhhgghhddccddbbaabbddccddhhgghhffeeffhhgghh */ System.out.println(letters.stream() .parallel() .reduce("", (x, y) -> y + x + y) ); }
Surely, reduction function (x, y) -> y + x + y is not associative.
8. Blocking for IO resource.
Using parallel stream for I/O based resource is not perform well because a source is hardly splittable, data read sequential, and blocking I/O. For example, as can be seen the code below, we use parallel stream to check URLs. Java stream invoked Fork/Join Framework to create a limited number of threads (workers) in Common Pool. If the firsts URLs get stuck on a ConnectionTimeOut, you have to wait CPU cycle in Common Pool. As result, performance could be affected.
import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; import java.util.stream.Stream; import static java.util.stream.Collectors.toList; public class IOParallelCheck { public static void main (String[] args) { try (Stream<String> urls = Files.lines(Paths.get("urls.txt"))) { List<String> errors = urls.parallel() .filter(url-> checkURL(url)).collect(toList()); }catch (IOException e) { e.printStackTrace(); } } /** * Connect to URL and wait for 200 response or timeout. * Returning false or true */ private static boolean checkURL(String url) { // TODO particular implementation here. return true; }
Summary.
Be careful once using Java 8 Parallel Stream.
- Elements processing is independent, source is well splittable
- Avoid mutation, interference, order dependency, side effects.
- Don’t add contention or synchronization.
References:
- Stuart Marks, he is a Principal Member of Technical Staff in the Java Platform Group at Oracle. He is currently working on enhancing the core libraries of the JDK. Java One
- David Gomez G., Parallel Stream in Java 8. Voxxed day tutorial
- Erik Bamberg, Parallel stream processing in Java 8 – performance of sequential vs. parallel stream processing
- Doug Lea’s, Doug Lea’s Home Page