プログラミング言語の比較 > 並列処理(スレッド)

別のスレッドで処理(リストを昇順にソートする)を行う

Java

  1. // import java.util.concurrent.*;
  2. class Sorter<E> implements Callable<E[]> {
  3. private final E[] original;
  4. public Sorter(E... original) {
  5. this.original = original;
  6. }
  7. @Override
  8. public E[] call() throws Exception {
  9. E[] sorted = Arrays.copyOf(original, original.length);
  10. Arrays.sort(sorted);
  11. return sorted;
  12. }
  13. }
処理が終わるのを待たない場合
  1. ExecutorService executor = Executors.newCachedThreadPool();
  2. executor.submit(new Sorter<Integer>(4, 1, 2, 5, 0));
  3. executor.shutdown();
処理が終わるまで何もせずに待つ場合
  1. ExecutorService executor = Executors.newCachedThreadPool();
  2. Future<Integer[]> future = executor.submit(new Sorter<Integer>(4, 1, 2, 5, 0));
  3. Integer[] result = future.get();
  4. executor.shutdown();
処理が終わるまで他のことをしながら待つ場合
  1. ExecutorService executor = Executors.newCachedThreadPool();
  2. Future<Integer[]> future = executor.submit(new Sorter<Integer>(4, 1, 2, 5, 0));
  3. while (! future.isDone()) {
  4. System.out.println("処理中...");
  5. }
  6. Integer[] result = future.get();
  7. executor.shutdown();

RxJava を使った場合

  1. // import java.util.concurrent.Future;
  2. // import java.util.concurrent.TimeUnit;
  3. // import java.util.stream.IntStream;
  4. // import io.reactivex.rxjava3.core.Single;
  5. IntStream stream = IntStream.of(4, 1, 2, 5, 0);
  6. Single<IntStream> single = Single.just(stream)
  7. .delay(0, TimeUnit.SECONDS)
  8. .map(s-> s.sorted());
処理が終わるのを待たない場合
  1. single.subscribe();
処理が終わるまで何もせずに待つ場合
  1. IntStream result = single.blockingGet();
処理が終わるまで他のことをしながら待つ場合
  1. Future<IntStream> future = single.toFuture();
  2. while (! future.isDone()) {
  3. System.out.println("処理中...");
  4. }
  5. IntStream result = future.get();

Groovy

  1. // import java.util.concurrent.*
  2. class Sorter implements Callable {
  3. private original
  4. Sorter(... original) {
  5. this.original = original
  6. }
  7. def call() {
  8. original.sort()
  9. }
  10. }
  11. def sorter = new Sorter(4, 1, 2, 5, 0)
  12. def executor = Executors.newSingleThreadExecutor()
処理が終わるのを待たない場合
  1. executor.submit(sorter)
  2. executor.shutdown()
処理が終わるまで何もせずに待つ場合
  1. def future = executor.submit(sorter)
  2. def result = future.get()
  3. executor.shutdown()
処理が終わるまで他のことをしながら待つ場合
  1. def future = executor.submit(sorter)
  2. while (! future.done) {
  3. println '処理中...'
  4. }
  5. def result = future.get()
  6. executor.shutdown()

RxJava を使った場合

  1. // import java.util.concurrent.TimeUnit
  2. // import io.reactivex.rxjava3.core.Single
  3. def list = [4, 1, 2, 5, 0]
  4. def single = Single.just(list)
  5. .delay(0, TimeUnit.SECONDS)
  6. .map { it.sort() }
処理が終わるのを待たない場合
  1. single.subscribe()
処理が終わるまで何もせずに待つ場合
  1. def result = single.blockingGet()
処理が終わるまで他のことをしながら待つ場合
  1. def future = single.toFuture()
  2. while (! future.done) {
  3. println '処理中...'
  4. }
  5. def result = future.get()

Kotlin

  1. val list = listOf(4, 1, 2, 5, 0)
処理が終わるのを待たない場合
  1. // import kotlinx.coroutines.*
  2. launch {
  3. list.sorted()
  4. }
処理が終わるまで何もせずに待つ場合
  1. // import kotlinx.coroutines.*
  2. val deffered = async {
  3. list.sorted()
  4. }
  5. val result = deffered.await()
処理が終わるまで他のことをしながら待つ場合
  1. // import kotlinx.coroutines.*
  2. val deffered = async {
  3. list.sorted()
  4. }
  5. while (deffered.isActive) {
  6. println("処理中...")
  7. delay(1)
  8. }
  9. val result = deffered.await()

RxJava を使った場合

  1. // import java.util.concurrent.TimeUnit
  2. // import io.reactivex.rxjava3.core.Single
  3. val list = listOf(4, 1, 2, 5, 0)
  4. val single = Single.just(list)
  5. .delay(0, TimeUnit.SECONDS)
  6. .map { it.sorted() }
処理が終わるのを待たない場合
  1. single.subscribe()
処理が終わるまで何もせずに待つ場合
  1. val result = single.blockingGet()
処理が終わるまで他のことをしながら待つ場合
  1. val future = single.toFuture()
  2. while (! future.isDone()) {
  3. println("処理中...")
  4. }
  5. val result = future.get()

Scala

Future を使う

  1. // import scala.concurrent._
  2. // import scala.concurrent.ExecutionContext.Implicits.global
  3. // import scala.concurrent.duration.Duration
  4. val list = List(4, 1, 2, 5, 0)
処理が終わるのを待たない場合
  1. Future { list.sorted }
処理が終わるまで何もせずに待つ場合
  1. val future = Future { list.sorted } // future の型は scala.concurrent.Future[List[Int]]
  2. val result = Await.result(future, Duration.Inf) // result の型は List[Int]
処理が終わるまで他のことをしながら待つ場合
  1. val future = Future { list.sorted } // future の型は scala.concurrent.Future[List[Int]]
  2. while (!future.isCompleted) {
  3. println("処理中...")
  4. }
  5. val result = Await.result(future, Duration.Zero) // result の型は List[Int]

Akka を使った場合(無名のアクター)

  1. // import scala.concurrent.Await
  2. // import scala.concurrent.duration._
  3. // import akka.actor.ActorSystem
  4. // import akka.actor.ActorDSL._
  5. // import akka.pattern.ask
  6. // import akka.util.Timeout
  7. implicit val actorSystem = ActorSystem()
  8. val duration = 1 minutes
  9. implicit val timeout = Timeout(duration)
  10. val s = actor(new Act {
  11. become {
  12. case list: List[Int] => sender() ! list.sorted
  13. }
  14. })
  15. val list = List(4, 1, 2, 5, 0)
処理が終わるのを待たない場合
  1. s ! list
処理が終わるまで何もせずに待つ場合
  1. val result = Await.result((s ? list).mapTo[List[Int]], duration)
処理が終わるまで他のことをしながら待つ場合
  1. val future = (s ? list).mapTo[List[Int]]
  2. while (!future.isCompleted) {
  3. println("処理中...")
  4. }
  5. val result = Await.result(future, duration)

Akka を使った場合(アクタークラスを定義)

  1. // import scala.concurrent.Await
  2. // import scala.concurrent.duration._
  3. // import scala.math.Ordering
  4. // import akka.actor._
  5. // import akka.pattern.ask
  6. // import akka.util.Timeout
  7. class Sorter[A](implicit ord: Ordering[A]) extends Actor {
  8. override def receive = {
  9. case list: List[A] => sender() ! list.sorted
  10. }
  11. }
  12. val actorSystem = ActorSystem()
  13. val duration = 1 minutes
  14. implicit val timeout = Timeout(duration)
  15. val s = actorSystem.actorOf(Props(classOf[Sorter[_]], Ordering.Int))
  16. val list = List(4, 1, 2, 5, 0)
処理が終わるのを待たない場合
  1. s ! list
処理が終わるまで何もせずに待つ場合
  1. val result = Await.result((s ? list).mapTo[List[Int]], duration)
処理が終わるまで他のことをしながら待つ場合
  1. val future = (s ? list).mapTo[List[Int]]
  2. while (!future.isCompleted) {
  3. println("処理中...")
  4. }
  5. val result = Await.result(future, duration)

RxJava を使った場合

  1. // import java.util.concurrent.TimeUnit
  2. // import io.reactivex.rxjava3.core.Single
  3. val list = List(4, 1, 2, 5, 0)
  4. val single = Single.just(list)
  5. .delay(0, TimeUnit.SECONDS)
  6. .map { _.sorted }
処理が終わるのを待たない場合
  1. single.subscribe
処理が終わるまで何もせずに待つ場合
  1. val result = single.blockingGet
処理が終わるまで他のことをしながら待つ場合
  1. val future = single.toFuture
  2. while (! future.isDone) {
  3. println("処理中...")
  4. }
  5. val result = future.get

Erlang

  1. SorterPid = spawn(fun() ->
  2. receive
  3. {Pid, List} -> Pid ! {self(), lists:sort(List)}
  4. end
  5. end),
