feature: Added FanOut/FanIn Pattern (#1800)

* Added FanOut/FanIn Pattern (#8)

* #1627 adding fanout-fanin pattern

* #1627 adding class diagram image

* #1627 adding readme

* #1627 adding license

* #1627 updating relations

* #1627 interrupting the thread

* #1627 fixing sonar issues

* #1627 fixing sonar issues

* #1627 adding more info in README.md

* Added FanOut/FanIn (#9)

* #1627 adding fanout-fanin pattern

* #1627 adding class diagram image

* #1627 adding readme

* #1627 adding license

* #1627 updating relations

* #1627 interrupting the thread

* #1627 fixing sonar issues

* #1627 fixing sonar issues

* #1627 adding more info in README.md

* #1627 adding programmatic examples in README.md
This commit is contained in:
karthikbhat13
2021-08-08 13:51:27 +01:00
committed by GitHub
parent c5a4068e84
commit d87e8cf10d
14 changed files with 646 additions and 0 deletions

View File

@ -0,0 +1,74 @@
/*
* The MIT License
* Copyright © 2014-2021 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.fanout.fanin;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
/**
* FanOut/FanIn pattern is a concurrency pattern that refers to executing multiple instances of the
* activity function concurrently. The "fan out" part is essentially splitting the data into
* multiple chunks and then calling the activity function multiple times, passing the chunks.
*
* <p>When each chunk has been processed, the "fan in" takes place that aggregates results from each
* instance of function and forms a single final result.
*
* <p>This pattern is only really useful if you can “chunk” the workload in a meaningful way for
* splitting up to be processed in parallel.
*/
@Slf4j
public class App {
/**
* Entry point.
*
* <p>Implementation provided has a list of numbers that has to be squared and added. The list can
* be chunked in any way and the "activity function" {@link
* SquareNumberRequest#delayedSquaring(Consumer)} i.e. squaring the number ca be done
* concurrently. The "fan in" part is handled by the {@link Consumer} that takes in the result
* from each instance of activity and aggregates it whenever that particular activity function
* gets over.
*/
public static void main(String[] args) {
final List<Long> numbers = Arrays.asList(1L, 3L, 4L, 7L, 8L);
LOGGER.info("Numbers to be squared and get sum --> {}", numbers);
final List<SquareNumberRequest> requests =
numbers.stream().map(SquareNumberRequest::new).collect(Collectors.toList());
var consumer = new Consumer(0L);
// Pass the request and the consumer to fanOutFanIn or sometimes referred as Orchestrator
// function
final Long sumOfSquaredNumbers = FanOutFanIn.fanOutFanIn(requests, consumer);
LOGGER.info("Sum of all squared numbers --> {}", sumOfSquaredNumbers);
}
}

View File

@ -0,0 +1,48 @@
/*
* The MIT License
* Copyright © 2014-2021 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.fanout.fanin;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Getter;
/**
* Consumer or callback class that will be called everytime a request is complete This will
* aggregate individual result to form a final result.
*/
@Getter
public class Consumer {
private final AtomicLong sumOfSquaredNumbers;
Consumer(Long init) {
sumOfSquaredNumbers = new AtomicLong(init);
}
public Long add(final Long num) {
return sumOfSquaredNumbers.addAndGet(num);
}
}

View File

@ -0,0 +1,62 @@
/*
* The MIT License
* Copyright © 2014-2021 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.fanout.fanin;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
/**
* FanOutFanIn class processes long running requests, when any of the processes gets over, result is
* passed over to the consumer or the callback function. Consumer will aggregate the results as they
* keep on completing.
*/
public class FanOutFanIn {
/**
* the main fanOutFanIn function or orchestrator function.
* @param requests List of numbers that need to be squared and summed up
* @param consumer Takes in the squared number from {@link SquareNumberRequest} and sums it up
* @return Aggregated sum of all squared numbers.
*/
public static Long fanOutFanIn(
final List<SquareNumberRequest> requests, final Consumer consumer) {
ExecutorService service = Executors.newFixedThreadPool(requests.size());
// fanning out
List<CompletableFuture<Void>> futures =
requests.stream()
.map(
request ->
CompletableFuture.runAsync(() -> request.delayedSquaring(consumer), service))
.collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
return consumer.getSumOfSquaredNumbers().get();
}
}

View File

@ -0,0 +1,63 @@
/*
* The MIT License
* Copyright © 2014-2021 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.fanout.fanin;
import java.security.SecureRandom;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* Squares the number with a little timeout to give impression of long running process that return
* at different times.
*/
@Slf4j
@AllArgsConstructor
public class SquareNumberRequest {
private final Long number;
/**
* Squares the number with a little timeout to give impression of long running process that return
* at different times.
* @param consumer callback class that takes the result after the delay.
* */
public void delayedSquaring(final Consumer consumer) {
var minTimeOut = 5000L;
SecureRandom secureRandom = new SecureRandom();
var randomTimeOut = secureRandom.nextInt(2000);
try {
// this will make the thread sleep from 5-7s.
Thread.sleep(minTimeOut + randomTimeOut);
} catch (InterruptedException e) {
LOGGER.error("Exception while sleep ", e);
Thread.currentThread().interrupt();
} finally {
consumer.add(number * number);
}
}
}

View File

@ -0,0 +1,36 @@
/*
* The MIT License
* Copyright © 2014-2021 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.fanout.fanin;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
class AppTest {
@Test
void shouldLaunchApp() {
assertDoesNotThrow(() -> App.main(new String[]{}));
}
}

View File

@ -0,0 +1,47 @@
/*
* The MIT License
* Copyright © 2014-2021 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.fanout.fanin;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
class FanOutFanInTest {
@Test
void fanOutFanInTest() {
final List<Long> numbers = Arrays.asList(1L, 3L, 4L, 7L, 8L);
final List<SquareNumberRequest> requests =
numbers.stream().map(SquareNumberRequest::new).collect(Collectors.toList());
final Consumer consumer = new Consumer(0L);
final Long sumOfSquaredNumbers = FanOutFanIn.fanOutFanIn(requests, consumer);
Assertions.assertEquals(139, sumOfSquaredNumbers);
}
}

View File

@ -0,0 +1,41 @@
/*
* The MIT License
* Copyright © 2014-2021 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.fanout.fanin;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
class SquareNumberRequestTest {
@Test
void delayedSquaringTest() {
Consumer consumer = new Consumer(10L);
SquareNumberRequest squareNumberRequest = new SquareNumberRequest(5L);
squareNumberRequest.delayedSquaring(consumer);
Assertions.assertEquals(35, consumer.getSumOfSquaredNumbers().get());
}
}