プログラミング言語の比較 > 並列処理(スレッド)

別のスレッドで処理(リストを昇順にソートする)を行う

Java

  1. // import java.util.concurrent.*;
  2. class Sorter<E> implements Callable<E[]> {
  3.     private final E[] original;
  4.     public Sorter(E... original) {
  5.         this.original = original;
  6.     }
  7.     @Override
  8.     public E[] call() throws Exception {
  9.         E[] sorted = Arrays.copyOf(original, original.length);
  10.         Arrays.sort(sorted);
  11.         return sorted;
  12.     }
  13. }
処理が終わるのを待たない場合
  1. ExecutorService executor = Executors.newCachedThreadPool();
  2. executor.submit(new Sorter<Integer>(41250));
  3. executor.shutdown();
処理が終わるまで何もせずに待つ場合
  1. ExecutorService executor = Executors.newCachedThreadPool();
  2. Future<Integer[]> future = executor.submit(new Sorter<Integer>(41250));
  3. Integer[] result = future.get();
  4. executor.shutdown();
処理が終わるまで他のことをしながら待つ場合
  1. ExecutorService executor = Executors.newCachedThreadPool();
  2. Future<Integer[]> future = executor.submit(new Sorter<Integer>(41250));
  3. while (! future.isDone()) {
  4.     System.out.println("処理中...");
  5. }
  6. Integer[] result = future.get();
  7. executor.shutdown();

Java 8以降でRxJava を使った場合

  1. // import java.util.concurrent.CountDownLatch;
  2. // import java.util.stream.IntStream;
  3. // import rx.Observable;
  4. IntStream stream = IntStream.of(41250);
  5. Observable<IntStream> ob = Observable.just(stream).parallel(o -> o.map(s -> s.sorted()));
処理が終わるのを待たない場合
  1. ob.subscribe();
処理が終わるまで何もせずに待つ場合
  1. List<Integer> result = new ArrayList<>();
  2. CountDownLatch latch = new CountDownLatch(1);
  3. ob.subscribe(s -> {
  4.     s.forEach(result::add);
  5.     latch.countDown();
  6. });
  7. latch.await();
処理が終わるまで他のことをしながら待つ場合
  1. List<Integer> result = new ArrayList<>();
  2. CountDownLatch latch = new CountDownLatch(1);
  3. ob.subscribe(s -> {
  4.     s.forEach(result::add);
  5.     latch.countDown();
  6. });
  7. while (latch.getCount() > 0) {
  8.     System.out.println("処理中...");
  9. }

Groovy

  1. // import java.util.concurrent.*
  2. class Sorter implements Callable {
  3.     private original
  4.     Sorter(... original) {
  5.         this.original = original
  6.     }
  7.     def call() {
  8.         original.sort()
  9.     }
  10. }
  11. def sorter = new Sorter(41250)
  12. def executor = Executors.newSingleThreadExecutor()
処理が終わるのを待たない場合
  1. executor.submit(sorter)
  2. executor.shutdown()
処理が終わるまで何もせずに待つ場合
  1. def future = executor.submit(sorter)
  2. def result = future.get()
  3. executor.shutdown()
処理が終わるまで他のことをしながら待つ場合
  1. def future = executor.submit(sorter)
  2. while (! future.done) {
  3.     println '処理中...'
  4. }
  5. def result = future.get()
  6. executor.shutdown()

Groovy 2.2以降で RxJava を使った場合

  1. // import java.util.concurrent.CountDownLatch
  2. // import rx.Observable
  3. def list = [41250]
  4. def o = Observable.just(list).parallel { it.map { it.sort() } }
処理が終わるのを待たない場合
  1. o.subscribe()
処理が終わるまで何もせずに待つ場合
  1. def result = null
  2. def latch = new CountDownLatch(1)
  3. o.subscribe {
  4.     result = it
  5.     latch.countDown()
  6. }
  7. latch.await()
処理が終わるまで他のことをしながら待つ場合
  1. def result = null
  2. o.subscribe { result = it }
  3. while (result == null) {
  4.     println '処理中...'
  5. }

Scala

Future を使う

  1. // import scala.concurrent._
  2. // import scala.concurrent.ExecutionContext.Implicits.global
  3. // import scala.concurrent.duration.Duration
  4. val list = List(41250)
処理が終わるのを待たない場合
  1. Future { list.sorted }
処理が終わるまで何もせずに待つ場合
  1. val future = Future { list.sorted }                         // future の型は scala.concurrent.Future[List[Int]]
  2. val result = Await.result(future, Duration.Inf)             // result の型は List[Int]
処理が終わるまで他のことをしながら待つ場合
  1. val future = Future { list.sorted }                         // future の型は scala.concurrent.Future[List[Int]]
  2. while (!future.isCompleted) {
  3.     println("処理中...")
  4. }
  5. val result = Await.result(future, Duration.Zero)            // result の型は List[Int]

akka を使った場合(無名のアクター)

  1. // import scala.concurrent.Await
  2. // import scala.concurrent.duration._
  3. // import akka.actor.ActorSystem
  4. // import akka.actor.ActorDSL._
  5. // import akka.pattern.ask
  6. // import akka.util.Timeout
  7. implicit val actorSystem = ActorSystem()
  8. val duration = 1 minutes
  9. implicit val timeout = Timeout(duration)
  10. val s = actor(new Act {
  11.     become {
  12.         case list: List[Int] => sender() ! list.sorted
  13.     }
  14. })
  15. val list = List(41250)
処理が終わるのを待たない場合
  1. s ! list
処理が終わるまで何もせずに待つ場合
  1. val result = Await.result((s ? list).mapTo[List[Int]], duration)
処理が終わるまで他のことをしながら待つ場合
  1. val future = (s ? list).mapTo[List[Int]]
  2. while (!future.isCompleted) {
  3.     println("処理中...")
  4. }
  5. val result = Await.result(future, duration)

akka を使った場合(アクタークラスを定義)

  1. // import scala.concurrent.Await
  2. // import scala.concurrent.duration._
  3. // import scala.math.Ordering
  4. // import akka.actor._
  5. // import akka.pattern.ask
  6. // import akka.util.Timeout
  7. class Sorter[A](implicit ord: Ordering[A]) extends Actor {
  8.     override def receive = {
  9.         case list: List[A] => sender() ! list.sorted
  10.     }
  11. }
  12. val actorSystem = ActorSystem()
  13. val duration = 1 minutes
  14. implicit val timeout = Timeout(duration)
  15. val s = actorSystem.actorOf(Props(classOf[Sorter[_]], Ordering.Int))
  16. val list = List(41250)
処理が終わるのを待たない場合
  1. s ! list
処理が終わるまで何もせずに待つ場合
  1. val result = Await.result((s ? list).mapTo[List[Int]], duration)
処理が終わるまで他のことをしながら待つ場合
  1. val future = (s ? list).mapTo[List[Int]]
  2. while (!future.isCompleted) {
  3.     println("処理中...")
  4. }
  5. val result = Await.result(future, duration)

RxJava を使った場合

  1. // import scala.concurrent.SyncVar
  2. // import rx.lang.scala.Observable
  3. val list = List(41250)
  4. val o = Observable.items(list).parallel { _ map { _.sorted } }
処理が終わるのを待たない場合
  1. o.subscribe
処理が終わるまで何もせずに待つ場合
  1. val sv = new SyncVar[List[Int]]
  2. o.subscribe(sv put _)
  3. val result = sv.get
処理が終わるまで他のことをしながら待つ場合
  1. val sv = new SyncVar[List[Int]]
  2. o.subscribe(sv put _)
  3. while (!sv.isSet) {
  4.     println("処理中...")
  5. }
  6. val result = sv.get

Erlang

  1. SorterPid = spawn(fun() ->
  2.                       receive
  3.                           {Pid, List} -> Pid ! {self(), lists:sort(List)}
  4.                       end
  5.                   end),
処理が終わるのを待たない場合
  1. SorterPid ! {self(), [41250]}.
処理が終わるまで何もせずに待つ場合
  1. SorterPid ! {self(), [41250]},
  2. receive
  3.     {SorterPid, Result} -> void
  4. end.
処理が終わるまで他のことをしながら待つ場合
  1. wait_result(Pid) ->
  2.     receive
  3.         {Pid, Result} -> Result
  4.     after 0 ->
  5.         io:format("処理中...~n", []),
  6.         wait_result(Pid)
  7.     end.
  1. SorterPid ! {self(), [41250]},
  2. Result = wait_result(SorterPid).