処理が終わるのを待たない場合
  1. SorterPid ! {self(), [4, 1, 2, 5, 0]}.
処理が終わるまで何もせずに待つ場合
  1. SorterPid ! {self(), [4, 1, 2, 5, 0]},
  2. receive
  3. {SorterPid, Result} -> void
  4. end.
処理が終わるまで他のことをしながら待つ場合
  1. wait_result(Pid) ->
  2. receive
  3. {Pid, Result} -> Result
  4. after 0 ->
  5. io:format("処理中...~n", []),
  6. wait_result(Pid)
  7. end.
  1. SorterPid ! {self(), [4, 1, 2, 5, 0]},
  2. Result = wait_result(SorterPid).

F#

非同期ワークフローを使った場合

処理が終わるのを待たない場合
  1. let list = [4; 1; 2; 5; 0]
  2. async {
  3. list |> List.sort |> ignore
  4. } |> Async.Start
処理が終わるまで何もせずに待つ場合
  1. let list = [4; 1; 2; 5; 0]
  2. let a = async {
  3. return list |> List.sort
  4. }
  5. let result = (Async.StartAsTask a).Result
  1. let list = [4; 1; 2; 5; 0]
  2. let a = async {
  3. return list |> List.sort
  4. }
  5. let result = Async.Parallel [a] |> Async.RunSynchronously |> Seq.head
  1. // open System
  2. let asyncFunc func arg =
  3. let _delegate = new Func<'T, 'TResult>(func)
  4. async { return! Async.FromBeginEnd(arg, _delegate.BeginInvoke, _delegate.EndInvoke) }
  5. let list = [4; 1; 2; 5; 0]
  6. let result = list |> asyncFunc List.sort |> Async.RunSynchronously
処理が終わるまで他のことをしながら待つ場合
  1. let list = [4; 1; 2; 5; 0]
  2. let a = async {
  3. return list |> List.sort
  4. }
  5. let task = Async.StartAsTask a
  6. while not task.IsCompleted do printfn "処理中..."
  7. let result = task.Result
  1. let array = [|4; 1; 2; 5; 0|]
  2. let result = ref null
  3. let a = async {
  4. result := Array.sort array
  5. }
  6. let b = async {
  7. while !result = null do printfn "処理中..."
  8. }
  9. Async.Parallel [a; b] |> Async.RunSynchronously |> ignore

タスク並列ライブラリ(TPL)を使った場合

処理が終わるのを待たない場合
  1. // open System.Threading.Tasks
  2. let list = [4; 1; 2; 5; 0]
  3. Task.Run(fun () -> List.sort list) |> ignore
処理が終わるまで何もせずに待つ場合
  1. // open System.Threading.Tasks
  2. let list = [4; 1; 2; 5; 0]
  3. let task = Task.Run(fun () -> List.sort list)
  4. let result = task.Result
  1. // open System
  2. // open System.Threading.Tasks
  3. let array = [|4; 1; 2; 5; 0|]
  4. let result = ref null
  5. let a() =
  6. result := Array.sort array
  7. Parallel.Invoke [| new Action(a) |]
処理が終わるまで他のことをしながら待つ場合
  1. // open System.Threading.Tasks
  2. let list = [4; 1; 2; 5; 0]
  3. let task = Task.Run(fun () -> List.sort list)
  4. while not task.IsCompleted do printfn "処理中..."
  5. let result = task.Result
  1. // open System
  2. // open System.Threading.Tasks
  3. let array = [|4; 1; 2; 5; 0|]
  4. let result = ref null
  5. let a() =
  6. result := Array.sort array
  7. let b() =
  8. while !result = null do printfn "処理中..."
  9. Parallel.Invoke [| new Action(a); new Action(b) |]

The Reactive Extensions for .NET を使った場合

  1. // open System.Reactive.Linq
  2. let array = [|4; 1; 2; 5; 0|]
処理が終わるのを待たない場合
  1. Observable.Start(fun () -> Array.sort array) |> ignore
処理が終わるまで何もせずに待つ場合
  1. let result = Observable.Start(fun () -> Array.sort array).Last()
処理が終わるまで他のことをしながら待つ場合
  1. let result = ref null
  2. Observable.Start(fun () -> Array.sort array).Subscribe(fun array -> result := array) |> ignore
  3. while !result = null do printfn "処理中..."

C#

Thread クラスを使った場合

  1. // using System.Threading;
  2. int[] array = { 4, 1, 2, 5, 0 };
  3. int[] result = null;
  4. Thread t = new Thread(() => {
  5. int[] sorted = (int[])array.Clone();
  6. Array.Sort(sorted);
  7. result = sorted;
  8. });
処理が終わるのを待たない場合
  1. t.Start();
処理が終わるまで何もせずに待つ場合
  1. t.Start();
  2. t.Join();
処理が終わるまで他のことをしながら待つ場合
  1. t.Start();
  2. while (t.IsAlive) {
  3. Console.WriteLine("処理中...");
  4. }

ThreadPool クラスを使った場合

  1. // using System.Threading;
  2. int[] array = { 4, 1, 2, 5, 0 };
  3. int[] result = null;
  4. WaitCallback callback = state => {
  5. int[] sorted = (int[])array.Clone();
  6. Array.Sort(sorted);
  7. result = sorted;
  8. };
処理が終わるのを待たない場合
  1. ThreadPool.QueueUserWorkItem(callback);
処理が終わるまで何もせずに待つ場合
  1. ThreadPool.QueueUserWorkItem(callback);
  2. TimeSpan sleepTime = TimeSpan.FromMilliseconds(100);
  3. while (result == null) {
  4. Thread.sleep(sleepTime);
  5. }
処理が終わるまで他のことをしながら待つ場合
  1. ThreadPool.QueueUserWorkItem(callback);
  2. while (result == null) {
  3. Console.WriteLine("処理中...");
  4. }

デリゲートの BeginInvoke メソッドを使った場合

  1. // using System.Threading;
  2. int[] array = { 4, 1, 2, 5, 0 };
  3. Converter<int[], int[]> c = input => {
  4. int[] sorted = (int[])input.Clone();
  5. Array.Sort(sorted);
  6. return sorted;
  7. };
処理が終わるのを待たない場合
  1. c.BeginInvoke(array, null, null);
  1. AsyncCallback callback = ar => {
  2. int[] result = c.EndInvoke(ar);
  3. };
  4. c.BeginInvoke(array, callback, null);
処理が終わるまで何もせずに待つ場合
  1. IAsyncResult ar = c.BeginInvoke(array, null, null);
  2. int[] result = c.EndInvoke(ar);
  1. IAsyncResult ar = c.BeginInvoke(array, null, null);
  2. using (WaitHandle wh = ar.AsyncWaitHandle) {
  3. wh.WaitOne();
  4. int[] result = c.EndInvoke(ar);
  5. }
処理が終わるまで他のことをしながら待つ場合
  1. IAsyncResult ar = c.BeginInvoke(array, null, null);
  2. while (! ar.IsCompleted) {
  3. Console.WriteLine("処理中...");
  4. }
  5. int[] result = c.EndInvoke(ar);

タスク並列ライブラリ(TPL)を使った場合

  1. // using System.Threading.Tasks;
  2. int[] array = { 4, 1, 2, 5, 0 };
  3. Func<int[], int[]> sorter = input => {
  4. int[] sorted = (int[])input.Clone();
  5. Array.Sort(sorted);
  6. return sorted;
  7. };
処理が終わるのを待たない場合
  1. Task.Run(() => sorter(array));
処理が終わるまで何もせずに待つ場合
  1. var task = Task.Run(() => sorter(array));
  2. var result = task.Result;
  1. var task = Task.Run(() => sorter(array));
  2. var result = await task;
処理が終わるまで他のことをしながら待つ場合
  1. var task = Task.Run(() => sorter(array));
  2. while (! task.IsCompleted) {
  3. Console.WriteLine("処理中...");
  4. }
  5. var result = task.Result;

The Reactive Extensions for .NET を使った場合

  1. // using System.Reactive.Linq;
  2. int[] array = { 4, 1, 2, 5, 0 };
  3. Func<int[], int[]> sorter = input => {
  4. int[] sorted = (int[])input.Clone();
  5. Array.Sort(sorted);
  6. return sorted;
  7. };
処理が終わるのを待たない場合
  1. sorter.ToAsync()(array);
  1. Observable.Start(() => sorter(array));
処理が終わるまで何もせずに待つ場合
  1. var result = sorter.ToAsync()(array).Last();
  1. var result = Observable.Start(() => sorter(array)).Last();
処理が終わるまで他のことをしながら待つ場合
  1. int[] result = null;
  2. sorter.ToAsync()(array).Subscribe(list => result = list);
  3. while (result == null) {
  4. Console.WriteLine("処理中...");
  5. }

