Parallel streams in java 8

dgomezg 10,335 views 42 slides Mar 05, 2015
Slide 1
Slide 1 of 42
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25
Slide 26
26
Slide 27
27
Slide 28
28
Slide 29
29
Slide 30
30
Slide 31
31
Slide 32
32
Slide 33
33
Slide 34
34
Slide 35
35
Slide 36
36
Slide 37
37
Slide 38
38
Slide 39
39
Slide 40
40
Slide 41
41
Slide 42
42

About This Presentation

The slides from the talk I gave at Oracle III #JuevesTecnológicos in Madrid.

A review of how the ParallelStreams Work in Java 8 and some considerations we must know in order to get the better performance from the concurrent data processing in #Java8


Slide Content

ParallelStreams
Concurrent data processing in Java 8
David Gómez G.
@dgomezg
[email protected]

Do you remember?
use stream()
for (int i = 0; i < 100; i++) {

long start = System.currentTimeMillis();

List<Integer> even = numbers.parallelStream() 

.filter(n -> n % 2 == 0)

.sorted() 

.collect( toList());

System.out.printf(
"%d elements computed in %5d msecs with %d threads\ n”,

even.size(), System. currentTimeMillis() - start,
Thread. activeCount());

}
4999299 elements computed in 225 msecs with 9 threads
4999299 elements computed in 230 msecs with 9 threads
4999299 elements computed in 250 msecs with 9 threads
@dgomezg

Previously on…

Streams?
What’s that?

A Stream is…
An convenience method to iterate over
collections in a declarative way
List<Integer>,numbers,=,new,ArrayList<Integer>();

for,(int,i=,0;,i,<,100,;,i++),{

,numbers.add(i);

},
List<Integer> evenNumbers = numbers.stream() 

.filter(n -> n % 2 == 0)

.collect( toList());
@dgomezg

Anatomy of a Stream
Source
Intermediate
Operations
filter
map
order
function
Final
operation
pipeline
@dgomezg

Iterating a Stream
List<Integer> evenNumbers = numbers.stream() 

.filter(n -> n % 2 == 0)

.collect( toList());
Internal Iteration
- No manual Iterators handling
- Concise
- Fluent API: chain sequence processing
Elements computed only when needed
@dgomezg

Iterating a Stream
List<Integer> evenNumbers = numbers.parallelStream() 

.filter(n -> n % 2 == 0)

.collect( toList());
Easily Parallelism
- Concurrency is hard to be done right!
- Uses ForkJoin
- Process steps should be
- stateless
- independent
@dgomezg

Parallel Streams
use stream()
List<Integer> numbers = new ArrayList<>();

for (int i= 0; i < 10_000_000 ; i++) {

numbers.add(( int)Math.round(Math.random()*100));

}
//This will use just a single thread
Stream<Integer> evenNumbers = numbers.s tream();
or parallelStream()
//Automatically select the optimum number of threads
Stream<Integer> evenNumbers = numbers.parallelStream();
@dgomezg

Let’s test it
use stream()
for (int i = 0; i < 100; i++) {

long start = System.currentTimeMillis();

List<Integer> even = numbers.stream() 

.filter(n -> n % 2 == 0)

.sorted() 

.collect( toList());

System.out.printf(
"%d elements computed in %5d msecs with %d threads\ n”,

even.size(), System. currentTimeMillis() - start,
Thread. activeCount());

}
5001983 elements computed in 828 msecs with 2 threads
5001983 elements computed in 843 msecs with 2 threads
5001983 elements computed in 675 msecs with 2 threads
5001983 elements computed in 795 msecs with 2 threads
@dgomezg

Going parallel
use stream()
for (int i = 0; i < 100; i++) {

long start = System.currentTimeMillis();

List<Integer> even = numbers.parallelStream() 

.filter(n -> n % 2 == 0)

.sorted() 

.collect( toList());

System.out.printf(
"%d elements computed in %5d msecs with %d threads\ n”,

even.size(), System. currentTimeMillis() - start,
Thread. activeCount());

}
4999299 elements computed in 225 msecs with 9 threads
4999299 elements computed in 230 msecs with 9 threads
4999299 elements computed in 250 msecs with 9 threads
@dgomezg

Previously on…
http://www.slideshare.net/dgomezg/streams-en-java-8

Parallelism
Under the hood

Fork/Join Framework
Proposed by Doug Lea
"a style of parallel programming in
which problems are solved by
(recursively) splitting them into
subtasks that are solved in parallel."
Available in Java 7
Used by ParallelStreams

