refactoring of CircuitBreaker and MemoizingCircuitBreaker, Tests

This commit is contained in:
anhefti 2019-04-10 10:01:59 +02:00
parent ed3ed44aff
commit fb6894e17c
6 changed files with 337 additions and 115 deletions

View file

@ -8,20 +8,27 @@
package ch.ethz.seb.sebserver.gbl.async;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Component;
/** An asynchronous runner that can be used to run a task asynchronously within the Spring context */
@Component
@EnableAsync
public class AsyncRunner {
/** Calls a given Supplier asynchronously in a new thread and returns a CompletableFuture
* to get and handle the result later
*
* @param supplier The Supplier that gets called asynchronously
* @return CompletableFuture of the result of the Supplier */
@Async(AsyncServiceSpringConfig.EXECUTOR_BEAN_NAME)
public <T> CompletableFuture<T> runAsync(final Supplier<T> supplier) {
return CompletableFuture.completedFuture(supplier.get());
public <T> Future<T> runAsync(final Supplier<T> supplier) {
return new AsyncResult<>(supplier.get());
}
}

View file

@ -23,39 +23,37 @@ public class AsyncService {
this.asyncRunner = asyncRunner;
}
public <T> CircuitBreakerSupplier<T> createMemoizingCircuitBreaker(
final Supplier<T> blockingSupplier) {
return new CircuitBreakerSupplier<>(this.asyncRunner, blockingSupplier, true);
public <T> CircuitBreaker<T> createCircuitBreaker() {
return new CircuitBreaker<>(this.asyncRunner);
}
public <T> CircuitBreakerSupplier<T> createCircuitBreaker(
final Supplier<T> blockingSupplier,
final boolean momoized) {
return new CircuitBreakerSupplier<>(this.asyncRunner, blockingSupplier, momoized);
}
public <T> CircuitBreakerSupplier<T> createCircuitBreaker(
final Supplier<T> blockingSupplier,
public <T> CircuitBreaker<T> createCircuitBreaker(
final int maxFailingAttempts,
final long maxBlockingTime,
final boolean momoized) {
final long timeToRecover) {
return new CircuitBreakerSupplier<>(
return new CircuitBreaker<>(
this.asyncRunner,
blockingSupplier,
CircuitBreakerSupplier.DEFAULT_MAX_FAILING_ATTEMPTS,
maxFailingAttempts,
maxBlockingTime,
CircuitBreakerSupplier.DEFAULT_TIME_TO_RECOVER,
momoized);
timeToRecover);
}
public <T> CircuitBreakerSupplier<T> createCircuitBreaker(
public <T> MemoizingCircuitBreaker<T> createMemoizingCircuitBreaker(
final Supplier<T> blockingSupplier) {
return new MemoizingCircuitBreaker<>(this.asyncRunner, blockingSupplier, true);
}
public <T> MemoizingCircuitBreaker<T> createMemoizingCircuitBreaker(
final Supplier<T> blockingSupplier,
final int maxFailingAttempts,
final long maxBlockingTime,
final long timeToRecover,
final boolean momoized) {
return new CircuitBreakerSupplier<>(
return new MemoizingCircuitBreaker<>(
this.asyncRunner,
blockingSupplier,
maxFailingAttempts,

View file

@ -8,6 +8,7 @@
package ch.ethz.seb.sebserver.gbl.async;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
@ -18,13 +19,11 @@ import org.slf4j.LoggerFactory;
import ch.ethz.seb.sebserver.gbl.Constants;
import ch.ethz.seb.sebserver.gbl.util.Result;
/** A circuit breaker with three states (CLOSED, HALF_OPEN, OPEN) and memoizing functionality
* that wraps and safe a Supplier function of the same type.
/** A circuit breaker with three states (CLOSED, HALF_OPEN, OPEN)
* <p>
* A <code>CircuitBreakerSupplier</code> can be used to make a save call within a Supplier function. This Supplier
* function
* usually access remote data on different processes probably on different machines over the Internet
* and that can fail or hang with unexpected result.
* A <code>CircuitBreaker</code> can be used to make a save call within a Supplier function.
* This Supplier function usually access remote data on different processes probably on different
* machines over the Internet and that can fail or hang with unexpected result.
* <p>
* The circuit breaker pattern can safe resources like Threads, Data-Base-Connections, from being taken and
* hold for long time not released. What normally lead into some kind of no resource available error state.
@ -46,14 +45,17 @@ import ch.ethz.seb.sebserver.gbl.util.Result;
*
*
* @param <T> The of the result of the suppling function */
public class CircuitBreakerSupplier<T> implements Supplier<Result<T>> {
public final class CircuitBreaker<T> {
private static final Logger log = LoggerFactory.getLogger(CircuitBreakerSupplier.class);
private static final Logger log = LoggerFactory.getLogger(CircuitBreaker.class);
public static final int DEFAULT_MAX_FAILING_ATTEMPTS = 5;
public static final long DEFAULT_MAX_BLOCKING_TIME = Constants.MINUTE_IN_MILLIS;
public static final long DEFAULT_TIME_TO_RECOVER = Constants.MINUTE_IN_MILLIS * 10;
public static final RuntimeException OPEN_STATE_EXCEPTION =
new RuntimeException("Open CircuitBreaker");
public enum State {
CLOSED,
HALF_OPEN,
@ -61,7 +63,6 @@ public class CircuitBreakerSupplier<T> implements Supplier<Result<T>> {
}
private final AsyncRunner asyncRunner;
private final Supplier<T> supplierThatCanFailOrBlock;
private final int maxFailingAttempts;
private final long maxBlockingTime;
private final long timeToRecover;
@ -70,57 +71,38 @@ public class CircuitBreakerSupplier<T> implements Supplier<Result<T>> {
private final AtomicInteger failingCount = new AtomicInteger(0);
private long lastSuccessTime;
private final boolean memoizing;
private final Result<T> notAvailable = Result.ofRuntimeError("No chached resource available");
private Result<T> cached = null;
/** Create new CircuitBreakerSupplier.
*
* @param asyncRunner the AsyncRunner used to create asynchronous calls on the given supplier function
* @param supplierThatCanFailOrBlock The Supplier function that can fail or block for a long time
* @param memoizing whether the memoizing functionality is on or off */
CircuitBreakerSupplier(
final AsyncRunner asyncRunner,
final Supplier<T> supplierThatCanFailOrBlock,
final boolean memoizing) {
* @param asyncRunner the AsyncRunner used to create asynchronous calls on the given supplier function */
CircuitBreaker(final AsyncRunner asyncRunner) {
this(
asyncRunner,
supplierThatCanFailOrBlock,
DEFAULT_MAX_FAILING_ATTEMPTS,
DEFAULT_MAX_BLOCKING_TIME,
DEFAULT_TIME_TO_RECOVER,
memoizing);
DEFAULT_TIME_TO_RECOVER);
}
/** Create new CircuitBreakerSupplier.
*
* @param asyncRunner the AsyncRunner used to create asynchronous calls on the given supplier function
* @param supplierThatCanFailOrBlock The Supplier function that can fail or block for a long time
* @param maxFailingAttempts the number of maximal failing attempts before go form CLOSE into HALF_OPEN state
* @param maxBlockingTime the maximal time that an call attempt can block until an error is responded
* @param timeToRecover the time the circuit breaker needs to cool-down on OPEN-STATE before going back to HALF_OPEN
* state
* @param memoizing whether the memoizing functionality is on or off */
CircuitBreakerSupplier(
* state */
CircuitBreaker(
final AsyncRunner asyncRunner,
final Supplier<T> supplierThatCanFailOrBlock,
final int maxFailingAttempts,
final long maxBlockingTime,
final long timeToRecover,
final boolean memoizing) {
final long timeToRecover) {
this.asyncRunner = asyncRunner;
this.supplierThatCanFailOrBlock = supplierThatCanFailOrBlock;
this.maxFailingAttempts = maxFailingAttempts;
this.maxBlockingTime = maxBlockingTime;
this.timeToRecover = timeToRecover;
this.memoizing = memoizing;
}
@Override
public Result<T> get() {
public Result<T> protectedRun(final Supplier<T> supplier) {
final long currentTime = System.currentTimeMillis();
if (log.isDebugEnabled()) {
@ -132,13 +114,13 @@ public class CircuitBreakerSupplier<T> implements Supplier<Result<T>> {
switch (this.state) {
case CLOSED:
return handleClosed(currentTime);
return handleClosed(currentTime, supplier);
case HALF_OPEN:
return handleHalfOpen(currentTime);
return handleHalfOpen(currentTime, supplier);
case OPEN:
return handelOpen(currentTime);
return handelOpen(currentTime, supplier);
default:
throw new IllegalStateException();
return Result.ofError(new IllegalStateException());
}
}
@ -146,57 +128,63 @@ public class CircuitBreakerSupplier<T> implements Supplier<Result<T>> {
return this.state;
}
private Result<T> handleClosed(final long currentTime) {
private Result<T> handleClosed(
final long startTime,
final Supplier<T> supplier) {
if (log.isDebugEnabled()) {
log.debug("Handle Closed on: {}", currentTime);
log.debug("Handle Closed on: {}", startTime);
}
final Result<T> result = attempt();
// try once
final Result<T> result = attempt(supplier);
if (result.hasError()) {
if (log.isDebugEnabled()) {
log.debug("Attempt failed. failing count: {}", this.failingCount);
}
final long currentBlockingTime = System.currentTimeMillis() - startTime;
final int failing = this.failingCount.incrementAndGet();
if (failing > this.maxFailingAttempts) {
if (failing > this.maxFailingAttempts || currentBlockingTime > this.maxBlockingTime) {
// brake thought to HALF_OPEN state and return error
if (log.isDebugEnabled()) {
log.debug("Changing state from Open to Half Open and return cached value");
}
this.state = State.HALF_OPEN;
this.failingCount.set(0);
return getCached(result);
return Result.ofError(OPEN_STATE_EXCEPTION);
} else {
return get();
// try again
return protectedRun(supplier);
}
} else {
this.lastSuccessTime = System.currentTimeMillis();
if (this.memoizing) {
this.cached = result;
}
return result;
}
}
private Result<T> handleHalfOpen(final long currentTime) {
private Result<T> handleHalfOpen(
final long startTime,
final Supplier<T> supplier) {
if (log.isDebugEnabled()) {
log.debug("Handle Half Open on: {}", currentTime);
log.debug("Handle Half Open on: {}", startTime);
}
final Result<T> result = attempt();
// try once
final Result<T> result = attempt(supplier);
if (result.hasError()) {
// on fail go to OPEN state
if (log.isDebugEnabled()) {
log.debug("Changing state from Half Open to Open and return cached value");
}
this.state = State.OPEN;
return getCached(result);
return Result.ofError(OPEN_STATE_EXCEPTION);
} else {
// on success go to CLODED state
if (log.isDebugEnabled()) {
log.debug("Changing state from Half Open to Closed and return value");
}
@ -209,58 +197,43 @@ public class CircuitBreakerSupplier<T> implements Supplier<Result<T>> {
/** As long as time to recover is not reached, return from cache
* If time to recover is reached go to half open state and try again */
private Result<T> handelOpen(final long currentTime) {
private Result<T> handelOpen(
final long startTime,
final Supplier<T> supplier) {
if (log.isDebugEnabled()) {
log.debug("Handle Open on: {}", currentTime);
log.debug("Handle Open on: {}", startTime);
}
if (currentTime - this.lastSuccessTime >= this.timeToRecover) {
if (startTime - this.lastSuccessTime >= this.timeToRecover) {
// if cool-down period is over, go back to HALF_OPEN state and try again
if (log.isDebugEnabled()) {
log.debug("Time to recover reached. Changing state from Open to Half Open");
}
this.state = State.HALF_OPEN;
return protectedRun(supplier);
}
return getCached(this.notAvailable);
return Result.ofError(OPEN_STATE_EXCEPTION);
}
private Result<T> attempt() {
private Result<T> attempt(final Supplier<T> supplier) {
final Future<T> future = this.asyncRunner.runAsync(supplier);
try {
return Result.of(this.asyncRunner.runAsync(this.supplierThatCanFailOrBlock)
.get(this.maxBlockingTime, TimeUnit.MILLISECONDS));
return Result.of(future.get(this.maxBlockingTime, TimeUnit.MILLISECONDS));
} catch (final Exception e) {
future.cancel(false);
return Result.ofError(e);
}
}
private Result<T> getCached(final Result<T> error) {
if (!this.memoizing) {
return error;
}
if (this.cached != null) {
return this.cached;
} else {
return error;
}
}
T getChached() {
if (this.cached == null) {
return null;
}
return this.cached.get();
}
@Override
public String toString() {
return "MemoizingCircuitBreaker [maxFailingAttempts=" + this.maxFailingAttempts + ", maxBlockingTime="
+ this.maxBlockingTime + ", timeToRecover=" + this.timeToRecover + ", state=" + this.state
+ ", failingCount="
+ this.failingCount + ", lastSuccessTime=" + this.lastSuccessTime + ", cached=" + this.cached + "]";
return "CircuitBreaker [asyncRunner=" + this.asyncRunner + ", maxFailingAttempts=" + this.maxFailingAttempts
+ ", maxBlockingTime=" + this.maxBlockingTime + ", timeToRecover=" + this.timeToRecover + ", state="
+ this.state
+ ", failingCount=" + this.failingCount + ", lastSuccessTime=" + this.lastSuccessTime + "]";
}
}

View file

@ -0,0 +1,135 @@
/*
* Copyright (c) 2019 ETH Zürich, Educational Development and Technology (LET)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package ch.ethz.seb.sebserver.gbl.async;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ch.ethz.seb.sebserver.gbl.async.CircuitBreaker.State;
import ch.ethz.seb.sebserver.gbl.util.Result;
/** A circuit breaker with three states (CLOSED, HALF_OPEN, OPEN) and memoizing functionality
* that wraps and safe a Supplier function of the same type.
* <p>
* A <code>CircuitBreakerSupplier</code> can be used to make a save call within a Supplier function. This Supplier
* function
* usually access remote data on different processes probably on different machines over the Internet
* and that can fail or hang with unexpected result.
* <p>
* The circuit breaker pattern can safe resources like Threads, Data-Base-Connections, from being taken and
* hold for long time not released. What normally lead into some kind of no resource available error state.
* For more information please visit: https://martinfowler.com/bliki/CircuitBreaker.html
* <p>
* This circuit breaker implementation has three states, CLODED, HALF_OPEN and OPEN. The normal and initial state of the
* circuit breaker is CLOSED. A call on the circuit breaker triggers a asynchronous call on the given supplier waiting
* a given time-period for a response or on fail, trying a given number of times to call again.
* If the time to wait went out or if the failing count is reached, the circuit breaker goes into the HALF-OPEN state
* and respond with an error.
* A call on a circuit breaker in HALF-OPEN state will trigger another single try to get the result from given supplier
* and on success the circuit breaker goes back to CLOSED state and on fail, the circuit breaker goes to OPEN state
* In the OPEN state the time to recover comes into play. The circuit breaker stays at least for this time into the
* OPEN state and respond to all calls within this time period with an error. After the time to recover has reached
* the circuit breaker goes back to HALF_OPEN state.
* <p>
* This circuit breaker implementation comes with a memoizing functionality where on successful calls the result get
* cached and the circuit breaker respond on error cases with the cached result if available.
*
*
* @param <T> The of the result of the suppling function */
public final class MemoizingCircuitBreaker<T> implements Supplier<Result<T>> {
private static final Logger log = LoggerFactory.getLogger(MemoizingCircuitBreaker.class);
private final CircuitBreaker<T> delegate;
private final Supplier<T> supplier;
private final boolean memoizing;
private Result<T> cached = null;
/** Create new CircuitBreakerSupplier.
*
* @param asyncRunner the AsyncRunner used to create asynchronous calls on the given supplier function
* @param supplier The Supplier function that can fail or block for a long time
* @param memoizing whether the memoizing functionality is on or off */
MemoizingCircuitBreaker(
final AsyncRunner asyncRunner,
final Supplier<T> supplier,
final boolean memoizing) {
this.delegate = new CircuitBreaker<>(asyncRunner);
this.supplier = supplier;
this.memoizing = memoizing;
}
/** Create new CircuitBreakerSupplier.
*
* @param asyncRunner the AsyncRunner used to create asynchronous calls on the given supplier function
* @param supplier The Supplier function that can fail or block for a long time
* @param maxFailingAttempts the number of maximal failing attempts before go form CLOSE into HALF_OPEN state
* @param maxBlockingTime the maximal time that an call attempt can block until an error is responded
* @param timeToRecover the time the circuit breaker needs to cool-down on OPEN-STATE before going back to HALF_OPEN
* state
* @param memoizing whether the memoizing functionality is on or off */
MemoizingCircuitBreaker(
final AsyncRunner asyncRunner,
final Supplier<T> supplier,
final int maxFailingAttempts,
final long maxBlockingTime,
final long timeToRecover,
final boolean memoizing) {
this.delegate = new CircuitBreaker<>(
asyncRunner,
maxFailingAttempts,
maxBlockingTime,
timeToRecover);
this.supplier = supplier;
this.memoizing = memoizing;
}
@Override
public Result<T> get() {
final Result<T> result = this.delegate.protectedRun(this.supplier);
if (result.hasError()) {
if (this.memoizing && this.cached != null) {
if (log.isDebugEnabled()) {
log.debug("Return cached at: {}", System.currentTimeMillis());
}
return this.cached;
}
return result;
} else {
if (this.memoizing) {
if (log.isDebugEnabled()) {
log.debug("Memoizing result at: {}", System.currentTimeMillis());
}
this.cached = result;
}
return result;
}
}
public State getState() {
return this.delegate.getState();
}
T getChached() {
if (this.cached == null) {
return null;
}
return this.cached.get();
}
}

View file

@ -0,0 +1,108 @@
/*
* Copyright (c) 2019 ETH Zürich, Educational Development and Technology (LET)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package ch.ethz.seb.sebserver.gbl.async;
import static org.junit.Assert.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import ch.ethz.seb.sebserver.gbl.async.CircuitBreaker.State;
import ch.ethz.seb.sebserver.gbl.util.Result;
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = { AsyncServiceSpringConfig.class, AsyncRunner.class, AsyncService.class })
public class CircuitBreakerTest {
private static final Logger log = LoggerFactory.getLogger(CircuitBreakerTest.class);
@Autowired
AsyncService asyncService;
@Test
public void testInit() {
assertNotNull(this.asyncService);
}
@Test
public void roundtrip1() throws InterruptedException {
final CircuitBreaker<String> circuitBreaker =
this.asyncService.createCircuitBreaker(3, 500, 1000);
final Supplier<String> tester = tester(100, 5, 10);
Result<String> result = circuitBreaker.protectedRun(tester); // 1. call...
assertFalse(result.hasError());
assertEquals("Hello", result.get());
assertEquals(State.CLOSED, circuitBreaker.getState());
circuitBreaker.protectedRun(tester); // 2. call...
circuitBreaker.protectedRun(tester); // 3. call...
circuitBreaker.protectedRun(tester); // 4. call...
result = circuitBreaker.protectedRun(tester); // 5. call... still available
assertFalse(result.hasError());
assertEquals("Hello", result.get());
assertEquals(State.CLOSED, circuitBreaker.getState());
result = circuitBreaker.protectedRun(tester); // 6. call... after the 5. call the tester is unavailable until the 10. call...
assertTrue(result.hasError());
assertEquals(CircuitBreaker.OPEN_STATE_EXCEPTION, result.getError());
assertEquals(State.HALF_OPEN, circuitBreaker.getState());
result = circuitBreaker.protectedRun(tester); // 9. call... after fail again, go to OPEN state
assertEquals(State.OPEN, circuitBreaker.getState());
assertEquals(CircuitBreaker.OPEN_STATE_EXCEPTION, result.getError());
// not cooled down yet
Thread.sleep(100);
result = circuitBreaker.protectedRun(tester); // 10. call...
assertEquals(State.OPEN, circuitBreaker.getState());
assertEquals(CircuitBreaker.OPEN_STATE_EXCEPTION, result.getError());
// wait time to recover
Thread.sleep(500);
result = circuitBreaker.protectedRun(tester); // 11. call...
assertEquals(State.CLOSED, circuitBreaker.getState());
assertEquals("Hello back again", result.get());
}
private Supplier<String> tester(final long delay, final int unavailableAfter, final int unavailableUntil) {
final AtomicInteger count = new AtomicInteger(0);
final AtomicBoolean wasUnavailable = new AtomicBoolean(false);
return () -> {
final int attempts = count.getAndIncrement();
log.info("tester answers {} {}", attempts, Thread.currentThread());
try {
Thread.sleep(delay);
} catch (final InterruptedException e) {
e.printStackTrace();
}
if (attempts >= unavailableAfter && attempts < unavailableUntil) {
wasUnavailable.set(true);
throw new RuntimeException("Error");
}
return (wasUnavailable.get()) ? "Hello back again" : "Hello";
};
}
}

View file

@ -22,7 +22,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import ch.ethz.seb.sebserver.gbl.async.CircuitBreakerSupplier.State;
import ch.ethz.seb.sebserver.gbl.async.CircuitBreaker.State;
import ch.ethz.seb.sebserver.gbl.util.Result;
@RunWith(SpringRunner.class)
@ -41,7 +41,7 @@ public class MemoizingCircuitBreakerTest {
@Test
public void roundtrip1() throws InterruptedException {
final CircuitBreakerSupplier<String> circuitBreaker = this.asyncService.createCircuitBreaker(
final MemoizingCircuitBreaker<String> circuitBreaker = this.asyncService.createMemoizingCircuitBreaker(
tester(100, 5, 10), 3, 500, 1000, true);
assertNull(circuitBreaker.getChached());
@ -71,16 +71,17 @@ public class MemoizingCircuitBreakerTest {
result = circuitBreaker.get(); // 9. call... after fail again, go to OPEN state
assertEquals(State.OPEN, circuitBreaker.getState());
// now the time to recover comes into play
Thread.sleep(1100);
// not cooled down yet
Thread.sleep(100);
result = circuitBreaker.get(); // 10. call...
assertEquals(State.HALF_OPEN, circuitBreaker.getState());
assertEquals("Hello", result.get());
assertEquals(State.OPEN, circuitBreaker.getState());
// back again
result = circuitBreaker.get(); // 15. call... 2. call in Half Open state...
// wait time to recover
Thread.sleep(500);
result = circuitBreaker.get(); // 11. call...
assertEquals(State.CLOSED, circuitBreaker.getState());
assertEquals("Hello back again", result.get());
}
private Supplier<String> tester(final long delay, final int unavailableAfter, final int unavailableUntil) {