Go

  1. // import "sort"
  2. func doSort(in <-chan []int, out chan<- []int) {
  3. array := <- in
  4. sorted := make([]int, len(array))
  5. copy(sorted, array)
  6. sort.SortInts(sorted)
  7. out <- sorted
  8. }
  1. in := make(chan []int)
  2. out := make(chan []int)
  3. go doSort(in, out)
処理が終わるのを待たない場合
  1. in <- []int{4, 1, 2, 5, 0}
処理が終わるまで何もせずに待つ場合
  1. in <- []int{4, 1, 2, 5, 0}
  2. result := <- out
処理が終わるまで他のことをしながら待つ場合
  1. in <- []int{4, 1, 2, 5, 0}
  2. var result []int
  3. for {
  4. var ok bool
  5. result, ok = <- out
  6. if ok { break }
  7. fmt.Printf("処理中...")
  8. }

Rust

  1. // use std::thread;
  2. let vec = vec![4, 1, 2, 5, 0];
処理が終わるのを待たない場合
  1. thread::spawn(move || {
  2. let mut v = vec.clone();
  3. v.sort()
  4. });
処理が終わるまで何もせずに待つ場合
  1. let t = thread::spawn(move || {
  2. let mut v = vec.clone();
  3. v.sort();
  4. v
  5. });
  6. while ! t.is_finished() {
  7. println!("処理中...");
  8. }
処理が終わるまで他のことをしながら待つ場合
  1. let t = thread::spawn(move || {
  2. let mut v = vec.clone();
  3. v.sort();
  4. v
  5. });
  6. let result = t.join().unwrap();

Dart

  1. var list = [4, 1, 2, 5, 0];
処理が終わるのを待たない場合
  1. Future(() {
  2. var sorted = List.from(list);
  3. sorted.sort();
  4. return sorted;
  5. });
処理が終わるまで何もせずに待つ場合
  1. var future = Future(() {
  2. var sorted = List.from(list);
  3. sorted.sort();
  4. return sorted;
  5. });
  6. var result = await future;

Ruby

処理が終わるのを待たない場合
  1. array = [4, 1, 2, 5, 0]
  2. Thread.start { array.sort }
  1. array = [4, 1, 2, 5, 0]
  2. Thread.start(array) {|a| a.sort }
処理が終わるまで何もせずに待つ場合
  1. array = [4, 1, 2, 5, 0]
  2. result = Thread.start { array.sort }.value
  1. array = [4, 1, 2, 5, 0]
  2. result = Thread.start(array) {|a| a.sort }.value
処理が終わるまで他のことをしながら待つ場合
  1. array = [4, 1, 2, 5, 0]
  2. t = Thread.start { array.sort }
  3. while t.alive?
  4. puts '処理中...'
  5. end
  6. result = t.value
  1. array = [4, 1, 2, 5, 0]
  2. t = Thread.start(array) {|a| a.sort }
  3. while t.alive?
  4. puts '処理中...'
  5. end
  6. result = t.value

Python

専用のスレッドクラスを作る

  1. # import threading
  2. class Sorter(threading.Thread):
  3. def __init__(self, *original):
  4. threading.Thread.__init__(self)
  5. self.original = original
  6. def run(self):
  7. self.sorted = sorted(self.original)
  8. sorter = Sorter(4, 1, 2, 5, 0)
  9. sorter.name = 'Sorting'
処理が終わるのを待たない場合
  1. sorter.start()
処理が終わるまで何もせずに待つ場合
  1. sorter.start()
  2. sorter.join()
  3. result = sorter.sorted
処理が終わるまで他のことをしながら待つ場合
  1. sorter.start()
  2. while sorter.is_alive():
  3. print('処理中...')
  4. result = sorter.sorted

汎用的なスレッドクラスを作る

  1. # import threading
  2. class Thread(threading.Thread):
  3. def __init__(self, func, *args):
  4. threading.Thread.__init__(self)
  5. self.func = func
  6. self.args = args
  7. def run(self):
  8. self.result = self.func(*self.args)
  9. sorter = Thread(sorted, [4, 1, 2, 5, 0])
  10. sorter.name = 'Sorting'
処理が終わるのを待たない場合
  1. sorter.start()
処理が終わるまで何もせずに待つ場合
  1. sorter.start()
  2. sorter.join()
  3. result = sorter.result
処理が終わるまで他のことをしながら待つ場合
  1. sorter.start()
  2. while sorter.is_alive():
  3. print('処理中...')
  4. result = sorter.result

Perl

処理が終わるのを待たない場合
  1. # use threads;
  2. threads->create(sub { sort @_ }, 4, 1, 2, 5, 0)->detach;
  1. # use threads;
  2. async { sort 4, 1, 2, 5, 0 }->detach;
処理が終わるまで何もせずに待つ場合
  1. # use threads;
  2. my ($t) = threads->create(sub { sort @_ }, 4, 1, 2, 5, 0);
  3. my @result = $t->join;
処理が終わるまで他のことをしながら待つ場合
  1. # use threads;
  2. my ($t) = threads->create(sub { sort @_ }, 4, 1, 2, 5, 0);
  3. print "処理中...\n" while $t->is_running;
  4. my @result = $t->join;

リストの各要素を並列で処理して(処理の順序は問わない)全ての完了を待つ

処理内容は、(1) 開始(Start)のメッセージを出力する (2) スリープする (3) 終了(End)のメッセージを出力する。

Java

  1. // import java.text.MessageFormat;
  2. public static void write(int n, String s) {
  3. System.out.println(MessageFormat.format("{0,date,HH:mm:ss.SSS} {1,number,00}: {2} Thread-{3}",
  4. new Date(), n, s, Thread.currentThread().getId()));
  5. }
  6. public static void exec(int n) {
  7. write(n, "S ");
  8. try {
  9. Thread.sleep(3000);
  10. } catch (InterruptedException e) {
  11. }
  12. write(n, " E");
  13. }
  1. // import java.util.stream.IntStream;
  2. IntStream stream = IntStream.range(0, 20)
  3. stream.parallel().forEach(n -> exec(n));
RxJava を使った場合
  1. // import io.reactivex.rxjava3.core.Flowable;
  2. // import io.reactivex.rxjava3.schedulers.Schedulers;
  3. Flowable.range(0, 20)
  4. .parallel()
  5. .runOn(Schedulers.newThread())
  6. .doOnNext(n -> exec(n))
  7. .sequential()
  8. .blockingSubscribe();

Groovy

  1. // import java.text.MessageFormat
  2. def write(n, s) {
  3. println MessageFormat.format("{0,date,HH:mm:ss.SSS} {1,number,00}: {2} Thread-{3}",
  4. new Date(), n, s, Thread.currentThread().id)
  5. }
  6. def exec(n) {
  7. write n, "S "
  8. Thread.sleep 3000
  9. write n, " E"
  10. }
  1. def list = 1..20
  2. list.stream().parallel().forEach { exec it }
RxJava を使った場合
  1. // import io.reactivex.rxjava3.core.Flowable
  2. // import io.reactivex.rxjava3.schedulers.Schedulers
  3. def list = 1..20
  4. Flowable.fromIterable(list)
  5. .parallel()
  6. .runOn(Schedulers.newThread())
  7. .doOnNext { exec it }
  8. .sequential()
  9. .blockingSubscribe()

Kotlin

  1. // import java.text.MessageFormat
  2. // import java.util.Date
  3. // import kotlinx.coroutines.*
  4. fun write(n: Int, s: String) {
  5. println(MessageFormat.format("{0,date,HH:mm:ss.SSS} {1,number,00}: {2} Thread-{3}", Date(), n, s, Thread.currentThread().getId()))
  6. }
  7. suspend fun exec(n: Int) {
  8. write(n, "S ")
  9. delay(3000)
  10. write(n, " E")
  11. }
  1. val jobs = Array(20) { i ->
  2. launch { exec(i + 1) }
  3. }
  4. joinAll(*jobs)
RxJava を使った場合
  1. // import java.text.MessageFormat
  2. // import java.util.Date
  3. // import io.reactivex.rxjava3.core.Flowable
  4. // import io.reactivex.rxjava3.schedulers.Schedulers
  5. fun write(n: Int, s: String) {
  6. println(MessageFormat.format("{0,date,HH:mm:ss.SSS} {1,number,00}: {2} Thread-{3}", Date(), n, s, Thread.currentThread().getId()))
  7. }
  8. fun exec(n: Int) {
  9. write(n, "S ")
  10. Thread.sleep(3000)
  11. write(n, " E")
  12. }
  1. val range = 1..20
  2. Flowable.fromIterable(range)
  3. .parallel()
  4. .runOn(Schedulers.newThread())
  5. .doOnNext { exec(it) }
  6. .sequential()
  7. .blockingSubscribe()

