CircuitBreaker implementation and testing

This commit is contained in:
anhefti 2019-03-15 23:25:51 +01:00
parent 6e020b8e6f
commit c89d813dca
8 changed files with 526 additions and 74 deletions

View file

@ -0,0 +1,27 @@
/*
* Copyright (c) 2019 ETH Zürich, Educational Development and Technology (LET)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package ch.ethz.seb.sebserver.gbl.async;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Component;
@Component
@EnableAsync
public class AsyncRunner {
@Async(AsyncServiceSpringConfig.EXECUTOR_BEAN_NAME)
public <T> CompletableFuture<T> runAsync(final Supplier<T> supplier) {
return CompletableFuture.completedFuture(supplier.get());
}
}

View file

@ -0,0 +1,56 @@
/*
* Copyright (c) 2019 ETH Zürich, Educational Development and Technology (LET)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package ch.ethz.seb.sebserver.gbl.async;
import java.util.function.Supplier;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
@Lazy
@Service
public class AsyncService {
private final AsyncRunner asyncRunner;
protected AsyncService(final AsyncRunner asyncRunner) {
this.asyncRunner = asyncRunner;
}
public <T> MemoizingCircuitBreaker<T> createCircuitBreaker(final Supplier<T> blockingSupplier) {
return new MemoizingCircuitBreaker<>(this.asyncRunner, blockingSupplier);
}
public <T> MemoizingCircuitBreaker<T> createCircuitBreaker(
final Supplier<T> blockingSupplier,
final long maxBlockingTime) {
return new MemoizingCircuitBreaker<>(
this.asyncRunner,
blockingSupplier,
MemoizingCircuitBreaker.DEFAULT_MAX_FAILING_ATTEMPTS,
maxBlockingTime,
MemoizingCircuitBreaker.DEFAULT_TIME_TO_RECOVER);
}
public <T> MemoizingCircuitBreaker<T> createCircuitBreaker(
final Supplier<T> blockingSupplier,
final int maxFailingAttempts,
final long maxBlockingTime,
final long timeToRecover) {
return new MemoizingCircuitBreaker<>(
this.asyncRunner,
blockingSupplier,
maxFailingAttempts,
maxBlockingTime,
timeToRecover);
}
}

View file

@ -0,0 +1,33 @@
/*
* Copyright (c) 2019 ETH Zürich, Educational Development and Technology (LET)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package ch.ethz.seb.sebserver.gbl.async;
import java.util.concurrent.Executor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class AsyncServiceSpringConfig {
public static final String EXECUTOR_BEAN_NAME = "AsyncServiceExecutorBean";
@Bean(name = EXECUTOR_BEAN_NAME)
public Executor threadPoolTaskExecutor() {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(7);
executor.setMaxPoolSize(42);
executor.setQueueCapacity(11);
executor.setThreadNamePrefix("asyncService-");
executor.initialize();
return executor;
}
}

View file

@ -0,0 +1,227 @@
/*
* Copyright (c) 2019 ETH Zürich, Educational Development and Technology (LET)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package ch.ethz.seb.sebserver.gbl.async;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ch.ethz.seb.sebserver.gbl.Constants;
import ch.ethz.seb.sebserver.gbl.util.Result;
/** A circuit breaker with three states (CLOSED, HALF_OPEN, OPEN) and memoizing.
*
* // TODO more docu:
*
* @param <T> */
public class MemoizingCircuitBreaker<T> implements Supplier<Result<T>> {
private static final Logger log = LoggerFactory.getLogger(MemoizingCircuitBreaker.class);
public static final int DEFAULT_MAX_FAILING_ATTEMPTS = 5;
public static final long DEFAULT_MAX_BLOCKING_TIME = Constants.MINUTE_IN_MILLIS;
public static final long DEFAULT_TIME_TO_RECOVER = Constants.MINUTE_IN_MILLIS * 10;
public enum State {
CLOSED,
HALF_OPEN,
OPEN
}
private final AsyncRunner asyncRunner;
private final Supplier<T> supplierThatCanFailOrBlock;
private final int maxFailingAttempts;
private final long maxBlockingTime;
private final long timeToRecover;
private State state = State.CLOSED;
private final AtomicInteger failingCount = new AtomicInteger(0);
private long lastSuccessTime;
private final Result<T> notAvailable = Result.ofRuntimeError("No chached resource available");
private Result<T> cached = null;
MemoizingCircuitBreaker(
final AsyncRunner asyncRunner,
final Supplier<T> supplierThatCanFailOrBlock) {
this(
asyncRunner,
supplierThatCanFailOrBlock,
DEFAULT_MAX_FAILING_ATTEMPTS,
DEFAULT_MAX_BLOCKING_TIME,
DEFAULT_TIME_TO_RECOVER);
}
MemoizingCircuitBreaker(
final AsyncRunner asyncRunner,
final Supplier<T> supplierThatCanFailOrBlock,
final int maxFailingAttempts,
final long maxBlockingTime,
final long timeToRecover) {
this.asyncRunner = asyncRunner;
this.supplierThatCanFailOrBlock = supplierThatCanFailOrBlock;
this.maxFailingAttempts = maxFailingAttempts;
this.maxBlockingTime = maxBlockingTime;
this.timeToRecover = timeToRecover;
}
@Override
public Result<T> get() {
final long currentTime = System.currentTimeMillis();
if (log.isDebugEnabled()) {
log.debug("Called on: {} current state is: {} failing count: {}",
currentTime,
this.state,
this.failingCount);
}
switch (this.state) {
case CLOSED:
return handleClosed(currentTime);
case HALF_OPEN:
return handleHalfOpen(currentTime);
case OPEN:
return handelOpen(currentTime);
default:
throw new IllegalStateException();
}
}
public State getState() {
return this.state;
}
T getChached() {
if (this.cached == null) {
return null;
}
return this.cached.get();
}
private Result<T> handleClosed(final long currentTime) {
if (log.isDebugEnabled()) {
log.debug("Handle Closed on: {}", currentTime);
}
final Result<T> result = attempt();
if (result.hasError()) {
if (log.isDebugEnabled()) {
log.debug("Attempt failed. failing count: {}", this.failingCount);
}
final int failing = this.failingCount.incrementAndGet();
if (failing > this.maxFailingAttempts) {
if (log.isDebugEnabled()) {
log.debug("Changing state from Open to Half Open and return cached value");
}
this.state = State.HALF_OPEN;
this.failingCount.set(0);
return getCached(result);
} else {
return get();
}
} else {
this.lastSuccessTime = System.currentTimeMillis();
this.cached = result;
return result;
}
}
private Result<T> handleHalfOpen(final long currentTime) {
if (log.isDebugEnabled()) {
log.debug("Handle Half Open on: {}", currentTime);
}
final Result<T> result = attempt();
if (result.hasError()) {
final int fails = this.failingCount.incrementAndGet();
if (fails > this.maxFailingAttempts) {
if (log.isDebugEnabled()) {
log.debug("Changing state from Half Open to Open and return cached value");
}
this.state = State.OPEN;
this.failingCount.set(0);
}
return getCached(result);
} else {
if (log.isDebugEnabled()) {
log.debug("Changing state from Half Open to Closed and return value");
}
this.state = State.CLOSED;
this.failingCount.set(0);
return result;
}
}
/** As long as time to recover is not reached, return from cache
* If time to recover is reached go to half open state and try again */
private Result<T> handelOpen(final long currentTime) {
if (log.isDebugEnabled()) {
log.debug("Handle Open on: {}", currentTime);
}
if (currentTime - this.lastSuccessTime < this.timeToRecover) {
return getCached(this.notAvailable);
} else {
if (log.isDebugEnabled()) {
log.debug("Time to recover reached. Changing state from Closed to Half Open and try agian");
}
this.state = State.HALF_OPEN;
this.failingCount.set(0);
return get();
}
}
private Result<T> attempt() {
try {
return Result.of(this.asyncRunner.runAsync(this.supplierThatCanFailOrBlock)
.get(this.maxBlockingTime, TimeUnit.MILLISECONDS));
} catch (final Exception e) {
return Result.ofError(e);
}
}
private Result<T> getCached(final Result<T> error) {
if (this.cached != null) {
return this.cached;
} else {
return error;
}
}
@Override
public String toString() {
return "MemoizingCircuitBreaker [maxFailingAttempts=" + this.maxFailingAttempts + ", maxBlockingTime="
+ this.maxBlockingTime + ", timeToRecover=" + this.timeToRecover + ", state=" + this.state
+ ", failingCount="
+ this.failingCount + ", lastSuccessTime=" + this.lastSuccessTime + ", cached=" + this.cached + "]";
}
}

