SEBSERV-60 implemented

This commit is contained in:
anhefti 2019-09-02 12:56:32 +02:00
parent cf948d5827
commit 7a8cdf0ee3
7 changed files with 119 additions and 44 deletions

View file

@ -40,18 +40,23 @@ public class AsyncService {
timeToRecover); timeToRecover);
} }
public <T> MemoizingCircuitBreaker<T> createMemoizingCircuitBreaker( // public <T> MemoizingCircuitBreaker<T> createMemoizingCircuitBreaker(
final Supplier<T> blockingSupplier) { // final Supplier<T> blockingSupplier) {
//
return new MemoizingCircuitBreaker<>(this.asyncRunner, blockingSupplier, true); // return new MemoizingCircuitBreaker<>(
} // this.asyncRunner,
// blockingSupplier,
// true,
// Constants.HOUR_IN_MILLIS);
// }
public <T> MemoizingCircuitBreaker<T> createMemoizingCircuitBreaker( public <T> MemoizingCircuitBreaker<T> createMemoizingCircuitBreaker(
final Supplier<T> blockingSupplier, final Supplier<T> blockingSupplier,
final int maxFailingAttempts, final int maxFailingAttempts,
final long maxBlockingTime, final long maxBlockingTime,
final long timeToRecover, final long timeToRecover,
final boolean momoized) { final boolean momoized,
final long maxMemoizingTime) {
return new MemoizingCircuitBreaker<>( return new MemoizingCircuitBreaker<>(
this.asyncRunner, this.asyncRunner,
@ -59,7 +64,8 @@ public class AsyncService {
maxFailingAttempts, maxFailingAttempts,
maxBlockingTime, maxBlockingTime,
timeToRecover, timeToRecover,
momoized); momoized,
maxMemoizingTime);
} }
} }

View file

@ -102,6 +102,26 @@ public final class CircuitBreaker<T> {
this.timeToRecover = timeToRecover; this.timeToRecover = timeToRecover;
} }
public int getMaxFailingAttempts() {
return this.maxFailingAttempts;
}
public long getMaxBlockingTime() {
return this.maxBlockingTime;
}
public long getTimeToRecover() {
return this.timeToRecover;
}
public AtomicInteger getFailingCount() {
return this.failingCount;
}
public long getLastSuccessTime() {
return this.lastSuccessTime;
}
public Result<T> protectedRun(final Supplier<T> supplier) { public Result<T> protectedRun(final Supplier<T> supplier) {
final long currentTime = System.currentTimeMillis(); final long currentTime = System.currentTimeMillis();

View file

@ -54,21 +54,42 @@ public final class MemoizingCircuitBreaker<T> implements Supplier<Result<T>> {
private final Supplier<T> supplier; private final Supplier<T> supplier;
private final boolean memoizing; private final boolean memoizing;
private final long maxMemoizingTime;
private long lastMemoizingTime = 0;
private Result<T> cached = null; private Result<T> cached = null;
/** Create new CircuitBreakerSupplier. /** Create new CircuitBreakerSupplier.
* *
* @param asyncRunner the AsyncRunner used to create asynchronous calls on the given supplier function * @param asyncRunner the AsyncRunner used to create asynchronous calls on the given supplier function
* @param supplier The Supplier function that can fail or block for a long time * @param supplier The Supplier function that can fail or block for a long time
* @param memoizing whether the memoizing functionality is on or off */ * @param memoizing whether the memoizing functionality is on or off
* @param maxMemoizingTime the maximal time memorized data is valid */
MemoizingCircuitBreaker( MemoizingCircuitBreaker(
final AsyncRunner asyncRunner, final AsyncRunner asyncRunner,
final Supplier<T> supplier, final Supplier<T> supplier,
final boolean memoizing) { final boolean memoizing,
final long maxMemoizingTime) {
this.delegate = new CircuitBreaker<>(asyncRunner); this.delegate = new CircuitBreaker<>(asyncRunner);
this.supplier = supplier; this.supplier = supplier;
this.memoizing = memoizing; this.memoizing = memoizing;
this.maxMemoizingTime = maxMemoizingTime;
}
public CircuitBreaker<T> getDelegate() {
return this.delegate;
}
public boolean isMemoizing() {
return this.memoizing;
}
public long getMaxMemoizingTime() {
return this.maxMemoizingTime;
}
public long getLastMemoizingTime() {
return this.lastMemoizingTime;
} }
/** Create new CircuitBreakerSupplier. /** Create new CircuitBreakerSupplier.
@ -79,14 +100,16 @@ public final class MemoizingCircuitBreaker<T> implements Supplier<Result<T>> {
* @param maxBlockingTime the maximal time that an call attempt can block until an error is responded * @param maxBlockingTime the maximal time that an call attempt can block until an error is responded
* @param timeToRecover the time the circuit breaker needs to cool-down on OPEN-STATE before going back to HALF_OPEN * @param timeToRecover the time the circuit breaker needs to cool-down on OPEN-STATE before going back to HALF_OPEN
* state * state
* @param memoizing whether the memoizing functionality is on or off */ * @param memoizing whether the memoizing functionality is on or off
* @param maxMemoizingTime the maximal time memorized data is valid */
MemoizingCircuitBreaker( MemoizingCircuitBreaker(
final AsyncRunner asyncRunner, final AsyncRunner asyncRunner,
final Supplier<T> supplier, final Supplier<T> supplier,
final int maxFailingAttempts, final int maxFailingAttempts,
final long maxBlockingTime, final long maxBlockingTime,
final long timeToRecover, final long timeToRecover,
final boolean memoizing) { final boolean memoizing,
final long maxMemoizingTime) {
this.delegate = new CircuitBreaker<>( this.delegate = new CircuitBreaker<>(
asyncRunner, asyncRunner,
@ -95,6 +118,7 @@ public final class MemoizingCircuitBreaker<T> implements Supplier<Result<T>> {
timeToRecover); timeToRecover);
this.supplier = supplier; this.supplier = supplier;
this.memoizing = memoizing; this.memoizing = memoizing;
this.maxMemoizingTime = maxMemoizingTime;
} }
@Override @Override
@ -102,10 +126,15 @@ public final class MemoizingCircuitBreaker<T> implements Supplier<Result<T>> {
final Result<T> result = this.delegate.protectedRun(this.supplier); final Result<T> result = this.delegate.protectedRun(this.supplier);
if (result.hasError()) { if (result.hasError()) {
if (this.memoizing && this.cached != null) { if (this.memoizing && this.cached != null) {
if (log.isDebugEnabled()) { final long currentTimeMillis = System.currentTimeMillis();
log.debug("Return cached at: {}", System.currentTimeMillis()); if (currentTimeMillis - this.lastMemoizingTime > this.maxMemoizingTime) {
if (log.isDebugEnabled()) {
log.debug("Max memoizing time reached. Return original error");
}
return result;
} }
log.warn("Return cached at: {}", System.currentTimeMillis());
return this.cached; return this.cached;
} }
@ -117,6 +146,7 @@ public final class MemoizingCircuitBreaker<T> implements Supplier<Result<T>> {
} }
this.cached = result; this.cached = result;
this.lastMemoizingTime = System.currentTimeMillis();
} }
return result; return result;
} }

View file

@ -46,6 +46,7 @@ import org.springframework.security.oauth2.common.OAuth2AccessToken;
import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap; import org.springframework.util.MultiValueMap;
import ch.ethz.seb.sebserver.gbl.Constants;
import ch.ethz.seb.sebserver.gbl.api.APIMessage; import ch.ethz.seb.sebserver.gbl.api.APIMessage;
import ch.ethz.seb.sebserver.gbl.async.AsyncService; import ch.ethz.seb.sebserver.gbl.async.AsyncService;
import ch.ethz.seb.sebserver.gbl.async.MemoizingCircuitBreaker; import ch.ethz.seb.sebserver.gbl.async.MemoizingCircuitBreaker;
@ -97,7 +98,13 @@ final class OpenEdxLmsAPITemplate implements LmsAPITemplate {
this.knownTokenAccessPaths.addAll(Arrays.asList(alternativeTokenRequestPaths)); this.knownTokenAccessPaths.addAll(Arrays.asList(alternativeTokenRequestPaths));
} }
this.allQuizzesSupplier = asyncService.createMemoizingCircuitBreaker(allQuizzesSupplier()); this.allQuizzesSupplier = asyncService.createMemoizingCircuitBreaker(
allQuizzesSupplier(),
3,
Constants.MINUTE_IN_MILLIS,
Constants.MINUTE_IN_MILLIS,
true,
Constants.HOUR_IN_MILLIS);
} }
@Override @Override

View file

@ -42,7 +42,7 @@ public class MemoizingCircuitBreakerTest {
@Test @Test
public void roundtrip1() throws InterruptedException { public void roundtrip1() throws InterruptedException {
final MemoizingCircuitBreaker<String> circuitBreaker = this.asyncService.createMemoizingCircuitBreaker( final MemoizingCircuitBreaker<String> circuitBreaker = this.asyncService.createMemoizingCircuitBreaker(
tester(100, 5, 10), 3, 500, 1000, true); tester(100, 5, 10), 3, 500, 1000, true, 1000);
assertNull(circuitBreaker.getChached()); assertNull(circuitBreaker.getChached());
@ -84,13 +84,39 @@ public class MemoizingCircuitBreakerTest {
} }
@Test
public void failing() throws InterruptedException {
final MemoizingCircuitBreaker<String> circuitBreaker = this.asyncService.createMemoizingCircuitBreaker(
tester(50, 1, 10), 3, 100, 100, true, 1000);
assertNull(circuitBreaker.getChached());
// fist call okay.. for memoizing
Result<String> result = circuitBreaker.get(); // 1. call...
assertFalse(result.hasError());
assertEquals("Hello", result.get());
assertTrue(circuitBreaker.getLastMemoizingTime() > 0);
// second call is okay from memoizing
Thread.sleep(100);
result = circuitBreaker.get(); // 2. call...
assertFalse(result.hasError());
assertEquals("Hello", result.get());
assertTrue(circuitBreaker.getLastMemoizingTime() > 0);
// third call is failing because of memoizing timeout
Thread.sleep(1000);
result = circuitBreaker.get(); // 3. call...
assertTrue(result.hasError());
}
private Supplier<String> tester(final long delay, final int unavailableAfter, final int unavailableUntil) { private Supplier<String> tester(final long delay, final int unavailableAfter, final int unavailableUntil) {
final AtomicInteger count = new AtomicInteger(0); final AtomicInteger count = new AtomicInteger(0);
final AtomicBoolean wasUnavailable = new AtomicBoolean(false); final AtomicBoolean wasUnavailable = new AtomicBoolean(false);
return () -> { return () -> {
final int attempts = count.getAndIncrement(); final int attempts = count.getAndIncrement();
log.info("tester answers {} {}", attempts, Thread.currentThread()); log.debug("tester answers {} {}", attempts, Thread.currentThread());
try { try {
Thread.sleep(delay); Thread.sleep(delay);
@ -107,6 +133,4 @@ public class MemoizingCircuitBreakerTest {
}; };
} }
// TODO timeout test: test also the behavior on timeout, is the thread being interrupted and released or not (should!)
} }

View file

@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false" scan="false">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} %-5level [%thread]:[%logger] %msg%n</pattern>
</encoder>
</appender>
<Logger name="org.eth.demo.sebserver" level="TRACE" additivity="true" />
<root level="INFO" additivity="true">
<appender-ref ref="STDOUT" />
</root>
</configuration>

View file

@ -1,26 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration debug="false" scan="false">
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} %-5level [%thread]:[%logger] %msg%n</pattern>
</encoder>
</appender>
<Logger name="org.eth.demo.sebserver" level="INFO" additivity="true" />
<Logger name="org.mybatis.generator" level="INFO" additivity="true" />
<Logger name="org.eth.demo.sebserver.batis" level="INFO" additivity="true" />
<Logger name="org.springframework.security" level="INFO" additivity="true" />
<Logger name="org.springframework.security.oauth2" level="INFO" additivity="true" />
<Logger name="org.springframework.web.socket.messaging" level="INFO" additivity="true" />
<Logger name="org.springframework.messaging" level="INFO" additivity="true" />
<Logger name="org.springframework" level="INFO" additivity="true" />
<Logger name="org.springframework.web" level="INFO" additivity="true" />
<Logger name="org.eth.demo.sebserver.batis.gen.mapper.ExamRecordMapper" level="INFO" additivity="true" />
<root level="INFO" additivity="true">
<appender-ref ref="STDOUT" />
</root>
</configuration>