Scala

  1. // import java.text.MessageFormat
  2. // import java.util.Date
  3. def write(n: Int, s: String) = println(
  4. MessageFormat.format("{0,date,HH:mm:ss.SSS} {1,number,00}: {2} Thread-{3}",
  5. new Date, Integer valueOf n, s, Thread.currentThread.getId.toString))
  6. def exec(n: Int) = {
  7. write(n, "S ")
  8. Thread sleep 3000
  9. write(n, " E")
  10. }
  1. val range = 1 to 20
  2. range.par.foreach(exec)
RxJava を使った場合
  1. // import scala.jdk.CollectionConverters._
  2. // import io.reactivex.rxjava3.core.Flowable
  3. // import io.reactivex.rxjava3.schedulers.Schedulers
  4. val range = 1 to 20
  5. Flowable.fromIterable(range.asJava)
  6. .parallel
  7. .runOn(Schedulers.newThread)
  8. .doOnNext(exec)
  9. .sequential
  10. .blockingSubscribe

Erlang

  1. List = lists:seq(1, 20),
  2. Write = fun(N, S) ->
  3. {Hour, Minute, Second} = time(),
  4. {_, _, MicroSecs} = now(),
  5. io:format("~2..0b:~2..0b:~2..0b.~3..0b ~2..0b: ~s Process~p~n",
  6. [Hour, Minute, Second, MicroSecs div 1000, N, S, self()])
  7. end,
  8. Exec = fun(N, Caller) ->
  9. Write(N, "S "),
  10. timer:sleep(3000),
  11. Write(N, " E"),
  12. Caller ! self() %% 完了したことを通知する
  13. end,
  14. Pids = lists:map(fun(N) ->
  15. Caller = self(),
  16. spawn(fun() -> Exec(N, Caller) end)
  17. end, List),
  18. lists:foreach(fun(Pid) ->
  19. receive
  20. Pid -> ok %% 完了の通知を受け取る
  21. end
  22. end, Pids).

F#

  1. // open System
  2. // open System.Linq
  3. // open System.Threading
  4. // open System.Threading.Tasks
  5. let list = [1..20]
  6. let write n s =
  7. let now = DateTime.Now.ToString "HH:mm:ss.fff"
  8. sprintf "%s %02d: %s Thread-%d" now n s Thread.CurrentThread.ManagedThreadId |> Console.WriteLine
  9. let exec n =
  10. write n "S "
  11. TimeSpan.FromSeconds 3.0 |> Thread.Sleep
  12. write n " E"
非同期ワークフローを使った場合
  1. let tasks = [ for n in list -> async { exec n } |> Async.StartAsTask ]
  2. for task in tasks do task.Wait()
  1. [ for n in list -> async { exec n } ] |> Async.Parallel |> Async.RunSynchronously |> ignore
タスク並列ライブラリ(TPL)を使った場合
  1. Parallel.ForEach(list, exec) |> ignore
  1. list.AsParallel().ForAll(new Action<_>(exec))
  1. list.AsParallel().ForAll(fun n -> exec n)
  1. Parallel.Invoke [| for n in list -> new Action(fun () -> exec n) |]
  1. Task.WaitAll [| for n in list -> Task.Run(fun () -> exec n) |]
F# PowerPack を使った場合
  1. // open Microsoft.FSharp.Collections
  2. PSeq.iter exec list
The Reactive Extensions for .NET を使った場合
  1. // open System.Reactive.Linq
  2. let execAsync n = Observable.Start(fun () -> exec n)
  3. list.Select(execAsync).Merge().Last() |> ignore
  1. // open System.Reactive.Linq
  2. let execAsync n = Observable.Start(fun () -> exec n)
  3. list.ToObservable().SelectMany(execAsync).Last() |> ignore

C#

  1. // using System.Linq;
  2. // using System.Threading;
  3. // using System.Threading.Tasks;
  4. var e = Enumerable.Range(1, 20);
  5. Action<int, string> write = (n, s) => Console.WriteLine(
  6. string.Format("{0:HH:mm:ss.fff} {1:00}: {2} Thread-{3}",
  7. DateTime.Now, n, s, Thread.CurrentThread.ManagedThreadId));
  8. Action<int> exec = n => {
  9. write(n, "S ");
  10. Thread.Sleep(TimeSpan.FromSeconds(3));
  11. write(n, " E");
  12. };
タスク並列ライブラリ(TPL)を使った場合
  1. Parallel.ForEach(e, exec);
  1. e.AsParallel().ForAll(exec);
  1. var actions = from n in e select new Action(() => exec(n));
  2. Parallel.Invoke(actions.ToArray());
  1. var tasks = from n in e select Task.Run(() => exec(n));
  2. Task.WaitAll(tasks.ToArray());
  1. var tasks = from n in e select Task.Run(() => exec(n));
  2. await Task.WhenAll(tasks);
The Reactive Extensions for .NET を使った場合
  1. // using System.Reactive.Linq;
  2. e.Select(exec.ToAsync()).Merge().Last();
  1. // using System.Reactive.Linq;
  2. e.ToObservable().SelectMany(exec.ToAsync()).Last();
  1. // using System.Reactive.Linq;
  2. (from n in e.ToObservable()
  3. from u in exec.ToAsync()(n)
  4. select u
  5. ).Last();

Go

  1. // import "time"
  2. array := [...]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}
  3. write := func(n int, s string) {
  4. now := time.LocalTime().Format("15:04:05")
  5. msec := (time.Nanoseconds() % 1e9) / 1e6
  6. fmt.Printf("%s.%03d %02d: %s\n", now, msec, n, s)
  7. }
  8. replyCh := make(chan interface{}) // 完了の通知を受け取るチャネルは、1つだけ用意する
  9. exec := func(n int) {
  10. write(n, "S ")
  11. time.Sleep(3e9) // 3×10^9ナノ秒=3秒
  12. write(n, " E")
  13. replyCh <- nil // 完了したことを通知する
  14. }
  15. for _, n := range(array) {
  16. go exec(n)
  17. }
  18. for _, _ = range(array) { // 処理を呼び出した回数分だけ完了の通知を待つ
  19. _ = <- replyCh
  20. }

Rust

  1. // use std::thread;
  2. // use std::time;
  3. // use chrono::Local;
  4. fn write(n: i32, s: &str) {
  5. println!("{} {:>02}: {} {:?}", Local::now().format("%H:%M:%S%.3f"), n, s, thread::current().id());
  6. }
  7. fn exec(n: i32) {
  8. write(n, "S ");
  9. thread::sleep(time::Duration::from_secs(3));
  10. write(n, " E");
  11. }
  1. let handles: Vec<_> = (0..20).map(move |i| thread::spawn(move || exec(i))).collect();
  2. for handle in handles {
  3. handle.join().unwrap();
  4. }

Ruby

  1. range = 1..20
  2. def write n, s
  3. Thread.exclusive do
  4. puts sprintf('%s %02d: %s %s', Time.now.strftime('%H:%M:%S.%L'), n, s, Thread.current)
  5. end
  6. end
  7. def exec n
  8. write n, 'S '
  9. sleep 3
  10. write n, ' E'
  11. end
  12. threads = range.collect do |n|
  13. Thread.start { exec n }
  14. end
  15. threads.each &:join

Python

  1. # from datetime import datetime
  2. # import threading
  3. # import time
  4. print_lock = threading.RLock()
  5. class Thread(threading.Thread):
  6. def __init__(self, n):
  7. threading.Thread.__init__(self)
  8. self.n = n
  9. def run(self):
  10. self.write('S ')
  11. time.sleep(3)
  12. self.write(' E')
  13. def write(self, s):
  14. now = datetime.now().strftime('%H:%M:%S')
  15. threadId = threading.current_thread().ident
  16. with print_lock:
  17. print('%s %02d: %s %s' % (now, self.n, s, threadId))
  1. l = range(0, 20)
  2. for n in l:
  3. Thread(n).start()

Perl

  1. # use threads;
  2. # use Time::Piece;
  3. my @array = (1..20);
  4. my $write = sub {
  5. my $now = localtime->strftime('%H:%M:%S');
  6. printf "%s %02d: %s Thread-%s\n", $now, $_[0], $_[1], threads->tid;
  7. };
  8. my $exec = sub {
  9. my $n = shift;
  10. $write->($n, 'S ');
  11. sleep 3;
  12. $write->($n, ' E');
  13. };
  14. my @threads = map { threads->create($exec, $_) } @array;
  15. $_->join for @threads;

リストの各要素を並列で変換処理して、その結果のリストを出力する

変換処理の内容は、(1) 開始(Start)のメッセージを出力する (2) スリープする (3) 終了(End)のメッセージを出力する (4) 元の値を-1倍して返す。

