Java8で並列プログラミングをしてみました

Javaで並列処理を実装した経験があまりなかったので、
理解を深めるために調べてみました。
並列処理といっても色々な実装方法があったので、
それぞれの機能を使った簡単なプログラムを書いて動きを確認してみました。






スレッドとは



スレッドとはプログラムの中のひとまとまりの処理のことで、
このスレッドが1列ずつ順番に処理されていくことをシングルスレッドといいます。

これに対し、複数のスレッドが並列に同時進行で処理されていくことをマルチスレッドといいます。






今回調べた並列API










Thread



Threadクラスを使って並列処理をさせることもできるのですが、
Java8から便利な機能が出てきたこともあり、今は推奨されていないようです。



実装方法としては、Threadクラスを継承したサブクラスでrun()をオーバーライドし、そのメソッド内でマルチスレッドの処理を書きます。
その後、Threadクラスのオブジェクトからstart()を呼び出すことで、サブクラスのrun()がマルチスレッドで実行されます。
ちなみに、直接run()を呼び出すとシングルスレッドで処理が実行されてしまうので注意が必要です。




/**
* 並列処理
*/
public class Parallel {

public static void main(String[] args) {
ParallelThreadA parallelA = new ParallelThreadA();
ParallelThreadB parallelB = new ParallelThreadB();
parallelA.start(); #1秒毎に5回出力
parallelB.start(); #2秒毎に5回出力
}
}

class ParallelThreadA extends Thread {
public void run() {

for (int i = 0; i < 5; i++) {
try {
# 1秒停止
Thread.sleep(1000);
System.out.println("スレッドA:" + (i + 1) + "番目");

} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

class ParallelThreadB extends Thread {
public void run() {

for (int i = 0; i < 5; i++) {
try {
# 2秒停止
Thread.sleep(2000);
System.out.println("スレッドB:" + (i + 1) + "番目");

} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}





2つのスレッド処理が同時に行われていることを確認できます。



<実行結果>




スレッドA:1番目
スレッドB:1番目
スレッドA:2番目
スレッドA:3番目
スレッドB:2番目
スレッドA:4番目
スレッドA:5番目
スレッドB:3番目
スレッドB:4番目
スレッドB:5番目

Process finished with exit code 0





Threadクラスを継承する代わりに、Runnableインターフェースをimplementsすることでも並列処理を実現することができます。

ちなみにRunnableオブジェクトのrun()は値を返却しませんが、
Callableインターフェースを使うことでスレッド処理の値を返却することができます。
ただし、CallableインターフェースはExecutorServiceでしか使うことができません。







Executor



スレッドプールを使ったシンプルな並列処理を実装するためのフレームワークです。

Threadクラスをそのまま使うのではなく、プログラムの処理を小さなタスクに分割して、
そのタスクをExecutorによって実行することで、より安全なマルチスレッド処理を実行することができます。

ExecutorServiceを使うことで簡単にスレッドプールを使った並列処理を行うことができます。




  • スレッドプールとは




複数のスレッドをあらかじめ作成して待機させておき、タスクがきたら待っていたスレッドにタスクを割り当てて、
処理を開始させる仕組みのことをいいます。このスレッドプールを使うと、生成コストを減らして性能を上げることができます。





ExecutorService は Executor のサブインターフェースで、
タスクの終了を管理するメソッドや戻り値のあるメソッドを提供しています。(Futureなど)

ここで使ってはいませんが、ExecutorServiceのサブインターフェースにScheduledExecutorServiceがあり、
これを使うことでタスクの繰り返し実行や、一定時間待機のタスク実行をすることができます。





Executor の newFixedThreadPool() を使って並列処理の確認をしてみました。

newFixedThreadPool() は固定数のスレッドを再利用するスレッドプールを利用する ExecutorService を返却します。
ExecutorService の execute() を呼ぶことで、タスクがサブミット(実行というよりも送信に近い)状態になります。

newFixedThreadPool() は、CPU集約などの処理に適しています。




import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Executor#newFixedThreadPool()
* を使った並列処理
*/
public class ExecutorFixedParallel {

public static void main(String[] args) throws Exception {
# 指定した数のスレッド使い回す
ExecutorService esFixed = Executors.newFixedThreadPool(2);

try {
esFixed.execute(() -> System.out.println("Fixed:" + Thread.currentThread().getId()));
esFixed.execute(() -> System.out.println("Fixed:" + Thread.currentThread().getId()));
esFixed.execute(() -> System.out.println("Fixed:" + Thread.currentThread().getId()));
esFixed.execute(() -> System.out.println("Fixed:" + Thread.currentThread().getId()));
esFixed.execute(() -> System.out.println("Fixed:" + Thread.currentThread().getId()));

} finally {
esFixed.shutdown();
}
}
}





newFixedThreadPool() で指定した数ぶんの、2つのスレッドが使い回されています。

<実行結果>




Fixed:10
Fixed:11
Fixed:10
Fixed:11
Fixed:10

Process finished with exit code 0





他メソッドについては下記サイトに詳しく記載されています。



Executors (Java Platform SE 8)



zeroturnaround.com





Executor が提供している別メソッドの newWorkStealingPool() を使って動作を確認してみました。

newWorkStealingPool() はCPUのコア数の最大値または指定された並列数を保つスレッドプールを作成し、
各スレッドにタスクを割り当てて実行します。

タスクに空きが出ると他のスレッドからタスクを横取りするなど、
スレッドを動的に作成およびシャットダウンしてタスクキューの競合を減らそうとするので、
負荷の高い環境で効果を発揮します。




import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Executor#newWorkStealingPool()
* を使った並列処理
*/
public class ExecutorStealingParallel {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newWorkStealingPool();

try {
executor.execute(() -> System.out.println("Stealing:" + Thread.currentThread().getName()));
executor.execute(() -> System.out.println("Stealing:" + Thread.currentThread().getName()));
executor.execute(() -> System.out.println("Stealing:" + Thread.currentThread().getName()));
executor.execute(() -> System.out.println("Stealing:" + Thread.currentThread().getName()));
executor.execute(() -> System.out.println("Stealing:" + Thread.currentThread().getName()));
Thread.sleep(500);

} finally {
executor.shutdown();
}
}
}





<実行結果>




Stealing:ForkJoinPool-1-worker-1
Stealing:ForkJoinPool-1-worker-1
Stealing:ForkJoinPool-1-worker-2
Stealing:ForkJoinPool-1-worker-2
Stealing:ForkJoinPool-1-worker-2

Process finished with exit code 0







ParallelStream



Java8からStreamを使った並列処理ができるようになりました。

parallel()を付けるだけでマルチスレッドにすることができますが、
mapやfilterなどの中間操作や、forEachなどの終端操作がスレッドセーフでない場合に
問題が発生するので注意が必要です。

下記実装で使われているboxed()は、Streamの各要素をIntegerに変換する中間操作です。(ここでは IntStream を Stream に変換しています)




import java.util.stream.IntStream;

/**
* Streamを使った並列処理
*/
public class StreamParallel {
public static void main(String[] args) throws Exception {
IntStream.range(1, 10).boxed()
.parallel()
.map(x -> "task" + x)
.forEach(System.out::println);
}
}





マルチスレッドで順不同で出力されます。

<実行結果>




task6
task2
task3
task8
task4
task1
task5
task7
task9

Process finished with exit code 0





また、別の実装方法としてparallelStreamを使うこともできます。




import java.util.Arrays;
import java.util.List;

/**
* parallelStreamを使った並列処理
*/
public class ParallelStream {
public static void main(String[] args) throws Exception {
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
list.parallelStream()
.map(x -> "task" + x)
.forEach(System.out::println);
}
}





<実行結果>




task3
task5
task4
task2
task1

Process finished with exit code 0







CompletableFuture



Java8から利用可能になった、並列プログラミングパターンを実装するためのクラスです。
結果の取得後にその結果に対して処理を実行したり、
複数の完了を待って処理を行ったりすることができます。

runAsync() か supplyAsync() を呼び出すことで、非同期実行させることができます。




import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* CompletableFutureを使った並列処理
*/
public class CompletableFutureParallel {
public static void main(String[] args) throws Exception {

ExecutorService executor = Executors.newWorkStealingPool();

List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final int num = i;
futures.add(CompletableFuture.runAsync(() -> System.out.println("task" + num), executor));
}

CompletableFuture<Void> cf = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[futures.size()]));

cf.whenComplete((ret, ex) -> {
if (ex == null) {
System.out.println("OK!");

} else {
System.out.println("NG!");
}
});
}
}





<実行結果>




task0
task3
task2
task1
task6
task5
task4
task9
task8
task7
OK!

Process finished with exit code 0





runAsync()でラムダ式を引数にとって非同期で実行しています。

CompletableFuture の allOf() は CompletableFuture の実行が完了することでVoid型の CompletableFuture を返却しています。

CompletableFuture の結果はVoid型なので値は取得できませんが、
whenComplete() を使うことで実行結果を確認することができます。







ForkJoinPool



タスクが完了または待ちになったらすぐに別のタスクが実行され、
常に論理CPU分のスレッドがアクティブになるといった、work-stealingアルゴリズムを使用した並列処理です。

スレッドプールに小さいタスクが数多く送信される場合に効率的になります。

RecursiveTaskクラスを継承させて処理を実装するようなのですが、
使い方がよくわからなかったり、並列処理になっている確認ができなかったので、またの機会に試してみたいと思います。






やってみた所感



並列処理の実装はあまりやったことがなかったのですが、
Java8から便利な機能が出ていたので簡単に実装することができました。

Executor、ParallelStream、CompletableFutureを使った実装が便利だと感じたので、
ここで調べた以外の機能をもっと掘り下げて調べてみたいと思います。





また、更新をするような機能では、複数のスレッドが同時に更新してしまわないようロックさせる必要があると思うので、
そのあたりもよく調べて安全な並列処理を実装できるようにしたいと思います。




github.com






Previous
Next Post »

1 コメント:

Write コメント
Unknown
AUTHOR
2021年4月18日 11:57 delete

コードのことなのですが、黒い背景に灰色の文字はとても見づらいので、白色の背景か、白色の文字にしていただけるとありがたいです。

Reply
avatar

人気の投稿