Skip to content

Commit d0e5b7a

Browse files
committed
2 parents 65baed7 + bed9477 commit d0e5b7a

File tree

7 files changed

+420
-0
lines changed

7 files changed

+420
-0
lines changed

‎LICENSE‎

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
The MIT License (MIT)
2+
3+
Copyright (c) 2013 Raoul-Gabriel Urma, Mario Fusco, Alan Mycroft
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
SOFTWARE.
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
packagelambdasinaction.chap6;
2+
3+
importjava.util.concurrent.RecursiveTask;
4+
importjava.util.concurrent.ForkJoinPool;
5+
importjava.util.concurrent.ForkJoinTask;
6+
importjava.util.stream.LongStream;
7+
8+
importstaticlambdasinaction.chap6.ParallelStreamsHarness.FORK_JOIN_POOL;
9+
10+
publicclassForkJoinSumCalculatorextendsRecursiveTask<Long>{
11+
12+
publicstaticfinallongTHRESHOLD = 10_000;
13+
14+
privatefinallong[] numbers;
15+
privatefinalintstart;
16+
privatefinalintend;
17+
18+
publicForkJoinSumCalculator(long[] numbers){
19+
this(numbers, 0, numbers.length);
20+
}
21+
22+
privateForkJoinSumCalculator(long[] numbers, intstart, intend){
23+
this.numbers = numbers;
24+
this.start = start;
25+
this.end = end;
26+
}
27+
28+
@Override
29+
protectedLongcompute(){
30+
intlength = end - start;
31+
if (length <= THRESHOLD){
32+
returncomputeSequentially();
33+
}
34+
ForkJoinSumCalculatorleftTask = newForkJoinSumCalculator(numbers, start, start + length/2);
35+
leftTask.fork();
36+
ForkJoinSumCalculatorrightTask = newForkJoinSumCalculator(numbers, start + length/2, end);
37+
LongrightResult = rightTask.compute();
38+
LongleftResult = leftTask.join();
39+
returnleftResult + rightResult;
40+
}
41+
42+
privatelongcomputeSequentially(){
43+
longsum = 0;
44+
for (inti = start; i < end; i++){
45+
sum += numbers[i];
46+
}
47+
returnsum;
48+
}
49+
50+
publicstaticlongforkJoinSum(longn){
51+
long[] numbers = LongStream.rangeClosed(1, n).toArray();
52+
ForkJoinTask<Long> task = newForkJoinSumCalculator(numbers);
53+
returnFORK_JOIN_POOL.invoke(task);
54+
}
55+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
packagelambdasinaction.chap6;
2+
3+
importjava.util.stream.*;
4+
5+
publicclassParallelStreams{
6+
7+
publicstaticlongsum(longn){
8+
returnStream.iterate(1L, i -> i + 1).limit(n).reduce(Long::sum).get();
9+
}
10+
11+
publicstaticlongparallelSum(longn){
12+
returnStream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(Long::sum).get();
13+
}
14+
15+
publicstaticlongrangedSum(longn){
16+
returnLongStream.rangeClosed(1, n).reduce(Long::sum).getAsLong();
17+
}
18+
19+
publicstaticlongparallelRangedSum(longn){
20+
returnLongStream.rangeClosed(1, n).parallel().reduce(Long::sum).getAsLong();
21+
}
22+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
packagelambdasinaction.chap6;
2+
3+
importjava.util.concurrent.*;
4+
importjava.util.function.*;
5+
6+
publicclassParallelStreamsHarness{
7+
8+
publicstaticfinalForkJoinPoolFORK_JOIN_POOL = newForkJoinPool();
9+
10+
publicstaticvoidmain(String[] args){
11+
System.out.println("Sum done in: " + measurePerf(ParallelStreams::sum, 10_000_000L) + " msecs");
12+
System.out.println("Parallel forkJoinSum done in: " + measurePerf(ParallelStreams::parallelSum, 10_000_000L) + " msecs" );
13+
System.out.println("Range forkJoinSum done in: " + measurePerf(ParallelStreams::rangedSum, 10_000_000L) + " msecs");
14+
System.out.println("Parallel range forkJoinSum done in: " + measurePerf(ParallelStreams::parallelRangedSum, 10_000_000L) + " msecs" );
15+
System.out.println("ForkJoin sum done in: " + measurePerf(ForkJoinSumCalculator::forkJoinSum, 10_000_000L) + " msecs" );
16+
}
17+
18+
publicstatic <T, R> longmeasurePerf(Function<T, R> f, Tinput){
19+
longfastest = Long.MAX_VALUE;
20+
for (inti = 0; i < 10; i++){
21+
longstart = System.nanoTime();
22+
Rresult = f.apply(input);
23+
longduration = (System.nanoTime() - start) / 1_000_000;
24+
System.out.println("Result: " + result);
25+
if (duration < fastest) fastest = duration;
26+
}
27+
returnfastest;
28+
}
29+
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
packagelambdasinaction.chap6;
2+
3+
importjava.util.*;
4+
importjava.util.concurrent.*;
5+
importjava.util.function.*;
6+
importjava.util.stream.*;
7+
8+
/**
9+
* Adapted from http://mail.openjdk.java.net/pipermail/lambda-dev/2013-November/011516.html
10+
*/
11+
publicclassStreamForker<T>{
12+
13+
privatefinalStream<T> stream;
14+
privatefinalMap<Object, Function<Stream<T>, ?>> forks = newHashMap<>();
15+
16+
publicStreamForker(Stream<T> stream){
17+
this.stream = stream;
18+
}
19+
20+
publicStreamForker<T> fork(Objectkey, Function<Stream<T>, ?> f){
21+
forks.put(key, f);
22+
returnthis;
23+
}
24+
25+
publicResultsgetResults(){
26+
ForkingStreamConsumer<T> consumer = build();
27+
try{
28+
stream.sequential().forEach(consumer);
29+
} finally{
30+
consumer.finish();
31+
}
32+
returnconsumer;
33+
}
34+
35+
privateForkingStreamConsumer<T> build(){
36+
List<BlockingQueue<T>> queues = newArrayList<>();
37+
38+
Map<Object, Future<?>> actions =
39+
forks.entrySet().stream().reduce(
40+
newHashMap<Object, Future<?>>(),
41+
(map, e) ->{
42+
map.put(e.getKey(),
43+
getOperationResult(queues, e.getValue()));
44+
returnmap;
45+
},
46+
(m1, m2) ->{
47+
m1.putAll(m2);
48+
returnm1;
49+
});
50+
51+
returnnewForkingStreamConsumer<>(queues, actions);
52+
}
53+
54+
privateFuture<?> getOperationResult(List<BlockingQueue<T>> queues, Function<Stream<T>, ?> f){
55+
BlockingQueue<T> queue = newLinkedBlockingQueue<>();
56+
queues.add(queue);
57+
Spliterator<T> spliterator = newBlockingQueueSpliterator<>(queue);
58+
Stream<T> source = StreamSupport.stream(spliterator, false);
59+
returnCompletableFuture.supplyAsync( () -> f.apply(source) );
60+
}
61+
62+
publicstaticinterfaceResults{
63+
public <R> Rget(Objectkey);
64+
}
65+
66+
privatestaticclassForkingStreamConsumer<T> implementsConsumer<T>, Results{
67+
staticfinalObjectEND_OF_STREAM = newObject();
68+
69+
privatefinalList<BlockingQueue<T>> queues;
70+
privatefinalMap<Object, Future<?>> actions;
71+
72+
ForkingStreamConsumer(List<BlockingQueue<T>> queues, Map<Object, Future<?>> actions){
73+
this.queues = queues;
74+
this.actions = actions;
75+
}
76+
77+
@Override
78+
publicvoidaccept(Tt){
79+
queues.forEach(q -> q.add(t));
80+
}
81+
82+
@Override
83+
public <R> Rget(Objectkey){
84+
try{
85+
return ((Future<R>) actions.get(key)).get();
86+
} catch (Exceptione){
87+
thrownewRuntimeException(e);
88+
}
89+
}
90+
91+
voidfinish(){
92+
accept((T) END_OF_STREAM);
93+
}
94+
}
95+
96+
privatestaticclassBlockingQueueSpliterator<T> implementsSpliterator<T>{
97+
privatefinalBlockingQueue<T> q;
98+
99+
BlockingQueueSpliterator(BlockingQueue<T> q){
100+
this.q = q;
101+
}
102+
103+
@Override
104+
publicbooleantryAdvance(Consumer<? superT> action){
105+
Tt;
106+
while (true){
107+
try{
108+
t = q.take();
109+
break;
110+
}
111+
catch (InterruptedExceptione){
112+
}
113+
}
114+
115+
if (t != ForkingStreamConsumer.END_OF_STREAM){
116+
action.accept(t);
117+
returntrue;
118+
}
119+
120+
returnfalse;
121+
}
122+
123+
@Override
124+
publicSpliterator<T> trySplit(){
125+
returnnull;
126+
}
127+
128+
@Override
129+
publiclongestimateSize(){
130+
return0;
131+
}
132+
133+
@Override
134+
publicintcharacteristics(){
135+
return0;
136+
}
137+
}
138+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
packagelambdasinaction.chap6;
2+
3+
importlambdasinaction.chap5.*;
4+
5+
importstaticjava.util.stream.Collectors.*;
6+
importstaticlambdasinaction.chap5.Dish.menu;
7+
8+
importjava.util.*;
9+
importjava.util.stream.*;
10+
11+
publicclassStreamForkerExample{
12+
13+
publicstaticvoidmain(String[] args) throwsException{
14+
processMenu();
15+
}
16+
17+
privatestaticvoidprocessMenu(){
18+
Stream<Dish> menuStream = menu.stream();
19+
20+
StreamForker.Resultsresults = newStreamForker<Dish>(menuStream)
21+
.fork("shortMenu", s -> s.map(Dish::getName).collect(joining(", ")))
22+
.fork("totalCalories", s -> s.mapToInt(Dish::getCalories).sum())
23+
.fork("mostCaloricDish", s -> s.collect(
24+
reducing((d1, d2) -> d1.getCalories() > d2.getCalories() ? d1 : d2))
25+
.get())
26+
.fork("dishesByType", s -> s.collect(groupingBy(Dish::getType)))
27+
.getResults();
28+
29+
StringshortMeny = results.get("shortMenu");
30+
inttotalCalories = results.get("totalCalories");
31+
DishmostCaloricDish = results.get("mostCaloricDish");
32+
Map<Dish.Type, List<Dish>> dishesByType = results.get("dishesByType");
33+
34+
System.out.println("Short menu: " + shortMeny);
35+
System.out.println("Total calories: " + totalCalories);
36+
System.out.println("Most caloric dish: " + mostCaloricDish);
37+
System.out.println("Dishes by type: " + dishesByType);
38+
}
39+
}

0 commit comments

Comments
(0)