Java

  1. // import java.text.MessageFormat;
  2. public static void write(int n, String s) {
  3. System.out.println(MessageFormat.format("{0,date,HH:mm:ss.SSS} {1,number,00}: {2} Thread-{3}",
  4. new Date(), n, s, Thread.currentThread().getId()));
  5. }
  6. public static int exec(int n) {
  7. write(n, "S ");
  8. try {
  9. Thread.sleep(3000);
  10. } catch (InterruptedException e) {
  11. }
  12. write(n, " E");
  13. return -n;
  14. }
  1. // import java.util.stream.Collectors;
  2. // import java.util.stream.IntStream;
  3. IntStream stream = IntStream.iterate(1, n -> n + 1).limit(20); // 1から20までの数列
  4. IntStream result = stream.parallel().map(n -> exec(n));
  5. System.out.println(result.boxed().collect(Collectors.toList()));
RxJava を使った場合
  1. // import io.reactivex.rxjava3.core.Flowable
  2. // import io.reactivex.rxjava3.schedulers.Schedulers
  3. Map<Integer, Integer> map = Collections.synchronizedMap(new TreeMap<>());
  4. Flowable.range(0, 20)
  5. .parallel()
  6. .runOn(Schedulers.newThread())
  7. .doOnNext(n -> map.put(n, exec(n)))
  8. .sequential()
  9. .blockingSubscribe();
  10. Collection<Integer> result = map.values();
  11. System.out.println(result);

Groovy

  1. // import java.text.MessageFormat
  2. def write(n, s) {
  3. println MessageFormat.format("{0,date,HH:mm:ss.SSS} {1,number,00}: {2} Thread-{3}",
  4. new Date(), n, s, Thread.currentThread().id)
  5. }
  6. def exec(n) {
  7. write n, "S "
  8. Thread.sleep 3000
  9. write n, " E"
  10. -n
  11. }
  1. def list = 1..20
  2. def result = list.stream().parallel().map { exec it }.toArray()
  3. println result
RxJava を使った場合
  1. // import io.reactivex.rxjava3.core.Flowable
  2. // import io.reactivex.rxjava3.schedulers.Schedulers
  3. def map = Collections.synchronizedMap(new TreeMap())
  4. def list = 1..20
  5. Flowable.fromIterable(list)
  6. .parallel()
  7. .runOn(Schedulers.newThread())
  8. .doOnNext { map[it] = exec it }
  9. .sequential()
  10. .blockingSubscribe()
  11. println map.values()

Kotlin

  1. // import java.text.MessageFormat
  2. // import java.util.Date
  3. // import kotlinx.coroutines.*
  4. fun write(n: Int, s: String) {
  5. println(MessageFormat.format("{0,date,HH:mm:ss.SSS} {1,number,00}: {2} Thread-{3}", Date(), n, s, Thread.currentThread().getId()))
  6. }
  7. suspend fun exec(n: Int): Int {
  8. write(n, "S ")
  9. delay(3000)
  10. write(n, " E")
  11. return -n
  12. }
  1. val deffereds = Array(20) { i ->
  2. async { exec(i + 1) }
  3. }
  4. val result = awaitAll(*deffereds)
  5. println(result)
RxJava を使った場合
  1. // import java.text.MessageFormat
  2. // import java.util.*
  3. // import io.reactivex.rxjava3.core.Flowable
  4. // import io.reactivex.rxjava3.schedulers.Schedulers
  5. fun write(n: Int, s: String) {
  6. println(MessageFormat.format("{0,date,HH:mm:ss.SSS} {1,number,00}: {2} Thread-{3}", Date(), n, s, Thread.currentThread().getId()))
  7. }
  8. fun exec(n: Int): Int {
  9. write(n, "S ")
  10. Thread.sleep(3000)
  11. write(n, " E")
  12. return -n
  13. }
  1. val map = Collections.synchronizedMap(mutableMapOf<Int, Int>())
  2. val range = 1..20
  3. Flowable.fromIterable(range)
  4. .parallel()
  5. .runOn(Schedulers.newThread())
  6. .doOnNext { map += it to exec(it) }
  7. .sequential()
  8. .blockingSubscribe()
  9. println(map.values)

Scala

  1. // import java.text.MessageFormat
  2. // import java.util.Date
  3. def write(n: Int, s: String) = println(
  4. MessageFormat.format("{0,date,HH:mm:ss.SSS} {1,number,00}: {2} Thread-{3}",
  5. new Date, Integer valueOf n, s, Thread.currentThread.getId.toString))
  6. def exec(n: Int) = {
  7. write(n, "S ")
  8. Thread sleep 3000
  9. write(n, " E")
  10. -n
  11. }
  1. val range = 1 to 20
  2. val result = range.par.map(exec)
  3. println(result)
RxJava を使った場合
  1. // import java.util.*
  2. // import scala.jdk.CollectionConverters._
  3. // import io.reactivex.rxjava3.core.Flowable
  4. // import io.reactivex.rxjava3.schedulers.Schedulers
  5. val map = Collections.synchronizedMap(new TreeMap[Int, Int])
  6. val range = 1 to 20
  7. Flowable.fromIterable(range.asJava)
  8. .parallel
  9. .runOn(Schedulers.newThread)
  10. .doOnNext { n => map.put(n, exec(n)) }
  11. .sequential
  12. .blockingSubscribe
  13. println(map.values)

Erlang

  1. List = lists:seq(1, 20),
  2. Write = fun(N, S) ->
  3. {Hour, Minute, Second} = time(),
  4. {_, _, MicroSecs} = now(),
  5. io:format("~2..0b:~2..0b:~2..0b.~3..0b ~2..0b: ~s Process~p~n",
  6. [Hour, Minute, Second, MicroSecs div 1000, N, S, self()])
  7. end,
  8. Exec = fun(N, Caller) ->
  9. Write(N, "S "),
  10. timer:sleep(3000),
  11. Write(N, " E"),
  12. Caller ! {self(), -N}
  13. end,
  14. Pids = lists:map(fun(N) ->
  15. Caller = self(),
  16. spawn(fun() -> Exec(N, Caller) end)
  17. end, List),
  18. Result = lists:map(fun(Pid) ->
  19. receive
  20. {Pid, N} -> N
  21. end
  22. end, Pids),
  23. io:format("~p~n", [Result]),

F#

  1. // open System
  2. // open System.Linq
  3. // open System.Threading
  4. // open System.Threading.Tasks
  5. let list = [1..20]
  6. let write n s =
  7. let now = DateTime.Now.ToString "HH:mm:ss.fff"
  8. sprintf "%s %02d: %s Thread-%d" now n s Thread.CurrentThread.ManagedThreadId |> Console.WriteLine
  9. let exec n =
  10. write n "S "
  11. TimeSpan.FromSeconds 3.0 |> Thread.Sleep
  12. write n " E"
  13. -n
非同期ワークフローを使った場合
  1. let tasks = [ for n in list -> async { return exec n } |> Async.StartAsTask ]
  2. let result = [ for task in tasks -> task.Result ]
  3. printfn "%A" result
  1. let result = [ for n in list -> async { return exec n } ] |> Async.Parallel |> Async.RunSynchronously
  2. result |> Array.toList |> printfn "%A"
タスク並列ライブラリ(TPL)を使った場合
  1. let result = list.AsParallel().Select exec |> Seq.toList
  2. printfn "%A" result
F# PowerPack を使った場合
  1. // open Microsoft.FSharp.Collections
  2. let result = list |> PSeq.mapi (fun i n -> i, exec n) |> PSeq.sort |> PSeq.map snd |> PSeq.toList
  3. printfn "%A" result
The Reactive Extensions for .NET を使った場合
  1. // open System.Reactive.Linq
  2. let execAsync n = Observable.Start(fun () -> exec n)
  3. let result = list.Select(execAsync).Zip().Last()
  4. printfn "%A" result
  1. // open System.Reactive.Linq
  2. let execAsync n = Observable.Start(fun () -> exec n)
  3. let result = list.Select(execAsync).CombineLatest().Last()
  4. printfn "%A" result

C#

  1. // using System.Linq;
  2. // using System.Threading;
  3. var e = Enumerable.Range(1, 20);
  4. Action<int, string> write = (n, s) => Console.WriteLine(
  5. string.Format("{0:HH:mm:ss.fff} {1:00}: {2} Thread-{3}",
  6. DateTime.Now, n, s, Thread.CurrentThread.ManagedThreadId));
  7. Func<int, int> exec = n => {
  8. write(n, "S ");
  9. Thread.Sleep(TimeSpan.FromSeconds(3));
  10. write(n, " E");
  11. return -n;
  12. };
タスク並列ライブラリ(TPL)を使った場合
  1. var result = from n in e.AsParallel().AsOrdered() // AsOrdered() を省くと変換結果の順序がばらばらになる
  2. select exec(n);
  3. Console.WriteLine(string.Join(" ", result));
  1. var result = e.AsParallel().AsOrdered().Select(exec); // AsOrdered() を省くと変換結果の順序がばらばらになる
  2. Console.WriteLine(string.Join(" ", result));
