circuit breaker
This commit is contained in:
parent
c89d813dca
commit
beba9850b8
4 changed files with 108 additions and 64 deletions
|
@ -23,34 +23,45 @@ public class AsyncService {
|
||||||
this.asyncRunner = asyncRunner;
|
this.asyncRunner = asyncRunner;
|
||||||
}
|
}
|
||||||
|
|
||||||
public <T> MemoizingCircuitBreaker<T> createCircuitBreaker(final Supplier<T> blockingSupplier) {
|
public <T> CircuitBreakerSupplier<T> createMemoizingCircuitBreaker(
|
||||||
return new MemoizingCircuitBreaker<>(this.asyncRunner, blockingSupplier);
|
final Supplier<T> blockingSupplier) {
|
||||||
|
return new CircuitBreakerSupplier<>(this.asyncRunner, blockingSupplier, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public <T> MemoizingCircuitBreaker<T> createCircuitBreaker(
|
public <T> CircuitBreakerSupplier<T> createCircuitBreaker(
|
||||||
final Supplier<T> blockingSupplier,
|
final Supplier<T> blockingSupplier,
|
||||||
final long maxBlockingTime) {
|
final boolean momoized) {
|
||||||
|
return new CircuitBreakerSupplier<>(this.asyncRunner, blockingSupplier, momoized);
|
||||||
|
}
|
||||||
|
|
||||||
return new MemoizingCircuitBreaker<>(
|
public <T> CircuitBreakerSupplier<T> createCircuitBreaker(
|
||||||
|
final Supplier<T> blockingSupplier,
|
||||||
|
final long maxBlockingTime,
|
||||||
|
final boolean momoized) {
|
||||||
|
|
||||||
|
return new CircuitBreakerSupplier<>(
|
||||||
this.asyncRunner,
|
this.asyncRunner,
|
||||||
blockingSupplier,
|
blockingSupplier,
|
||||||
MemoizingCircuitBreaker.DEFAULT_MAX_FAILING_ATTEMPTS,
|
CircuitBreakerSupplier.DEFAULT_MAX_FAILING_ATTEMPTS,
|
||||||
maxBlockingTime,
|
maxBlockingTime,
|
||||||
MemoizingCircuitBreaker.DEFAULT_TIME_TO_RECOVER);
|
CircuitBreakerSupplier.DEFAULT_TIME_TO_RECOVER,
|
||||||
|
momoized);
|
||||||
}
|
}
|
||||||
|
|
||||||
public <T> MemoizingCircuitBreaker<T> createCircuitBreaker(
|
public <T> CircuitBreakerSupplier<T> createCircuitBreaker(
|
||||||
final Supplier<T> blockingSupplier,
|
final Supplier<T> blockingSupplier,
|
||||||
final int maxFailingAttempts,
|
final int maxFailingAttempts,
|
||||||
final long maxBlockingTime,
|
final long maxBlockingTime,
|
||||||
final long timeToRecover) {
|
final long timeToRecover,
|
||||||
|
final boolean momoized) {
|
||||||
|
|
||||||
return new MemoizingCircuitBreaker<>(
|
return new CircuitBreakerSupplier<>(
|
||||||
this.asyncRunner,
|
this.asyncRunner,
|
||||||
blockingSupplier,
|
blockingSupplier,
|
||||||
maxFailingAttempts,
|
maxFailingAttempts,
|
||||||
maxBlockingTime,
|
maxBlockingTime,
|
||||||
timeToRecover);
|
timeToRecover,
|
||||||
|
momoized);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,14 +18,37 @@ import org.slf4j.LoggerFactory;
|
||||||
import ch.ethz.seb.sebserver.gbl.Constants;
|
import ch.ethz.seb.sebserver.gbl.Constants;
|
||||||
import ch.ethz.seb.sebserver.gbl.util.Result;
|
import ch.ethz.seb.sebserver.gbl.util.Result;
|
||||||
|
|
||||||
/** A circuit breaker with three states (CLOSED, HALF_OPEN, OPEN) and memoizing.
|
/** A circuit breaker with three states (CLOSED, HALF_OPEN, OPEN) and memoizing functionality
|
||||||
|
* that wraps and safe a Supplier function of the same type.
|
||||||
|
* <p>
|
||||||
|
* A <code>CircuitBreakerSupplier</code> can be used to make a save call within a Supplier function. This Supplier
|
||||||
|
* function
|
||||||
|
* usually access remote data on different processes probably on different machines over the Internet
|
||||||
|
* and that can fail or hang with unexpected result.
|
||||||
|
* <p>
|
||||||
|
* The circuit breaker pattern can safe resources like Threads, Data-Base-Connections, from being taken and
|
||||||
|
* hold for long time not released. What normally lead into some kind of no resource available error state.
|
||||||
|
* For more information please visit: https://martinfowler.com/bliki/CircuitBreaker.html
|
||||||
|
* <p>
|
||||||
|
* This circuit breaker implementation has three states, CLODED, HALF_OPEN and OPEN. The normal and initial state of the
|
||||||
|
* circuit breaker is CLOSED. A call on the circuit breaker triggers a asynchronous call on the given supplier waiting
|
||||||
|
* a given time-period for a response or on fail, trying a given number of times to call again.
|
||||||
|
* If the time to wait went out or if the failing count is reached, the circuit breaker goes into the HALF-OPEN state
|
||||||
|
* and respond with an error.
|
||||||
|
* A call on a circuit breaker in HALF-OPEN state will trigger another single try to get the result from given supplier
|
||||||
|
* and on success the circuit breaker goes back to CLOSED state and on fail, the circuit breaker goes to OPEN state
|
||||||
|
* In the OPEN state the time to recover comes into play. The circuit breaker stays at least for this time into the
|
||||||
|
* OPEN state and respond to all calls within this time period with an error. After the time to recover has reached
|
||||||
|
* the circuit breaker goes back to HALF_OPEN state.
|
||||||
|
* <p>
|
||||||
|
* This circuit breaker implementation comes with a memoizing functionality where on successful calls the result get
|
||||||
|
* cached and the circuit breaker respond on error cases with the cached result if available.
|
||||||
*
|
*
|
||||||
* // TODO more docu:
|
|
||||||
*
|
*
|
||||||
* @param <T> */
|
* @param <T> The of the result of the suppling function */
|
||||||
public class MemoizingCircuitBreaker<T> implements Supplier<Result<T>> {
|
public class CircuitBreakerSupplier<T> implements Supplier<Result<T>> {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(MemoizingCircuitBreaker.class);
|
private static final Logger log = LoggerFactory.getLogger(CircuitBreakerSupplier.class);
|
||||||
|
|
||||||
public static final int DEFAULT_MAX_FAILING_ATTEMPTS = 5;
|
public static final int DEFAULT_MAX_FAILING_ATTEMPTS = 5;
|
||||||
public static final long DEFAULT_MAX_BLOCKING_TIME = Constants.MINUTE_IN_MILLIS;
|
public static final long DEFAULT_MAX_BLOCKING_TIME = Constants.MINUTE_IN_MILLIS;
|
||||||
|
@ -47,33 +70,52 @@ public class MemoizingCircuitBreaker<T> implements Supplier<Result<T>> {
|
||||||
private final AtomicInteger failingCount = new AtomicInteger(0);
|
private final AtomicInteger failingCount = new AtomicInteger(0);
|
||||||
private long lastSuccessTime;
|
private long lastSuccessTime;
|
||||||
|
|
||||||
|
private final boolean memoizing;
|
||||||
private final Result<T> notAvailable = Result.ofRuntimeError("No chached resource available");
|
private final Result<T> notAvailable = Result.ofRuntimeError("No chached resource available");
|
||||||
private Result<T> cached = null;
|
private Result<T> cached = null;
|
||||||
|
|
||||||
MemoizingCircuitBreaker(
|
/** Create new CircuitBreakerSupplier.
|
||||||
|
*
|
||||||
|
* @param asyncRunner the AsyncRunner used to create asynchronous calls on the given supplier function
|
||||||
|
* @param supplierThatCanFailOrBlock The Supplier function that can fail or block for a long time
|
||||||
|
* @param memoizing whether the memoizing functionality is on or off */
|
||||||
|
CircuitBreakerSupplier(
|
||||||
final AsyncRunner asyncRunner,
|
final AsyncRunner asyncRunner,
|
||||||
final Supplier<T> supplierThatCanFailOrBlock) {
|
final Supplier<T> supplierThatCanFailOrBlock,
|
||||||
|
final boolean memoizing) {
|
||||||
|
|
||||||
this(
|
this(
|
||||||
asyncRunner,
|
asyncRunner,
|
||||||
supplierThatCanFailOrBlock,
|
supplierThatCanFailOrBlock,
|
||||||
DEFAULT_MAX_FAILING_ATTEMPTS,
|
DEFAULT_MAX_FAILING_ATTEMPTS,
|
||||||
DEFAULT_MAX_BLOCKING_TIME,
|
DEFAULT_MAX_BLOCKING_TIME,
|
||||||
DEFAULT_TIME_TO_RECOVER);
|
DEFAULT_TIME_TO_RECOVER,
|
||||||
|
memoizing);
|
||||||
}
|
}
|
||||||
|
|
||||||
MemoizingCircuitBreaker(
|
/** Create new CircuitBreakerSupplier.
|
||||||
|
*
|
||||||
|
* @param asyncRunner the AsyncRunner used to create asynchronous calls on the given supplier function
|
||||||
|
* @param supplierThatCanFailOrBlock The Supplier function that can fail or block for a long time
|
||||||
|
* @param maxFailingAttempts the number of maximal failing attempts before go form CLOSE into HALF_OPEN state
|
||||||
|
* @param maxBlockingTime the maximal time that an call attempt can block until an error is responded
|
||||||
|
* @param timeToRecover the time the circuit breaker needs to cool-down on OPEN-STATE before going back to HALF_OPEN
|
||||||
|
* state
|
||||||
|
* @param memoizing whether the memoizing functionality is on or off */
|
||||||
|
CircuitBreakerSupplier(
|
||||||
final AsyncRunner asyncRunner,
|
final AsyncRunner asyncRunner,
|
||||||
final Supplier<T> supplierThatCanFailOrBlock,
|
final Supplier<T> supplierThatCanFailOrBlock,
|
||||||
final int maxFailingAttempts,
|
final int maxFailingAttempts,
|
||||||
final long maxBlockingTime,
|
final long maxBlockingTime,
|
||||||
final long timeToRecover) {
|
final long timeToRecover,
|
||||||
|
final boolean memoizing) {
|
||||||
|
|
||||||
this.asyncRunner = asyncRunner;
|
this.asyncRunner = asyncRunner;
|
||||||
this.supplierThatCanFailOrBlock = supplierThatCanFailOrBlock;
|
this.supplierThatCanFailOrBlock = supplierThatCanFailOrBlock;
|
||||||
this.maxFailingAttempts = maxFailingAttempts;
|
this.maxFailingAttempts = maxFailingAttempts;
|
||||||
this.maxBlockingTime = maxBlockingTime;
|
this.maxBlockingTime = maxBlockingTime;
|
||||||
this.timeToRecover = timeToRecover;
|
this.timeToRecover = timeToRecover;
|
||||||
|
this.memoizing = memoizing;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -104,14 +146,6 @@ public class MemoizingCircuitBreaker<T> implements Supplier<Result<T>> {
|
||||||
return this.state;
|
return this.state;
|
||||||
}
|
}
|
||||||
|
|
||||||
T getChached() {
|
|
||||||
if (this.cached == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
return this.cached.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
private Result<T> handleClosed(final long currentTime) {
|
private Result<T> handleClosed(final long currentTime) {
|
||||||
|
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
|
@ -140,7 +174,9 @@ public class MemoizingCircuitBreaker<T> implements Supplier<Result<T>> {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
this.lastSuccessTime = System.currentTimeMillis();
|
this.lastSuccessTime = System.currentTimeMillis();
|
||||||
|
if (this.memoizing) {
|
||||||
this.cached = result;
|
this.cached = result;
|
||||||
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -153,16 +189,11 @@ public class MemoizingCircuitBreaker<T> implements Supplier<Result<T>> {
|
||||||
|
|
||||||
final Result<T> result = attempt();
|
final Result<T> result = attempt();
|
||||||
if (result.hasError()) {
|
if (result.hasError()) {
|
||||||
final int fails = this.failingCount.incrementAndGet();
|
|
||||||
if (fails > this.maxFailingAttempts) {
|
|
||||||
|
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debug("Changing state from Half Open to Open and return cached value");
|
log.debug("Changing state from Half Open to Open and return cached value");
|
||||||
}
|
}
|
||||||
|
|
||||||
this.state = State.OPEN;
|
this.state = State.OPEN;
|
||||||
this.failingCount.set(0);
|
|
||||||
}
|
|
||||||
return getCached(result);
|
return getCached(result);
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
|
@ -184,19 +215,16 @@ public class MemoizingCircuitBreaker<T> implements Supplier<Result<T>> {
|
||||||
log.debug("Handle Open on: {}", currentTime);
|
log.debug("Handle Open on: {}", currentTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (currentTime - this.lastSuccessTime < this.timeToRecover) {
|
if (currentTime - this.lastSuccessTime >= this.timeToRecover) {
|
||||||
return getCached(this.notAvailable);
|
|
||||||
} else {
|
|
||||||
|
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debug("Time to recover reached. Changing state from Closed to Half Open and try agian");
|
log.debug("Time to recover reached. Changing state from Open to Half Open");
|
||||||
}
|
}
|
||||||
|
|
||||||
this.state = State.HALF_OPEN;
|
this.state = State.HALF_OPEN;
|
||||||
this.failingCount.set(0);
|
|
||||||
return get();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return getCached(this.notAvailable);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Result<T> attempt() {
|
private Result<T> attempt() {
|
||||||
|
@ -209,6 +237,9 @@ public class MemoizingCircuitBreaker<T> implements Supplier<Result<T>> {
|
||||||
}
|
}
|
||||||
|
|
||||||
private Result<T> getCached(final Result<T> error) {
|
private Result<T> getCached(final Result<T> error) {
|
||||||
|
if (!this.memoizing) {
|
||||||
|
return error;
|
||||||
|
}
|
||||||
if (this.cached != null) {
|
if (this.cached != null) {
|
||||||
return this.cached;
|
return this.cached;
|
||||||
} else {
|
} else {
|
||||||
|
@ -216,6 +247,14 @@ public class MemoizingCircuitBreaker<T> implements Supplier<Result<T>> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
T getChached() {
|
||||||
|
if (this.cached == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return this.cached.get();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "MemoizingCircuitBreaker [maxFailingAttempts=" + this.maxFailingAttempts + ", maxBlockingTime="
|
return "MemoizingCircuitBreaker [maxFailingAttempts=" + this.maxFailingAttempts + ", maxBlockingTime="
|
|
@ -40,7 +40,7 @@ import org.springframework.util.MultiValueMap;
|
||||||
|
|
||||||
import ch.ethz.seb.sebserver.gbl.api.APIMessage;
|
import ch.ethz.seb.sebserver.gbl.api.APIMessage;
|
||||||
import ch.ethz.seb.sebserver.gbl.async.AsyncService;
|
import ch.ethz.seb.sebserver.gbl.async.AsyncService;
|
||||||
import ch.ethz.seb.sebserver.gbl.async.MemoizingCircuitBreaker;
|
import ch.ethz.seb.sebserver.gbl.async.CircuitBreakerSupplier;
|
||||||
import ch.ethz.seb.sebserver.gbl.model.exam.QuizData;
|
import ch.ethz.seb.sebserver.gbl.model.exam.QuizData;
|
||||||
import ch.ethz.seb.sebserver.gbl.model.institution.LmsSetup;
|
import ch.ethz.seb.sebserver.gbl.model.institution.LmsSetup;
|
||||||
import ch.ethz.seb.sebserver.gbl.model.institution.LmsSetupTestResult;
|
import ch.ethz.seb.sebserver.gbl.model.institution.LmsSetupTestResult;
|
||||||
|
@ -69,7 +69,7 @@ final class OpenEdxLmsAPITemplate implements LmsAPITemplate {
|
||||||
private final Set<String> knownTokenAccessPaths;
|
private final Set<String> knownTokenAccessPaths;
|
||||||
|
|
||||||
private OAuth2RestTemplate restTemplate = null;
|
private OAuth2RestTemplate restTemplate = null;
|
||||||
private final MemoizingCircuitBreaker<List<QuizData>> allQuizzesSupplier;
|
private final CircuitBreakerSupplier<List<QuizData>> allQuizzesSupplier;
|
||||||
|
|
||||||
OpenEdxLmsAPITemplate(
|
OpenEdxLmsAPITemplate(
|
||||||
final AsyncService asyncService,
|
final AsyncService asyncService,
|
||||||
|
@ -89,7 +89,7 @@ final class OpenEdxLmsAPITemplate implements LmsAPITemplate {
|
||||||
this.knownTokenAccessPaths.addAll(Arrays.asList(alternativeTokenRequestPaths));
|
this.knownTokenAccessPaths.addAll(Arrays.asList(alternativeTokenRequestPaths));
|
||||||
}
|
}
|
||||||
|
|
||||||
this.allQuizzesSupplier = asyncService.createCircuitBreaker(allQuizzesSupplier(lmsSetup));
|
this.allQuizzesSupplier = asyncService.createMemoizingCircuitBreaker(allQuizzesSupplier(lmsSetup));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -22,7 +22,7 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.test.context.ContextConfiguration;
|
import org.springframework.test.context.ContextConfiguration;
|
||||||
import org.springframework.test.context.junit4.SpringRunner;
|
import org.springframework.test.context.junit4.SpringRunner;
|
||||||
|
|
||||||
import ch.ethz.seb.sebserver.gbl.async.MemoizingCircuitBreaker.State;
|
import ch.ethz.seb.sebserver.gbl.async.CircuitBreakerSupplier.State;
|
||||||
import ch.ethz.seb.sebserver.gbl.util.Result;
|
import ch.ethz.seb.sebserver.gbl.util.Result;
|
||||||
|
|
||||||
@RunWith(SpringRunner.class)
|
@RunWith(SpringRunner.class)
|
||||||
|
@ -41,8 +41,8 @@ public class MemoizingCircuitBreakerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void roundtrip1() throws InterruptedException {
|
public void roundtrip1() throws InterruptedException {
|
||||||
final MemoizingCircuitBreaker<String> circuitBreaker = this.asyncService.createCircuitBreaker(
|
final CircuitBreakerSupplier<String> circuitBreaker = this.asyncService.createCircuitBreaker(
|
||||||
tester(100, 5, 15), 3, 500, 1000);
|
tester(100, 5, 10), 3, 500, 1000, true);
|
||||||
|
|
||||||
assertNull(circuitBreaker.getChached());
|
assertNull(circuitBreaker.getChached());
|
||||||
|
|
||||||
|
@ -62,30 +62,22 @@ public class MemoizingCircuitBreakerTest {
|
||||||
assertEquals("Hello", circuitBreaker.getChached());
|
assertEquals("Hello", circuitBreaker.getChached());
|
||||||
assertEquals(State.CLOSED, circuitBreaker.getState());
|
assertEquals(State.CLOSED, circuitBreaker.getState());
|
||||||
|
|
||||||
result = circuitBreaker.get(); // 6. call... after the 5. call the tester is unavailable until the 15. call.. 3 try calls
|
result = circuitBreaker.get(); // 6. call... after the 5. call the tester is unavailable until the 10. call...
|
||||||
assertFalse(result.hasError());
|
assertFalse(result.hasError());
|
||||||
assertEquals("Hello", result.get());
|
assertEquals("Hello", result.get());
|
||||||
assertEquals("Hello", circuitBreaker.getChached());
|
assertEquals("Hello", circuitBreaker.getChached());
|
||||||
assertEquals(State.HALF_OPEN, circuitBreaker.getState());
|
assertEquals(State.HALF_OPEN, circuitBreaker.getState());
|
||||||
|
|
||||||
circuitBreaker.get(); // 9. call... 1. call in HalfOpen state
|
result = circuitBreaker.get(); // 9. call... after fail again, go to OPEN state
|
||||||
circuitBreaker.get(); // 10. call... 2. call in HalfOpen state
|
|
||||||
result = circuitBreaker.get(); // 11. call... 3. call in HalfOpen state.. still available in Half Open state
|
|
||||||
assertEquals(State.HALF_OPEN, circuitBreaker.getState());
|
|
||||||
|
|
||||||
result = circuitBreaker.get(); // 12. call... 4. is changing to Open state
|
|
||||||
assertEquals(State.OPEN, circuitBreaker.getState());
|
assertEquals(State.OPEN, circuitBreaker.getState());
|
||||||
|
|
||||||
// now the time to recover comes into play
|
// now the time to recover comes into play
|
||||||
Thread.sleep(1100);
|
Thread.sleep(1100);
|
||||||
result = circuitBreaker.get(); // 13. call... 1. call in Open state... after time to recover ended get back to Half Open
|
result = circuitBreaker.get(); // 10. call...
|
||||||
assertEquals(State.HALF_OPEN, circuitBreaker.getState());
|
|
||||||
assertEquals("Hello", result.get());
|
|
||||||
|
|
||||||
result = circuitBreaker.get(); // 14. call... 1. call in Half Open state...
|
|
||||||
assertEquals(State.HALF_OPEN, circuitBreaker.getState());
|
assertEquals(State.HALF_OPEN, circuitBreaker.getState());
|
||||||
assertEquals("Hello", result.get());
|
assertEquals("Hello", result.get());
|
||||||
|
|
||||||
|
// back again
|
||||||
result = circuitBreaker.get(); // 15. call... 2. call in Half Open state...
|
result = circuitBreaker.get(); // 15. call... 2. call in Half Open state...
|
||||||
assertEquals(State.CLOSED, circuitBreaker.getState());
|
assertEquals(State.CLOSED, circuitBreaker.getState());
|
||||||
assertEquals("Hello back again", result.get());
|
assertEquals("Hello back again", result.get());
|
||||||
|
@ -114,4 +106,6 @@ public class MemoizingCircuitBreakerTest {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO timeout test: test also the behavior on timeout, is the thread being interrupted and released or not (should!)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue