fixes and fine-tuning for distributed setup

This commit is contained in:
anhefti 2021-11-18 08:21:03 +01:00
parent 66475b84d9
commit 3e35a7745b
14 changed files with 64 additions and 33 deletions

View file

@ -86,8 +86,15 @@ public class WebserviceInit implements ApplicationListener<ApplicationReadyEvent
SEBServerInit.INIT_LOGGER.info("---->");
SEBServerInit.INIT_LOGGER.info("----> *** Info:");
SEBServerInit.INIT_LOGGER.info("----> JDBC connection pool max size: {}",
this.environment.getProperty("spring.datasource.hikari.maximumPoolSize"));
if (this.webserviceInfo.isDistributed()) {
SEBServerInit.INIT_LOGGER.info("----> Distributed Setup: {}", this.webserviceInfo.getWebserviceUUID());
SEBServerInit.INIT_LOGGER.info("------> Ping update time: {}",
this.environment.getProperty("sebserver.webservice.distributed.pingUpdate"));
SEBServerInit.INIT_LOGGER.info("------> Connection update time: {}",
this.environment.getProperty("sebserver.webservice.distributed.connectionUpdate"));
}
try {

View file

@ -40,7 +40,7 @@ public class WebserviceInfoDAOImpl implements WebserviceInfoDAO {
public WebserviceInfoDAOImpl(
final WebserviceServerInfoRecordMapper webserviceServerInfoRecordMapper,
@Value("${sebserver.webservice.forceMaster:false}") final boolean forceMaster,
@Value("${sebserver.webservice.master.delay.threshold:10000}") final long masterDelayTimeThreshold) {
@Value("${sebserver.webservice.master.delay.threshold:30000}") final long masterDelayTimeThreshold) {
this.webserviceServerInfoRecordMapper = webserviceServerInfoRecordMapper;
this.masterDelayTimeThreshold = masterDelayTimeThreshold;

View file

@ -61,6 +61,7 @@ public class ExamSessionServiceImpl implements ExamSessionService {
private final CacheManager cacheManager;
private final SEBRestrictionService sebRestrictionService;
private final boolean distributedSetup;
private final long distributedConnectionUpdate;
private long lastConnectionTokenCacheUpdate = 0;
@ -72,7 +73,8 @@ public class ExamSessionServiceImpl implements ExamSessionService {
final IndicatorDAO indicatorDAO,
final CacheManager cacheManager,
final SEBRestrictionService sebRestrictionService,
@Value("${sebserver.webservice.distributed:false}") final boolean distributedSetup) {
@Value("${sebserver.webservice.distributed:false}") final boolean distributedSetup,
@Value("${sebserver.webservice.distributed.connectionUpdate:2000}") final long distributedConnectionUpdate) {
this.examSessionCacheService = examSessionCacheService;
this.examDAO = examDAO;
@ -82,6 +84,7 @@ public class ExamSessionServiceImpl implements ExamSessionService {
this.indicatorDAO = indicatorDAO;
this.sebRestrictionService = sebRestrictionService;
this.distributedSetup = distributedSetup;
this.distributedConnectionUpdate = distributedConnectionUpdate;
}
@Override
@ -114,9 +117,11 @@ public class ExamSessionServiceImpl implements ExamSessionService {
return Result.tryCatch(() -> {
final Collection<APIMessage> result = new ArrayList<>();
final Exam exam = this.examDAO
.byPK(examId)
.getOrThrow();
final Exam exam = (this.isExamRunning(examId))
? this.examSessionCacheService.getRunningExam(examId)
: this.examDAO
.byPK(examId)
.getOrThrow();
// check lms connection
if (exam.status == ExamStatus.CORRUPT_NO_LMS_CONNECTION) {
@ -193,7 +198,7 @@ public class ExamSessionServiceImpl implements ExamSessionService {
}
@Override
public synchronized Result<Exam> getRunningExam(final Long examId) {
public /* synchronized */ Result<Exam> getRunningExam(final Long examId) {
if (log.isTraceEnabled()) {
log.trace("Running exam request for exam {}", examId);
}
@ -361,6 +366,7 @@ public class ExamSessionServiceImpl implements ExamSessionService {
@Override
public Result<Exam> updateExamCache(final Long examId) {
final Exam exam = this.examSessionCacheService.getRunningExam(examId);
if (exam == null) {
return Result.ofEmpty();
@ -398,10 +404,10 @@ public class ExamSessionServiceImpl implements ExamSessionService {
// at least every second. This allows caching over multiple monitoring requests but
// ensure an update every second for new incoming connections
private void updateClientConnections(final Long examId) {
try {
final long currentTimeMillis = System.currentTimeMillis();
if (this.distributedSetup &&
System.currentTimeMillis() - this.lastConnectionTokenCacheUpdate > Constants.SECOND_IN_MILLIS) {
currentTimeMillis - this.lastConnectionTokenCacheUpdate > this.distributedConnectionUpdate) {
// go trough all client connection and update the ones that not up to date
this.clientConnectionDAO.evictConnectionTokenCache(examId);
@ -420,7 +426,7 @@ public class ExamSessionServiceImpl implements ExamSessionService {
.stream()
.forEach(this.examSessionCacheService::evictClientConnection);
this.lastConnectionTokenCacheUpdate = System.currentTimeMillis();
this.lastConnectionTokenCacheUpdate = currentTimeMillis;
}
} catch (final Exception e) {
log.error("Unexpected error while trying to update client connections: ", e);

View file

@ -555,6 +555,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
.getOrThrow()
.stream()
.flatMap(examId -> distributed
// TODO fetch only the connection tokens form active connections here
? this.clientConnectionDAO
.getConnectionTokensNoCache(examId)
.getOrThrow()

View file

@ -30,6 +30,7 @@ import org.springframework.transaction.annotation.Transactional;
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
import ch.ethz.seb.sebserver.gbl.util.Utils;
import ch.ethz.seb.sebserver.webservice.WebserviceInfo;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientEventLastPingMapper;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordDynamicSqlSupport;
@ -45,9 +46,11 @@ public class DistributedPingCache implements DisposableBean {
private final ClientEventLastPingMapper clientEventLastPingMapper;
private final ClientEventRecordMapper clientEventRecordMapper;
private ScheduledFuture<?> taskRef;
private final long pingUpdateTolerance;
private ScheduledFuture<?> taskRef;
private final Map<Long, Long> pingCache = new ConcurrentHashMap<>();
private long lastUpdate = 0L;
public DistributedPingCache(
final ClientEventLastPingMapper clientEventLastPingMapper,
@ -58,6 +61,7 @@ public class DistributedPingCache implements DisposableBean {
this.clientEventLastPingMapper = clientEventLastPingMapper;
this.clientEventRecordMapper = clientEventRecordMapper;
this.pingUpdateTolerance = pingUpdate * 2 / 3;
if (webserviceInfo.isDistributed()) {
try {
this.taskRef = taskScheduler.scheduleAtFixedRate(this::updateCache, pingUpdate);
@ -166,7 +170,6 @@ public class DistributedPingCache implements DisposableBean {
try {
Long ping = this.pingCache.get(pingRecordId);
if (ping == null) {
if (log.isDebugEnabled()) {
log.debug("*** Get and cache ping time: {}", pingRecordId);
}
@ -187,6 +190,12 @@ public class DistributedPingCache implements DisposableBean {
@Transactional(readOnly = true, isolation = Isolation.READ_UNCOMMITTED)
public void updateCache() {
final long millisecondsNow = Utils.getMillisecondsNow();
if (millisecondsNow - this.lastUpdate < this.pingUpdateTolerance) {
log.warn("Skip ping update schedule because the last one was less then 2 seconds ago");
return;
}
if (this.pingCache.isEmpty()) {
return;
}
@ -210,6 +219,7 @@ public class DistributedPingCache implements DisposableBean {
if (mapping != null) {
this.pingCache.clear();
this.pingCache.putAll(mapping);
this.lastUpdate = millisecondsNow;
}
} catch (final Exception e) {

View file

@ -270,20 +270,20 @@ public class ExamAPI_V1_Controller {
final String pingNumString = request.getParameter(API.EXAM_API_PING_NUMBER);
final String instructionConfirm = request.getParameter(API.EXAM_API_PING_INSTRUCTION_CONFIRM);
if (log.isTraceEnabled()) {
log.trace("****************** SEB client connection: {} ip: {}",
connectionToken,
getClientAddress(request));
if (connectionToken == null) {
log.warn("Missing connection token on ping. Ignore the request");
return;
}
if (instructionConfirm != null) {
this.sebClientConnectionService.confirmInstructionDone(connectionToken, instructionConfirm);
}
final Long clientTime = timeStampString != null ? Long.parseLong(timeStampString) : 0L;
final String instruction = this.sebClientConnectionService
.notifyPing(
connectionToken,
Long.parseLong(timeStampString),
clientTime,
pingNumString != null ? Integer.parseInt(pingNumString) : -1);
if (instruction == null) {

View file

@ -8,7 +8,7 @@ sebserver.gui.webservice.address=localhost
sebserver.gui.webservice.port=8080
sebserver.gui.webservice.apipath=/admin-api/v1
# defines the polling interval that is used to poll the webservice for client connection data on a monitored exam page
#sebserver.gui.webservice.poll-interval=1000
sebserver.gui.webservice.poll-interval=1000
sebserver.gui.theme=css/sebserver.css
sebserver.gui.list.page.size=15

View file

@ -13,12 +13,14 @@ spring.datasource.hikari.initializationFailTimeout=30000
spring.datasource.hikari.connectionTimeout=30000
spring.datasource.hikari.idleTimeout=600000
spring.datasource.hikari.maxLifetime=1800000
spring.datasource.hikari.maximumPoolSize=5
spring.datasource.hikari.maximumPoolSize=10
spring.datasource.hikari.leakDetectionThreshold=2000
sebserver.http.client.connect-timeout=15000
sebserver.http.client.connection-request-timeout=10000
sebserver.http.client.read-timeout=20000
sebserver.webservice.distributed.pingUpdate=1000
sebserver.webservice.distributed.connectionUpdate=1000
sebserver.webservice.clean-db-on-startup=false
# webservice configuration

View file

@ -10,12 +10,16 @@ server.tomcat.uri-encoding=UTF-8
logging.level.ch=INFO
logging.level.ch.ethz.seb.sebserver.webservice.datalayer=INFO
logging.level.org.springframework.cache=INFO
logging.level.ch.ethz.seb.sebserver.webservice.servicelayer.lms.impl=TRACE
logging.level.ch.ethz.seb.sebserver.webservice.servicelayer.session=DEBUG
logging.level.ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.proctoring=TRACE
logging.level.ch.ethz.seb.sebserver.webservice.servicelayer.lms.impl=INFO
logging.level.ch.ethz.seb.sebserver.webservice.servicelayer.session=INFO
logging.level.ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.proctoring=INFO
logging.level.ch.ethz.seb.sebserver.webservice.servicelayer.dao.impl=DEBUG
#logging.level.ch.ethz.seb.sebserver.webservice.datalayer.batis=DEBUG
#logging.level.ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper=DEBUG
#logging.level.ch.ethz.seb.sebserver.webservice.weblayer.api.ExamAPI_V1_Controller=TRACE
logging.level.com.zaxxer.hikari=DEBUG
sebserver.http.client.connect-timeout=150000
sebserver.http.client.connection-request-timeout=100000
sebserver.http.client.read-timeout=200000
sebserver.http.client.connect-timeout=15000
sebserver.http.client.connection-request-timeout=10000
sebserver.http.client.read-timeout=60000

View file

@ -24,9 +24,9 @@ sebserver.gui.http.webservice.contextPath=${server.servlet.context-path}
sebserver.gui.entrypoint=/gui
sebserver.http.client.connect-timeout=60000
sebserver.http.client.connection-request-timeout=100000
sebserver.http.client.read-timeout=200000
sebserver.http.client.connect-timeout=15000
sebserver.http.client.connection-request-timeout=10000
sebserver.http.client.read-timeout=60000
sebserver.gui.webservice.apipath=${sebserver.webservice.api.admin.endpoint}
# defines the polling interval that is used to poll the webservice for client connection data on a monitored exam page

View file

@ -32,6 +32,7 @@ spring.datasource.hikari.connectionTimeout=30000
spring.datasource.hikari.idleTimeout=600000
spring.datasource.hikari.maxLifetime=1800000
spring.datasource.hikari.maximumPoolSize=100
spring.datasource.hikari.leakDetectionThreshold=10000
### webservice security
spring.datasource.password=${sebserver.mariadb.password}

View file

@ -86,12 +86,11 @@
<key-type>java.lang.String</key-type>
<value-type>ch.ethz.seb.sebserver.gbl.model.exam.QuizData</value-type>
<expiry>
<ttl unit="minutes">5</ttl>
<ttl unit="minutes">10</ttl>
</expiry>
<resources>
<heap unit="entries">10000</heap>
</resources>
</cache>
</config>

View file

@ -67,7 +67,7 @@ public class WebserviceTest extends AdministrationAPIIntegrationTester {
assertFalse(this.webserviceInfoDAO.isMaster(WEBSERVICE_2));
try {
Thread.sleep(5000);
Thread.sleep(500);
} catch (final InterruptedException e) {
}
@ -75,7 +75,7 @@ public class WebserviceTest extends AdministrationAPIIntegrationTester {
assertFalse(this.webserviceInfoDAO.isMaster(WEBSERVICE_2));
try {
Thread.sleep(6000);
Thread.sleep(600);
} catch (final InterruptedException e) {
}

View file

@ -41,4 +41,5 @@ sebserver.webservice.api.redirect.unauthorized=none
sebserver.webservice.lms.openedx.api.token.request.paths=/oauth2/access_token
sebserver.webservice.lms.moodle.api.token.request.paths
management.endpoints.web.base-path=/actuator
management.endpoints.web.base-path=/actuator
sebserver.webservice.master.delay.threshold=1000