The Reactive Extensions for .NET を使った場合
  1. // using System.Reactive.Linq;
  2. var result = e.Select(exec.ToAsync()).Zip().Last();
  3. Console.WriteLine(string.Join(" ", result));
  1. // using System.Reactive.Linq;
  2. var result = e.Select(exec.ToAsync()).CombineLatest().Last();
  3. Console.WriteLine(string.Join(" ", result));
  1. // using System.Reactive.Linq;
  2. var o = from n in e.ToObservable()
  3. from m in exec.ToAsync()(n)
  4. select new {n = n, m = m};
  5. var result = from r in o.ToEnumerable()
  6. orderby r.n
  7. select r.m;
  8. Console.WriteLine(string.Join(" ", result));

Go

  1. // import "time"
  2. array := [...]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}
  3. write := func(n int, s string) {
  4. now := time.LocalTime().Format("15:04:05")
  5. msec := (time.Nanoseconds() % 1e9) / 1e6
  6. fmt.Printf("%s.%03d %02d: %s\n", now, msec, n, s)
  7. }
  8. replyChs := make([]chan int, len(array)) // 変換結果を受け取るためにチャネルの配列を作る
  9. exec := func(n int, replyCh chan<- int) {
  10. write(n, "S ")
  11. time.Sleep(3e9) // 3×10^9ナノ秒=3秒
  12. write(n, " E")
  13. replyCh <- -n // -1倍した値を返す
  14. }
  15. for i, n := range(array) {
  16. replyChs[i] = make(chan int) // 各要素に対応するチャネルを作る
  17. go exec(n, replyChs[i])
  18. }
  19. result := make([]int, len(array)) // 変換結果の配列を作る
  20. for i, replyCh := range(replyChs) {
  21. result[i] = <- replyCh
  22. }
  23. fmt.Printf("%v\n", result) // 変換結果を出力する

Rust

  1. // use std::thread;
  2. // use std::time;
  3. // use chrono::Local;
  4. fn write(n: i32, s: &str) {
  5. println!("{} {:>02}: {} {:?}", Local::now().format("%H:%M:%S%.3f"), n, s, thread::current().id());
  6. }
  7. fn exec(n: i32) -> i32 {
  8. write(n, "S ");
  9. thread::sleep(time::Duration::from_secs(3));
  10. write(n, " E");
  11. -n
  12. }
  1. let handles: Vec<_> = (0..20).map(move |i| thread::spawn(move || exec(i))).collect();
  2. let result: Vec<_> = handles.into_iter().map(|handle| handle.join().unwrap()).collect();
  3. println!("{:?}", result);

Ruby

  1. range = 1..20
  2. def write n, s
  3. Thread.exclusive do
  4. puts sprintf('%s %02d: %s %s', Time.now.strftime('%H:%M:%S.%L'), n, s, Thread.current)
  5. end
  6. end
  7. def exec n
  8. write n, 'S '
  9. sleep 3
  10. write n, ' E'
  11. -n
  12. end
  13. threads = range.collect do |n|
  14. Thread.start { exec n }
  15. end
  16. result = threads.collect &:value
  17. print result

Python

  1. # from datetime import datetime
  2. # import threading
  3. # import time
  4. print_lock = threading.RLock()
  5. class Thread(threading.Thread):
  6. def __init__(self, n):
  7. threading.Thread.__init__(self)
  8. self.n = n
  9. def run(self):
  10. self.write('S ')
  11. time.sleep(3)
  12. self.write(' E')
  13. self.result = - self.n
  14. def write(self, s):
  15. now = datetime.now().strftime('%H:%M:%S')
  16. threadId = threading.current_thread().ident
  17. with print_lock:
  18. print('%s %02d: %s %d' % (now, self.n, s, threadId))
  1. l = range(0, 20)
  2. threads = [Thread(n) for n in l]
  3. for t in threads:
  4. t.start()
  5. for t in threads:
  6. t.join()
  7. result = [t.result for t in threads]
  8. print(result)

Perl

  1. # use threads;
  2. # use Time::Piece;
  3. my @array = (1..20);
  4. my $write = sub {
  5. my $now = localtime->strftime('%H:%M:%S');
  6. printf "%s %02d: %s Thread-%s\n", $now, $_[0], $_[1], threads->tid;
  7. };
  8. my $exec = sub {
  9. my $n = shift;
  10. $write->($n, 'S ');
  11. sleep 3;
  12. $write->($n, ' E');
  13. -$n;
  14. };
  15. my @threads = map { threads->create($exec, $_) } @array;
  16. my @result = map { $_->join } @threads;
  17. print join(' ', @result), "\n";

メッセージ(処理のリクエスト)を受け取る度に新しいスレッドを起動して処理を行うオブジェクト

Groovy

  1. // import java.util.concurrent.*
  2. class Server {
  3. private executor = Executors.newCachedThreadPool()
  4. // リクエストを処理するメソッド
  5. private def processRequest(array) {
  6. array.sort()
  7. }
  8. // サーバにリクエストを送るメソッド
  9. def sendRequest(array) {
  10. executor.submit new Callable() {
  11. def call() {
  12. processRequest(array)
  13. }
  14. }
  15. }
  16. // サーバを停止するメソッド
  17. def quit() {
  18. executor.shutdown()
  19. }
  20. }
処理が終わるのを待たない場合
  1. def server = new Server()
  2. server.sendRequest(1..20 as int[])
処理が終わるまで何もせずに待つ場合
  1. def server = new Server()
  2. def future = server.sendRequest(1..20 as int[])
  3. def result = future.get()
処理が終わるまで他のことをしながら待つ場合
  1. def server = new Server()
  2. def future = server.sendRequest(1..20 as int[])
  3. while (! future.done) {
  4. println '処理中...'
  5. }
  6. def result = future.get()

Scala

Akka を使った場合
  1. // import scala.concurrent._
  2. // import scala.concurrent.duration._
  3. // import akka.actor._
  4. // import akka.pattern.ask
  5. // import akka.util.Timeout
  6. class Server extends Actor {
  7. override def receive = {
  8. case list: List[Int] => context.actorOf(Props(classOf[Worker], this)).forward(list)
  9. }
  10. class Worker extends Actor {
  11. override def receive = {
  12. case list: List[Int] => sender() ! list.sorted
  13. }
  14. }
  15. }
  16. object Server {
  17. def start(implicit system: ActorSystem) = new Ref(system.actorOf(Props[Server]))
  18. class Ref(actor: ActorRef) {
  19. def sendRequest(list: List[Int])(implicit timeout: Timeout) = (actor ? list).mapTo[List[Int]]
  20. }
  21. }
  22. implicit val actorSystem = ActorSystem()
  23. val duration = 1 minutes
  24. implicit val timeout = Timeout(duration)
  25. val server = Server.start
処理が終わるのを待たない場合
  1. server sendRequest (1 to 20).toList
処理が終わるまで何もせずに待つ場合
  1. val future = server sendRequest (1 to 20).toList
  2. val result = Await.result(future, duration)
処理が終わるまで他のことをしながら待つ場合
  1. val future = server sendRequest (1 to 20).toList
  2. while (!future.isCompleted) {
  3. println("処理中...")
  4. }
  5. val result = Await.result(future, duration)

Erlang

  1. %% サーバでリクエストを処理する関数
  2. process_request() ->
  3. receive
  4. {ServerPid, ClientPid, Request} ->
  5. Result = lists:sort(Request), %% リクエストを処理する
  6. ClientPid ! {ServerPid, Result} %% クライアントにレスポンスを返す
  7. end.
  8. %% サーバで、停止命令が来るまでの間リクエストを受け続ける関数
  9. service_loop() ->
  10. receive
  11. {ClientPid, Request} -> %% リクエストが来たら、
  12. spawn(fun process_request/0) ! {self(), ClientPid, Request}, %% 新しいプロセスを起動してリクエストの処理を任せ、
  13. service_loop(); %% また次のリクエストを待つ
  14. exit -> void
  15. end.
  16. %% サーバを起動する関数
  17. start_server() -> spawn(fun service_loop/0).
処理が終わるのを待たない場合
  1. ServerPid = start_server(),
  2. ServerPid ! {self(), lists:seq(1, 20)}.
処理が終わるまで何もせずに待つ場合
  1. ServerPid = start_server(),
  2. ServerPid ! {self(), lists:seq(1, 20)},
  3. receive
  4. {ServerPid, Result} -> void
  5. end.
処理が終わるまで他のことをしながら待つ場合
  1. wait_result(Pid) ->
  2. receive
  3. {Pid, Result} -> Result
  4. after 0 ->
  5. io:format("処理中...~n", []),
  6. wait_result(Pid)
  7. end.
  1. ServerPid = start_server(),
  2. ServerPid ! {self(), lists:seq(1, 20)},
  3. Result = wait_result(ServerPid).

