Few Java 8 examples to execute streams in parallel.

1. BaseStream.parallel()

A simple parallel example to print 1 to 10.

ParallelExample1.java

package com.favtuts.java8.stream;

import java.util.stream.IntStream;

public class StreamParallelExamples {
    
    public static void main(String[] args) {
        
        demoSimpleStreamParallel();

    }

    static void demoSimpleStreamParallel() {

        System.out.println("Normal...");
        
        IntStream range = IntStream.rangeClosed(1, 10);

        range.forEach(System.out::println);

        System.out.println("Parallel...");

        IntStream range2 = IntStream.rangeClosed(1, 10);

        range2.parallel().forEach(System.out::println);
        
    }
}

Output

Normal...
1
2
3
4
5
6
7
8
9
10
Parallel...
7
6
9
8
10
1
3
4
5
2

2. Collection.parallelStream()

Another simple parallel example to print a to z. For collection, we can use parallelStream().

package com.favtuts.java8.stream;

import java.util.ArrayList;
import java.util.List;

public class StreamParallelExamples {
    
    public static void main(String[] args) {
        
        demoCollectionParallelStream();

    }

    static void demoCollectionParallelStream() {

        System.out.println("Normal...");

        List<String> alpha = getData();

        alpha.stream().forEach(System.out::println);

        System.out.println("Parallel...");

        List<String> alpha2 = getData();
        
        alpha2.parallelStream().forEach(System.out::println);
    }

    private static List<String> getData() {

        List<String> alpha = new ArrayList<>();

        int n = 97;  // 97 = a , 122 = z
        while (n <= 122) {
            char c = (char) n;
            alpha.add(String.valueOf(c));
            n++;
        }

        return alpha;

    }

}

Output

Normal...
a
b
c
d
e
f
g
h
i
j
k
l
m
n
o
p
q
r
s
t
u
v
w
x
y
z
Parallel...
q
s
r
o
x
h
l
p
d
i
g
t
u
n
z
v
j
k
w
f
m
c
a
e
b
y

3. Is Stream running in parallel mode?

3.1 We can test it with isParallel()

ParallelExample3a.java

package com.favtuts.java8.stream;

import java.util.stream.IntStream;

public class StreamParallelExamples {
    
    public static void main(String[] args) {

        checkStreamIsRunningParallelMode();

    }

    static void checkStreamIsRunningParallelMode() {

        System.out.println("Normal...");

        IntStream range = IntStream.rangeClosed(1, 10);
        System.out.println(range.isParallel());         // false
        range.forEach(System.out::println);

        System.out.println("Parallel...");

        IntStream range2 = IntStream.rangeClosed(1, 10);
        IntStream range2Parallel = range2.parallel();
        System.out.println(range2Parallel.isParallel()); // true
        range2Parallel.forEach(System.out::println);

    }

}

3.2 Or print the current thread name like this:

ParallelExample3b.java

package com.favtuts.java8.stream;

import java.util.stream.IntStream;

public class StreamParallelExamples {
    
    public static void main(String[] args) {

        printStreamCurrentThreadName();

    }

    static void printStreamCurrentThreadName() {

        System.out.println("Normal...");

        IntStream range = IntStream.rangeClosed(1, 10);
        range.forEach(x -> {
            System.out.println("Thread : " + Thread.currentThread().getName() + ", value: " + x);
        });

        System.out.println("Parallel...");

        IntStream range2 = IntStream.rangeClosed(1, 10);
        range2.parallel().forEach(x -> {
            System.out.println("Thread : " + Thread.currentThread().getName() + ", value: " + x);
        });

    }

}

Output

Normal...
Thread : main, value: 1
Thread : main, value: 2
Thread : main, value: 3
Thread : main, value: 4
Thread : main, value: 5
Thread : main, value: 6
Thread : main, value: 7
Thread : main, value: 8
Thread : main, value: 9
Thread : main, value: 10
Parallel...
Thread : main, value: 7
Thread : main, value: 6
Thread : ForkJoinPool.commonPool-worker-5, value: 8
Thread : main, value: 9
Thread : ForkJoinPool.commonPool-worker-7, value: 10
Thread : ForkJoinPool.commonPool-worker-3, value: 3
Thread : ForkJoinPool.commonPool-worker-7, value: 2
Thread : ForkJoinPool.commonPool-worker-11, value: 4
Thread : ForkJoinPool.commonPool-worker-7, value: 1
Thread : ForkJoinPool.commonPool-worker-3, value: 5

P.S By default, parallel streams use `ForkJoinPool`

4. Calculation

4.1 Java 8 streams to print all prime numbers up to 1 million:

ParallelExample4.java

package com.favtuts.java8.stream;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import com.favtuts.java8.Employee;

public class StreamParallelExamples {
    
    public static void main(String[] args) {
        demoStreamParallelPrimeCalculation();
    }