The F/J algorithm
Result solve(Problem problem)
{
if (problem is small)
directly solve problem
else
{
split problem into independent parts
fork new subtasks to solve each part
join all subtasks
compose result from subresults
}
}
as proposed by Doug Lea

ForkJoinPool
ExecutorService implementation that
• has a defined number of Workers (threads)
• executes ForkJoinTasks
• submitted by execute(ForkJoinTask,
task),
• or by invoke(ForkJoinTask,task)

ForkJoinTask
Abstract class that represents a task to be run
concurrently
Every ForkJoinTask could be splitted (if not small
enough) and solved Recursively
Two concrete implementations
• RecursiveAction,if not returning value
• RecursiveTask,if returning a value

ForkJoinWorkerThread
Any of the threads created by the ForkJoinPool
Executes ForkJoinTasks
Everyone has a Dequeue for tasks (allows task
stealing)

ForkJoinWorkerThread
Result solve(Problem problem)
{
if (problem is small)
directly solve problem
else
{
split problem into independent parts
fork new subtasks to solve each part
join all subtasks
compose result from subresults
}
}
the F/J algorithm
plus Task Stealing.

Fork/Join. When to use?
For computations that could be splitted into smaller
tasks
aka ‘divide and conquer’ algorithms
Independent
Reduction with no contention.

ParallelStreams
in action!

ParallellStreams
for (int i = 0; i < 100; i++) {

long start = System.currentTimeMillis();

List<Integer> even = numbers. parallelStream()

.filter(n -> n % 2 == 0)

.sorted() 

.collect( toList());

System.out.printf(
"%d elements computed in %5d msecs with %d threads\ n”,

even.size(), System. currentTimeMillis() - start,
Thread. activeCount());

}
4999299 elements computed in 225 msecs with 9 threads
4999299 elements computed in 230 msecs with 9 threads
4999299 elements computed in 250 msecs with 9 threads

Thread.activeCount not accurate
for (int i = 0; i < 100; i++) {

long start = System.currentTimeMillis();

List<Integer> even = numbers. parallelStream()

.filter(n -> n % 2 == 0)

.sorted() 

.collect( toList());

System.out.printf(
"%d elements computed in %5d msecs with %d threads\ n”,

even.size(), System. currentTimeMillis() - start,
Thread. activeCount());

}
Thread.activeCount() does not show the effective
number of threads processing the stream

Better count threads involved
Set<String> workerThreadNames = new ConcurrentSet<>();

for (int i = 0; i < 100; i++) {

long start = System.currentTimeMillis();

List<Integer> even = numbers.stream() 

.filter(n -> n % 2 == 0)

.peek(n -> workerThreadNames.add(
Thread. currentThread().getName()))

.sorted() 

.collect( toList());


System.out.printf(
"%d elements computed in %5d msecs with %d threads\ n”,

even.size(), System. currentTimeMillis() - start,
workerThreadNames.size());

}

Threads usage
ParallelStreams use the common ForkJoinPool
Number of worker threads configured with
PDjava.util.concurrent.ForkJoinPool.common.parallelism= n
Useful to keep CPU parallelism under control…
…but …

Limiting parallelism
for (int i = 0; i < 100; i++) {

long start = System.currentTimeMillis();

List<Integer> even = numbers.stream() 

.filter(n -> n % 2 == 0)

.peek(n -> workerThreadNames.add(
Thread. currentThread().getName()))

.sorted() 

.collect( toList());


System.out.printf(
"%d elements computed in %5d msecs with %d threads\ n”,

even.size(), System. currentTimeMillis() - start,
workerThreadNames.size());

}
PDjava.util.concurrent.ForkJoinPool.common.parallelism= 4
5001069 elements computed in 269 msecs with 5 threads
WTF

Limiting parallelism
for (int i = 0; i < 100; i++) {

long start = System.currentTimeMillis();

List<Integer> even = numbers.stream() 

.filter(n -> n % 2 == 0)

.peek(n -> workerThreadNames.add(
Thread. currentThread().getName()))

.sorted() 

.collect( toList());


System.out.printf(
"%d elements computed in %5d msecs with %d threads\ n”,

even.size(), System. currentTimeMillis() - start,
workerThreadNames.size());

}
System.out.println("credits to threads: “
+ workerThreadNames);
5001069 elements computed in 269 msecs with 5 threads
credits to threads:
ForkJoinPool.commonPool-worker-0,
ForkJoinPool.commonPool-worker-1,
ForkJoinPool.commonPool-worker-2,
ForkJoinPool.commonPool-worker-3, main
WTF