F#

  1. // open System.Threading.Tasks
  2. type Message<'a> = Request of 'a * AsyncReplyChannel<Task<'a>>
  3. | Exit
  4. type Server() =
  5. // リクエストを処理するメソッド
  6. let processRequest list = Task.Run(fun () -> List.sort list)
  7. // 停止命令が来るまでの間リクエストを受け続けるメソッド
  8. let rec loopReceive (agent : MailboxProcessor<_>) =
  9. async {
  10. let! msg = agent.Receive()
  11. match msg with
  12. | Request(list, replyChannel) -> replyChannel.Reply <| processRequest list
  13. return! loopReceive agent
  14. | Exit -> return ()
  15. }
  16. let agent = MailboxProcessor.Start loopReceive
  17. // サーバにリクエストを送るメソッド
  18. member this.SendRequest list = agent.PostAndReply <| fun replyChannel -> Request(list, replyChannel)
  19. // サーバを停止するメソッド
  20. member this.Quit() = agent.Post Exit
処理が終わるのを待たない場合
  1. let server = Server()
  2. server.SendRequest [1..20] |> ignore
処理が終わるまで何もせずに待つ場合
  1. let server = Server()
  2. let task = server.SendRequest [1..20]
  3. let result = task.Result
処理が終わるまで他のことをしながら待つ場合
  1. let server = Server()
  2. let task = server.SendRequest [1..20]
  3. while not task.IsCompleted do printfn "処理中..."
  4. let result = task.Result

Go

  1. // import "sort"
  2. // リクエストは、処理対象であるintの配列と、返信先となるチャネルを含む
  3. type Request struct {
  4. Array []int
  5. ReplyCh chan []int
  6. }
  7. // サーバは、リクエストを受け付けるチャネルと、停止命令を受け付けるチャネルを持つ
  8. type Server struct {
  9. requestCh chan *Request
  10. quitCh chan bool
  11. }
  12. // サーバにリクエストを送るメソッド
  13. func (this *Server) SendRequest(array []int) *Request {
  14. request := &Request{array, make(chan []int)}
  15. this.requestCh <- request
  16. return request
  17. }
  18. // サーバを停止するメソッド
  19. func (this *Server) Quit() {
  20. this.quitCh <- true
  21. }
  22. // サーバでリクエストを処理するメソッド
  23. func (this *Server) processRequest(request *Request) {
  24. sorted := make([]int, len(request.Array))
  25. copy(sorted, request.Array)
  26. sort.SortInts(sorted)
  27. request.ReplyCh <- sorted // クライアントにレスポンスを返す
  28. }
  29. // サーバで、停止命令が来るまでの間リクエストを受け続けるメソッド
  30. func (this *Server) serviceLoop() {
  31. for {
  32. select {
  33. case request := <- this.requestCh: // 通常のリクエストが来たら、配列をソートして返す
  34. go this.processRequest(request)
  35. case <- this.quitCh: // 終了のリクエストが来たら、後始末をしてループを抜ける
  36. close(this.requestCh)
  37. close(this.quitCh)
  38. return
  39. }
  40. }
  41. }
  42. // サーバを起動するメソッド
  43. func StartServer() *Server {
  44. server := &Server{make(chan *Request, 1000), make(chan bool)}
  45. go server.serviceLoop()
  46. return server
  47. }
処理が終わるのを待たない場合
  1. server := StartServer()
  2. request := server.SendRequest([]int{4, 1, 2, 5, 0})
処理が終わるまで何もせずに待つ場合
  1. server := StartServer()
  2. request := server.SendRequest([]int{4, 1, 2, 5, 0})
  3. reply := <- request.ReplyCh // reply の型は []int
処理が終わるまで他のことをしながら待つ場合
  1. server := StartServer()
  2. request := server.SendRequest([]int{4, 1, 2, 5, 0})
  3. var reply []int
  4. for {
  5. var ok bool
  6. reply, ok = <- request.ReplyCh // reply の型は []int
  7. if ok { break }
  8. fmt.Printf("処理中...")
  9. }

MapReduce

MapReduce により、テキストファイル内の単語をカウントする。

Java

  1. // import java.util.Map.Entry;
  2. // import java.util.concurrent.ConcurrentMap;
  3. // import java.util.function.*;
  4. // import java.util.stream.*;
  5. public final class MapReduce {
  6. public static <K1, V1, K2, V2, K3, V3> void execute(
  7. Stream<Entry<K1, V1>> read,
  8. Function<Entry<K1, V1>, Stream<Entry<K2, V2>>> map,
  9. Function<Entry<K2, Stream<V2>>, Stream<Entry<K3, V3>>> reduce,
  10. Consumer<Entry<K3, V3>> write) {
  11. Collector<Entry<K2, V2>, ?, ConcurrentMap<K2, Stream<V2>>> multiMapCollector =
  12. Collectors.toConcurrentMap(e -> e.getKey(), e -> Stream.of(e.getValue()), Stream::concat);
  13. Stream<Entry<K2, V2>> mapped = processParallel("map", read, map);
  14. Stream<Entry<K2, Stream<V2>>> shuffled = mapped.collect(multiMapCollector).entrySet().stream();
  15. System.out.println("shuffled");
  16. processParallel("reduce", shuffled, reduce).forEachOrdered(write);
  17. System.out.println("completed");
  18. }
  19. private static <K1, V1, K2, V2> Stream<Entry<K2, V2>> processParallel(
  20. String processName,
  21. Stream<Entry<K1, V1>> input,
  22. Function<Entry<K1, V1>, Stream<Entry<K2, V2>>> process) {
  23. return input.parallel().flatMap(e -> {
  24. try {
  25. System.out.printf("%s start key=[%s] value=[%s]%n", processName, e.getKey(), e.getValue());
  26. return process.apply(e);
  27. } finally {
  28. System.out.printf("%s end%n", processName);
  29. }
  30. });
  31. }
  32. }
  1. // import java.util.Map.Entry;
  2. // import java.util.function.*;
  3. // import java.util.stream.Stream;
  4. public final class MapReduce {
  5. public static <K1, V1, K2, V2, K3, V3> void execute(
  6. Stream<Entry<K1, V1>> read,
  7. Function<Entry<K1, V1>, Stream<Entry<K2, V2>>> map,
  8. Function<Entry<K2, Stream<V2>>, Stream<Entry<K3, V3>>> reduce,
  9. Consumer<Entry<K3, V3>> write) {
  10. Map<K2, Stream<V2>> m = new HashMap<>();
  11. processParallel("map", read, map,
  12. e -> m.merge(e.getKey(), Stream.of(e.getValue()), Stream::concat));
  13. Stream<Entry<K2, Stream<V2>>> shuffled = m.entrySet().stream();
  14. System.out.println("shuffled");
  15. processParallel("reduce", shuffled, reduce, write);
  16. System.out.println("completed");
  17. }
  18. private static <K1, V1, K2, V2> void processParallel(
  19. String processName,
  20. Stream<Entry<K1, V1>> input,
  21. Function<Entry<K1, V1>, Stream<Entry<K2, V2>>> process,
  22. Consumer<Entry<K2, V2>> output) {
  23. input.parallel().flatMap(e -> {
  24. try {
  25. System.out.printf("%s start key=[%s] value=[%s]%n", processName, e.getKey(), e.getValue());
  26. return process.apply(e);
  27. } finally {
  28. System.out.printf("%s end%n", processName);
  29. }
  30. }).forEachOrdered(output);
  31. }
  32. }
  1. // import java.util.AbstractMap.SimpleEntry;
  2. // import java.util.regex.Pattern;
  3. // import java.nio.file.*;
  4. // import java.util.stream.Stream;
  5. try (Stream<String> lines = Files.lines(Path.of("in.txt"))) {
  6. MapReduce.<Void, String, String, Integer, String, Long>execute(
  7. // read : Stream<Entry<Void, String>>
  8. lines.map(line -> new SimpleEntry<>(null, line)),
  9. // map : Entry<Void, String> -> Stream<Entry<String, Integer>>
  10. e -> Pattern.compile("\\W+").splitAsStream(e.getValue()).map(s -> new SimpleEntry<>(s, 1)),
  11. // reduce : Entry<String, Stream<Integer>> -> Stream<Entry<String, Long>>
  12. e -> Stream.of(new SimpleEntry<>(e.getKey(), e.getValue().count())),
  13. // write : Entry<String, Long> -> void
  14. e -> System.out.printf("%5d\t%s%n", e.getValue(), e.getKey())
  15. );
  16. }
