Few Java 8 examples to execute streams in parallel.
1. BaseStream.parallel()
A simple parallel example to print 1 to 10.
ParallelExample1.java
package com.favtuts.java8.stream; import java.util.stream.IntStream; public class StreamParallelExamples { public static void main(String[] args) { demoSimpleStreamParallel(); } static void demoSimpleStreamParallel() { System.out.println("Normal..."); IntStream range = IntStream.rangeClosed(1, 10); range.forEach(System.out::println); System.out.println("Parallel..."); IntStream range2 = IntStream.rangeClosed(1, 10); range2.parallel().forEach(System.out::println); } }
Output
Normal...
1
2
3
4
5
6
7
8
9
10
Parallel...
7
6
9
8
10
1
3
4
5
2
2. Collection.parallelStream()
Another simple parallel example to print a
to z
. For collection, we can use parallelStream()
.
package com.favtuts.java8.stream; import java.util.ArrayList; import java.util.List; public class StreamParallelExamples { public static void main(String[] args) { demoCollectionParallelStream(); } static void demoCollectionParallelStream() { System.out.println("Normal..."); List<String> alpha = getData(); alpha.stream().forEach(System.out::println); System.out.println("Parallel..."); List<String> alpha2 = getData(); alpha2.parallelStream().forEach(System.out::println); } private static List<String> getData() { List<String> alpha = new ArrayList<>(); int n = 97; // 97 = a , 122 = z while (n <= 122) { char c = (char) n; alpha.add(String.valueOf(c)); n++; } return alpha; } }
Output
Normal...
a
b
c
d
e
f
g
h
i
j
k
l
m
n
o
p
q
r
s
t
u
v
w
x
y
z
Parallel...
q
s
r
o
x
h
l
p
d
i
g
t
u
n
z
v
j
k
w
f
m
c
a
e
b
y
3. Is Stream running in parallel mode?
3.1 We can test it with isParallel()
ParallelExample3a.java
package com.favtuts.java8.stream; import java.util.stream.IntStream; public class StreamParallelExamples { public static void main(String[] args) { checkStreamIsRunningParallelMode(); } static void checkStreamIsRunningParallelMode() { System.out.println("Normal..."); IntStream range = IntStream.rangeClosed(1, 10); System.out.println(range.isParallel()); // false range.forEach(System.out::println); System.out.println("Parallel..."); IntStream range2 = IntStream.rangeClosed(1, 10); IntStream range2Parallel = range2.parallel(); System.out.println(range2Parallel.isParallel()); // true range2Parallel.forEach(System.out::println); } }
3.2 Or print the current thread name like this:
ParallelExample3b.java
package com.favtuts.java8.stream; import java.util.stream.IntStream; public class StreamParallelExamples { public static void main(String[] args) { printStreamCurrentThreadName(); } static void printStreamCurrentThreadName() { System.out.println("Normal..."); IntStream range = IntStream.rangeClosed(1, 10); range.forEach(x -> { System.out.println("Thread : " + Thread.currentThread().getName() + ", value: " + x); }); System.out.println("Parallel..."); IntStream range2 = IntStream.rangeClosed(1, 10); range2.parallel().forEach(x -> { System.out.println("Thread : " + Thread.currentThread().getName() + ", value: " + x); }); } }
Output
Normal...
Thread : main, value: 1
Thread : main, value: 2
Thread : main, value: 3
Thread : main, value: 4
Thread : main, value: 5
Thread : main, value: 6
Thread : main, value: 7
Thread : main, value: 8
Thread : main, value: 9
Thread : main, value: 10
Parallel...
Thread : main, value: 7
Thread : main, value: 6
Thread : ForkJoinPool.commonPool-worker-5, value: 8
Thread : main, value: 9
Thread : ForkJoinPool.commonPool-worker-7, value: 10
Thread : ForkJoinPool.commonPool-worker-3, value: 3
Thread : ForkJoinPool.commonPool-worker-7, value: 2
Thread : ForkJoinPool.commonPool-worker-11, value: 4
Thread : ForkJoinPool.commonPool-worker-7, value: 1
Thread : ForkJoinPool.commonPool-worker-3, value: 5
P.S By default, parallel streams use `ForkJoinPool`
4. Calculation
4.1 Java 8 streams to print all prime numbers up to 1 million:
ParallelExample4.java
package com.favtuts.java8.stream; import java.math.BigDecimal; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; import com.favtuts.java8.Employee; public class StreamParallelExamples { public static void main(String[] args) { demoStreamParallelPrimeCalculation(); } static void demoStreamParallelPrimeCalculation() { long count = Stream.iterate(0, n -> n + 1) .limit(1_000_000) //.parallel() with this 23s, without this 1m 10s .filter(StreamParallelExamples::isPrime) .peek(x -> System.out.format("%s\t", x)) .count(); System.out.println("\nTotal: " + count); } public static boolean isPrime(int number) { if (number <= 1) return false; return !IntStream.rangeClosed(2, number / 2).anyMatch(i -> number % i == 0); } }
Result:
- For normal streams, it takes 1 minute 10 seconds.
- For parallel streams, it takes 23 seconds.
P.S Tested with i7-7700, 16G RAM, WIndows 10
4.2 Yet another parallel stream example to find out the average age of a list of employees.
package com.favtuts.java8.stream; import java.math.BigDecimal; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; import com.favtuts.java8.Employee; public class StreamParallelExamples { public static void main(String[] args) { demoStreamParallelAverageAgeEmployees(); } static void demoStreamParallelAverageAgeEmployees() { IntStream range = IntStream.rangeClosed(1, 10000); List<Employee> employees = range.boxed() .map(x -> { Employee obj = new Employee("Employee" + x, x, BigDecimal.valueOf(x * 100)); return obj; }) .collect(Collectors.toList()); ; System.out.println(employees); double age = employees .parallelStream() .mapToInt(Employee::getAge) .average() .getAsDouble(); System.out.println("Average age: " + age); } }
5. Case Study
5.1 Parallel streams to increase the performance of a time-consuming save file tasks.
This Java code will generate 10,000 random employees and save into 10,000 files, each employee save into a file.
- For normal stream, it takes 27-29 seconds.
- For parallel stream, it takes 7-8 seconds.
P.S Tested with i7-7700, 16G RAM, WIndows 10
ParallelExample5.java
package com.favtuts.java8.stream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; import java.math.BigDecimal; import java.math.RoundingMode; import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; import java.util.Random; import java.util.stream.Collectors; import java.util.stream.Stream; import com.favtuts.java8.stream.parallel.Employee; public class StreamParallelExamples { private static final String DIR = System.getProperty("user.dir") + "/test/"; public static void main(String[] args) throws IOException { Files.createDirectories(Paths.get(DIR)); StreamParallelExamples obj = new StreamParallelExamples(); List<Employee> employees = obj.generateEmployee(10000); // normal, sequential //employees.stream().forEach(StreamParallelExamples2::save); // 27s-29s // parallel employees.parallelStream().forEach(StreamParallelExamples2::save); // 7s-8s } private static void save(Employee input) { try (FileOutputStream fos = new FileOutputStream(new File(DIR + input.getName() + ".txt")); ObjectOutputStream obs = new ObjectOutputStream(fos)) { obs.writeObject(input); } catch (IOException e) { e.printStackTrace(); } } private List<Employee> generateEmployee(int num) { return Stream.iterate(0, n -> n + 1) .limit(num) .map(x -> { return new Employee( generateRandomName(4), generateRandomAge(15, 100), generateRandomSalary(900.00, 200_000.00) ); }) .collect(Collectors.toList()); } private String generateRandomName(int length) { return new Random() .ints(5, 97, 122) // 97 = a , 122 = z .mapToObj(x -> String.valueOf((char) x)) .collect(Collectors.joining()); } private int generateRandomAge(int min, int max) { return new Random() .ints(1, min, max) .findFirst() .getAsInt(); } private BigDecimal generateRandomSalary(double min, double max) { return new BigDecimal(new Random() .doubles(1, min, max) .findFirst() .getAsDouble()).setScale(2, RoundingMode.HALF_UP); } }
Employee.java
package com.favtuts.java8.stream.parallel; import java.io.Serializable; import java.math.BigDecimal; public class Employee implements Serializable { private static final long serialVersionUID = 1L; private String name; private int age; private BigDecimal salary; //getters, setters n etc... public Employee(String name, int age, BigDecimal salary) { this.name = name; this.age = age; this.salary = salary; } @Override public String toString() { return "{" + " name='" + getName() + "'" + ", age='" + getAge() + "'" + ", salary='" + getSalary() + "'" + "}"; } public String getName() { return this.name; } public void setName(String name) { this.name = name; } public int getAge() { return this.age; } public void setAge(int age) { this.age = age; } public BigDecimal getSalary() { return this.salary; } public void setSalary(BigDecimal salary) { this.salary = salary; } }
Download Source Code
$ git clone https://github.com/favtuts/java-core-tutorials-examples
$ cd java-basic/java8