Java
- class Sorter<E> implements Callable<E[]> {
- private final E[] original;
- public Sorter(E... original) {
- this.original = original;
- }
- @Override
- public E[] call() throws Exception {
- E[] sorted = Arrays.copyOf(original, original.length);
- Arrays.sort(sorted);
- return sorted;
- }
- }
処理が終わるのを待たない場合
- ExecutorService executor = Executors.newCachedThreadPool();
- executor.submit(new Sorter<Integer>(4, 1, 2, 5, 0));
- executor.shutdown();
処理が終わるまで何もせずに待つ場合
- ExecutorService executor = Executors.newCachedThreadPool();
- Future<Integer[]> future = executor.submit(new Sorter<Integer>(4, 1, 2, 5, 0));
- Integer[] result = future.get();
- executor.shutdown();
処理が終わるまで他のことをしながら待つ場合
- ExecutorService executor = Executors.newCachedThreadPool();
- Future<Integer[]> future = executor.submit(new Sorter<Integer>(4, 1, 2, 5, 0));
- while (! future.isDone()) {
- System.out.println("処理中...");
- }
- Integer[] result = future.get();
- executor.shutdown();
- IntStream stream = IntStream.of(4, 1, 2, 5, 0);
- Single<IntStream> single = Single.just(stream)
- .delay(0, TimeUnit.SECONDS)
- .map(s-> s.sorted());
処理が終わるのを待たない場合
処理が終わるまで何もせずに待つ場合
- IntStream result = single.blockingGet();
処理が終わるまで他のことをしながら待つ場合
- Future<IntStream> future = single.toFuture();
- while (! future.isDone()) {
- System.out.println("処理中...");
- }
- IntStream result = future.get();
Groovy
- class Sorter implements Callable {
- private original
- Sorter(... original) {
- this.original = original
- }
- def call() {
- original.sort()
- }
- }
- def sorter = new Sorter(4, 1, 2, 5, 0)
- def executor = Executors.newSingleThreadExecutor()
処理が終わるのを待たない場合
- executor.submit(sorter)
- executor.shutdown()
処理が終わるまで何もせずに待つ場合
- def future = executor.submit(sorter)
- def result = future.get()
- executor.shutdown()
処理が終わるまで他のことをしながら待つ場合
- def future = executor.submit(sorter)
- while (! future.done) {
- println '処理中...'
- }
- def result = future.get()
- executor.shutdown()
- def list = [4, 1, 2, 5, 0]
- def single = Single.just(list)
- .delay(0, TimeUnit.SECONDS)
- .map { it.sort() }
処理が終わるのを待たない場合
処理が終わるまで何もせずに待つ場合
- def result = single.blockingGet()
処理が終わるまで他のことをしながら待つ場合
- def future = single.toFuture()
- while (! future.done) {
- println '処理中...'
- }
- def result = future.get()
Kotlin
- val list = listOf(4, 1, 2, 5, 0)
処理が終わるのを待たない場合
処理が終わるまで何もせずに待つ場合
- val deffered = async {
- list.sorted()
- }
- val result = deffered.await()
処理が終わるまで他のことをしながら待つ場合
- val deffered = async {
- list.sorted()
- }
- while (deffered.isActive) {
- println("処理中...")
- delay(1)
- }
- val result = deffered.await()
- val list = listOf(4, 1, 2, 5, 0)
- val single = Single.just(list)
- .delay(0, TimeUnit.SECONDS)
- .map { it.sorted() }
処理が終わるのを待たない場合
処理が終わるまで何もせずに待つ場合
- val result = single.blockingGet()
処理が終わるまで他のことをしながら待つ場合
- val future = single.toFuture()
- while (! future.isDone()) {
- println("処理中...")
- }
- val result = future.get()
Scala
Future を使う
- val list = List(4, 1, 2, 5, 0)
処理が終わるのを待たない場合
処理が終わるまで何もせずに待つ場合
- val future = Future { list.sorted }
- val result = Await.result(future, Duration.Inf)
処理が終わるまで他のことをしながら待つ場合
- val future = Future { list.sorted }
- while (!future.isCompleted) {
- println("処理中...")
- }
- val result = Await.result(future, Duration.Zero)
Akka を使った場合(無名のアクター)
- implicit val actorSystem = ActorSystem()
- val duration = 1 minutes
- implicit val timeout = Timeout(duration)
- val s = actor(new Act {
- become {
- case list: List[Int] => sender() ! list.sorted
- }
- })
- val list = List(4, 1, 2, 5, 0)
処理が終わるのを待たない場合
処理が終わるまで何もせずに待つ場合
- val result = Await.result((s ? list).mapTo[List[Int]], duration)
処理が終わるまで他のことをしながら待つ場合
- val future = (s ? list).mapTo[List[Int]]
- while (!future.isCompleted) {
- println("処理中...")
- }
- val result = Await.result(future, duration)
Akka を使った場合(アクタークラスを定義)
- class Sorter[A](implicit ord: Ordering[A]) extends Actor {
- override def receive = {
- case list: List[A] => sender() ! list.sorted
- }
- }
- val actorSystem = ActorSystem()
- val duration = 1 minutes
- implicit val timeout = Timeout(duration)
- val s = actorSystem.actorOf(Props(classOf[Sorter[_]], Ordering.Int))
- val list = List(4, 1, 2, 5, 0)
処理が終わるのを待たない場合
処理が終わるまで何もせずに待つ場合
- val result = Await.result((s ? list).mapTo[List[Int]], duration)
処理が終わるまで他のことをしながら待つ場合
- val future = (s ? list).mapTo[List[Int]]
- while (!future.isCompleted) {
- println("処理中...")
- }
- val result = Await.result(future, duration)
- val list = List(4, 1, 2, 5, 0)
- val single = Single.just(list)
- .delay(0, TimeUnit.SECONDS)
- .map { _.sorted }
処理が終わるのを待たない場合
処理が終わるまで何もせずに待つ場合
- val result = single.blockingGet
処理が終わるまで他のことをしながら待つ場合
- val future = single.toFuture
- while (! future.isDone) {
- println("処理中...")
- }
- val result = future.get
Erlang
- SorterPid = spawn(fun() ->
- receive
- {Pid, List} -> Pid ! {self(), lists:sort(List)}
- end
- end),
処理が終わるのを待たない場合
- SorterPid ! {self(), [4, 1, 2, 5, 0]}.
処理が終わるまで何もせずに待つ場合
- SorterPid ! {self(), [4, 1, 2, 5, 0]},
- receive
- {SorterPid, Result} -> void
- end.
処理が終わるまで他のことをしながら待つ場合
- wait_result(Pid) ->
- receive
- {Pid, Result} -> Result
- after 0 ->
- io:format("処理中...~n", []),
- wait_result(Pid)
- end.
- SorterPid ! {self(), [4, 1, 2, 5, 0]},
- Result = wait_result(SorterPid).
F#
非同期ワークフローを使った場合
処理が終わるのを待たない場合
- let list = [4; 1; 2; 5; 0]
- async {
- list |> List.sort |> ignore
- } |> Async.Start
処理が終わるまで何もせずに待つ場合
- let list = [4; 1; 2; 5; 0]
- let a = async {
- return list |> List.sort
- }
- let result = (Async.StartAsTask a).Result
- let list = [4; 1; 2; 5; 0]
- let a = async {
- return list |> List.sort
- }
- let result = Async.Parallel [a] |> Async.RunSynchronously |> Seq.head
- let asyncFunc func arg =
- let _delegate = new Func<'T, 'TResult>(func)
- async { return! Async.FromBeginEnd(arg, _delegate.BeginInvoke, _delegate.EndInvoke) }
- let list = [4; 1; 2; 5; 0]
- let result = list |> asyncFunc List.sort |> Async.RunSynchronously
処理が終わるまで他のことをしながら待つ場合
- let list = [4; 1; 2; 5; 0]
- let a = async {
- return list |> List.sort
- }
- let task = Async.StartAsTask a
- while not task.IsCompleted do printfn "処理中..."
- let result = task.Result
- let array = [|4; 1; 2; 5; 0|]
- let result = ref null
- let a = async {
- result := Array.sort array
- }
- let b = async {
- while !result = null do printfn "処理中..."
- }
- Async.Parallel [a; b] |> Async.RunSynchronously |> ignore
タスク並列ライブラリ(TPL)を使った場合
処理が終わるのを待たない場合
- let list = [4; 1; 2; 5; 0]
- Task.Run(fun () -> List.sort list) |> ignore
処理が終わるまで何もせずに待つ場合
- let list = [4; 1; 2; 5; 0]
- let task = Task.Run(fun () -> List.sort list)
- let result = task.Result
- let array = [|4; 1; 2; 5; 0|]
- let result = ref null
- let a() =
- result := Array.sort array
- Parallel.Invoke [| new Action(a) |]
処理が終わるまで他のことをしながら待つ場合
- let list = [4; 1; 2; 5; 0]
- let task = Task.Run(fun () -> List.sort list)
- while not task.IsCompleted do printfn "処理中..."
- let result = task.Result
- let array = [|4; 1; 2; 5; 0|]
- let result = ref null
- let a() =
- result := Array.sort array
- let b() =
- while !result = null do printfn "処理中..."
- Parallel.Invoke [| new Action(a); new Action(b) |]
- let array = [|4; 1; 2; 5; 0|]
処理が終わるのを待たない場合
- Observable.Start(fun () -> Array.sort array) |> ignore
処理が終わるまで何もせずに待つ場合
- let result = Observable.Start(fun () -> Array.sort array).Last()
処理が終わるまで他のことをしながら待つ場合
- let result = ref null
- Observable.Start(fun () -> Array.sort array).Subscribe(fun array -> result := array) |> ignore
- while !result = null do printfn "処理中..."
C#
Thread クラスを使った場合
- int[] array = { 4, 1, 2, 5, 0 };
- int[] result = null;
- Thread t = new Thread(() => {
- int[] sorted = (int[])array.Clone();
- Array.Sort(sorted);
- result = sorted;
- });
処理が終わるのを待たない場合
処理が終わるまで何もせずに待つ場合
処理が終わるまで他のことをしながら待つ場合
- t.Start();
- while (t.IsAlive) {
- Console.WriteLine("処理中...");
- }
ThreadPool クラスを使った場合
- int[] array = { 4, 1, 2, 5, 0 };
- int[] result = null;
- WaitCallback callback = state => {
- int[] sorted = (int[])array.Clone();
- Array.Sort(sorted);
- result = sorted;
- };
処理が終わるのを待たない場合
- ThreadPool.QueueUserWorkItem(callback);
処理が終わるまで何もせずに待つ場合
- ThreadPool.QueueUserWorkItem(callback);
- TimeSpan sleepTime = TimeSpan.FromMilliseconds(100);
- while (result == null) {
- Thread.sleep(sleepTime);
- }
処理が終わるまで他のことをしながら待つ場合
- ThreadPool.QueueUserWorkItem(callback);
- while (result == null) {
- Console.WriteLine("処理中...");
- }
デリゲートの BeginInvoke メソッドを使った場合
- int[] array = { 4, 1, 2, 5, 0 };
- Converter<int[], int[]> c = input => {
- int[] sorted = (int[])input.Clone();
- Array.Sort(sorted);
- return sorted;
- };
処理が終わるのを待たない場合
- c.BeginInvoke(array, null, null);
- AsyncCallback callback = ar => {
- int[] result = c.EndInvoke(ar);
- };
- c.BeginInvoke(array, callback, null);
処理が終わるまで何もせずに待つ場合
- IAsyncResult ar = c.BeginInvoke(array, null, null);
- int[] result = c.EndInvoke(ar);
- IAsyncResult ar = c.BeginInvoke(array, null, null);
- using (WaitHandle wh = ar.AsyncWaitHandle) {
- wh.WaitOne();
- int[] result = c.EndInvoke(ar);
- }
処理が終わるまで他のことをしながら待つ場合
- IAsyncResult ar = c.BeginInvoke(array, null, null);
- while (! ar.IsCompleted) {
- Console.WriteLine("処理中...");
- }
- int[] result = c.EndInvoke(ar);
タスク並列ライブラリ(TPL)を使った場合
- int[] array = { 4, 1, 2, 5, 0 };
- Func<int[], int[]> sorter = input => {
- int[] sorted = (int[])input.Clone();
- Array.Sort(sorted);
- return sorted;
- };
処理が終わるのを待たない場合
- Task.Run(() => sorter(array));
処理が終わるまで何もせずに待つ場合
- var task = Task.Run(() => sorter(array));
- var result = task.Result;
- var task = Task.Run(() => sorter(array));
- var result = await task;
処理が終わるまで他のことをしながら待つ場合
- var task = Task.Run(() => sorter(array));
- while (! task.IsCompleted) {
- Console.WriteLine("処理中...");
- }
- var result = task.Result;
- int[] array = { 4, 1, 2, 5, 0 };
- Func<int[], int[]> sorter = input => {
- int[] sorted = (int[])input.Clone();
- Array.Sort(sorted);
- return sorted;
- };
処理が終わるのを待たない場合
- Observable.Start(() => sorter(array));
処理が終わるまで何もせずに待つ場合
- var result = sorter.ToAsync()(array).Last();
- var result = Observable.Start(() => sorter(array)).Last();
処理が終わるまで他のことをしながら待つ場合
- int[] result = null;
- sorter.ToAsync()(array).Subscribe(list => result = list);
- while (result == null) {
- Console.WriteLine("処理中...");
- }
Go
- func doSort(in <-chan []int, out chan<- []int) {
- array := <- in
- sorted := make([]int, len(array))
- copy(sorted, array)
- sort.SortInts(sorted)
- out <- sorted
- }
- in := make(chan []int)
- out := make(chan []int)
- go doSort(in, out)
処理が終わるのを待たない場合
- in <- []int{4, 1, 2, 5, 0}
処理が終わるまで何もせずに待つ場合
- in <- []int{4, 1, 2, 5, 0}
- result := <- out
処理が終わるまで他のことをしながら待つ場合
- in <- []int{4, 1, 2, 5, 0}
- var result []int
- for {
- var ok bool
- result, ok = <- out
- if ok { break }
- fmt.Printf("処理中...")
- }
Rust
- let vec = vec![4, 1, 2, 5, 0];
処理が終わるのを待たない場合
- thread::spawn(move || {
- let mut v = vec.clone();
- v.sort()
- });
処理が終わるまで何もせずに待つ場合
- let t = thread::spawn(move || {
- let mut v = vec.clone();
- v.sort();
- v
- });
- while ! t.is_finished() {
- println!("処理中...");
- }
処理が終わるまで他のことをしながら待つ場合
- let t = thread::spawn(move || {
- let mut v = vec.clone();
- v.sort();
- v
- });
- let result = t.join().unwrap();
Dart
- var list = [4, 1, 2, 5, 0];
処理が終わるのを待たない場合
- Future(() {
- var sorted = List.from(list);
- sorted.sort();
- return sorted;
- });
処理が終わるまで何もせずに待つ場合
- var future = Future(() {
- var sorted = List.from(list);
- sorted.sort();
- return sorted;
- });
- var result = await future;
Ruby
処理が終わるのを待たない場合
- array = [4, 1, 2, 5, 0]
- Thread.start { array.sort }
- array = [4, 1, 2, 5, 0]
- Thread.start(array) {|a| a.sort }
処理が終わるまで何もせずに待つ場合
- array = [4, 1, 2, 5, 0]
- result = Thread.start { array.sort }.value
- array = [4, 1, 2, 5, 0]
- result = Thread.start(array) {|a| a.sort }.value
処理が終わるまで他のことをしながら待つ場合
- array = [4, 1, 2, 5, 0]
- t = Thread.start { array.sort }
- while t.alive?
- puts '処理中...'
- end
- result = t.value
- array = [4, 1, 2, 5, 0]
- t = Thread.start(array) {|a| a.sort }
- while t.alive?
- puts '処理中...'
- end
- result = t.value
Python
専用のスレッドクラスを作る
- class Sorter(threading.Thread):
- def __init__(self, *original):
- threading.Thread.__init__(self)
- self.original = original
- def run(self):
- self.sorted = sorted(self.original)
- sorter = Sorter(4, 1, 2, 5, 0)
- sorter.name = 'Sorting'
処理が終わるのを待たない場合
処理が終わるまで何もせずに待つ場合
- sorter.start()
- sorter.join()
- result = sorter.sorted
処理が終わるまで他のことをしながら待つ場合
- sorter.start()
- while sorter.is_alive():
- print('処理中...')
- result = sorter.sorted
汎用的なスレッドクラスを作る
- class Thread(threading.Thread):
- def __init__(self, func, *args):
- threading.Thread.__init__(self)
- self.func = func
- self.args = args
- def run(self):
- self.result = self.func(*self.args)
- sorter = Thread(sorted, [4, 1, 2, 5, 0])
- sorter.name = 'Sorting'
処理が終わるのを待たない場合
処理が終わるまで何もせずに待つ場合
- sorter.start()
- sorter.join()
- result = sorter.result
処理が終わるまで他のことをしながら待つ場合
- sorter.start()
- while sorter.is_alive():
- print('処理中...')
- result = sorter.result
Perl
処理が終わるのを待たない場合
- threads->create(sub { sort @_ }, 4, 1, 2, 5, 0)->detach;
- async { sort 4, 1, 2, 5, 0 }->detach;
処理が終わるまで何もせずに待つ場合
- my ($t) = threads->create(sub { sort @_ }, 4, 1, 2, 5, 0);
- my @result = $t->join;
処理が終わるまで他のことをしながら待つ場合
- my ($t) = threads->create(sub { sort @_ }, 4, 1, 2, 5, 0);
- print "処理中...\n" while $t->is_running;
- my @result = $t->join;
処理内容は、(1) 開始(Start)のメッセージを出力する (2) スリープする (3) 終了(End)のメッセージを出力する。
Java
- public static void write(int n, String s) {
- System.out.println(MessageFormat.format("{0,date,HH:mm:ss.SSS} {1,number,00}: {2} Thread-{3}",
- new Date(), n, s, Thread.currentThread().getId()));
- }
- public static void exec(int n) {
- write(n, "S ");
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- }
- write(n, " E");
- }
- IntStream stream = IntStream.range(0, 20)
- stream.parallel().forEach(n -> exec(n));
RxJava を使った場合
- Flowable.range(0, 20)
- .parallel()
- .runOn(Schedulers.newThread())
- .doOnNext(n -> exec(n))
- .sequential()
- .blockingSubscribe();
Groovy
- def write(n, s) {
- println MessageFormat.format("{0,date,HH:mm:ss.SSS} {1,number,00}: {2} Thread-{3}",
- new Date(), n, s, Thread.currentThread().id)
- }
- def exec(n) {
- write n, "S "
- Thread.sleep 3000
- write n, " E"
- }
- def list = 1..20
- list.stream().parallel().forEach { exec it }
RxJava を使った場合
- def list = 1..20
- Flowable.fromIterable(list)
- .parallel()
- .runOn(Schedulers.newThread())
- .doOnNext { exec it }
- .sequential()
- .blockingSubscribe()
Kotlin
- fun write(n: Int, s: String) {
- println(MessageFormat.format("{0,date,HH:mm:ss.SSS} {1,number,00}: {2} Thread-{3}", Date(), n, s, Thread.currentThread().getId()))
- }
- suspend fun exec(n: Int) {
- write(n, "S ")
- delay(3000)
- write(n, " E")
- }
- val jobs = Array(20) { i ->
- launch { exec(i + 1) }
- }
- joinAll(*jobs)
RxJava を使った場合
- fun write(n: Int, s: String) {
- println(MessageFormat.format("{0,date,HH:mm:ss.SSS} {1,number,00}: {2} Thread-{3}", Date(), n, s, Thread.currentThread().getId()))
- }
- fun exec(n: Int) {
- write(n, "S ")
- Thread.sleep(3000)
- write(n, " E")
- }
- val range = 1..20
- Flowable.fromIterable(range)
- .parallel()
- .runOn(Schedulers.newThread())
- .doOnNext { exec(it) }
- .sequential()
- .blockingSubscribe()
Scala
- def write(n: Int, s: String) = println(
- MessageFormat.format("{0,date,HH:mm:ss.SSS} {1,number,00}: {2} Thread-{3}",
- new Date, Integer valueOf n, s, Thread.currentThread.getId.toString))
- def exec(n: Int) = {
- write(n, "S ")
- Thread sleep 3000
- write(n, " E")
- }
- val range = 1 to 20
- range.par.foreach(exec)
RxJava を使った場合
- val range = 1 to 20
- Flowable.fromIterable(range.asJava)
- .parallel
- .runOn(Schedulers.newThread)
- .doOnNext(exec)
- .sequential
- .blockingSubscribe
Erlang
- List = lists:seq(1, 20),
- Write = fun(N, S) ->
- {Hour, Minute, Second} = time(),
- {_, _, MicroSecs} = now(),
- io:format("~2..0b:~2..0b:~2..0b.~3..0b ~2..0b: ~s Process~p~n",
- [Hour, Minute, Second, MicroSecs div 1000, N, S, self()])
- end,
- Exec = fun(N, Caller) ->
- Write(N, "S "),
- timer:sleep(3000),
- Write(N, " E"),
- Caller ! self()
- end,
- Pids = lists:map(fun(N) ->
- Caller = self(),
- spawn(fun() -> Exec(N, Caller) end)
- end, List),
- lists:foreach(fun(Pid) ->
- receive
- Pid -> ok
- end
- end, Pids).
F#
- let list = [1..20]
- let write n s =
- let now = DateTime.Now.ToString "HH:mm:ss.fff"
- sprintf "%s %02d: %s Thread-%d" now n s Thread.CurrentThread.ManagedThreadId |> Console.WriteLine
- let exec n =
- write n "S "
- TimeSpan.FromSeconds 3.0 |> Thread.Sleep
- write n " E"
非同期ワークフローを使った場合
- let tasks = [ for n in list -> async { exec n } |> Async.StartAsTask ]
- for task in tasks do task.Wait()
- [ for n in list -> async { exec n } ] |> Async.Parallel |> Async.RunSynchronously |> ignore
タスク並列ライブラリ(TPL)を使った場合
- Parallel.ForEach(list, exec) |> ignore
- list.AsParallel().ForAll(new Action<_>(exec))
- list.AsParallel().ForAll(fun n -> exec n)
- Parallel.Invoke [| for n in list -> new Action(fun () -> exec n) |]
- Task.WaitAll [| for n in list -> Task.Run(fun () -> exec n) |]
F# PowerPack を使った場合
The Reactive Extensions for .NET を使った場合
- let execAsync n = Observable.Start(fun () -> exec n)
- list.Select(execAsync).Merge().Last() |> ignore
- let execAsync n = Observable.Start(fun () -> exec n)
- list.ToObservable().SelectMany(execAsync).Last() |> ignore
C#
- var e = Enumerable.Range(1, 20);
- Action<int, string> write = (n, s) => Console.WriteLine(
- string.Format("{0:HH:mm:ss.fff} {1:00}: {2} Thread-{3}",
- DateTime.Now, n, s, Thread.CurrentThread.ManagedThreadId));
- Action<int> exec = n => {
- write(n, "S ");
- Thread.Sleep(TimeSpan.FromSeconds(3));
- write(n, " E");
- };
タスク並列ライブラリ(TPL)を使った場合
- Parallel.ForEach(e, exec);
- e.AsParallel().ForAll(exec);
- var actions = from n in e select new Action(() => exec(n));
- Parallel.Invoke(actions.ToArray());
- var tasks = from n in e select Task.Run(() => exec(n));
- Task.WaitAll(tasks.ToArray());
- var tasks = from n in e select Task.Run(() => exec(n));
- await Task.WhenAll(tasks);
The Reactive Extensions for .NET を使った場合
- e.Select(exec.ToAsync()).Merge().Last();
- e.ToObservable().SelectMany(exec.ToAsync()).Last();
- (from n in e.ToObservable()
- from u in exec.ToAsync()(n)
- select u
- ).Last();
Go
- array := [...]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}
- write := func(n int, s string) {
- now := time.LocalTime().Format("15:04:05")
- msec := (time.Nanoseconds() % 1e9) / 1e6
- fmt.Printf("%s.%03d %02d: %s\n", now, msec, n, s)
- }
- replyCh := make(chan interface{})
- exec := func(n int) {
- write(n, "S ")
- time.Sleep(3e9)
- write(n, " E")
- replyCh <- nil
- }
- for _, n := range(array) {
- go exec(n)
- }
- for _, _ = range(array) {
- _ = <- replyCh
- }
Rust
- fn write(n: i32, s: &str) {
- println!("{} {:>02}: {} {:?}", Local::now().format("%H:%M:%S%.3f"), n, s, thread::current().id());
- }
- fn exec(n: i32) {
- write(n, "S ");
- thread::sleep(time::Duration::from_secs(3));
- write(n, " E");
- }
- let handles: Vec<_> = (0..20).map(move |i| thread::spawn(move || exec(i))).collect();
- for handle in handles {
- handle.join().unwrap();
- }
Ruby
- range = 1..20
- def write n, s
- Thread.exclusive do
- puts sprintf('%s %02d: %s %s', Time.now.strftime('%H:%M:%S.%L'), n, s, Thread.current)
- end
- end
- def exec n
- write n, 'S '
- sleep 3
- write n, ' E'
- end
- threads = range.collect do |n|
- Thread.start { exec n }
- end
- threads.each &:join
Python
- print_lock = threading.RLock()
- class Thread(threading.Thread):
- def __init__(self, n):
- threading.Thread.__init__(self)
- self.n = n
- def run(self):
- self.write('S ')
- time.sleep(3)
- self.write(' E')
- def write(self, s):
- now = datetime.now().strftime('%H:%M:%S')
- threadId = threading.current_thread().ident
- with print_lock:
- print('%s %02d: %s %s' % (now, self.n, s, threadId))
- l = range(0, 20)
- for n in l:
- Thread(n).start()
Perl
- my @array = (1..20);
- my $write = sub {
- my $now = localtime->strftime('%H:%M:%S');
- printf "%s %02d: %s Thread-%s\n", $now, $_[0], $_[1], threads->tid;
- };
- my $exec = sub {
- my $n = shift;
- $write->($n, 'S ');
- sleep 3;
- $write->($n, ' E');
- };
- my @threads = map { threads->create($exec, $_) } @array;
- $_->join for @threads;
変換処理の内容は、(1) 開始(Start)のメッセージを出力する (2) スリープする (3) 終了(End)のメッセージを出力する (4) 元の値を-1倍して返す。
Java
- public static void write(int n, String s) {
- System.out.println(MessageFormat.format("{0,date,HH:mm:ss.SSS} {1,number,00}: {2} Thread-{3}",
- new Date(), n, s, Thread.currentThread().getId()));
- }
- public static int exec(int n) {
- write(n, "S ");
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- }
- write(n, " E");
- return -n;
- }
- IntStream stream = IntStream.iterate(1, n -> n + 1).limit(20);
- IntStream result = stream.parallel().map(n -> exec(n));
- System.out.println(result.boxed().collect(Collectors.toList()));
RxJava を使った場合
- Map<Integer, Integer> map = Collections.synchronizedMap(new TreeMap<>());
- Flowable.range(0, 20)
- .parallel()
- .runOn(Schedulers.newThread())
- .doOnNext(n -> map.put(n, exec(n)))
- .sequential()
- .blockingSubscribe();
- Collection<Integer> result = map.values();
- System.out.println(result);
Groovy
- def write(n, s) {
- println MessageFormat.format("{0,date,HH:mm:ss.SSS} {1,number,00}: {2} Thread-{3}",
- new Date(), n, s, Thread.currentThread().id)
- }
- def exec(n) {
- write n, "S "
- Thread.sleep 3000
- write n, " E"
- -n
- }
- def list = 1..20
- def result = list.stream().parallel().map { exec it }.toArray()
- println result
RxJava を使った場合
- def map = Collections.synchronizedMap(new TreeMap())
- def list = 1..20
- Flowable.fromIterable(list)
- .parallel()
- .runOn(Schedulers.newThread())
- .doOnNext { map[it] = exec it }
- .sequential()
- .blockingSubscribe()
- println map.values()
Kotlin
- fun write(n: Int, s: String) {
- println(MessageFormat.format("{0,date,HH:mm:ss.SSS} {1,number,00}: {2} Thread-{3}", Date(), n, s, Thread.currentThread().getId()))
- }
- suspend fun exec(n: Int): Int {
- write(n, "S ")
- delay(3000)
- write(n, " E")
- return -n
- }
- val deffereds = Array(20) { i ->
- async { exec(i + 1) }
- }
- val result = awaitAll(*deffereds)
- println(result)
RxJava を使った場合
- fun write(n: Int, s: String) {
- println(MessageFormat.format("{0,date,HH:mm:ss.SSS} {1,number,00}: {2} Thread-{3}", Date(), n, s, Thread.currentThread().getId()))
- }
- fun exec(n: Int): Int {
- write(n, "S ")
- Thread.sleep(3000)
- write(n, " E")
- return -n
- }
- val map = Collections.synchronizedMap(mutableMapOf<Int, Int>())
- val range = 1..20
- Flowable.fromIterable(range)
- .parallel()
- .runOn(Schedulers.newThread())
- .doOnNext { map += it to exec(it) }
- .sequential()
- .blockingSubscribe()
- println(map.values)
Scala
- def write(n: Int, s: String) = println(
- MessageFormat.format("{0,date,HH:mm:ss.SSS} {1,number,00}: {2} Thread-{3}",
- new Date, Integer valueOf n, s, Thread.currentThread.getId.toString))
- def exec(n: Int) = {
- write(n, "S ")
- Thread sleep 3000
- write(n, " E")
- -n
- }
- val range = 1 to 20
- val result = range.par.map(exec)
- println(result)
RxJava を使った場合
- val map = Collections.synchronizedMap(new TreeMap[Int, Int])
- val range = 1 to 20
- Flowable.fromIterable(range.asJava)
- .parallel
- .runOn(Schedulers.newThread)
- .doOnNext { n => map.put(n, exec(n)) }
- .sequential
- .blockingSubscribe
- println(map.values)
Erlang
- List = lists:seq(1, 20),
- Write = fun(N, S) ->
- {Hour, Minute, Second} = time(),
- {_, _, MicroSecs} = now(),
- io:format("~2..0b:~2..0b:~2..0b.~3..0b ~2..0b: ~s Process~p~n",
- [Hour, Minute, Second, MicroSecs div 1000, N, S, self()])
- end,
- Exec = fun(N, Caller) ->
- Write(N, "S "),
- timer:sleep(3000),
- Write(N, " E"),
- Caller ! {self(), -N}
- end,
- Pids = lists:map(fun(N) ->
- Caller = self(),
- spawn(fun() -> Exec(N, Caller) end)
- end, List),
- Result = lists:map(fun(Pid) ->
- receive
- {Pid, N} -> N
- end
- end, Pids),
- io:format("~p~n", [Result]),
F#
- let list = [1..20]
- let write n s =
- let now = DateTime.Now.ToString "HH:mm:ss.fff"
- sprintf "%s %02d: %s Thread-%d" now n s Thread.CurrentThread.ManagedThreadId |> Console.WriteLine
- let exec n =
- write n "S "
- TimeSpan.FromSeconds 3.0 |> Thread.Sleep
- write n " E"
- -n
非同期ワークフローを使った場合
- let tasks = [ for n in list -> async { return exec n } |> Async.StartAsTask ]
- let result = [ for task in tasks -> task.Result ]
- printfn "%A" result
- let result = [ for n in list -> async { return exec n } ] |> Async.Parallel |> Async.RunSynchronously
- result |> Array.toList |> printfn "%A"
タスク並列ライブラリ(TPL)を使った場合
- let result = list.AsParallel().Select exec |> Seq.toList
- printfn "%A" result
F# PowerPack を使った場合
- let result = list |> PSeq.mapi (fun i n -> i, exec n) |> PSeq.sort |> PSeq.map snd |> PSeq.toList
- printfn "%A" result
The Reactive Extensions for .NET を使った場合
- let execAsync n = Observable.Start(fun () -> exec n)
- let result = list.Select(execAsync).Zip().Last()
- printfn "%A" result
- let execAsync n = Observable.Start(fun () -> exec n)
- let result = list.Select(execAsync).CombineLatest().Last()
- printfn "%A" result
C#
- var e = Enumerable.Range(1, 20);
- Action<int, string> write = (n, s) => Console.WriteLine(
- string.Format("{0:HH:mm:ss.fff} {1:00}: {2} Thread-{3}",
- DateTime.Now, n, s, Thread.CurrentThread.ManagedThreadId));
- Func<int, int> exec = n => {
- write(n, "S ");
- Thread.Sleep(TimeSpan.FromSeconds(3));
- write(n, " E");
- return -n;
- };
タスク並列ライブラリ(TPL)を使った場合
- var result = from n in e.AsParallel().AsOrdered()
- select exec(n);
- Console.WriteLine(string.Join(" ", result));
- var result = e.AsParallel().AsOrdered().Select(exec);
- Console.WriteLine(string.Join(" ", result));
The Reactive Extensions for .NET を使った場合
- var result = e.Select(exec.ToAsync()).Zip().Last();
- Console.WriteLine(string.Join(" ", result));
- var result = e.Select(exec.ToAsync()).CombineLatest().Last();
- Console.WriteLine(string.Join(" ", result));
- var o = from n in e.ToObservable()
- from m in exec.ToAsync()(n)
- select new {n = n, m = m};
- var result = from r in o.ToEnumerable()
- orderby r.n
- select r.m;
- Console.WriteLine(string.Join(" ", result));
Go
- array := [...]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}
- write := func(n int, s string) {
- now := time.LocalTime().Format("15:04:05")
- msec := (time.Nanoseconds() % 1e9) / 1e6
- fmt.Printf("%s.%03d %02d: %s\n", now, msec, n, s)
- }
- replyChs := make([]chan int, len(array))
- exec := func(n int, replyCh chan<- int) {
- write(n, "S ")
- time.Sleep(3e9)
- write(n, " E")
- replyCh <- -n
- }
- for i, n := range(array) {
- replyChs[i] = make(chan int)
- go exec(n, replyChs[i])
- }
- result := make([]int, len(array))
- for i, replyCh := range(replyChs) {
- result[i] = <- replyCh
- }
- fmt.Printf("%v\n", result)
Rust
- fn write(n: i32, s: &str) {
- println!("{} {:>02}: {} {:?}", Local::now().format("%H:%M:%S%.3f"), n, s, thread::current().id());
- }
- fn exec(n: i32) -> i32 {
- write(n, "S ");
- thread::sleep(time::Duration::from_secs(3));
- write(n, " E");
- -n
- }
- let handles: Vec<_> = (0..20).map(move |i| thread::spawn(move || exec(i))).collect();
- let result: Vec<_> = handles.into_iter().map(|handle| handle.join().unwrap()).collect();
- println!("{:?}", result);
Ruby
- range = 1..20
- def write n, s
- Thread.exclusive do
- puts sprintf('%s %02d: %s %s', Time.now.strftime('%H:%M:%S.%L'), n, s, Thread.current)
- end
- end
- def exec n
- write n, 'S '
- sleep 3
- write n, ' E'
- -n
- end
- threads = range.collect do |n|
- Thread.start { exec n }
- end
- result = threads.collect &:value
- print result
Python
- print_lock = threading.RLock()
- class Thread(threading.Thread):
- def __init__(self, n):
- threading.Thread.__init__(self)
- self.n = n
- def run(self):
- self.write('S ')
- time.sleep(3)
- self.write(' E')
- self.result = - self.n
- def write(self, s):
- now = datetime.now().strftime('%H:%M:%S')
- threadId = threading.current_thread().ident
- with print_lock:
- print('%s %02d: %s %d' % (now, self.n, s, threadId))
- l = range(0, 20)
- threads = [Thread(n) for n in l]
- for t in threads:
- t.start()
- for t in threads:
- t.join()
- result = [t.result for t in threads]
- print(result)
Perl
- my @array = (1..20);
- my $write = sub {
- my $now = localtime->strftime('%H:%M:%S');
- printf "%s %02d: %s Thread-%s\n", $now, $_[0], $_[1], threads->tid;
- };
- my $exec = sub {
- my $n = shift;
- $write->($n, 'S ');
- sleep 3;
- $write->($n, ' E');
- -$n;
- };
- my @threads = map { threads->create($exec, $_) } @array;
- my @result = map { $_->join } @threads;
- print join(' ', @result), "\n";
Groovy
- class Server {
- private executor = Executors.newCachedThreadPool()
-
- private def processRequest(array) {
- array.sort()
- }
-
- def sendRequest(array) {
- executor.submit new Callable() {
- def call() {
- processRequest(array)
- }
- }
- }
-
- def quit() {
- executor.shutdown()
- }
- }
処理が終わるのを待たない場合
- def server = new Server()
- server.sendRequest(1..20 as int[])
処理が終わるまで何もせずに待つ場合
- def server = new Server()
- def future = server.sendRequest(1..20 as int[])
- def result = future.get()
処理が終わるまで他のことをしながら待つ場合
- def server = new Server()
- def future = server.sendRequest(1..20 as int[])
- while (! future.done) {
- println '処理中...'
- }
- def result = future.get()
Scala
Akka を使った場合
- class Server extends Actor {
- override def receive = {
- case list: List[Int] => context.actorOf(Props(classOf[Worker], this)).forward(list)
- }
- class Worker extends Actor {
- override def receive = {
- case list: List[Int] => sender() ! list.sorted
- }
- }
- }
- object Server {
- def start(implicit system: ActorSystem) = new Ref(system.actorOf(Props[Server]))
- class Ref(actor: ActorRef) {
- def sendRequest(list: List[Int])(implicit timeout: Timeout) = (actor ? list).mapTo[List[Int]]
- }
- }
- implicit val actorSystem = ActorSystem()
- val duration = 1 minutes
- implicit val timeout = Timeout(duration)
- val server = Server.start
処理が終わるのを待たない場合
- server sendRequest (1 to 20).toList
処理が終わるまで何もせずに待つ場合
- val future = server sendRequest (1 to 20).toList
- val result = Await.result(future, duration)
処理が終わるまで他のことをしながら待つ場合
- val future = server sendRequest (1 to 20).toList
- while (!future.isCompleted) {
- println("処理中...")
- }
- val result = Await.result(future, duration)
Erlang
- process_request() ->
- receive
- {ServerPid, ClientPid, Request} ->
- Result = lists:sort(Request),
- ClientPid ! {ServerPid, Result}
- end.
- service_loop() ->
- receive
- {ClientPid, Request} ->
- spawn(fun process_request/0) ! {self(), ClientPid, Request},
- service_loop();
- exit -> void
- end.
- start_server() -> spawn(fun service_loop/0).
処理が終わるのを待たない場合
- ServerPid = start_server(),
- ServerPid ! {self(), lists:seq(1, 20)}.
処理が終わるまで何もせずに待つ場合
- ServerPid = start_server(),
- ServerPid ! {self(), lists:seq(1, 20)},
- receive
- {ServerPid, Result} -> void
- end.
処理が終わるまで他のことをしながら待つ場合
- wait_result(Pid) ->
- receive
- {Pid, Result} -> Result
- after 0 ->
- io:format("処理中...~n", []),
- wait_result(Pid)
- end.
- ServerPid = start_server(),
- ServerPid ! {self(), lists:seq(1, 20)},
- Result = wait_result(ServerPid).
F#
- type Message<'a> = Request of 'a * AsyncReplyChannel<Task<'a>>
- | Exit
- type Server() =
-
- let processRequest list = Task.Run(fun () -> List.sort list)
-
- let rec loopReceive (agent : MailboxProcessor<_>) =
- async {
- let! msg = agent.Receive()
- match msg with
- | Request(list, replyChannel) -> replyChannel.Reply <| processRequest list
- return! loopReceive agent
- | Exit -> return ()
- }
- let agent = MailboxProcessor.Start loopReceive
-
- member this.SendRequest list = agent.PostAndReply <| fun replyChannel -> Request(list, replyChannel)
-
- member this.Quit() = agent.Post Exit
処理が終わるのを待たない場合
- let server = Server()
- server.SendRequest [1..20] |> ignore
処理が終わるまで何もせずに待つ場合
- let server = Server()
- let task = server.SendRequest [1..20]
- let result = task.Result
処理が終わるまで他のことをしながら待つ場合
- let server = Server()
- let task = server.SendRequest [1..20]
- while not task.IsCompleted do printfn "処理中..."
- let result = task.Result
Go
- type Request struct {
- Array []int
- ReplyCh chan []int
- }
- type Server struct {
- requestCh chan *Request
- quitCh chan bool
- }
- func (this *Server) SendRequest(array []int) *Request {
- request := &Request{array, make(chan []int)}
- this.requestCh <- request
- return request
- }
- func (this *Server) Quit() {
- this.quitCh <- true
- }
- func (this *Server) processRequest(request *Request) {
- sorted := make([]int, len(request.Array))
- copy(sorted, request.Array)
- sort.SortInts(sorted)
- request.ReplyCh <- sorted
- }
- func (this *Server) serviceLoop() {
- for {
- select {
- case request := <- this.requestCh:
- go this.processRequest(request)
- case <- this.quitCh:
- close(this.requestCh)
- close(this.quitCh)
- return
- }
- }
- }
- func StartServer() *Server {
- server := &Server{make(chan *Request, 1000), make(chan bool)}
- go server.serviceLoop()
- return server
- }
処理が終わるのを待たない場合
- server := StartServer()
- request := server.SendRequest([]int{4, 1, 2, 5, 0})
処理が終わるまで何もせずに待つ場合
- server := StartServer()
- request := server.SendRequest([]int{4, 1, 2, 5, 0})
- reply := <- request.ReplyCh
処理が終わるまで他のことをしながら待つ場合
- server := StartServer()
- request := server.SendRequest([]int{4, 1, 2, 5, 0})
- var reply []int
- for {
- var ok bool
- reply, ok = <- request.ReplyCh
- if ok { break }
- fmt.Printf("処理中...")
- }
MapReduce により、テキストファイル内の単語をカウントする。
Java
- public final class MapReduce {
- public static <K1, V1, K2, V2, K3, V3> void execute(
- Stream<Entry<K1, V1>> read,
- Function<Entry<K1, V1>, Stream<Entry<K2, V2>>> map,
- Function<Entry<K2, Stream<V2>>, Stream<Entry<K3, V3>>> reduce,
- Consumer<Entry<K3, V3>> write) {
- Collector<Entry<K2, V2>, ?, ConcurrentMap<K2, Stream<V2>>> multiMapCollector =
- Collectors.toConcurrentMap(e -> e.getKey(), e -> Stream.of(e.getValue()), Stream::concat);
- Stream<Entry<K2, V2>> mapped = processParallel("map", read, map);
- Stream<Entry<K2, Stream<V2>>> shuffled = mapped.collect(multiMapCollector).entrySet().stream();
- System.out.println("shuffled");
- processParallel("reduce", shuffled, reduce).forEachOrdered(write);
- System.out.println("completed");
- }
- private static <K1, V1, K2, V2> Stream<Entry<K2, V2>> processParallel(
- String processName,
- Stream<Entry<K1, V1>> input,
- Function<Entry<K1, V1>, Stream<Entry<K2, V2>>> process) {
- return input.parallel().flatMap(e -> {
- try {
- System.out.printf("%s start key=[%s] value=[%s]%n", processName, e.getKey(), e.getValue());
- return process.apply(e);
- } finally {
- System.out.printf("%s end%n", processName);
- }
- });
- }
- }
- public final class MapReduce {
- public static <K1, V1, K2, V2, K3, V3> void execute(
- Stream<Entry<K1, V1>> read,
- Function<Entry<K1, V1>, Stream<Entry<K2, V2>>> map,
- Function<Entry<K2, Stream<V2>>, Stream<Entry<K3, V3>>> reduce,
- Consumer<Entry<K3, V3>> write) {
- Map<K2, Stream<V2>> m = new HashMap<>();
- processParallel("map", read, map,
- e -> m.merge(e.getKey(), Stream.of(e.getValue()), Stream::concat));
- Stream<Entry<K2, Stream<V2>>> shuffled = m.entrySet().stream();
- System.out.println("shuffled");
- processParallel("reduce", shuffled, reduce, write);
- System.out.println("completed");
- }
- private static <K1, V1, K2, V2> void processParallel(
- String processName,
- Stream<Entry<K1, V1>> input,
- Function<Entry<K1, V1>, Stream<Entry<K2, V2>>> process,
- Consumer<Entry<K2, V2>> output) {
- input.parallel().flatMap(e -> {
- try {
- System.out.printf("%s start key=[%s] value=[%s]%n", processName, e.getKey(), e.getValue());
- return process.apply(e);
- } finally {
- System.out.printf("%s end%n", processName);
- }
- }).forEachOrdered(output);
- }
- }
- try (Stream<String> lines = Files.lines(Path.of("in.txt"))) {
- MapReduce.<Void, String, String, Integer, String, Long>execute(
-
- lines.map(line -> new SimpleEntry<>(null, line)),
-
- e -> Pattern.compile("\\W+").splitAsStream(e.getValue()).map(s -> new SimpleEntry<>(s, 1)),
-
- e -> Stream.of(new SimpleEntry<>(e.getKey(), e.getValue().count())),
-
- e -> System.out.printf("%5d\t%s%n", e.getValue(), e.getKey())
- );
- }
RxJava を使った場合
- public final class MapReduce {
- public static <K1, V1, K2, V2, K3, V3> Single<Long> execute(
- Flowable<Entry<K1, V1>> reader,
- Function<Entry<K1, V1>, Flowable<Entry<K2, V2>>> map,
- Function<Entry<K2, Flowable<V2>>, Flowable<Entry<K3, V3>>> reduce,
- Consumer<Entry<K3, V3>> writer) {
- ParallelFlowable<Entry<K2, V2>> mapped = processParallel("map", reader, map);
- Flowable<Entry<K2, Flowable<V2>>> shuffled = mapped.sequential()
- .toMultimap(e -> e.getKey(), e -> e.getValue())
- .toFlowable()
- .flatMap(m -> Flowable.fromIterable(m.entrySet()))
- .map(e -> new SimpleEntry<>(e.getKey(), Flowable.fromIterable(e.getValue())));
- ParallelFlowable<Entry<K3, V3>> reduced = processParallel("reduce", shuffled, reduce);
- return reduced.sequential()
- .doOnNext(writer)
- .count();
- }
- private static <K1, V1, K2, V2> ParallelFlowable<Entry<K2, V2>> processParallel(
- String processName,
- Flowable<Entry<K1, V1>> input,
- Function<Entry<K1, V1>, Flowable<Entry<K2, V2>>> process) {
- return input.parallel()
- .runOn(Schedulers.newThread())
- .flatMap(e -> {
- try {
- System.out.printf("%s start key=[%s] value=[%s]%n", processName, e.getKey(), e.getValue());
- return process.apply(e);
- } finally {
- System.out.printf("%s end%n", processName);
- }
- });
- }
- }
- try (Stream<String> lines = Files.lines(Path.of("in.txt"))) {
- Long count = MapReduce.<Void, String, String, Integer, String, Long>execute(
-
- Flowable.fromStream(lines.map(line -> new SimpleEntry<>(null, line))),
-
- e -> Flowable.fromArray(e.getValue().split("\\W+"))
- .filter(s -> s.length() > 0)
- .map(s -> new SimpleEntry<>(s, 1)),
-
- e -> e.getValue().count().toFlowable().map(n -> new SimpleEntry<>(e.getKey(), n)),
-
- e -> System.out.printf("%5d\t%s%n", e.getValue(), e.getKey())
- ).blockingGet();
- System.out.printf("completed: %d%n", count);
- }
C#
タスク並列ライブラリ(TPL)を使った場合
- public sealed class MapReduce {
- public static void Execute<K1, V1, K2, V2, K3, V3>(
- IEnumerable<(K1, V1)> read,
- Func<(K1, V1), IEnumerable<(K2, V2)>> map,
- Func<(K2, IEnumerable<V2>), IEnumerable<(K3, V3)>> reduce,
- Action<(K3, V3)> write) {
- var mapped = ProcessParallel("map", read, map);
- var shuffled = from grp in (from t in mapped group t.Item2 by t.Item1)
- select (grp.Key, grp.AsEnumerable());
- var reduced = ProcessParallel("reduce", shuffled, reduce);
- foreach (var t in reduced) {
- write(t);
- }
- }
- private static ParallelQuery<(K2, V2)> ProcessParallel<K1, V1, K2, V2>(
- string processName,
- IEnumerable<(K1, V1)> input,
- Func<(K1, V1), IEnumerable<(K2, V2)>> process) {
- return input.AsParallel().SelectMany(t => {
- Console.WriteLine($"{processName} start key=[{t.Item1}] value=[{t.Item2}]");
- try {
- System.Threading.Thread.Sleep(200);
- return process(t);
- } finally {
- Console.WriteLine($"{processName} end");
- }
- });
- }
- }
- MapReduce.Execute(
-
- from line in File.ReadLines("in.txt") select ((object)null, line),
-
- t => from Match m in Regex.Matches(t.Item2, @"\w+") select (m.Value, 1),
-
- t => Enumerable.Repeat((t.Item1, t.Item2.Count()), 1),
-
- t => Console.WriteLine($"{t.Item2,5}\t{t.Item1}")
- );
The Reactive Extensions for .NET を使った場合
- public sealed class MapReduce {
- public static void Execute<K1, V1, K2, V2, K3, V3>(
- IObservable<(K1, V1)> read,
- Func<(K1, V1), IObservable<(K2, V2)>> map,
- Func<(K2, IObservable<V2>), IObservable<(K3, V3)>> reduce,
- Action<(K3, V3)> write) {
- var mapped = ProcessParallel("map", read, map);
- var shuffled = from lookup in mapped.ToLookup(t => t.Item1, t => t.Item2)
- from grp in lookup
- select (grp.Key, grp.ToObservable());
- var reduced = ProcessParallel("reduce", shuffled, reduce);
- foreach (var t in reduced.ToEnumerable()) {
- write(t);
- }
- }
- private static IObservable<(K2, V2)> ProcessParallel<K1, V1, K2, V2>(
- string processName,
- IObservable<(K1, V1)> input,
- Func<(K1, V1), IObservable<(K2, V2)>> process) {
- Func<(K1, V1), IObservable<(K2, V2)>> f = t => {
- Console.WriteLine($"{processName} start key=[{t.Item1}] value=[{t.Item2}]");
- try {
- System.Threading.Thread.Sleep(200);
- return process(t);
- } finally {
- Console.WriteLine($"{processName} end");
- }
- };
- return input.SelectMany(f.ToAsync()).Merge();
- }
- }
- MapReduce.Execute(
-
- from line in File.ReadLines("in.txt").ToObservable() select ((object)null, line),
-
- t => from m in Regex.Matches(t.Item2, @"\w+").ToObservable() select (m.Value, 1),
-
- t => t.Item2.Count().Select(n => (t.Item1, n)),
-
- t => Console.WriteLine($"{t.Item2,5}\t{t.Item1}")
- );