F#

非同期ワークフローを使った場合

処理が終わるのを待たない場合
  1. let list = [41250]
  2. async {
  3.     list |> List.sort |> ignore
  4. } |> Async.Start
処理が終わるまで何もせずに待つ場合
  1. let list = [41250]
  2. let a = async {
  3.     return list |> List.sort
  4. }
  5. let result = (Async.StartAsTask a).Result
  1. let list = [41250]
  2. let a = async {
  3.     return list |> List.sort
  4. }
  5. let result = Async.Parallel [a] |> Async.RunSynchronously |> Seq.head
  1. // open System
  2. let asyncFunc func arg =
  3.     let _delegate = new Func<'T, 'TResult>(func)
  4.     async { return! Async.FromBeginEnd(arg, _delegate.BeginInvoke, _delegate.EndInvoke) }
  5. let list = [41250]
  6. let result = list |> asyncFunc List.sort |> Async.RunSynchronously
処理が終わるまで他のことをしながら待つ場合
  1. let list = [41250]
  2. let a = async {
  3.     return list |> List.sort
  4. }
  5. let task = Async.StartAsTask a
  6. while not task.IsCompleted do printfn "処理中..."
  7. let result = task.Result
  1. let array = [|41250|]
  2. let result = ref null
  3. let a = async {
  4.     result := Array.sort array
  5. }
  6. let b = async {
  7.     while !result = null do printfn "処理中..."
  8. }
  9. Async.Parallel [a; b] |> Async.RunSynchronously |> ignore

タスク並列ライブラリ(TPL)を使った場合

処理が終わるのを待たない場合
  1. // open System.Threading.Tasks
  2. let list = [41250]
  3. Task.Factory.StartNew(fun () -> List.sort list) |> ignore
処理が終わるまで何もせずに待つ場合
  1. // open System.Threading.Tasks
  2. let list = [41250]
  3. let task = Task.Factory.StartNew(fun () -> List.sort list)
  4. let result = task.Result
  1. // open System
  2. // open System.Threading.Tasks
  3. let array = [|41250|]
  4. let result = ref null
  5. let a() =
  6.     result := Array.sort array
  7. Parallel.Invoke [| new Action(a) |]
処理が終わるまで他のことをしながら待つ場合
  1. // open System.Threading.Tasks
  2. let list = [41250]
  3. let task = Task.Factory.StartNew(fun () -> List.sort list)
  4. while not task.IsCompleted do printfn "処理中..."
  5. let result = task.Result
  1. // open System
  2. // open System.Threading.Tasks
  3. let array = [|41250|]
  4. let result = ref null
  5. let a() =
  6.     result := Array.sort array
  7. let b() =
  8.     while !result = null do printfn "処理中..."
  9. Parallel.Invoke [| new Action(a); new Action(b) |]

Reactive Extensions を使った場合

  1. // open System.Reactive.Linq
  2. let array = [|41250|]
処理が終わるのを待たない場合
  1. Observable.Start(fun () -> Array.sort array) |> ignore
処理が終わるまで何もせずに待つ場合
  1. let result = Observable.Start(fun () -> Array.sort array).Last()
処理が終わるまで他のことをしながら待つ場合
  1. let result = ref null
  2. Observable.Start(fun () -> Array.sort array).Subscribe(fun array -> result := array) |> ignore
  3. while !result = null do printfn "処理中..."

C#

Thread クラスを使った場合

  1. // using System.Threading;
  2. int[] array = { 41250 };
  3. int[] result = null;
  4. Thread t = new Thread(delegate () {
  5.     int[] sorted = (int[])array.Clone();
  6.     Array.Sort(sorted);
  7.     result = sorted;
  8. });
  9. t.Name = "Sorting";
処理が終わるのを待たない場合
  1. t.Start();
処理が終わるまで何もせずに待つ場合
  1. t.Start();
  2. t.Join();
処理が終わるまで他のことをしながら待つ場合
  1. t.Start();
  2. while (t.IsAlive) {
  3.     Console.WriteLine("処理中...");
  4. }

ThreadPool クラスを使った場合

  1. // using System.Threading;
  2. int[] array = { 41250 };
  3. int[] result = null;
  4. WaitCallback callback = delegate (object state) {
  5.     int[] sorted = (int[])array.Clone();
  6.     Array.Sort(sorted);
  7.     result = sorted;
  8. };
処理が終わるのを待たない場合
  1. ThreadPool.QueueUserWorkItem(callback);
処理が終わるまで何もせずに待つ場合
  1. ThreadPool.QueueUserWorkItem(callback);
  2. TimeSpan sleepTime = TimeSpan.FromMilliseconds(100);
  3. while (result == null) {
  4.     Thread.sleep(sleepTime);
  5. }
処理が終わるまで他のことをしながら待つ場合
  1. ThreadPool.QueueUserWorkItem(callback);
  2. while (result == null) {
  3.     Console.WriteLine("処理中...");
  4. }

デリゲートの BeginInvoke メソッドを使った場合

  1. // using System.Threading;
  2. int[] array = { 41250 };
  3. Converter<int[], int[]> c = delegate (int[] input) {
  4.     int[] sorted = (int[])input.Clone();
  5.     Array.Sort(sorted);
  6.     return sorted;
  7. };
処理が終わるのを待たない場合
  1. c.BeginInvoke(array, nullnull);
  1. AsyncCallback callback = delegate (IAsyncResult ar) {
  2.     int[] result = c.EndInvoke(ar);
  3. };
  4. c.BeginInvoke(array, callback, null);
処理が終わるまで何もせずに待つ場合
  1. IAsyncResult ar = c.BeginInvoke(array, nullnull);
  2. int[] result = c.EndInvoke(ar);
  1. IAsyncResult ar = c.BeginInvoke(array, nullnull);
  2. using (WaitHandle wh = ar.AsyncWaitHandle) {
  3.     wh.WaitOne();
  4.     int[] result = c.EndInvoke(ar);
  5. }
処理が終わるまで他のことをしながら待つ場合
  1. IAsyncResult ar = c.BeginInvoke(array, nullnull);
  2. while (! ar.IsCompleted) {
  3.     Console.WriteLine("処理中...");
  4. }
  5. int[] result = c.EndInvoke(ar);

C# 4.0以降でタスク並列ライブラリ(TPL)を使った場合

  1. // using System.Threading.Tasks;
  2. int[] array = { 41250 };
  3. Func<int[], int[]> sorter = input => {
  4.     int[] sorted = (int[])input.Clone();
  5.     Array.Sort(sorted);
  6.     return sorted;
  7. };
処理が終わるのを待たない場合
  1. Task.Factory.StartNew(() => sorter(array));
処理が終わるまで何もせずに待つ場合
  1. var task = Task.Factory.StartNew(() => sorter(array));
  2. var result = task.Result;
処理が終わるまで他のことをしながら待つ場合
  1. var task = Task.Factory.StartNew(() => sorter(array));
  2. while (! task.IsCompleted) {
  3.     Console.WriteLine("処理中...");
  4. }
  5. var result = task.Result;

Reactive Extensions を使った場合

  1. // using System.Reactive.Linq;
  2. int[] array = { 41250 };
  3. Func<int[], int[]> sorter = input => {
  4.     int[] sorted = (int[])input.Clone();
  5.     Array.Sort(sorted);
  6.     return sorted;
  7. };
処理が終わるのを待たない場合
  1. sorter.ToAsync()(array);
  1. Observable.Start(() => sorter(array));
処理が終わるまで何もせずに待つ場合
  1. var result = sorter.ToAsync()(array).Last();
  1. var result = Observable.Start(() => sorter(array)).Last();
処理が終わるまで他のことをしながら待つ場合
  1. int[] result = null;
  2. sorter.ToAsync()(array).Subscribe(list => result = list);
  3. while (result == null) {
  4.     Console.WriteLine("処理中...");
  5. }

Go

  1. // import "sort"
  2. func doSort(in <-chan []int, out chan<- []int) {
  3.     array := <- in
  4.     sorted := make([]intlen(array))
  5.     copy(sorted, array)
  6.     sort.SortInts(sorted)
  7.     out <- sorted
  8. }
  1. in := make(chan []int)
  2. out := make(chan []int)
  3. go doSort(in, out)