View file

@ -1,42 +0,0 @@
/*
* Copyright (c) 2019 ETH Zürich, Educational Development and Technology (LET)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package ch.ethz.seb.sebserver.gbl.util;
import java.util.function.Supplier;
public class SupplierWithCircuitBreaker<T> implements Supplier<Result<T>> {
private final Supplier<T> supplierThatCanFailOrBlock;
private final int maxFailingAttempts;
private final long maxBlockingTime;
private final T cached = null;
public SupplierWithCircuitBreaker(
final Supplier<T> supplierThatCanFailOrBlock,
final int maxFailingAttempts,
final long maxBlockingTime) {
this.supplierThatCanFailOrBlock = supplierThatCanFailOrBlock;
this.maxFailingAttempts = maxFailingAttempts;
this.maxBlockingTime = maxBlockingTime;
}
@Override
public Result<T> get() {
// TODO start an async task that calls the supplierThatCanFailOrBlock and returns a Future
// try to get the result periodically until maxBlockingTime
// if the supplier returns error, try for maxFailingAttempts
// if success cache and return the result
// if failed return the cached values
return Result.tryCatch(() -> this.supplierThatCanFailOrBlock.get());
}
}

View file

@ -24,6 +24,7 @@ import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.stereotype.Service;
import ch.ethz.seb.sebserver.gbl.Constants;
import ch.ethz.seb.sebserver.gbl.async.AsyncService;
import ch.ethz.seb.sebserver.gbl.model.Page;
import ch.ethz.seb.sebserver.gbl.model.exam.QuizData;
import ch.ethz.seb.sebserver.gbl.model.institution.LmsSetup;
@ -44,6 +45,7 @@ public class LmsAPIServiceImpl implements LmsAPIService {
private static final Logger log = LoggerFactory.getLogger(LmsAPIServiceImpl.class);
private final AsyncService asyncService;
private final LmsSetupDAO lmsSetupDAO;
private final ClientCredentialService clientCredentialService;
private final ClientHttpRequestFactory clientHttpRequestFactory;
@ -52,11 +54,13 @@ public class LmsAPIServiceImpl implements LmsAPIService {
private final Map<CacheKey, LmsAPITemplate> cache = new ConcurrentHashMap<>();
public LmsAPIServiceImpl(
final AsyncService asyncService,
final LmsSetupDAO lmsSetupDAO,
final ClientCredentialService clientCredentialService,
final ClientHttpRequestFactory clientHttpRequestFactory,
@Value("${sebserver.lms.openedix.api.token.request.paths}") final String alternativeTokenRequestPaths) {
this.asyncService = asyncService;
this.lmsSetupDAO = lmsSetupDAO;
this.clientCredentialService = clientCredentialService;
this.clientHttpRequestFactory = clientHttpRequestFactory;
@ -102,6 +106,7 @@ public class LmsAPIServiceImpl implements LmsAPIService {
private Result<List<QuizData>> getAllQuizzesFromLMSSetups(final FilterMap filterMap) {
return Result.tryCatch(() -> {
// case 1. if lmsSetupId is available only get quizzes from specified LmsSetup
final Long lmsSetupId = filterMap.getLmsSetupId();
if (lmsSetupId != null) {
return getLmsAPITemplate(lmsSetupId)
@ -110,6 +115,7 @@ public class LmsAPIServiceImpl implements LmsAPIService {
.getOrThrow();
}
// case 2. get quizzes from all LmsSetups of specified institution
final Long institutionId = filterMap.getInstitutionId();
if (institutionId == null) {
throw new IllegalAPIArgumentException("Missing institution identifier");
@ -182,6 +188,7 @@ public class LmsAPIServiceImpl implements LmsAPIService {
this.clientCredentialService);
case OPEN_EDX:
return new OpenEdxLmsAPITemplate(
this.asyncService,
lmsSetup,
credentials,
this.clientCredentialService,

View file

@ -15,6 +15,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@ -38,11 +39,12 @@ import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import ch.ethz.seb.sebserver.gbl.api.APIMessage;
import ch.ethz.seb.sebserver.gbl.async.AsyncService;
import ch.ethz.seb.sebserver.gbl.async.MemoizingCircuitBreaker;
import ch.ethz.seb.sebserver.gbl.model.exam.QuizData;
import ch.ethz.seb.sebserver.gbl.model.institution.LmsSetup;
import ch.ethz.seb.sebserver.gbl.model.institution.LmsSetupTestResult;
import ch.ethz.seb.sebserver.gbl.util.Result;
import ch.ethz.seb.sebserver.gbl.util.SupplierWithCircuitBreaker;
import ch.ethz.seb.sebserver.webservice.servicelayer.client.ClientCredentialService;
import ch.ethz.seb.sebserver.webservice.servicelayer.client.ClientCredentials;
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.FilterMap;
@ -67,9 +69,10 @@ final class OpenEdxLmsAPITemplate implements LmsAPITemplate {
private final Set<String> knownTokenAccessPaths;
private OAuth2RestTemplate restTemplate = null;
private SupplierWithCircuitBreaker<List<QuizData>> allQuizzesSupplier = null;
private final MemoizingCircuitBreaker<List<QuizData>> allQuizzesSupplier;
OpenEdxLmsAPITemplate(
final AsyncService asyncService,
final LmsSetup lmsSetup,
final ClientCredentials credentials,
final ClientCredentialService clientCredentialService,
@ -85,6 +88,8 @@ final class OpenEdxLmsAPITemplate implements LmsAPITemplate {
if (alternativeTokenRequestPaths != null) {
this.knownTokenAccessPaths.addAll(Arrays.asList(alternativeTokenRequestPaths));
}
this.allQuizzesSupplier = asyncService.createCircuitBreaker(allQuizzesSupplier(lmsSetup));
}
@Override
@ -115,20 +120,10 @@ final class OpenEdxLmsAPITemplate implements LmsAPITemplate {
@Override
public Result<List<QuizData>> getQuizzes(final FilterMap filterMap) {
return this.initRestTemplateAndRequestAccessToken()
.flatMap(this::getAllQuizes)
return this.allQuizzesSupplier.get()
.map(LmsAPIService.quizzesFilterFunction(filterMap));
}
public ResponseEntity<EdXPage> getEdxPage(final String pageURI) {
final HttpHeaders httpHeaders = new HttpHeaders();
return this.restTemplate.exchange(
pageURI,
HttpMethod.GET,
new HttpEntity<>(httpHeaders),
EdXPage.class);
}
@Override
public Collection<Result<QuizData>> getQuizzes(final Set<String> ids) {
// TODO Auto-generated method stub
@ -197,26 +192,49 @@ final class OpenEdxLmsAPITemplate implements LmsAPITemplate {
return template;
}
private Result<List<QuizData>> getAllQuizes(final LmsSetup lmsSetup) {
if (this.allQuizzesSupplier == null) {
this.allQuizzesSupplier = new SupplierWithCircuitBreaker<>(
() -> collectAllCourses(lmsSetup.lmsApiUrl + OPEN_EDX_DEFAULT_COURSE_ENDPOINT)
.stream()
.reduce(
new ArrayList<QuizData>(),
(list, courseData) -> {
list.add(quizDataOf(lmsSetup, courseData));
return list;
},
(list1, list2) -> {
list1.addAll(list2);
return list1;
}),
5, 1000L); // TODO specify better CircuitBreaker params
}
// private Result<List<QuizData>> getAllQuizes(final LmsSetup lmsSetup) {
// if (this.allQuizzesSupplier == null) {
// this.allQuizzesSupplier = new CircuitBreaker<>(
// () -> collectAllCourses(lmsSetup.lmsApiUrl + OPEN_EDX_DEFAULT_COURSE_ENDPOINT)
// .stream()
// .reduce(
// new ArrayList<QuizData>(),
// (list, courseData) -> {
// list.add(quizDataOf(lmsSetup, courseData));
// return list;
// },
// (list1, list2) -> {
// list1.addAll(list2);
// return list1;
// }),
// 5, 1000L); // TODO specify better CircuitBreaker params
// }
//
// return this.allQuizzesSupplier.get();
//
// }
return this.allQuizzesSupplier.get();
private Supplier<List<QuizData>> allQuizzesSupplier(final LmsSetup lmsSetup) {
return () -> {
return initRestTemplateAndRequestAccessToken()
.map(this::collectAllQuizzes)
.getOrThrow();
};
}
private ArrayList<QuizData> collectAllQuizzes(final LmsSetup lmsSetup) {
return collectAllCourses(lmsSetup.lmsApiUrl + OPEN_EDX_DEFAULT_COURSE_ENDPOINT)
.stream()
.reduce(
new ArrayList<QuizData>(),
(list, courseData) -> {
list.add(quizDataOf(lmsSetup, courseData));
return list;
},
(list1, list2) -> {
list1.addAll(list2);
return list1;
});
}
private List<CourseData> collectAllCourses(final String pageURI) {
@ -233,7 +251,16 @@ final class OpenEdxLmsAPITemplate implements LmsAPITemplate {
return collector;
}
private QuizData quizDataOf(
private ResponseEntity<EdXPage> getEdxPage(final String pageURI) {
final HttpHeaders httpHeaders = new HttpHeaders();
return this.restTemplate.exchange(
pageURI,
HttpMethod.GET,
new HttpEntity<>(httpHeaders),
EdXPage.class);
}
private static QuizData quizDataOf(
final LmsSetup lmsSetup,
final CourseData courseData) {

View file

@ -0,0 +1,117 @@
/*
* Copyright (c) 2019 ETH Zürich, Educational Development and Technology (LET)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package ch.ethz.seb.sebserver.gbl.async;
import static org.junit.Assert.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import ch.ethz.seb.sebserver.gbl.async.MemoizingCircuitBreaker.State;
import ch.ethz.seb.sebserver.gbl.util.Result;
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = { AsyncServiceSpringConfig.class, AsyncRunner.class, AsyncService.class })
public class MemoizingCircuitBreakerTest {
private static final Logger log = LoggerFactory.getLogger(MemoizingCircuitBreakerTest.class);
@Autowired
AsyncService asyncService;
@Test
public void testInit() {
assertNotNull(this.asyncService);
}
@Test
public void roundtrip1() throws InterruptedException {
final MemoizingCircuitBreaker<String> circuitBreaker = this.asyncService.createCircuitBreaker(
tester(100, 5, 15), 3, 500, 1000);
assertNull(circuitBreaker.getChached());
Result<String> result = circuitBreaker.get(); // 1. call...
assertFalse(result.hasError());
assertEquals("Hello", result.get());
assertEquals("Hello", circuitBreaker.getChached());
assertEquals(State.CLOSED, circuitBreaker.getState());
circuitBreaker.get(); // 2. call...
circuitBreaker.get(); // 3. call...
circuitBreaker.get(); // 4. call...
result = circuitBreaker.get(); // 5. call... still available
assertFalse(result.hasError());
assertEquals("Hello", result.get());
assertEquals("Hello", circuitBreaker.getChached());
assertEquals(State.CLOSED, circuitBreaker.getState());
result = circuitBreaker.get(); // 6. call... after the 5. call the tester is unavailable until the 15. call.. 3 try calls
assertFalse(result.hasError());
assertEquals("Hello", result.get());
assertEquals("Hello", circuitBreaker.getChached());
assertEquals(State.HALF_OPEN, circuitBreaker.getState());
circuitBreaker.get(); // 9. call... 1. call in HalfOpen state
circuitBreaker.get(); // 10. call... 2. call in HalfOpen state
result = circuitBreaker.get(); // 11. call... 3. call in HalfOpen state.. still available in Half Open state
assertEquals(State.HALF_OPEN, circuitBreaker.getState());
result = circuitBreaker.get(); // 12. call... 4. is changing to Open state
assertEquals(State.OPEN, circuitBreaker.getState());
// now the time to recover comes into play
Thread.sleep(1100);
result = circuitBreaker.get(); // 13. call... 1. call in Open state... after time to recover ended get back to Half Open
assertEquals(State.HALF_OPEN, circuitBreaker.getState());
assertEquals("Hello", result.get());
result = circuitBreaker.get(); // 14. call... 1. call in Half Open state...
assertEquals(State.HALF_OPEN, circuitBreaker.getState());
assertEquals("Hello", result.get());
result = circuitBreaker.get(); // 15. call... 2. call in Half Open state...
assertEquals(State.CLOSED, circuitBreaker.getState());
assertEquals("Hello back again", result.get());
}
private Supplier<String> tester(final long delay, final int unavailableAfter, final int unavailableUntil) {
final AtomicInteger count = new AtomicInteger(0);
final AtomicBoolean wasUnavailable = new AtomicBoolean(false);
return () -> {
final int attempts = count.getAndIncrement();
log.info("tester answers {} {}", attempts, Thread.currentThread());
try {
Thread.sleep(delay);
} catch (final InterruptedException e) {
e.printStackTrace();
}
if (attempts >= unavailableAfter && attempts < unavailableUntil) {
wasUnavailable.set(true);
throw new RuntimeException("Error");
}
return (wasUnavailable.get()) ? "Hello back again" : "Hello";
};
}
}