Compare commits
1 Commits
updateThro
...
event-aggr
Author | SHA1 | Date | |
---|---|---|---|
2ddb24e698 |
@ -9,6 +9,10 @@ tags:
|
|||||||
- Reactive
|
- Reactive
|
||||||
---
|
---
|
||||||
|
|
||||||
|
## Name
|
||||||
|
|
||||||
|
Event Aggregator
|
||||||
|
|
||||||
## Intent
|
## Intent
|
||||||
A system with lots of objects can lead to complexities when a
|
A system with lots of objects can lead to complexities when a
|
||||||
client wants to subscribe to events. The client has to find and register for
|
client wants to subscribe to events. The client has to find and register for
|
||||||
@ -17,6 +21,136 @@ requires a separate subscription. An Event Aggregator acts as a single source
|
|||||||
of events for many objects. It registers for all the events of the many objects
|
of events for many objects. It registers for all the events of the many objects
|
||||||
allowing clients to register with just the aggregator.
|
allowing clients to register with just the aggregator.
|
||||||
|
|
||||||
|
## Explanation
|
||||||
|
|
||||||
|
Real-world example
|
||||||
|
|
||||||
|
> King Joffrey sits on the iron throne and rules the seven kingdoms of Westeros. He receives most
|
||||||
|
> of his critical information from King's Hand, the second in command. King's hand has many
|
||||||
|
> close advisors himself, feeding him with relevant information about events occurring in the
|
||||||
|
> kingdom.
|
||||||
|
|
||||||
|
In Plain Words
|
||||||
|
|
||||||
|
> Event Aggregator is an event mediator that collects events from multiple sources and delivers
|
||||||
|
> them to registered observers.
|
||||||
|
|
||||||
|
**Programmatic Example**
|
||||||
|
|
||||||
|
In our programmatic example, we demonstrate the implementation of an event aggregator pattern. Some of
|
||||||
|
the objects are event listeners, some are event emitters, and the event aggregator does both.
|
||||||
|
|
||||||
|
```java
|
||||||
|
public interface EventObserver {
|
||||||
|
void onEvent(Event e);
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract class EventEmitter {
|
||||||
|
|
||||||
|
private final Map<Event, List<EventObserver>> observerLists;
|
||||||
|
|
||||||
|
public EventEmitter() {
|
||||||
|
observerLists = new HashMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
public final void registerObserver(EventObserver obs, Event e) {
|
||||||
|
...
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void notifyObservers(Event e) {
|
||||||
|
...
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
`KingJoffrey` is listening to events from `KingsHand`.
|
||||||
|
|
||||||
|
```java
|
||||||
|
@Slf4j
|
||||||
|
public class KingJoffrey implements EventObserver {
|
||||||
|
@Override
|
||||||
|
public void onEvent(Event e) {
|
||||||
|
LOGGER.info("Received event from the King's Hand: {}", e.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
`KingsHand` is listening to events from his subordinates `LordBaelish`, `LordVarys`, and `Scout`.
|
||||||
|
Whatever he hears from them, he delivers to `KingJoffrey`.
|
||||||
|
|
||||||
|
```java
|
||||||
|
public class KingsHand extends EventEmitter implements EventObserver {
|
||||||
|
|
||||||
|
public KingsHand() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public KingsHand(EventObserver obs, Event e) {
|
||||||
|
super(obs, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onEvent(Event e) {
|
||||||
|
notifyObservers(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
For example, `LordVarys` finds a traitor every Sunday and notifies the `KingsHand`.
|
||||||
|
|
||||||
|
```java
|
||||||
|
@Slf4j
|
||||||
|
public class LordVarys extends EventEmitter implements EventObserver {
|
||||||
|
@Override
|
||||||
|
public void timePasses(Weekday day) {
|
||||||
|
if (day == Weekday.SATURDAY) {
|
||||||
|
notifyObservers(Event.TRAITOR_DETECTED);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
The following snippet demonstrates how the objects are constructed and wired together.
|
||||||
|
|
||||||
|
```java
|
||||||
|
var kingJoffrey = new KingJoffrey();
|
||||||
|
|
||||||
|
var kingsHand = new KingsHand();
|
||||||
|
kingsHand.registerObserver(kingJoffrey, Event.TRAITOR_DETECTED);
|
||||||
|
kingsHand.registerObserver(kingJoffrey, Event.STARK_SIGHTED);
|
||||||
|
kingsHand.registerObserver(kingJoffrey, Event.WARSHIPS_APPROACHING);
|
||||||
|
kingsHand.registerObserver(kingJoffrey, Event.WHITE_WALKERS_SIGHTED);
|
||||||
|
|
||||||
|
var varys = new LordVarys();
|
||||||
|
varys.registerObserver(kingsHand, Event.TRAITOR_DETECTED);
|
||||||
|
varys.registerObserver(kingsHand, Event.WHITE_WALKERS_SIGHTED);
|
||||||
|
|
||||||
|
var scout = new Scout();
|
||||||
|
scout.registerObserver(kingsHand, Event.WARSHIPS_APPROACHING);
|
||||||
|
scout.registerObserver(varys, Event.WHITE_WALKERS_SIGHTED);
|
||||||
|
|
||||||
|
var baelish = new LordBaelish(kingsHand, Event.STARK_SIGHTED);
|
||||||
|
|
||||||
|
var emitters = List.of(
|
||||||
|
kingsHand,
|
||||||
|
baelish,
|
||||||
|
varys,
|
||||||
|
scout
|
||||||
|
);
|
||||||
|
|
||||||
|
Arrays.stream(Weekday.values())
|
||||||
|
.<Consumer<? super EventEmitter>>map(day -> emitter -> emitter.timePasses(day))
|
||||||
|
.forEachOrdered(emitters::forEach);
|
||||||
|
```
|
||||||
|
|
||||||
|
The console output after running the example.
|
||||||
|
|
||||||
|
```
|
||||||
|
18:21:52.955 [main] INFO com.iluwatar.event.aggregator.KingJoffrey - Received event from the King's Hand: Warships approaching
|
||||||
|
18:21:52.960 [main] INFO com.iluwatar.event.aggregator.KingJoffrey - Received event from the King's Hand: White walkers sighted
|
||||||
|
18:21:52.960 [main] INFO com.iluwatar.event.aggregator.KingJoffrey - Received event from the King's Hand: Stark sighted
|
||||||
|
18:21:52.960 [main] INFO com.iluwatar.event.aggregator.KingJoffrey - Received event from the King's Hand: Traitor detected
|
||||||
|
```
|
||||||
|
|
||||||
## Class diagram
|
## Class diagram
|
||||||

|

|
||||||
|
|
||||||
@ -26,9 +160,13 @@ Use the Event Aggregator pattern when
|
|||||||
* Event Aggregator is a good choice when you have lots of objects that are
|
* Event Aggregator is a good choice when you have lots of objects that are
|
||||||
potential event sources. Rather than have the observer deal with registering
|
potential event sources. Rather than have the observer deal with registering
|
||||||
with them all, you can centralize the registration logic to the Event
|
with them all, you can centralize the registration logic to the Event
|
||||||
Aggregator. As well as simplifying registration, a Event Aggregator also
|
Aggregator. As well as simplifying registration, an Event Aggregator also
|
||||||
simplifies the memory management issues in using observers.
|
simplifies the memory management issues in using observers.
|
||||||
|
|
||||||
|
## Related patterns
|
||||||
|
|
||||||
|
* [Observer](https://java-design-patterns.com/patterns/observer/)
|
||||||
|
|
||||||
## Credits
|
## Credits
|
||||||
|
|
||||||
* [Martin Fowler - Event Aggregator](http://martinfowler.com/eaaDev/EventAggregator.html)
|
* [Martin Fowler - Event Aggregator](http://martinfowler.com/eaaDev/EventAggregator.html)
|
||||||
|
@ -18,11 +18,11 @@ threads and eliminating the latency of creating new threads.
|
|||||||
|
|
||||||
## Explanation
|
## Explanation
|
||||||
|
|
||||||
Real-world example
|
Real world example
|
||||||
|
|
||||||
> We have a large number of relatively short tasks at hand. We need to peel huge amounts of potatoes
|
> We have a large number of relatively short tasks at hand. We need to peel huge amounts of potatoes
|
||||||
> and serve a mighty amount of coffee cups. Creating a new thread for each task would be a waste so
|
> and serve mighty amount of coffee cups. Creating a new thread for each task would be a waste so we
|
||||||
> we establish a thread pool.
|
> establish a thread pool.
|
||||||
|
|
||||||
In plain words
|
In plain words
|
||||||
|
|
||||||
@ -99,7 +99,7 @@ public class PotatoPeelingTask extends Task {
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
Next, we present a runnable `Worker` class that the thread pool will utilize to handle all the potato
|
Next we present a runnable `Worker` class that the thread pool will utilize to handle all the potato
|
||||||
peeling and coffee making.
|
peeling and coffee making.
|
||||||
|
|
||||||
```java
|
```java
|
||||||
|
@ -16,12 +16,10 @@ Ensure that a given client is not able to access service resources more than the
|
|||||||
|
|
||||||
## Explanation
|
## Explanation
|
||||||
|
|
||||||
Real-world example
|
Real world example
|
||||||
|
|
||||||
> A young human and an old dwarf walk into a bar. They start ordering beers from the bartender.
|
> A large multinational corporation offers API to its customers. The API is rate-limited and each
|
||||||
> The bartender immediately sees that the young human shouldn't consume too many drinks too fast
|
> customer can only make certain amount of calls per second.
|
||||||
> and refuses to serve if enough time has not passed. For the old dwarf, the serving rate can
|
|
||||||
> be higher.
|
|
||||||
|
|
||||||
In plain words
|
In plain words
|
||||||
|
|
||||||
@ -35,25 +33,30 @@ In plain words
|
|||||||
|
|
||||||
**Programmatic Example**
|
**Programmatic Example**
|
||||||
|
|
||||||
`BarCustomer` class presents the clients of the `Bartender` API. `CallsCount` tracks the number of
|
Tenant class presents the clients of the API. CallsCount tracks the number of API calls per tenant.
|
||||||
calls per `BarCustomer`.
|
|
||||||
|
|
||||||
```java
|
```java
|
||||||
public class BarCustomer {
|
public class Tenant {
|
||||||
|
|
||||||
@Getter
|
private final String name;
|
||||||
private final String name;
|
private final int allowedCallsPerSecond;
|
||||||
@Getter
|
|
||||||
private final int allowedCallsPerSecond;
|
|
||||||
|
|
||||||
public BarCustomer(String name, int allowedCallsPerSecond, CallsCount callsCount) {
|
public Tenant(String name, int allowedCallsPerSecond, CallsCount callsCount) {
|
||||||
if (allowedCallsPerSecond < 0) {
|
if (allowedCallsPerSecond < 0) {
|
||||||
throw new InvalidParameterException("Number of calls less than 0 not allowed");
|
throw new InvalidParameterException("Number of calls less than 0 not allowed");
|
||||||
}
|
|
||||||
this.name = name;
|
|
||||||
this.allowedCallsPerSecond = allowedCallsPerSecond;
|
|
||||||
callsCount.addTenant(name);
|
|
||||||
}
|
}
|
||||||
|
this.name = name;
|
||||||
|
this.allowedCallsPerSecond = allowedCallsPerSecond;
|
||||||
|
callsCount.addTenant(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getName() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getAllowedCallsPerSecond() {
|
||||||
|
return allowedCallsPerSecond;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@ -73,14 +76,14 @@ public final class CallsCount {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void reset() {
|
public void reset() {
|
||||||
|
LOGGER.debug("Resetting the map.");
|
||||||
tenantCallsCount.replaceAll((k, v) -> new AtomicLong(0));
|
tenantCallsCount.replaceAll((k, v) -> new AtomicLong(0));
|
||||||
LOGGER.info("reset counters");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
Next, the service that the tenants are calling is introduced. To track the call count, a throttler
|
Next we introduce the service that the tenants are calling. To track the call count we use the
|
||||||
timer is used.
|
throttler timer.
|
||||||
|
|
||||||
```java
|
```java
|
||||||
public interface Throttler {
|
public interface Throttler {
|
||||||
@ -108,103 +111,71 @@ public class ThrottleTimerImpl implements Throttler {
|
|||||||
}, 0, throttlePeriod);
|
}, 0, throttlePeriod);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
|
||||||
|
|
||||||
`Bartender` offers the `orderDrink` service to the `BarCustomer`s. The customers probably don't
|
class B2BService {
|
||||||
know that the beer serving rate is limited by their appearances.
|
|
||||||
|
|
||||||
```java
|
private static final Logger LOGGER = LoggerFactory.getLogger(B2BService.class);
|
||||||
class Bartender {
|
private final CallsCount callsCount;
|
||||||
|
|
||||||
private static final Logger LOGGER = LoggerFactory.getLogger(Bartender.class);
|
public B2BService(Throttler timer, CallsCount callsCount) {
|
||||||
private final CallsCount callsCount;
|
this.callsCount = callsCount;
|
||||||
|
timer.start();
|
||||||
|
}
|
||||||
|
|
||||||
public Bartender(Throttler timer, CallsCount callsCount) {
|
public int dummyCustomerApi(Tenant tenant) {
|
||||||
this.callsCount = callsCount;
|
var tenantName = tenant.getName();
|
||||||
timer.start();
|
var count = callsCount.getCount(tenantName);
|
||||||
|
LOGGER.debug("Counter for {} : {} ", tenant.getName(), count);
|
||||||
|
if (count >= tenant.getAllowedCallsPerSecond()) {
|
||||||
|
LOGGER.error("API access per second limit reached for: {}", tenantName);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
callsCount.incrementCount(tenantName);
|
||||||
|
return getRandomCustomerId();
|
||||||
|
}
|
||||||
|
|
||||||
public int orderDrink(BarCustomer barCustomer) {
|
private int getRandomCustomerId() {
|
||||||
var tenantName = barCustomer.getName();
|
return ThreadLocalRandom.current().nextInt(1, 10000);
|
||||||
var count = callsCount.getCount(tenantName);
|
}
|
||||||
if (count >= barCustomer.getAllowedCallsPerSecond()) {
|
|
||||||
LOGGER.error("I'm sorry {}, you've had enough for today!", tenantName);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
callsCount.incrementCount(tenantName);
|
|
||||||
LOGGER.debug("Serving beer to {} : [{} consumed] ", barCustomer.getName(), count+1);
|
|
||||||
return getRandomCustomerId();
|
|
||||||
}
|
|
||||||
|
|
||||||
private int getRandomCustomerId() {
|
|
||||||
return ThreadLocalRandom.current().nextInt(1, 10000);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
Now it is possible to see the full example in action. `BarCustomer` young human is rate-limited to 2
|
Now we are ready to see the full example in action. Tenant Adidas is rate-limited to 5 calls per
|
||||||
calls per second and the old dwarf to 4.
|
second and Nike to 6.
|
||||||
|
|
||||||
```java
|
```java
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
var callsCount = new CallsCount();
|
var callsCount = new CallsCount();
|
||||||
var human = new BarCustomer("young human", 2, callsCount);
|
var adidas = new Tenant("Adidas", 5, callsCount);
|
||||||
var dwarf = new BarCustomer("dwarf soldier", 4, callsCount);
|
var nike = new Tenant("Nike", 6, callsCount);
|
||||||
|
|
||||||
var executorService = Executors.newFixedThreadPool(2);
|
var executorService = Executors.newFixedThreadPool(2);
|
||||||
|
executorService.execute(() -> makeServiceCalls(adidas, callsCount));
|
||||||
executorService.execute(() -> makeServiceCalls(human, callsCount));
|
executorService.execute(() -> makeServiceCalls(nike, callsCount));
|
||||||
executorService.execute(() -> makeServiceCalls(dwarf, callsCount));
|
|
||||||
|
|
||||||
executorService.shutdown();
|
executorService.shutdown();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
executorService.awaitTermination(10, TimeUnit.SECONDS);
|
executorService.awaitTermination(10, TimeUnit.SECONDS);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOGGER.error("Executor service terminated: {}", e.getMessage());
|
LOGGER.error("Executor Service terminated: {}", e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void makeServiceCalls(BarCustomer barCustomer, CallsCount callsCount) {
|
private static void makeServiceCalls(Tenant tenant, CallsCount callsCount) {
|
||||||
var timer = new ThrottleTimerImpl(1000, callsCount);
|
var timer = new ThrottleTimerImpl(10, callsCount);
|
||||||
var service = new Bartender(timer, callsCount);
|
var service = new B2BService(timer, callsCount);
|
||||||
// Sleep is introduced to keep the output in check and easy to view and analyze the results.
|
// Sleep is introduced to keep the output in check and easy to view and analyze the results.
|
||||||
IntStream.range(0, 50).forEach(i -> {
|
IntStream.range(0, 20).forEach(i -> {
|
||||||
service.orderDrink(barCustomer);
|
service.dummyCustomerApi(tenant);
|
||||||
try {
|
try {
|
||||||
Thread.sleep(100);
|
Thread.sleep(1);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOGGER.error("Thread interrupted: {}", e.getMessage());
|
LOGGER.error("Thread interrupted: {}", e.getMessage());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
An excerpt from the example's console output:
|
|
||||||
|
|
||||||
```
|
|
||||||
18:46:36.218 [Timer-0] INFO com.iluwatar.throttling.CallsCount - reset counters
|
|
||||||
18:46:36.218 [Timer-1] INFO com.iluwatar.throttling.CallsCount - reset counters
|
|
||||||
18:46:36.242 [pool-1-thread-2] DEBUG com.iluwatar.throttling.Bartender - Serving beer to dwarf soldier : [1 consumed]
|
|
||||||
18:46:36.242 [pool-1-thread-1] DEBUG com.iluwatar.throttling.Bartender - Serving beer to young human : [1 consumed]
|
|
||||||
18:46:36.342 [pool-1-thread-2] DEBUG com.iluwatar.throttling.Bartender - Serving beer to dwarf soldier : [2 consumed]
|
|
||||||
18:46:36.342 [pool-1-thread-1] DEBUG com.iluwatar.throttling.Bartender - Serving beer to young human : [2 consumed]
|
|
||||||
18:46:36.443 [pool-1-thread-1] ERROR com.iluwatar.throttling.Bartender - I'm sorry young human, you've had enough for today!
|
|
||||||
18:46:36.443 [pool-1-thread-2] DEBUG com.iluwatar.throttling.Bartender - Serving beer to dwarf soldier : [3 consumed]
|
|
||||||
18:46:36.544 [pool-1-thread-1] ERROR com.iluwatar.throttling.Bartender - I'm sorry young human, you've had enough for today!
|
|
||||||
18:46:36.544 [pool-1-thread-2] DEBUG com.iluwatar.throttling.Bartender - Serving beer to dwarf soldier : [4 consumed]
|
|
||||||
18:46:36.645 [pool-1-thread-2] ERROR com.iluwatar.throttling.Bartender - I'm sorry dwarf soldier, you've had enough for today!
|
|
||||||
18:46:36.645 [pool-1-thread-1] ERROR com.iluwatar.throttling.Bartender - I'm sorry young human, you've had enough for today!
|
|
||||||
18:46:36.745 [pool-1-thread-1] ERROR com.iluwatar.throttling.Bartender - I'm sorry young human, you've had enough for today!
|
|
||||||
18:46:36.745 [pool-1-thread-2] ERROR com.iluwatar.throttling.Bartender - I'm sorry dwarf soldier, you've had enough for today!
|
|
||||||
18:46:36.846 [pool-1-thread-1] ERROR com.iluwatar.throttling.Bartender - I'm sorry young human, you've had enough for today!
|
|
||||||
18:46:36.846 [pool-1-thread-2] ERROR com.iluwatar.throttling.Bartender - I'm sorry dwarf soldier, you've had enough for today!
|
|
||||||
18:46:36.947 [pool-1-thread-2] ERROR com.iluwatar.throttling.Bartender - I'm sorry dwarf soldier, you've had enough for today!
|
|
||||||
18:46:36.947 [pool-1-thread-1] ERROR com.iluwatar.throttling.Bartender - I'm sorry young human, you've had enough for today!
|
|
||||||
18:46:37.048 [pool-1-thread-2] ERROR com.iluwatar.throttling.Bartender - I'm sorry dwarf soldier, you've had enough for today!
|
|
||||||
18:46:37.048 [pool-1-thread-1] ERROR com.iluwatar.throttling.Bartender - I'm sorry young human, you've had enough for today!
|
|
||||||
18:46:37.148 [pool-1-thread-1] ERROR com.iluwatar.throttling.Bartender - I'm sorry young human, you've had enough for today!
|
|
||||||
18:46:37.148 [pool-1-thread-2] ERROR com.iluwatar.throttling.Bartender - I'm sorry dwarf soldier, you've had enough for today!
|
|
||||||
```
|
|
||||||
|
|
||||||
## Class diagram
|
## Class diagram
|
||||||
|
|
||||||
@ -214,7 +185,7 @@ An excerpt from the example's console output:
|
|||||||
|
|
||||||
The Throttling pattern should be used:
|
The Throttling pattern should be used:
|
||||||
|
|
||||||
* When service access needs to be restricted not to have high impact on the performance of the service.
|
* When a service access needs to be restricted to not have high impacts on the performance of the service.
|
||||||
* When multiple clients are consuming the same service resources and restriction has to be made according to the usage per client.
|
* When multiple clients are consuming the same service resources and restriction has to be made according to the usage per client.
|
||||||
|
|
||||||
## Credits
|
## Credits
|
||||||
|
@ -34,11 +34,11 @@ import lombok.extern.slf4j.Slf4j;
|
|||||||
* complete service by users or a particular tenant. This can allow systems to continue to function
|
* complete service by users or a particular tenant. This can allow systems to continue to function
|
||||||
* and meet service level agreements, even when an increase in demand places load on resources.
|
* and meet service level agreements, even when an increase in demand places load on resources.
|
||||||
* <p>
|
* <p>
|
||||||
* In this example there is a {@link Bartender} serving beer to {@link BarCustomer}s. This is a time
|
* In this example we have ({@link App}) as the initiating point of the service. This is a time
|
||||||
* based throttling, i.e. only a certain number of calls are allowed per second.
|
* based throttling, i.e. only a certain number of calls are allowed per second.
|
||||||
* </p>
|
* </p>
|
||||||
* ({@link BarCustomer}) is the service tenant class having a name and the number of calls allowed.
|
* ({@link Tenant}) is the Tenant POJO class with which many tenants can be created ({@link
|
||||||
* ({@link Bartender}) is the service which is consumed by the tenants and is throttled.
|
* B2BService}) is the service which is consumed by the tenants and is throttled.
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public class App {
|
public class App {
|
||||||
@ -50,35 +50,33 @@ public class App {
|
|||||||
*/
|
*/
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
var callsCount = new CallsCount();
|
var callsCount = new CallsCount();
|
||||||
var human = new BarCustomer("young human", 2, callsCount);
|
var adidas = new Tenant("Adidas", 5, callsCount);
|
||||||
var dwarf = new BarCustomer("dwarf soldier", 4, callsCount);
|
var nike = new Tenant("Nike", 6, callsCount);
|
||||||
|
|
||||||
var executorService = Executors.newFixedThreadPool(2);
|
var executorService = Executors.newFixedThreadPool(2);
|
||||||
|
|
||||||
executorService.execute(() -> makeServiceCalls(human, callsCount));
|
executorService.execute(() -> makeServiceCalls(adidas, callsCount));
|
||||||
executorService.execute(() -> makeServiceCalls(dwarf, callsCount));
|
executorService.execute(() -> makeServiceCalls(nike, callsCount));
|
||||||
|
|
||||||
executorService.shutdown();
|
executorService.shutdown();
|
||||||
try {
|
try {
|
||||||
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
|
executorService.awaitTermination(10, TimeUnit.SECONDS);
|
||||||
executorService.shutdownNow();
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
executorService.shutdownNow();
|
LOGGER.error("Executor Service terminated: {}", e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Make calls to the bartender.
|
* Make calls to the B2BService dummy API.
|
||||||
*/
|
*/
|
||||||
private static void makeServiceCalls(BarCustomer barCustomer, CallsCount callsCount) {
|
private static void makeServiceCalls(Tenant tenant, CallsCount callsCount) {
|
||||||
var timer = new ThrottleTimerImpl(1000, callsCount);
|
var timer = new ThrottleTimerImpl(10, callsCount);
|
||||||
var service = new Bartender(timer, callsCount);
|
var service = new B2BService(timer, callsCount);
|
||||||
// Sleep is introduced to keep the output in check and easy to view and analyze the results.
|
// Sleep is introduced to keep the output in check and easy to view and analyze the results.
|
||||||
IntStream.range(0, 50).forEach(i -> {
|
IntStream.range(0, 20).forEach(i -> {
|
||||||
service.orderDrink(barCustomer);
|
service.dummyCustomerApi(tenant);
|
||||||
try {
|
try {
|
||||||
Thread.sleep(100);
|
Thread.sleep(1);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOGGER.error("Thread interrupted: {}", e.getMessage());
|
LOGGER.error("Thread interrupted: {}", e.getMessage());
|
||||||
}
|
}
|
||||||
|
@ -29,32 +29,33 @@ import org.slf4j.Logger;
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Bartender is a service which accepts a BarCustomer (tenant) and throttles
|
* A service which accepts a tenant and throttles the resource based on the time given to the
|
||||||
* the resource based on the time given to the tenant.
|
* tenant.
|
||||||
*/
|
*/
|
||||||
class Bartender {
|
class B2BService {
|
||||||
|
|
||||||
private static final Logger LOGGER = LoggerFactory.getLogger(Bartender.class);
|
private static final Logger LOGGER = LoggerFactory.getLogger(B2BService.class);
|
||||||
private final CallsCount callsCount;
|
private final CallsCount callsCount;
|
||||||
|
|
||||||
public Bartender(Throttler timer, CallsCount callsCount) {
|
public B2BService(Throttler timer, CallsCount callsCount) {
|
||||||
this.callsCount = callsCount;
|
this.callsCount = callsCount;
|
||||||
timer.start();
|
timer.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Orders a drink from the bartender.
|
* Calls dummy customer api.
|
||||||
|
*
|
||||||
* @return customer id which is randomly generated
|
* @return customer id which is randomly generated
|
||||||
*/
|
*/
|
||||||
public int orderDrink(BarCustomer barCustomer) {
|
public int dummyCustomerApi(Tenant tenant) {
|
||||||
var tenantName = barCustomer.getName();
|
var tenantName = tenant.getName();
|
||||||
var count = callsCount.getCount(tenantName);
|
var count = callsCount.getCount(tenantName);
|
||||||
if (count >= barCustomer.getAllowedCallsPerSecond()) {
|
LOGGER.debug("Counter for {} : {} ", tenant.getName(), count);
|
||||||
LOGGER.error("I'm sorry {}, you've had enough for today!", tenantName);
|
if (count >= tenant.getAllowedCallsPerSecond()) {
|
||||||
|
LOGGER.error("API access per second limit reached for: {}", tenantName);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
callsCount.incrementCount(tenantName);
|
callsCount.incrementCount(tenantName);
|
||||||
LOGGER.debug("Serving beer to {} : [{} consumed] ", barCustomer.getName(), count + 1);
|
|
||||||
return getRandomCustomerId();
|
return getRandomCustomerId();
|
||||||
}
|
}
|
||||||
|
|
@ -69,7 +69,7 @@ public final class CallsCount {
|
|||||||
* Resets the count of all the tenants in the map.
|
* Resets the count of all the tenants in the map.
|
||||||
*/
|
*/
|
||||||
public void reset() {
|
public void reset() {
|
||||||
|
LOGGER.debug("Resetting the map.");
|
||||||
tenantCallsCount.replaceAll((k, v) -> new AtomicLong(0));
|
tenantCallsCount.replaceAll((k, v) -> new AtomicLong(0));
|
||||||
LOGGER.info("reset counters");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -25,26 +25,22 @@ package com.iluwatar.throttling;
|
|||||||
|
|
||||||
import java.security.InvalidParameterException;
|
import java.security.InvalidParameterException;
|
||||||
|
|
||||||
import lombok.Getter;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* BarCustomer is a tenant with a name and a number of allowed calls per second.
|
* A Pojo class to create a basic Tenant with the allowed calls per second.
|
||||||
*/
|
*/
|
||||||
public class BarCustomer {
|
public class Tenant {
|
||||||
|
|
||||||
@Getter
|
|
||||||
private final String name;
|
private final String name;
|
||||||
@Getter
|
|
||||||
private final int allowedCallsPerSecond;
|
private final int allowedCallsPerSecond;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor.
|
* Constructor.
|
||||||
*
|
*
|
||||||
* @param name Name of the BarCustomer
|
* @param name Name of the tenant
|
||||||
* @param allowedCallsPerSecond The number of calls allowed for this particular tenant.
|
* @param allowedCallsPerSecond The number of calls allowed for a particular tenant.
|
||||||
* @throws InvalidParameterException If number of calls is less than 0, throws exception.
|
* @throws InvalidParameterException If number of calls is less than 0, throws exception.
|
||||||
*/
|
*/
|
||||||
public BarCustomer(String name, int allowedCallsPerSecond, CallsCount callsCount) {
|
public Tenant(String name, int allowedCallsPerSecond, CallsCount callsCount) {
|
||||||
if (allowedCallsPerSecond < 0) {
|
if (allowedCallsPerSecond < 0) {
|
||||||
throw new InvalidParameterException("Number of calls less than 0 not allowed");
|
throw new InvalidParameterException("Number of calls less than 0 not allowed");
|
||||||
}
|
}
|
||||||
@ -52,4 +48,12 @@ public class BarCustomer {
|
|||||||
this.allowedCallsPerSecond = allowedCallsPerSecond;
|
this.allowedCallsPerSecond = allowedCallsPerSecond;
|
||||||
callsCount.addTenant(name);
|
callsCount.addTenant(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getName() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getAllowedCallsPerSecond() {
|
||||||
|
return allowedCallsPerSecond;
|
||||||
|
}
|
||||||
}
|
}
|
@ -32,18 +32,19 @@ import org.junit.jupiter.api.Test;
|
|||||||
/**
|
/**
|
||||||
* B2BServiceTest class to test the B2BService
|
* B2BServiceTest class to test the B2BService
|
||||||
*/
|
*/
|
||||||
public class BartenderTest {
|
public class B2BServiceTest {
|
||||||
|
|
||||||
private final CallsCount callsCount = new CallsCount();
|
private final CallsCount callsCount = new CallsCount();
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void dummyCustomerApiTest() {
|
void dummyCustomerApiTest() {
|
||||||
var tenant = new BarCustomer("pirate", 2, callsCount);
|
var tenant = new Tenant("testTenant", 2, callsCount);
|
||||||
// In order to assure that throttling limits will not be reset, we use an empty throttling implementation
|
// In order to assure that throttling limits will not be reset, we use an empty throttling implementation
|
||||||
var timer = (Throttler) () -> {};
|
var timer = (Throttler) () -> {
|
||||||
var service = new Bartender(timer, callsCount);
|
};
|
||||||
|
var service = new B2BService(timer, callsCount);
|
||||||
|
|
||||||
IntStream.range(0, 5).mapToObj(i -> tenant).forEach(service::orderDrink);
|
IntStream.range(0, 5).mapToObj(i -> tenant).forEach(service::dummyCustomerApi);
|
||||||
var counter = callsCount.getCount(tenant.getName());
|
var counter = callsCount.getCount(tenant.getName());
|
||||||
assertEquals(2, counter, "Counter limit must be reached");
|
assertEquals(2, counter, "Counter limit must be reached");
|
||||||
}
|
}
|
@ -23,21 +23,20 @@
|
|||||||
|
|
||||||
package com.iluwatar.throttling;
|
package com.iluwatar.throttling;
|
||||||
|
|
||||||
import org.junit.jupiter.api.Test;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
|
||||||
import java.security.InvalidParameterException;
|
import java.security.InvalidParameterException;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* TenantTest to test the creation of Tenant with valid parameters.
|
* TenantTest to test the creation of Tenant with valid parameters.
|
||||||
*/
|
*/
|
||||||
public class BarCustomerTest {
|
public class TenantTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void constructorTest() {
|
void constructorTest() {
|
||||||
assertThrows(InvalidParameterException.class, () -> {
|
assertThrows(InvalidParameterException.class, () -> {
|
||||||
new BarCustomer("sirBrave", -1, new CallsCount());
|
new Tenant("FailTenant", -1, new CallsCount());
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
Reference in New Issue
Block a user