処理が終わるのを待たない場合
  1. in <- []int{41250}
処理が終わるまで何もせずに待つ場合
  1. in <- []int{41250}
  2. result := <- out
処理が終わるまで他のことをしながら待つ場合
  1. in <- []int{41250}
  2. var result []int
  3. for {
  4.     var ok bool
  5.     result, ok = <- out
  6.     if ok { break }
  7.     fmt.Printf("処理中...")
  8. }

Dart

  1. class Sorter extends Isolate {
  2.     main() {
  3.         port.receive((list, replyTo) {
  4.             var newList = new List<int>.from(list);
  5.             newList.sort((a, b) => a - b);
  6.             replyTo.send(newList);
  7.         });
  8.     }
  9. }
処理が終わるのを待たない場合
  1. List<int> list = [41250];
  2. new Sorter().spawn().then((port) {
  3.     port.send(list);
  4. });
処理が終わるまで何もせずに待つ場合
  1. List<int> list = [41250];
  2. new Sorter().spawn().then((port) {
  3.     port.call(list).receive((result, _) {
  4.         // result が返って来た後の処理
  5.     });
  6. });

Ruby

処理が終わるのを待たない場合
  1. array = [41250]
  2. Thread.start { array.sort }
  1. array = [41250]
  2. Thread.start(array) {|a| a.sort }
処理が終わるまで何もせずに待つ場合
  1. array = [41250]
  2. result = Thread.start { array.sort }.value
  1. array = [41250]
  2. result = Thread.start(array) {|a| a.sort }.value
処理が終わるまで他のことをしながら待つ場合
  1. array = [41250]
  2. t = Thread.start { array.sort }
  3. while t.alive?
  4.   puts '処理中...'
  5. end
  6. result = t.value
  1. array = [41250]
  2. t = Thread.start(array) {|a| a.sort }
  3. while t.alive?
  4.   puts '処理中...'
  5. end
  6. result = t.value

Python

専用のスレッドクラスを作る

  1. # import threading
  2. class Sorter(threading.Thread):
  3.     def __init__(self, *original):
  4.         threading.Thread.__init__(self)
  5.         self.original = original
  6.     def run(self):
  7.         self.sorted = sorted(self.original)
  8. sorter = Sorter(41250)
  9. sorter.name = 'Sorting'
処理が終わるのを待たない場合
  1. sorter.start()
処理が終わるまで何もせずに待つ場合
  1. sorter.start()
  2. sorter.join()
  3. result = sorter.sorted
処理が終わるまで他のことをしながら待つ場合
  1. sorter.start()
  2. while sorter.is_alive():
  3.     print('処理中...')
  4. result = sorter.sorted

汎用的なスレッドクラスを作る

  1. # import threading
  2. class Thread(threading.Thread):
  3.     def __init__(self, func, *args):
  4.         threading.Thread.__init__(self)
  5.         self.func = func
  6.         self.args = args
  7.     def run(self):
  8.         self.result = self.func(*self.args)
  9. sorter = Thread(sorted, [41250])
  10. sorter.name = 'Sorting'
処理が終わるのを待たない場合
  1. sorter.start()
処理が終わるまで何もせずに待つ場合
  1. sorter.start()
  2. sorter.join()
  3. result = sorter.result
処理が終わるまで他のことをしながら待つ場合
  1. sorter.start()
  2. while sorter.is_alive():
  3.     print('処理中...')
  4. result = sorter.result

Perl

処理が終わるのを待たない場合
  1. # use threads;
  2. threads->create(sub { sort @_ }, 41250)->detach;
  1. # use threads;
  2. async { sort 41250 }->detach;
処理が終わるまで何もせずに待つ場合
  1. # use threads;
  2. my ($t) = threads->create(sub { sort @_ }, 41250);
  3. my @result = $t->join;
処理が終わるまで他のことをしながら待つ場合
  1. # use threads;
  2. my ($t) = threads->create(sub { sort @_ }, 41250);
  3. print "処理中...\n" while $t->is_running;
  4. my @result = $t->join;

リストの各要素を並列で処理して(処理の順序は問わない)全ての完了を待つ

処理内容は、(1) 開始(Start)のメッセージを出力する (2) スリープする (3) 終了(End)のメッセージを出力する。

Java

Java 8以降
  1. public static void write(int n, String s) {
  2.     System.out.println(MessageFormat.format("{0,date,HH:mm:ss.SSS} {1,number,00}: {2} Thread-{3}",
  3.         new Date(), n, s, Thread.currentThread().getId()));
  4. }
  5. public static void exec(int n) {
  6.     write(n, "S ");
  7.     try {
  8.         Thread.sleep(3000);
  9.     } catch (InterruptedException e) {
  10.     }
  11.     write(n, " E");
  12. }
  1. // import java.util.stream.IntStream;
  2. IntStream stream = IntStream.iterate(1, n -> n + 1).limit(20);          // 1から20までの数列
  3. stream.parallel().forEach(n -> exec(n));
Java 8以降で RxJava を使った場合
  1. // import java.util.concurrent.CountDownLatch;
  2. // import rx.Observable;
  3. // import rx.exceptions.OnErrorNotImplementedException;
  4. // import rx.functions.Action1;
  5. Observable<Integer> range = Observable.range(120);                    // 1から20までの数列
  6. Action1<Throwable> rethrow = e -> { throw new OnErrorNotImplementedException(e); };
  7. CountDownLatch latch = new CountDownLatch(1);
  8. range.parallel(o -> o.map(n -> {
  9.     exec(n);
  10.     return null;
  11. })).subscribe(n -> {}, rethrow, () -> latch.countDown());
  12. latch.await();                                                          // countDown されるまで待つ

Groovy

Java 8以降で Groovy 2.2以降
  1. // import java.text.MessageFormat
  2. def write(n, s) {
  3.     println MessageFormat.format("{0,date,HH:mm:ss.SSS} {1,number,00}: {2} Thread-{3}",
  4.         new Date(), n, s, Thread.currentThread().id)
  5. }
  6. def exec(n) {
  7.     write n, "S "
  8.     Thread.sleep 3000
  9.     write n, " E"
  10. }
  1. def list = 1..20
  2. list.stream().parallel().forEach { exec it }
Groovy 2.2以降で RxJava を使った場合
  1. // import java.util.concurrent.CountDownLatch
  2. // import rx.Observable
  3. // import rx.exceptions.OnErrorNotImplementedException
  4. def list = 1..20
  5. def latch = new CountDownLatch(1)
  6. Observable.from(list)
  7.     .parallel { it.map { exec it } }
  8.     .subscribe({}, { throw new OnErrorNotImplementedException(it) }, { latch.countDown() })
  9. latch.await()                                                           // countDown されるまで待つ

Scala

Scala 2.9以降
  1. // import java.text.MessageFormat
  2. // import java.util.Date
  3. def write(n: Int, s: String) = println(
  4.     MessageFormat.format("{0,date,HH:mm:ss.SSS} {1,number,00}: {2} Thread-{3}",
  5.         new Date, Integer valueOf n, s, Thread.currentThread.getId.toString))
  6. def exec(n: Int) {
  7.     write(n, "S ")
  8.     Thread sleep 3000
  9.     write(n, " E")
  10. }
  1. val range = 1 to 20
  2. range.par.foreach(exec)
RxJava を使った場合
  1. // import scala.concurrent.SyncVar
  2. // import rx.lang.scala.Observable
  3. val range = 1 to 20
  4. val sv = new SyncVar[Unit]
  5. Observable.from(range).parallel(_ map exec).last.subscribe(sv.put _)
  6. sv.get

Erlang

  1. List = lists:seq(120),
  2. Write = fun(N, S) ->
  3.             {Hour, Minute, Second} = time(),
  4.             {_, _, MicroSecs} = now(),
  5.             io:format("~2..0b:~2..0b:~2..0b.~3..0b ~2..0b: ~s Process~p~n",
  6.                 [Hour, Minute, Second, MicroSecs div 1000, N, S, self()])
  7.         end,
  8. Exec = fun(N, Caller) ->
  9.            Write(N, "S "),
  10.            timer:sleep(3000),
  11.            Write(N, " E"),
  12.            Caller ! self()                                          %% 完了したことを通知する
  13.        end,
  14. Pids = lists:map(fun(N) ->
  15.                      Caller = self(),
  16.                      spawn(fun() -> Exec(N, Caller) end)
  17.                  end, List),
  18. lists:foreach(fun(Pid) ->
  19.                   receive
  20.                       Pid -> ok                                     %% 完了の通知を受け取る
  21.                   end
  22.               end, Pids).