    static void demoStreamParallelPrimeCalculation() {

        long count = Stream.iterate(0, n -> n + 1)
                .limit(1_000_000)
                //.parallel()   with this 23s, without this 1m 10s
                .filter(StreamParallelExamples::isPrime)
                .peek(x -> System.out.format("%s\t", x))
                .count();

        System.out.println("\nTotal: " + count);

    }

    public static boolean isPrime(int number) {
        if (number <= 1) return false;
        return !IntStream.rangeClosed(2, number / 2).anyMatch(i -> number % i == 0);
    }

}

Result:

  • For normal streams, it takes 1 minute 10 seconds.
  • For parallel streams, it takes 23 seconds.

P.S Tested with i7-7700, 16G RAM, WIndows 10

4.2 Yet another parallel stream example to find out the average age of a list of employees.

package com.favtuts.java8.stream;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import com.favtuts.java8.Employee;

public class StreamParallelExamples {
    
    public static void main(String[] args) {
        demoStreamParallelAverageAgeEmployees();
    }


    static void demoStreamParallelAverageAgeEmployees() {

        IntStream range = IntStream.rangeClosed(1, 10000);
        List<Employee> employees = range.boxed()
            .map(x -> {
                Employee obj = new Employee("Employee" + x, x, BigDecimal.valueOf(x * 100));
                return obj;
            })
            .collect(Collectors.toList());
            ;
        System.out.println(employees);

        double age = employees
            .parallelStream()
            .mapToInt(Employee::getAge)
            .average()
            .getAsDouble();

        System.out.println("Average age: " + age);

    }

}

5. Case Study

5.1 Parallel streams to increase the performance of a time-consuming save file tasks.

This Java code will generate 10,000 random employees and save into 10,000 files, each employee save into a file.

  • For normal stream, it takes 27-29 seconds.
  • For parallel stream, it takes 7-8 seconds.

P.S Tested with i7-7700, 16G RAM, WIndows 10

ParallelExample5.java

package com.favtuts.java8.stream;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.favtuts.java8.stream.parallel.Employee;

public class StreamParallelExamples {

    private static final String DIR = System.getProperty("user.dir") + "/test/";
    
    public static void main(String[] args) throws IOException {
        
        Files.createDirectories(Paths.get(DIR));
        StreamParallelExamples obj = new StreamParallelExamples();
        List<Employee> employees = obj.generateEmployee(10000);

        // normal, sequential
        //employees.stream().forEach(StreamParallelExamples2::save); 		// 27s-29s

	// parallel
        employees.parallelStream().forEach(StreamParallelExamples2::save); // 7s-8s
    }

    private static void save(Employee input) {

        try (FileOutputStream fos = new FileOutputStream(new File(DIR + input.getName() + ".txt"));
             ObjectOutputStream obs = new ObjectOutputStream(fos)) {
            obs.writeObject(input);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    private List<Employee> generateEmployee(int num) {

        return Stream.iterate(0, n -> n + 1)
                .limit(num)
                .map(x -> {
                    return new Employee(
                            generateRandomName(4),
                            generateRandomAge(15, 100),
                            generateRandomSalary(900.00, 200_000.00)
                    );
                })
                .collect(Collectors.toList());

    }

    private String generateRandomName(int length) {

        return new Random()
                .ints(5, 97, 122) // 97 = a , 122 = z
                .mapToObj(x -> String.valueOf((char) x))
                .collect(Collectors.joining());

    }

    private int generateRandomAge(int min, int max) {
        return new Random()
                .ints(1, min, max)
                .findFirst()
                .getAsInt();
    }

    private BigDecimal generateRandomSalary(double min, double max) {
        return new BigDecimal(new Random()
                .doubles(1, min, max)
                .findFirst()
                .getAsDouble()).setScale(2, RoundingMode.HALF_UP);
    }
}

Employee.java

package com.favtuts.java8.stream.parallel;

import java.io.Serializable;
import java.math.BigDecimal;

public class Employee implements Serializable {

    private static final long serialVersionUID = 1L;

    private String name;
    private int age;
    private BigDecimal salary;

    //getters, setters n etc...

    public Employee(String name, int age, BigDecimal salary) {
        this.name = name;
        this.age = age;
        this.salary = salary;
    }

    @Override
    public String toString() {
        return "{" +
            " name='" + getName() + "'" +
            ", age='" + getAge() + "'" +
            ", salary='" + getSalary() + "'" +
            "}";
    }

    public String getName() {
        return this.name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return this.age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public BigDecimal getSalary() {
        return this.salary;
    }

    public void setSalary(BigDecimal salary) {
        this.salary = salary;
    }

}

Download Source Code

$ git clone https://github.com/favtuts/java-core-tutorials-examples

$ cd java-basic/java8

References

Leave a Reply

Your email address will not be published. Required fields are marked *