Threads Involved in ParallelStream
ParallelStreams use the common ForkJoinPool
Thread invoking ParallelStream also used as
Worker
Caveats:
•ParallelStream processing is synchronous for
invoking thread
•Other Threads using common ForkJoinPool
could be affected

ParallelStream Hack
ParallelStream can be forced to use a custom
ForkJoinPool
ForkJoinPool forkJoinPool = new ForkJoinPool(4);


long start = System.currentTimeMillis();

numbers.parallelStream() 

.filter(n -> n % 2 == 0)

.sorted() 

.collect( toList());


ParallelStream Hack
ParallelStream can be forced to use a custom
ForkJoinPool
ForkJoinPool forkJoinPool = new ForkJoinPool(4);


long start = System.currentTimeMillis();

ForkJoinTask<List<Integer>> task = 

forkJoinPool.submit(() -> { 

return numbers.parallelStream() 

.filter(n -> n % 2 == 0)

.sorted() 

.collect( toList());

}

);
List<Integer> even = task.get();

ParallelStream Hack
ParallelStream can be forced to use a custom
ForkJoinPool
ForkJoinPool forkJoinPool = new ForkJoinPool(4);


ForkJoinTask<List<Integer>> task = 

forkJoinPool.submit(() -> { 

return numbers.parallelStream() 

.filter(n -> n % 2 == 0)

.sorted() 

.collect( toList());

}

);
List<Integer> even = task.get();
Task submitted in 1 msecs
5000805 elements computed in 328 msecs with 4 threads

ParallelStream Hack benefits
A custom ExecutorService
• Does not affect other ParallelStreams
• Does not affect Common ForkJoinPool users
• Reduces unpredictable latency due to other
CommonForkJoin Pool load
• Invoking thread not used as worker (async
parallel process)

Problems derived from
Common ForkJoinPool

Blocking for IO
If firsts URLs stuck on a ConnectionTimeOut, overall
performance could be affected
Stream<String> urls =
Files.lines(Paths.get("urlsToCheck.txt"));


List<String> errors = urls.parallel().filter(url -> { 

//Connect to URL and wait for 200 response or timeout 

return true;

}).collect(toList());


Nested parallelStreams
Outer parallelStream could exhaust ForkJoin
Workers:
long start = System.currentTimeMillis();

IntStream.range(0, 10_000).parallel()
.forEach(i -> {

results[i][0] = (int) Math.round(Math.random() * 100);


IntStream.range(1, 9_999)
.parallel().forEach((int j) ->
results[i][j] =
(int) Math.round(Math.random() * 1000));


});

Process finalized in 22974 msecs
Process finalized in 22575 msecs
Process finalized in 22606 msecs

Nested parallelStreams
Outer parallelStream could exhaust ForkJoin
Workers:
long start = System.currentTimeMillis();

IntStream.range(0, 10_000).parallel()
.forEach(i -> {

results[i][0] = (int) Math.round(Math.random() * 100);


IntStream.range(1, 9_999)
.sequential().forEach((int j) ->
results[i][j] =
(int) Math.round(Math.random() * 1000));


});

Process finalized in 12491 msecs
Process finalized in 12589 msecs
Process finalized in 12798 msecs

Other performance
problems

Too much Auto(un)boxing
outboxing and boxing of Integers in every filter call
List<Integer> even = numbers.parallelStream() 

.filter(n -> n % 2 == 0)

.sorted()

.collect(toList());

4999464 elements computed in 290 msecs with 8 threads
4999464 elements computed in 276 msecs with 8 threads
4999464 elements computed in 257 msecs with 8 threads
4999464 elements computed in 265 msecs with 8 threads

Less Auto(un)boxing
outboxing and boxing of Integers in every filter call
List<Integer> even = numbers.parallelStream() 

.mapToInt(n -> n) 

.filter(n -> n % 2 == 0)

.sorted()

.boxed()

.collect(toList());
4999460 elements computed in 160 msecs with 8 threads
4999460 elements computed in 243 msecs with 8 threads
4999460 elements computed in 144 msecs with 8 threads
4999460 elements computed in 140 msecs with 8 threads

Conclusions

Conclusions
ParallelStreams eases concurrent processing but:
• Understand how it works
• Don’t abuse the default common ForkJoinPool
• Don’t use when blocking by IO
• Or use a custom ForkJoinPool
• Avoid unnecessary autoboxing
• Don’t add contention or synchronisation
• Be careful with nested parallel streams
• Use method references when sorting

Thank You.
@dgomezg
[email protected]