F#

  1. // open System
  2. // open System.Linq
  3. // open System.Threading
  4. // open System.Threading.Tasks
  5. let list = [1..20]
  6. let write n s =
  7.     let now = DateTime.Now.ToString "HH:mm:ss.fff"
  8.     sprintf "%s %02d: %s Thread-%d" now n s Thread.CurrentThread.ManagedThreadId |> Console.WriteLine
  9. let exec n =
  10.     write n "S "
  11.     TimeSpan.FromSeconds 3.0 |> Thread.Sleep
  12.     write n " E"
非同期ワークフローを使った場合
  1. let tasks = [ for n in list -> async { exec n } |> Async.StartAsTask ]
  2. for task in tasks do task.Wait()
  1. for n in list -> async { exec n } ] |> Async.Parallel |> Async.RunSynchronously |> ignore
タスク並列ライブラリ(TPL)を使った場合
  1. Parallel.ForEach(list, exec) |> ignore
  1. list.AsParallel().ForAll(new Action<_>(exec))
  1. list.AsParallel().ForAll(fun n -> exec n)
  1. Parallel.Invoke [| for n in list -> new Action(fun () -> exec n) |]
  1. Task.WaitAll [| for n in list -> Task.Factory.StartNew(fun () -> exec n) |]
F# PowerPack を使った場合
  1. // open Microsoft.FSharp.Collections
  2. PSeq.iter exec list
Reactive Extensions を使った場合
  1. // open System.Reactive.Linq
  2. let execAsync n = Observable.Start(fun () -> exec n)
  3. list.Select(execAsync).Merge().Last() |> ignore
  1. // open System.Reactive.Linq
  2. let execAsync n = Observable.Start(fun () -> exec n)
  3. list.ToObservable().SelectMany(execAsync).Last() |> ignore

C#

  1. // using System.Threading;
  2. // using System.Threading.Tasks;
  3. var e = Enumerable.Range(120);
  4. Action<intstring> write = (n, s) => Console.WriteLine(
  5.     string.Format("{0:HH:mm:ss.fff} {1:00}: {2} Thread-{3}",
  6.         DateTime.Now, n, s, Thread.CurrentThread.ManagedThreadId));
  7. Action<int> exec = n => {
  8.     write(n, "S ");
  9.     Thread.Sleep(TimeSpan.FromSeconds(3));
  10.     write(n, " E");
  11. };
C# 4.0以降でタスク並列ライブラリ(TPL)を使った場合
  1. Parallel.ForEach(e, exec);
  1. e.AsParallel().ForAll(exec);
  1. var actions = from n in e select new Action(() => exec(n));
  2. Parallel.Invoke(actions.ToArray());
  1. var tasks = from n in e select Task.Factory.StartNew(() => exec(n));
  2. Task.WaitAll(tasks.ToArray());
Reactive Extensions を使った場合
  1. // using System.Reactive.Linq;
  2. e.Select(exec.ToAsync()).Merge().Last();
  1. // using System.Reactive.Linq;
  2. e.ToObservable().SelectMany(exec.ToAsync()).Last();
  1. // using System.Reactive.Linq;
  2. (from n in e.ToObservable()
  3.  from u in exec.ToAsync()(n)
  4.  select u
  5. ).Last();

Go

  1. // import "time"
  2. array := [...]int{1234567891011121314151617181920}
  3. write := func(n int, s string) {
  4.     now := time.LocalTime().Format("15:04:05")
  5.     msec := (time.Nanoseconds() % 1e9) / 1e6
  6.     fmt.Printf("%s.%03d %02d: %s\n", now, msec, n, s)
  7. }
  8. replyCh := make(chan interface{})       // 完了の通知を受け取るチャネルは、1つだけ用意する
  9. exec := func(n int) {
  10.     write(n, "S ")
  11.     time.Sleep(3e9)                     // 3×10^9ナノ秒=3秒
  12.     write(n, " E")
  13.     replyCh <- nil                      // 完了したことを通知する
  14. }
  15. for _, n := range(array) {
  16.     go exec(n)
  17. }
  18. for _, _ = range(array) {               // 処理を呼び出した回数分だけ完了の通知を待つ
  19.     _ = <- replyCh
  20. }

Ruby

  1. range = 1..20
  2. def write n, s
  3.   Thread.exclusive do
  4.     puts sprintf('%s %02d: %s %s', Time.now.strftime('%H:%M:%S.%L'), n, s, Thread.current)
  5.   end
  6. end
  7. def exec n
  8.   write n, 'S '
  9.   sleep 3
  10.   write n, ' E'
  11. end
  12. threads = range.collect do |n|
  13.   Thread.start { exec n }
  14. end
  15. threads.each &:join

Python

  1. # from datetime import datetime
  2. # import threading
  3. # import time
  4. print_lock = threading.RLock()
  5. class Thread(threading.Thread):
  6.     def __init__(self, n):
  7.         threading.Thread.__init__(self)
  8.         self.n = n
  9.     def run(self):
  10.         self.write('S ')
  11.         time.sleep(3)
  12.         self.write(' E')
  13.     def write(self, s):
  14.         now = datetime.now().strftime('%H:%M:%S')
  15.         threadId = threading.current_thread().ident
  16.         with print_lock:
  17.             print('%s %02d: %s %s' % (now, self.n, s, threadId))
  1. l = range(020)
  2. for n in l:
  3.     Thread(n).start()

Perl

  1. # use threads;
  2. # use Time::Piece;
  3. my @array = (1..20);
  4. my $write = sub {
  5.     my $now = localtime->strftime('%H:%M:%S');
  6.     printf "%s %02d: %s Thread-%s\n"$now$_[0], $_[1], threads->tid;
  7. };
  8. my $exec = sub {
  9.     my $n = shift;
  10.     $write->($n'S ');
  11.     sleep 3;
  12.     $write->($n' E');
  13. };
  14. my @threads = map { threads->create($exec$_) } @array;
  15. $_->join for @threads;

リストの各要素を並列で変換処理して、その結果のリストを出力する

変換処理の内容は、(1) 開始(Start)のメッセージを出力する (2) スリープする (3) 終了(End)のメッセージを出力する (4) 元の値を-1倍して返す。

Java

Java 8以降
  1. public static void write(int n, String s) {
  2.     System.out.println(MessageFormat.format("{0,date,HH:mm:ss.SSS} {1,number,00}: {2} Thread-{3}",
  3.         new Date(), n, s, Thread.currentThread().getId()));
  4. }
  5. public static int exec(int n) {
  6.     write(n, "S ");
  7.     try {
  8.         Thread.sleep(3000);
  9.     } catch (InterruptedException e) {
  10.     }
  11.     write(n, " E");
  12.     return -n;
  13. }
  1. // import java.util.stream.Collectors;
  2. // import java.util.stream.IntStream;
  3. IntStream stream = IntStream.iterate(1, n -> n + 1).limit(20);          // 1から20までの数列
  4. IntStream result = stream.parallel().map(n -> exec(n));
  5. System.out.println(result.boxed().collect(Collectors.toList()));
Java 8以降で RxJava を使った場合
  1. // import java.util.concurrent.CountDownLatch;
  2. // import rx.Observable;
  3. // import rx.exceptions.OnErrorNotImplementedException;
  4. // import rx.functions.Action1;
  5. Map<Integer, Integer> m = new TreeMap<>();                      // 結果を入力値(1~20)の順で取得できるように TreeMap を使う
  6. for (int i = 1; i <= 20; i++) {
  7.     m.put(i, null);
  8. }
  9. CountDownLatch latch = new CountDownLatch(1);
  10. Action1<Throwable> rethrow = e -> { throw new OnErrorNotImplementedException(e); };
  11. Observable.from(m.keySet())
  12.     .parallel(o -> o.map(n -> m.put(n, exec(n))))
  13.     .subscribe(n -> {}, rethrow, () -> latch.countDown());
  14. latch.await();                                                  // countDown されるまで待つ
  15. System.out.println(m.values());

Groovy