RxJava を使った場合
  1. // import java.util.AbstractMap.SimpleEntry;
  2. // import java.util.Map.Entry;
  3. // import io.reactivex.rxjava3.core.*;
  4. // import io.reactivex.rxjava3.functions.*;
  5. // import io.reactivex.rxjava3.parallel.ParallelFlowable;
  6. // import io.reactivex.rxjava3.schedulers.Schedulers;
  7. public final class MapReduce {
  8. public static <K1, V1, K2, V2, K3, V3> Single<Long> execute(
  9. Flowable<Entry<K1, V1>> reader,
  10. Function<Entry<K1, V1>, Flowable<Entry<K2, V2>>> map,
  11. Function<Entry<K2, Flowable<V2>>, Flowable<Entry<K3, V3>>> reduce,
  12. Consumer<Entry<K3, V3>> writer) {
  13. ParallelFlowable<Entry<K2, V2>> mapped = processParallel("map", reader, map);
  14. Flowable<Entry<K2, Flowable<V2>>> shuffled = mapped.sequential()
  15. .toMultimap(e -> e.getKey(), e -> e.getValue())
  16. .toFlowable()
  17. .flatMap(m -> Flowable.fromIterable(m.entrySet()))
  18. .map(e -> new SimpleEntry<>(e.getKey(), Flowable.fromIterable(e.getValue())));
  19. ParallelFlowable<Entry<K3, V3>> reduced = processParallel("reduce", shuffled, reduce);
  20. return reduced.sequential()
  21. .doOnNext(writer)
  22. .count();
  23. }
  24. private static <K1, V1, K2, V2> ParallelFlowable<Entry<K2, V2>> processParallel(
  25. String processName,
  26. Flowable<Entry<K1, V1>> input,
  27. Function<Entry<K1, V1>, Flowable<Entry<K2, V2>>> process) {
  28. return input.parallel()
  29. .runOn(Schedulers.newThread())
  30. .flatMap(e -> {
  31. try {
  32. System.out.printf("%s start key=[%s] value=[%s]%n", processName, e.getKey(), e.getValue());
  33. return process.apply(e);
  34. } finally {
  35. System.out.printf("%s end%n", processName);
  36. }
  37. });
  38. }
  39. }
  1. // import java.util.AbstractMap.SimpleEntry;
  2. // import java.nio.file.*;
  3. // import java.util.stream.Stream;
  4. // import io.reactivex.rxjava3.core.Flowable;
  5. try (Stream<String> lines = Files.lines(Path.of("in.txt"))) {
  6. Long count = MapReduce.<Void, String, String, Integer, String, Long>execute(
  7. // read : Flowable<Entry<Void, String>>
  8. Flowable.fromStream(lines.map(line -> new SimpleEntry<>(null, line))),
  9. // map : Entry<Void, String> -> Flowable<Entry<String, Integer>>
  10. e -> Flowable.fromArray(e.getValue().split("\\W+"))
  11. .filter(s -> s.length() > 0)
  12. .map(s -> new SimpleEntry<>(s, 1)),
  13. // reduce : Entry<String, Flowable<Integer>> -> Flowable<Entry<String, Long>>
  14. e -> e.getValue().count().toFlowable().map(n -> new SimpleEntry<>(e.getKey(), n)),
  15. // write : Entry<String, Long> -> void
  16. e -> System.out.printf("%5d\t%s%n", e.getValue(), e.getKey())
  17. ).blockingGet();
  18. System.out.printf("completed: %d%n", count);
  19. }

C#

タスク並列ライブラリ(TPL)を使った場合
  1. // using System.Linq;
  2. public sealed class MapReduce {
  3. public static void Execute<K1, V1, K2, V2, K3, V3>(
  4. IEnumerable<(K1, V1)> read,
  5. Func<(K1, V1), IEnumerable<(K2, V2)>> map,
  6. Func<(K2, IEnumerable<V2>), IEnumerable<(K3, V3)>> reduce,
  7. Action<(K3, V3)> write) {
  8. var mapped = ProcessParallel("map", read, map);
  9. var shuffled = from grp in (from t in mapped group t.Item2 by t.Item1)
  10. select (grp.Key, grp.AsEnumerable());
  11. var reduced = ProcessParallel("reduce", shuffled, reduce);
  12. foreach (var t in reduced) {
  13. write(t);
  14. }
  15. }
  16. private static ParallelQuery<(K2, V2)> ProcessParallel<K1, V1, K2, V2>(
  17. string processName,
  18. IEnumerable<(K1, V1)> input,
  19. Func<(K1, V1), IEnumerable<(K2, V2)>> process) {
  20. return input.AsParallel().SelectMany(t => {
  21. Console.WriteLine($"{processName} start key=[{t.Item1}] value=[{t.Item2}]");
  22. try {
  23. System.Threading.Thread.Sleep(200);
  24. return process(t);
  25. } finally {
  26. Console.WriteLine($"{processName} end");
  27. }
  28. });
  29. }
  30. }
  1. // using System.IO;
  2. // using System.Linq;
  3. // using System.Text.RegularExpressions;
  4. MapReduce.Execute(
  5. // read : IEnumerable<(object, string)>
  6. from line in File.ReadLines("in.txt") select ((object)null, line),
  7. // map : (object, string) => IEnumerable<(string, int)>
  8. t => from Match m in Regex.Matches(t.Item2, @"\w+") select (m.Value, 1),
  9. // reduce : (string, IEnumerable<int>) => IEnumerable<(string, int)>
  10. t => Enumerable.Repeat((t.Item1, t.Item2.Count()), 1),
  11. // write : (string, int) => void
  12. t => Console.WriteLine($"{t.Item2,5}\t{t.Item1}")
  13. );
The Reactive Extensions for .NET を使った場合
  1. // using System.Linq;
  2. // using System.Reactive.Linq;
  3. public sealed class MapReduce {
  4. public static void Execute<K1, V1, K2, V2, K3, V3>(
  5. IObservable<(K1, V1)> read,
  6. Func<(K1, V1), IObservable<(K2, V2)>> map,
  7. Func<(K2, IObservable<V2>), IObservable<(K3, V3)>> reduce,
  8. Action<(K3, V3)> write) {
  9. var mapped = ProcessParallel("map", read, map);
  10. var shuffled = from lookup in mapped.ToLookup(t => t.Item1, t => t.Item2)
  11. from grp in lookup
  12. select (grp.Key, grp.ToObservable());
  13. var reduced = ProcessParallel("reduce", shuffled, reduce);
  14. foreach (var t in reduced.ToEnumerable()) {
  15. write(t);
  16. }
  17. }
  18. private static IObservable<(K2, V2)> ProcessParallel<K1, V1, K2, V2>(
  19. string processName,
  20. IObservable<(K1, V1)> input,
  21. Func<(K1, V1), IObservable<(K2, V2)>> process) {
  22. Func<(K1, V1), IObservable<(K2, V2)>> f = t => {
  23. Console.WriteLine($"{processName} start key=[{t.Item1}] value=[{t.Item2}]");
  24. try {
  25. System.Threading.Thread.Sleep(200);
  26. return process(t);
  27. } finally {
  28. Console.WriteLine($"{processName} end");
  29. }
  30. };
  31. return input.SelectMany(f.ToAsync()).Merge();
  32. }
  33. }
  1. // using System.IO;
  2. // using System.Linq;
  3. // using System.Reactive.Linq;
  4. // using System.Text.RegularExpressions;
  5. MapReduce.Execute(
  6. // read : IObservable<(object, string)>
  7. from line in File.ReadLines("in.txt").ToObservable() select ((object)null, line),
  8. // map : (object, string) => IObservable<(string, int)>
  9. t => from m in Regex.Matches(t.Item2, @"\w+").ToObservable() select (m.Value, 1),
  10. // reduce : (string, IObservable<int>) => IObservable<(string, int)>
  11. t => t.Item2.Count().Select(n => (t.Item1, n)),
  12. // write : (string, int) => void
  13. t => Console.WriteLine($"{t.Item2,5}\t{t.Item1}")
  14. );

戻る

目次

別のスレッドで処理(リストを昇順にソートする)を行う Java Groovy Kotlin Scala Erlang Haskell PowerShell F# C# C++ Go Rust Dart TypeScript JavaScript CoffeeScript Ruby Python PHP Perl
リストの各要素を並列で処理して(処理の順序は問わない)全ての完了を待つ Java Groovy Kotlin Scala Erlang Haskell PowerShell F# C# C++ Go Rust Dart TypeScript JavaScript CoffeeScript Ruby Python PHP Perl
リストの各要素を並列で変換処理して、その結果のリストを出力する Java Groovy Kotlin Scala Erlang Haskell PowerShell F# C# C++ Go Rust Dart TypeScript JavaScript CoffeeScript Ruby Python PHP Perl
メッセージ(処理のリクエスト)を受け取る度に新しいスレッドを起動して処理を行うオブジェクト Java Groovy Kotlin Scala Erlang Haskell PowerShell F# C# C++ Go Rust Dart TypeScript JavaScript CoffeeScript Ruby Python PHP Perl
MapReduce Java Groovy Kotlin Scala Erlang Haskell PowerShell F# C# C++ Go Rust Dart TypeScript JavaScript CoffeeScript Ruby Python PHP Perl