Java 8以降で Groovy 2.2以降
  1. // import java.text.MessageFormat
  2. def write(n, s) {
  3.     println MessageFormat.format("{0,date,HH:mm:ss.SSS} {1,number,00}: {2} Thread-{3}",
  4.         new Date(), n, s, Thread.currentThread().id)
  5. }
  6. def exec(n) {
  7.     write n, "S "
  8.     Thread.sleep 3000
  9.     write n, " E"
  10.     -n
  11. }
  1. def list = 1..20
  2. def result = list.stream().parallel().map { exec it }.toArray()
  3. println result
Groovy 2.2以降で RxJava を使った場合
  1. // import java.util.concurrent.CountDownLatch
  2. // import rx.Observable
  3. // import rx.exceptions.OnErrorNotImplementedException
  4. def m = [:]
  5. (1..20).each { m[it] = null }
  6. def latch = new CountDownLatch(1)
  7. Observable.from(m.keySet())
  8.     .parallel { it.map { m[it] = exec it } }
  9.     .subscribe({}, { throw new OnErrorNotImplementedException(it) }, { latch.countDown() })
  10. latch.await()                                                   // countDown されるまで待つ
  11. println m.values()

Scala

Scala 2.9以降
  1. // import java.text.MessageFormat
  2. // import java.util.Date
  3. def write(n: Int, s: String) = println(
  4.     MessageFormat.format("{0,date,HH:mm:ss.SSS} {1,number,00}: {2} Thread-{3}",
  5.         new Date, Integer valueOf n, s, Thread.currentThread.getId.toString))
  6. def exec(n: Int) = {
  7.     write(n, "S ")
  8.     Thread sleep 3000
  9.     write(n, " E")
  10.     -n
  11. }
  1. val range = 1 to 20
  2. val result = range.par.map(exec)
  3. println(result)
RxJava を使った場合
  1. // import scala.concurrent.SyncVar
  2. // import rx.lang.scala.Observable
  3. val range = 1 to 20
  4. val sv = new SyncVar[List[Int]]
  5. Observable.from(range)
  6.     .parallel(_ map { n => n -> exec(n) })
  7.     .toSeq.subscribe { sv put _.sortBy(_._1).map(_._2).toList }
  8. val result = sv.get
  9. println(result)

Erlang

  1. List = lists:seq(120),
  2. Write = fun(N, S) ->
  3.             {Hour, Minute, Second} = time(),
  4.             {_, _, MicroSecs} = now(),
  5.             io:format("~2..0b:~2..0b:~2..0b.~3..0b ~2..0b: ~s Process~p~n",
  6.                 [Hour, Minute, Second, MicroSecs div 1000, N, S, self()])
  7.         end,
  8. Exec = fun(N, Caller) ->
  9.            Write(N, "S "),
  10.            timer:sleep(3000),
  11.            Write(N, " E"),
  12.            Caller ! {self(), -N}
  13.        end,
  14. Pids = lists:map(fun(N) ->
  15.                      Caller = self(),
  16.                      spawn(fun() -> Exec(N, Caller) end)
  17.                  end, List),
  18. Result = lists:map(fun(Pid) ->
  19.                        receive
  20.                            {Pid, N} -> N
  21.                        end
  22.                    end, Pids),
  23. io:format("~p~n", [Result]),

F#

  1. // open System
  2. // open System.Linq
  3. // open System.Threading
  4. // open System.Threading.Tasks
  5. let list = [1..20]
  6. let write n s =
  7.     let now = DateTime.Now.ToString "HH:mm:ss.fff"
  8.     sprintf "%s %02d: %s Thread-%d" now n s Thread.CurrentThread.ManagedThreadId |> Console.WriteLine
  9. let exec n =
  10.     write n "S "
  11.     TimeSpan.FromSeconds 3.0 |> Thread.Sleep
  12.     write n " E"
  13.     -n
非同期ワークフローを使った場合
  1. let tasks = [ for n in list -> async { return exec n } |> Async.StartAsTask ]
  2. let result = [ for task in tasks -> task.Result ]
  3. printfn "%A" result
  1. let result = [ for n in list -> async { return exec n } ] |> Async.Parallel |> Async.RunSynchronously
  2. result |> Array.toList |> printfn "%A"
タスク並列ライブラリ(TPL)を使った場合
  1. let result = list.AsParallel().Select exec |> Seq.toList
  2. printfn "%A" result
F# PowerPack を使った場合
  1. // open Microsoft.FSharp.Collections
  2. let result = list |> PSeq.mapi (fun i n -> i, exec n) |> PSeq.sort |> PSeq.map snd |> PSeq.toList
  3. printfn "%A" result
Reactive Extensions を使った場合
  1. // open System.Reactive.Linq
  2. let execAsync n = Observable.Start(fun () -> exec n)
  3. let result = list.Select(execAsync).Zip().Last()
  4. printfn "%A" result
  1. // open System.Reactive.Linq
  2. let execAsync n = Observable.Start(fun () -> exec n)
  3. let result = list.Select(execAsync).CombineLatest().Last()
  4. printfn "%A" result

C#

  1. // using System.Threading;
  2. var e = Enumerable.Range(120);
  3. Action<intstring> write = (n, s) => Console.WriteLine(
  4.     string.Format("{0:HH:mm:ss.fff} {1:00}: {2} Thread-{3}",
  5.         DateTime.Now, n, s, Thread.CurrentThread.ManagedThreadId));
  6. Func<intint> exec = n => {
  7.     write(n, "S ");
  8.     Thread.Sleep(TimeSpan.FromSeconds(3));
  9.     write(n, " E");
  10.     return -n;
  11. };
C# 4.0以降でタスク並列ライブラリ(TPL)を使った場合
  1. var result = from n in e.AsParallel().AsOrdered()                   // AsOrdered() を省くと変換結果の順序がばらばらになる
  2.              select exec(n);
  3. Console.WriteLine(string.Join(" ", result));
  1. var result = e.AsParallel().AsOrdered().Select(exec);               // AsOrdered() を省くと変換結果の順序がばらばらになる
  2. Console.WriteLine(string.Join(" ", result));
Reactive Extensions を使った場合
  1. // using System.Reactive.Linq;
  2. var result = e.Select(exec.ToAsync()).Zip().Last();
  3. Console.WriteLine(string.Join(" ", result));
  1. // using System.Reactive.Linq;
  2. var result = e.Select(exec.ToAsync()).CombineLatest().Last();
  3. Console.WriteLine(string.Join(" ", result));
  1. // using System.Reactive.Linq;
  2. var o = from n in e.ToObservable()
  3.         from m in exec.ToAsync()(n)
  4.         select new {n = n, m = m};
  5. var result = from r in o.ToEnumerable()
  6.              orderby r.n
  7.              select r.m;
  8. Console.WriteLine(string.Join(" ", result));

Go

  1. // import "time"
  2. array := [...]int{1234567891011121314151617181920}
  3. write := func(n int, s string) {
  4.     now := time.LocalTime().Format("15:04:05")
  5.     msec := (time.Nanoseconds() % 1e9) / 1e6
  6.     fmt.Printf("%s.%03d %02d: %s\n", now, msec, n, s)
  7. }
  8. replyChs := make([]chan intlen(array))        // 変換結果を受け取るためにチャネルの配列を作る
  9. exec := func(n int, replyCh chan<- int) {
  10.     write(n, "S ")
  11.     time.Sleep(3e9)                             // 3×10^9ナノ秒=3秒
  12.     write(n, " E")
  13.     replyCh <- -n                               // -1倍した値を返す
  14. }
  15. for i, n := range(array) {
  16.     replyChs[i] = make(chan int)                // 各要素に対応するチャネルを作る
  17.     go exec(n, replyChs[i])
  18. }
  19. result := make([]intlen(array))               // 変換結果の配列を作る
  20. for i, replyCh := range(replyChs) {
  21.     result[i] = <- replyCh
  22. }
  23. fmt.Printf("%v\n", result)                      // 変換結果を出力する

Ruby

Ruby 1.9以降
  1. range = 1..20
  2. def write n, s
  3.   Thread.exclusive do
  4.     puts sprintf('%s %02d: %s %s', Time.now.strftime('%H:%M:%S.%L'), n, s, Thread.current)
  5.   end
  6. end
  7. def exec n
  8.   write n, 'S '
  9.   sleep 3
  10.   write n, ' E'
  11.   -n
  12. end
  13. threads = range.collect do |n|
  14.   Thread.start { exec n }
  15. end
  16. result = threads.collect &:value
  17. print result

Python

  1. # from datetime import datetime
  2. # import threading
  3. # import time
  4. print_lock = threading.RLock()
  5. class Thread(threading.Thread):
  6.     def __init__(self, n):
  7.         threading.Thread.__init__(self)
  8.         self.n = n
  9.     def run(self):
  10.         self.write('S ')
  11.         time.sleep(3)
  12.         self.write(' E')
  13.         self.result = - self.n
  14.     def write(self, s):
  15.         now = datetime.now().strftime('%H:%M:%S')
  16.         threadId = threading.current_thread().ident
  17.         with print_lock:
  18.             print('%s %02d: %s %d' % (now, self.n, s, threadId))
  1. l = range(020)
  2. threads = [Thread(n) for n in l]
  3. for t in threads:
  4.     t.start()
  5. for t in threads:
  6.     t.join()
  7. result = [t.result for t in threads]
  8. print(result)

Perl

  1. # use threads;
  2. # use Time::Piece;
  3. my @array = (1..20);
  4. my $write = sub {
  5.     my $now = localtime->strftime('%H:%M:%S');
  6.     printf "%s %02d: %s Thread-%s\n"$now$_[0], $_[1], threads->tid;
  7. };
  8. my $exec = sub {
  9.     my $n = shift;
  10.     $write->($n'S ');
  11.     sleep 3;
  12.     $write->($n' E');
  13.     -$n;
  14. };
  15. my @threads = map { threads->create($exec$_) } @array;
  16. my @result = map { $_->join } @threads;
  17. print join(' '@result), "\n";

メッセージ(処理のリクエスト)を受け取る度に新しいスレッドを起動して処理を行うオブジェクト

Groovy

  1. // import java.util.concurrent.*
  2. class Server {
  3.     private executor = Executors.newCachedThreadPool()
  4.     // リクエストを処理するメソッド
  5.     private def processRequest(array) {
  6.         array.sort()
  7.     }
  8.     // サーバにリクエストを送るメソッド
  9.     def sendRequest(array) {
  10.         executor.submit new Callable() {
  11.             def call() {
  12.                 processRequest(array)
  13.             }
  14.         }
  15.     }
  16.     // サーバを停止するメソッド
  17.     def quit() {
  18.         executor.shutdown()
  19.     }
  20. }
処理が終わるのを待たない場合
  1. def server = new Server()
  2. server.sendRequest(1..20 as int[])
処理が終わるまで何もせずに待つ場合
  1. def server = new Server()
  2. def future = server.sendRequest(1..20 as int[])
  3. def result = future.get()
処理が終わるまで他のことをしながら待つ場合
  1. def server = new Server()
  2. def future = server.sendRequest(1..20 as int[])
  3. while (! future.done) {
  4.     println '処理中...'
  5. }
  6. def result = future.get()

Scala

Scala 2.10以降

akka を使った場合
  1. // import scala.concurrent._
  2. // import scala.concurrent.duration._
  3. // import akka.actor._
  4. // import akka.pattern.ask
  5. // import akka.util.Timeout
  6. class Server extends Actor {
  7.     override def receive = {
  8.         case list: List[Int] => context.actorOf(Props(classOf[Worker], this)).forward(list)
  9.     }
  10.     class Worker extends Actor {
  11.         override def receive = {
  12.             case list: List[Int] => sender() ! list.sorted
  13.         }
  14.     }
  15. }
  16. object Server {
  17.     def start(implicit system: ActorSystem) = new Ref(system.actorOf(Props[Server]))
  18.     class Ref(actor: ActorRef) {
  19.         def sendRequest(list: List[Int])(implicit timeout: Timeout) = (actor ? list).mapTo[List[Int]]
  20.     }
  21. }
  22. implicit val actorSystem = ActorSystem()
  23. val duration = 1 minutes
  24. implicit val timeout = Timeout(duration)
  25. val server = Server.start
処理が終わるのを待たない場合
  1. server sendRequest (1 to 20).toList
処理が終わるまで何もせずに待つ場合
  1. val future = server sendRequest (1 to 20).toList
  2. val result = Await.result(future, duration)
処理が終わるまで他のことをしながら待つ場合
  1. val future = server sendRequest (1 to 20).toList
  2. while (!future.isCompleted) {
  3.     println("処理中...")
  4. }
  5. val result = Await.result(future, duration)

Erlang

  1. %% サーバでリクエストを処理する関数
  2. process_request() ->
  3.     receive
  4.         {ServerPid, ClientPid, Request} ->
  5.             Result = lists:sort(Request),               %% リクエストを処理する
  6.             ClientPid ! {ServerPid, Result}             %% クライアントにレスポンスを返す
  7.     end.
  8. %% サーバで、停止命令が来るまでの間リクエストを受け続ける関数
  9. service_loop() ->
  10.     receive
  11.         {ClientPid, Request} ->                                                 %% リクエストが来たら、
  12.             spawn(fun process_request/0) ! {self(), ClientPid, Request},        %% 新しいプロセスを起動してリクエストの処理を任せ、
  13.             service_loop();                                                     %% また次のリクエストを待つ
  14.         exit -> void
  15.     end.
  16. %% サーバを起動する関数
  17. start_server() -> spawn(fun service_loop/0).
処理が終わるのを待たない場合
  1. ServerPid = start_server(),
  2. ServerPid ! {self(), lists:seq(120)}.
処理が終わるまで何もせずに待つ場合
  1. ServerPid = start_server(),
  2. ServerPid ! {self(), lists:seq(120)},
  3. receive
  4.     {ServerPid, Result} -> void
  5. end.
処理が終わるまで他のことをしながら待つ場合
  1. wait_result(Pid) ->
  2.     receive
  3.         {Pid, Result} -> Result
  4.     after 0 ->
  5.         io:format("処理中...~n", []),
  6.         wait_result(Pid)
  7.     end.
  1. ServerPid = start_server(),
  2. ServerPid ! {self(), lists:seq(120)},
  3. Result = wait_result(ServerPid).

F#

  1. // open System.Threading.Tasks
  2. type Message<'a> = Request of 'a * AsyncReplyChannel<Task<'a>>
  3.                  | Exit
  4. type Server() =
  5.     // リクエストを処理するメソッド
  6.     let processRequest list = Task.Factory.StartNew(fun () -> List.sort list)
  7.     // 停止命令が来るまでの間リクエストを受け続けるメソッド
  8.     let rec loopReceive (agent : MailboxProcessor<_>) =
  9.         async {
  10.             let! msg = agent.Receive()
  11.             match msg with
  12.             | Request(list, replyChannel) -> replyChannel.Reply <| processRequest list
  13.                                              return! loopReceive agent
  14.             | Exit -> return ()
  15.         }
  16.     let agent = MailboxProcessor.Start loopReceive
  17.     // サーバにリクエストを送るメソッド
  18.     member this.SendRequest list = agent.PostAndReply <| fun replyChannel -> Request(list, replyChannel)
  19.     // サーバを停止するメソッド
  20.     member this.Quit() = agent.Post Exit
処理が終わるのを待たない場合
  1. let server = Server()
  2. server.SendRequest [1..20] |> ignore
処理が終わるまで何もせずに待つ場合
  1. let server = Server()
  2. let task = server.SendRequest [1..20]
  3. let result = task.Result
処理が終わるまで他のことをしながら待つ場合
  1. let server = Server()
  2. let task = server.SendRequest [1..20]
  3. while not task.IsCompleted do printfn "処理中..."
  4. let result = task.Result

Go

  1. // import "sort"
  2. // リクエストは、処理対象であるintの配列と、返信先となるチャネルを含む
  3. type Request struct {
  4.     Array []int
  5.     ReplyCh chan []int
  6. }
  7. // サーバは、リクエストを受け付けるチャネルと、停止命令を受け付けるチャネルを持つ
  8. type Server struct {
  9.     requestCh chan *Request
  10.     quitCh chan bool
  11. }
  12. // サーバにリクエストを送るメソッド
  13. func (this *Server) SendRequest(array []int) *Request {
  14.     request := &Request{array, make(chan []int)}
  15.     this.requestCh <- request
  16.     return request
  17. }
  18. // サーバを停止するメソッド
  19. func (this *Server) Quit() {
  20.     this.quitCh <- true
  21. }
  22. // サーバでリクエストを処理するメソッド
  23. func (this *Server) processRequest(request *Request) {
  24.     sorted := make([]intlen(request.Array))
  25.     copy(sorted, request.Array)
  26.     sort.SortInts(sorted)
  27.     request.ReplyCh <- sorted                               // クライアントにレスポンスを返す
  28. }
  29. // サーバで、停止命令が来るまでの間リクエストを受け続けるメソッド
  30. func (this *Server) serviceLoop() {
  31.     for {
  32.         select {
  33.         case request := <- this.requestCh:                  // 通常のリクエストが来たら、配列をソートして返す
  34.             go this.processRequest(request)
  35.         case <- this.quitCh:                                // 終了のリクエストが来たら、後始末をしてループを抜ける
  36.             close(this.requestCh)
  37.             close(this.quitCh)
  38.             return
  39.         }
  40.     }
  41. }
  42. // サーバを起動するメソッド
  43. func StartServer() *Server {
  44.     server := &Server{make(chan *Request, 1000), make(chan bool)}
  45.     go server.serviceLoop()
  46.     return server
  47. }
処理が終わるのを待たない場合
  1. server := StartServer()
  2. request := server.SendRequest([]int{41250})
処理が終わるまで何もせずに待つ場合
  1. server := StartServer()
  2. request := server.SendRequest([]int{41250})
  3. reply := <- request.ReplyCh                                 // reply の型は []int
処理が終わるまで他のことをしながら待つ場合
  1. server := StartServer()
  2. request := server.SendRequest([]int{41250})
  3. var reply []int
  4. for {
  5.     var ok bool
  6.     reply, ok = <- request.ReplyCh                          // reply の型は []int
  7.     if ok { break }
  8.     fmt.Printf("処理中...")
  9. }

MapReduce

MapReduce により、テキストファイル内の単語をカウントする。

Java

Java 8以降
  1. // import java.util.Map.Entry;
  2. // import java.util.concurrent.ConcurrentMap;
  3. // import java.util.function.*;
  4. // import java.util.stream.*;
  5. public final class MapReduce {
  6.     public static <K1, V1, K2, V2, K3, V3> void execute(
  7.             Stream<Entry<K1, V1>> read,
  8.             Function<Entry<K1, V1>, Stream<Entry<K2, V2>>> map,
  9.             Function<Entry<K2, Stream<V2>>, Stream<Entry<K3, V3>>> reduce,
  10.             Consumer<Entry<K3, V3>> write) {
  11.         Collector<Entry<K2, V2>, ?, ConcurrentMap<K2, Stream<V2>>> multiMapCollector =
  12.             Collectors.toConcurrentMap(e -> e.getKey(), e -> Stream.of(e.getValue()), Stream::concat);
  13.         Stream<Entry<K2, V2>> mapped = processParallel("map", read, map);
  14.         Stream<Entry<K2, Stream<V2>>> shuffled = mapped.collect(multiMapCollector).entrySet().stream();
  15.         System.out.println("shuffled");
  16.         processParallel("reduce", shuffled, reduce).forEachOrdered(write);
  17.         System.out.println("completed");
  18.     }
  19.     private static <K1, V1, K2, V2> Stream<Entry<K2, V2>> processParallel(
  20.             String processName,
  21.             Stream<Entry<K1, V1>> input,
  22.             Function<Entry<K1, V1>, Stream<Entry<K2, V2>>> process) {
  23.         return input.parallel().flatMap(e -> {
  24.             try {
  25.                 System.out.printf("%s start key=[%s] value=[%s]%n", processName, e.getKey(), e.getValue());
  26.                 return process.apply(e);
  27.             } finally {
  28.                 System.out.printf("%s end%n", processName);
  29.             }
  30.         });
  31.     }
  32. }
  1. // import java.util.Map.Entry;
  2. // import java.util.function.*;
  3. // import java.util.stream.Stream;
  4. public final class MapReduce {
  5.     public static <K1, V1, K2, V2, K3, V3> void execute(
  6.             Stream<Entry<K1, V1>> read,
  7.             Function<Entry<K1, V1>, Stream<Entry<K2, V2>>> map,
  8.             Function<Entry<K2, Stream<V2>>, Stream<Entry<K3, V3>>> reduce,
  9.             Consumer<Entry<K3, V3>> write) {
  10.         Map<K2, Stream<V2>> m = new HashMap<>();
  11.         processParallel("map", read, map,
  12.             e -> m.merge(e.getKey(), Stream.of(e.getValue()), Stream::concat));
  13.         Stream<Entry<K2, Stream<V2>>> shuffled = m.entrySet().stream();
  14.         System.out.println("shuffled");
  15.         processParallel("reduce", shuffled, reduce, write);
  16.         System.out.println("completed");
  17.     }
  18.     private static <K1, V1, K2, V2> void processParallel(
  19.             String processName,
  20.             Stream<Entry<K1, V1>> input,
  21.             Function<Entry<K1, V1>, Stream<Entry<K2, V2>>> process,
  22.             Consumer<Entry<K2, V2>> output) {
  23.         input.parallel().flatMap(e -> {
  24.             try {
  25.                 System.out.printf("%s start key=[%s] value=[%s]%n", processName, e.getKey(), e.getValue());
  26.                 return process.apply(e);
  27.             } finally {
  28.                 System.out.printf("%s end%n", processName);
  29.             }
  30.         }).forEachOrdered(output);
  31.     }
  32. }
  1. // import java.util.AbstractMap.SimpleEntry;
  2. // import java.util.regex.Pattern;
  3. // import java.nio.file.*;
  4. // import java.util.stream.Stream;
  5. try (Stream<String> lines = Files.lines(Paths.get("in.txt"))) {
  6.     MapReduce.<Void, String, String, Integer, String, Long>execute(
  7.         // read : Stream<Entry<Void, String>>
  8.         lines.map(line -> new SimpleEntry<>(null, line)),
  9.         // map : Entry<Void, String> -> Stream<Entry<String, Integer>>
  10.         e -> Pattern.compile("\\W+").splitAsStream(e.getValue()).map(s -> new SimpleEntry<>(s, 1)),
  11.         // reduce : Entry<String, Stream<Integer>> -> Stream<Entry<String, Long>>
  12.         e -> Stream.of(new SimpleEntry<>(e.getKey(), e.getValue().count())),
  13.         // write : Entry<String, Long> -> void
  14.         e -> System.out.printf("%5d\t%s%n", e.getValue(), e.getKey())
  15.     );
  16. }
Java 8以降で RxJava を使った場合
  1. // import java.util.AbstractMap.SimpleEntry;
  2. // import java.util.Map.Entry;
  3. // import rx.Observable;
  4. // import rx.functions.*;
  5. public final class MapReduce {
  6.     public static <K1, V1, K2, V2, K3, V3> Observable<Integer> execute(
  7.             Observable<Entry<K1, V1>> reader,
  8.             Func1<Entry<K1, V1>, Observable<Entry<K2, V2>>> map,
  9.             Func1<Entry<K2, Observable<V2>>, Observable<Entry<K3, V3>>> reduce,
  10.             Action1<Entry<K3, V3>> writer) {
  11.         Observable<Entry<K2, V2>> mapped = processParallel("map", reader, map);
  12.         Observable<Entry<K2, Observable<V2>>> shuffled = mapped
  13.             .toMultimap(e -> e.getKey(), e -> e.getValue())
  14.             .flatMap(m -> Observable.from(m.entrySet()))
  15.             .map(e -> new SimpleEntry<>(e.getKey(), Observable.from(e.getValue())));
  16.         Observable<Entry<K3, V3>> reduced = processParallel("reduce", shuffled, reduce);
  17.         return reduced.doOnNext(writer).count();
  18.     }
  19.     private static <K1, V1, K2, V2> Observable<Entry<K2, V2>> processParallel(
  20.             String processName,
  21.             Observable<Entry<K1, V1>> input,
  22.             Func1<Entry<K1, V1>, Observable<Entry<K2, V2>>> process) {
  23.         return input.parallel(o -> o.flatMap(e -> {
  24.             try {
  25.                 System.out.printf("%s start key=[%s] value=[%s]%n", processName, e.getKey(), e.getValue());
  26.                 return process.call(e);
  27.             } finally {
  28.                 System.out.printf("%s end%n", processName);
  29.             }
  30.         }));
  31.     }
  32. }
  1. // import java.util.AbstractMap.SimpleEntry;
  2. // import java.util.Map.Entry;
  3. // import java.util.concurrent.CountDownLatch;
  4. // import java.util.regex.Pattern;
  5. // import java.nio.file.*;
  6. // import java.util.stream.Stream;
  7. // import rx.Observable;
  8. CountDownLatch latch = new CountDownLatch(1);
  9. try (Stream<String> lines = Files.lines(Paths.get("in.txt"))) {
  10.     MapReduce.<Void, String, String, Integer, String, Integer>execute(
  11.         // read : Observable<Entry<Void, String>>
  12.         Observable.from(() -> lines.map(line -> new SimpleEntry<>(null, line)).iterator()),
  13.         // map : Entry<Void, String> -> Observable<Entry<String, Integer>>
  14.         e -> Observable.from(e.getValue().split("\\W+"))
  15.                        .filter(s -> s.length() > 0)
  16.                        .map(s -> new SimpleEntry<>(s, 1)),
  17.         // reduce : Entry<String, Observable<Integer>> -> Observable<Entry<String, Integer>>
  18.         e -> e.getValue().count().map(n -> new SimpleEntry<>(e.getKey(), n)),
  19.         // write : Entry<String, Integer> -> void
  20.         e -> System.out.printf("%5d\t%s%n", e.getValue(), e.getKey())
  21.     ).subscribe(count -> {
  22.         System.out.printf("completed: %d%n", count);
  23.         latch.countDown();
  24.     });
  25. }
  26. latch.await();

C#

C# 4.0以降でタスク並列ライブラリ(TPL)を使った場合
  1. // using System;
  2. // using System.Collections.Generic;
  3. // using System.Linq;
  4. public sealed class MapReduce {
  5.     public static void Execute<K1, V1, K2, V2, K3, V3>(
  6.             IEnumerable<Tuple<K1, V1>> read,
  7.             Func<Tuple<K1, V1>, IEnumerable<Tuple<K2, V2>>> map,
  8.             Func<Tuple<K2, IEnumerable<V2>>, IEnumerable<Tuple<K3, V3>>> reduce,
  9.             Action<Tuple<K3, V3>> write) {
  10.         var mapped = ProcessParallel("map", read, map);
  11.         var shuffled = from grp in (from t in mapped group t.Item2 by t.Item1)
  12.                        select Tuple.Create(grp.Key, grp.AsEnumerable());
  13.         var reduced = ProcessParallel("reduce", shuffled, reduce);
  14.         foreach (var t in reduced) {
  15.             write(t);
  16.         }
  17.     }
  18.     private static ParallelQuery<Tuple<K2, V2>> ProcessParallel<K1, V1, K2, V2>(
  19.             string processName,
  20.             IEnumerable<Tuple<K1, V1>> input,
  21.             Func<Tuple<K1, V1>, IEnumerable<Tuple<K2, V2>>> process) {
  22.         return input.AsParallel().SelectMany(t => {
  23.             Console.WriteLine("{0} start key=[{1}] value=[{2}]", processName, t.Item1, t.Item2);
  24.             try {
  25.                 System.Threading.Thread.Sleep(200);
  26.                 return process(t);
  27.             } finally {
  28.                 Console.WriteLine("{0} end", processName);
  29.             }
  30.         });
  31.     }
  32. }
  1. // using System;
  2. // using System.IO;
  3. // using System.Linq;
  4. // using System.Text.RegularExpressions;
  5. MapReduce.Execute(
  6.     // read : IEnumerable<Tuple<object, string>>
  7.     from line in File.ReadLines("in.txt")
  8.     select Tuple.Create<objectstring>(null, line),
  9.     // map : Tuple<object, string> => IEnumerable<Tuple<string, int>>
  10.     t => from Match m in Regex.Matches(t.Item2, @"\w+")
  11.          select Tuple.Create(m.Value, 1),
  12.     // reduce : Tuple<string, IEnumerable<int>> => IEnumerable<Tuple<string, int>>
  13.     t => Enumerable.Repeat(Tuple.Create(t.Item1, t.Item2.Count()), 1),
  14.     // write : Tuple<string, int> => void
  15.     t => Console.WriteLine("{0,5}\t{1}", t.Item2, t.Item1)
  16. );
Reactive Extensions を使った場合
  1. // using System;
  2. // using System.Linq;
  3. // using System.Reactive.Linq;
  4. public sealed class MapReduce {
  5.     public static void Execute<K1, V1, K2, V2, K3, V3>(
  6.             IObservable<Tuple<K1, V1>> read,
  7.             Func<Tuple<K1, V1>, IObservable<Tuple<K2, V2>>> map,
  8.             Func<Tuple<K2, IObservable<V2>>, IObservable<Tuple<K3, V3>>> reduce,
  9.             Action<Tuple<K3, V3>> write) {
  10.         var mapped = ProcessParallel("map", read, map);
  11.         var shuffled = from lookup in mapped.ToLookup(t => t.Item1, t => t.Item2)
  12.                        from grp in lookup
  13.                        select Tuple.Create(grp.Key, grp.ToObservable());
  14.         var reduced = ProcessParallel("reduce", shuffled, reduce);
  15.         reduced.ForEach(write);
  16.     }
  17.     private static IObservable<Tuple<K2, V2>> ProcessParallel<K1, V1, K2, V2>(
  18.             string processName,
  19.             IObservable<Tuple<K1, V1>> input,
  20.             Func<Tuple<K1, V1>, IObservable<Tuple<K2, V2>>> process) {
  21.         Func<Tuple<K1, V1>, IObservable<Tuple<K2, V2>>> f = t => {
  22.             Console.WriteLine("{0} start key=[{1}] value=[{2}]", processName, t.Item1, t.Item2);
  23.             try {
  24.                 System.Threading.Thread.Sleep(200);
  25.                 return process(t);
  26.             } finally {
  27.                 Console.WriteLine("{0} end", processName);
  28.             }
  29.         };
  30.         return input.SelectMany(f.ToAsync()).Merge();
  31.     }
  32. }
  1. // using System;
  2. // using System.IO;
  3. // using System.Linq;
  4. // using System.Reactive.Linq;
  5. // using System.Text.RegularExpressions;
  6. MapReduce.Execute(
  7.     // read : IObservable<Tuple<object, string>>
  8.     from line in File.ReadLines("in.txt").ToObservable()
  9.     select Tuple.Create<objectstring>(null, line),
  10.     // map : Tuple<object, string> => IObservable<Tuple<string, int>>
  11.     t => from m in Regex.Matches(t.Item2, @"\w+").Cast<Match>().ToObservable()
  12.          select Tuple.Create(m.Value, 1),
  13.     // reduce : Tuple<string, IObservable<int>> => IObservable<Tuple<string, int>>
  14.     t => from count in t.Item2.Count()
  15.          select Tuple.Create(t.Item1, count),
  16.     // write : Tuple<string, int> => void
  17.     t => Console.WriteLine("{0,5}\t{1}", t.Item2, t.Item1)
  18. );

戻る

目次

別のスレッドで処理(リストを昇順にソートする)を行う Java Groovy Scala Erlang Haskell PowerShell F# C# C++ Go Dart ActionScript JavaScript CoffeeScript Lua Ruby Python PHP Perl
リストの各要素を並列で処理して(処理の順序は問わない)全ての完了を待つ Java Groovy Scala Erlang Haskell PowerShell F# C# C++ Go Dart ActionScript JavaScript CoffeeScript Lua Ruby Python PHP Perl
リストの各要素を並列で変換処理して、その結果のリストを出力する Java Groovy Scala Erlang Haskell PowerShell F# C# C++ Go Dart ActionScript JavaScript CoffeeScript Lua Ruby Python PHP Perl
メッセージ(処理のリクエスト)を受け取る度に新しいスレッドを起動して処理を行うオブジェクト Java Groovy Scala Erlang Haskell PowerShell F# C# C++ Go Dart ActionScript JavaScript CoffeeScript Lua Ruby Python PHP Perl
MapReduce Java Groovy Scala Erlang Haskell PowerShell F# C# C++ Go Dart ActionScript JavaScript CoffeeScript Lua Ruby Python PHP Perl