SEBSERV-209 fixed exam session service for distributed setup

apply client connection sync check over time-interval (per exam) instead
of for every monitoring request and every client connection. So I
implemented batching for that to avoid heavy db load
This commit is contained in:
anhefti 2021-07-15 07:52:06 +02:00
parent 885e39d1cc
commit bc1e933087
11 changed files with 96 additions and 126 deletions

View file

@ -195,10 +195,6 @@ public final class ClientConnectionTable {
this.table.layout();
}
// public int getUpdateErrors() {
// return this.updateErrors;
// }
public WidgetFactory getWidgetFactory() {
return this.pageService.getWidgetFactory();
}
@ -337,6 +333,7 @@ public final class ClientConnectionTable {
try {
// TODO forceUpdateAll doeasn't work on distributed
if (this.statusFilterChanged || this.forceUpdateAll || needsSync) {
this.toDelete.clear();
this.toDelete.addAll(this.tableMapping.keySet());

View file

@ -42,6 +42,11 @@ public interface ClientConnectionDAO extends
unless = "#result.hasError()")
Result<Collection<String>> getConnectionTokens(Long examId);
@CacheEvict(cacheNames = CONNECTION_TOKENS_CACHE, key = "#examId")
default void evictConnectionTokenCache(final Long examId) {
}
/** Get a list of all connection tokens of all connections (no matter what state)
* of an exam.
*
@ -151,6 +156,8 @@ public interface ClientConnectionDAO extends
* @return Result refer to true if the given ClientConnection is up to date */
Result<Boolean> isUpToDate(ClientConnection clientConnection);
Result<Set<String>> getClientConnectionsOutOfSyc(Long examId, Set<Long> timestamps);
/** Indicates if the client connection for given exam and connection token is
* in a ready state to send instructions.
*

View file

@ -13,6 +13,7 @@ import static org.mybatis.dynamic.sql.SqlBuilder.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
@ -553,9 +554,10 @@ public class ClientConnectionDAOImpl implements ClientConnectionDAO {
}
@Override
@Transactional(readOnly = true)
public Result<Boolean> isUpToDate(final ClientConnection clientConnection) {
return Result.tryCatch(() -> this.clientConnectionRecordMapper
.selectByExample()
.countByExample()
.where(
ClientConnectionRecordDynamicSqlSupport.connectionToken,
SqlBuilder.isEqualTo(clientConnection.connectionToken))
@ -563,10 +565,31 @@ public class ClientConnectionDAOImpl implements ClientConnectionDAO {
ClientConnectionRecordDynamicSqlSupport.updateTime,
SqlBuilder.isEqualTo(clientConnection.updateTime))
.build()
.execute()
.stream()
.findFirst()
.isPresent());
.execute() > 0);
}
@Override
@Transactional(readOnly = true)
public Result<Set<String>> getClientConnectionsOutOfSyc(final Long examId, final Set<Long> timestamps) {
return Result.tryCatch(() -> {
final Set<String> result = new HashSet<>();
this.clientConnectionRecordMapper
.selectByExample()
.where(
ClientConnectionRecordDynamicSqlSupport.examId,
SqlBuilder.isEqualTo(examId))
.build()
.execute()
.stream()
.forEach(cc -> {
if (!timestamps.contains(cc.getUpdateTime())) {
result.add(cc.getConnectionToken());
}
});
return result;
});
}
@Override

View file

@ -10,25 +10,18 @@ package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl;
import java.io.ByteArrayOutputStream;
import org.mybatis.dynamic.sql.SqlBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import ch.ethz.seb.sebserver.gbl.model.exam.Exam;
import ch.ethz.seb.sebserver.gbl.model.exam.Exam.ExamStatus;
import ch.ethz.seb.sebserver.gbl.model.session.ClientConnection;
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
import ch.ethz.seb.sebserver.gbl.util.Result;
import ch.ethz.seb.sebserver.gbl.util.Utils;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordDynamicSqlSupport;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordMapper;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientConnectionDAO;
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ExamDAO;
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.RemoteProctoringRoomDAO;
@ -48,7 +41,6 @@ public class ExamSessionCacheService {
public static final String CACHE_NAME_RUNNING_EXAM = "RUNNING_EXAM";
public static final String CACHE_NAME_ACTIVE_CLIENT_CONNECTION = "ACTIVE_CLIENT_CONNECTION";
public static final String CACHE_NAME_SEB_CONFIG_EXAM = "SEB_CONFIG_EXAM";
public static final String CACHE_NAME_PING_RECORD = "CACHE_NAME_PING_RECORD";
private static final Logger log = LoggerFactory.getLogger(ExamSessionCacheService.class);
@ -56,7 +48,6 @@ public class ExamSessionCacheService {
private final ClientConnectionDAO clientConnectionDAO;
private final InternalClientConnectionDataFactory internalClientConnectionDataFactory;
private final ExamConfigService sebExamConfigService;
private final ClientEventRecordMapper clientEventRecordMapper;
private final ExamUpdateHandler examUpdateHandler;
protected ExamSessionCacheService(
@ -64,7 +55,6 @@ public class ExamSessionCacheService {
final ClientConnectionDAO clientConnectionDAO,
final InternalClientConnectionDataFactory internalClientConnectionDataFactory,
final ExamConfigService sebExamConfigService,
final ClientEventRecordMapper clientEventRecordMapper,
final ExamUpdateHandler examUpdateHandler,
final RemoteProctoringRoomDAO remoteProctoringRoomDAO) {
@ -72,7 +62,6 @@ public class ExamSessionCacheService {
this.clientConnectionDAO = clientConnectionDAO;
this.internalClientConnectionDataFactory = internalClientConnectionDataFactory;
this.sebExamConfigService = sebExamConfigService;
this.clientEventRecordMapper = clientEventRecordMapper;
this.examUpdateHandler = examUpdateHandler;
}
@ -125,7 +114,7 @@ public class ExamSessionCacheService {
}
public boolean isRunning(final Exam exam) {
if (exam == null) {
if (exam == null || !exam.active) {
return false;
}
@ -202,56 +191,13 @@ public class ExamSessionCacheService {
}
}
@Cacheable(
cacheNames = CACHE_NAME_PING_RECORD,
key = "#connectionToken",
unless = "#result == null")
@Transactional
public ClientEventRecord getPingRecord(final String connectionToken) {
if (log.isDebugEnabled()) {
log.debug("Verify ClientConnection for ping record to cache by connectionToken: {}", connectionToken);
}
final ClientConnection clientConnection = getClientConnectionByToken(connectionToken);
if (clientConnection == null) {
return null;
} else {
try {
return this.clientEventRecordMapper.selectByExample()
.where(
ClientEventRecordDynamicSqlSupport.clientConnectionId,
SqlBuilder.isEqualTo(clientConnection.getId()))
.and(
ClientEventRecordDynamicSqlSupport.type,
SqlBuilder.isEqualTo(EventType.LAST_PING.id))
.build()
.execute()
.stream()
.collect(Utils.toSingleton());
} catch (final Exception e) {
log.error("Unexpected error: ", e);
return null;
}
}
}
@CacheEvict(
cacheNames = CACHE_NAME_PING_RECORD,
key = "#connectionToken")
public void evictPingRecord(final String connectionToken) {
if (log.isTraceEnabled()) {
log.trace("Eviction of ReusableClientEventRecord from cache for connection token: {}", connectionToken);
}
}
private ClientConnection getClientConnectionByToken(final String connectionToken) {
final Result<ClientConnection> result = this.clientConnectionDAO
.byConnectionToken(connectionToken);
if (result.hasError()) {
log.error("Failed to find/load ClientConnection with connectionToken {}", connectionToken,
log.error("Failed to find/load ClientConnection with connectionToken {}",
connectionToken,
result.getError());
return null;
}

View file

@ -15,6 +15,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@ -61,6 +62,8 @@ public class ExamSessionServiceImpl implements ExamSessionService {
private final SEBRestrictionService sebRestrictionService;
private final boolean distributedSetup;
private long lastConnectionTokenCacheUpdate = 0;
protected ExamSessionServiceImpl(
final ExamSessionCacheService examSessionCacheService,
final ExamDAO examDAO,
@ -312,21 +315,11 @@ public class ExamSessionServiceImpl implements ExamSessionService {
final ClientConnectionDataInternal activeClientConnection = this.examSessionCacheService
.getClientConnection(connectionToken);
if (activeClientConnection == null) {
throw new NoSuchElementException("Client Connection with token: " + connectionToken);
}
if (this.distributedSetup) {
final Boolean upToDate = this.clientConnectionDAO
.isUpToDate(activeClientConnection.clientConnection)
.getOr(false);
if (!upToDate) {
this.examSessionCacheService.evictClientConnection(connectionToken);
return this.examSessionCacheService.getClientConnection(connectionToken);
}
}
return activeClientConnection;
});
@ -337,13 +330,20 @@ public class ExamSessionServiceImpl implements ExamSessionService {
final Long examId,
final Predicate<ClientConnectionData> filter) {
return Result.tryCatch(() -> this.clientConnectionDAO
.getConnectionTokens(examId)
.getOrThrow()
.stream()
.map(this.examSessionCacheService::getClientConnection)
.filter(filter)
.collect(Collectors.toList()));
return Result.tryCatch(() -> {
updateClientConnections(examId);
return this.clientConnectionDAO
.getConnectionTokens(examId)
.getOrThrow()
.stream()
.map(token -> getConnectionData(token).getOr(null))
.filter(Objects::nonNull)
.filter(filter)
.collect(Collectors.toList());
});
}
@Override
@ -381,32 +381,43 @@ public class ExamSessionServiceImpl implements ExamSessionService {
.forEach(token -> {
// evict client connection
this.examSessionCacheService.evictClientConnection(token);
// evict also cached ping record
this.examSessionCacheService.evictPingRecord(token);
});
return exam;
});
}
// private Function<ClientConnectionMinRecord, ClientConnectionDataInternal> distributedClientConnectionUpdateFunction(
// final Predicate<ClientConnectionData> filter) {
//
// return cd -> {
// ClientConnectionDataInternal clientConnection = this.examSessionCacheService
// .getClientConnection(cd.connection_token);
//
// if (filter.test(clientConnection)) {
// if (cd.update_time != null &&
// !cd.update_time.equals(clientConnection.clientConnection.updateTime)) {
//
// this.examSessionCacheService.evictClientConnection(cd.connection_token);
// clientConnection = this.examSessionCacheService
// .getClientConnection(cd.connection_token);
// }
// }
// return clientConnection;
// };
// }
// If we are in a distributed setup the active connection token cache get flushed
// at least every second. This allows caching over multiple monitoring requests but
// ensure an update every second for new incoming connections
private void updateClientConnections(final Long examId) {
try {
if (this.distributedSetup &&
System.currentTimeMillis() - this.lastConnectionTokenCacheUpdate > Constants.SECOND_IN_MILLIS) {
// go trough all client connection and update the ones that not up to date
this.clientConnectionDAO.evictConnectionTokenCache(examId);
final Set<Long> timestamps = this.clientConnectionDAO
.getConnectionTokens(examId)
.getOrThrow()
.stream()
.map(this.examSessionCacheService::getClientConnection)
.filter(Objects::nonNull)
.map(cc -> cc.getClientConnection().updateTime)
.collect(Collectors.toSet());
this.clientConnectionDAO.getClientConnectionsOutOfSyc(examId, timestamps)
.getOrElse(() -> Collections.emptySet())
.stream()
.forEach(this.examSessionCacheService::evictClientConnection);
this.lastConnectionTokenCacheUpdate = System.currentTimeMillis();
}
} catch (final Exception e) {
log.error("Unexpected error while trying to update client connections: ", e);
}
}
}

View file

@ -16,8 +16,6 @@ import java.util.function.Predicate;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.Cache;
@ -35,6 +33,7 @@ import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent;
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
import ch.ethz.seb.sebserver.gbl.util.Result;
import ch.ethz.seb.sebserver.gbl.util.Utils;
import ch.ethz.seb.sebserver.webservice.WebserviceInfo;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientConnectionRecord;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
@ -512,7 +511,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
try {
final Cache cache = this.cacheManager.getCache(ExamSessionCacheService.CACHE_NAME_ACTIVE_CLIENT_CONNECTION);
final long now = DateTime.now(DateTimeZone.UTC).getMillis();
final long now = Utils.getMillisecondsNow();
this.examSessionService
.getExamDAO()
.allRunningExamIds()
@ -730,8 +729,6 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
private ClientConnectionDataInternal reloadConnectionCache(final String connectionToken) {
// evict cached ClientConnection
this.examSessionCacheService.evictClientConnection(connectionToken);
// evict also cached ping record
this.examSessionCacheService.evictPingRecord(connectionToken);
// and load updated ClientConnection into cache
return this.examSessionCacheService.getClientConnection(connectionToken);
}

View file

@ -15,7 +15,7 @@ import ch.ethz.seb.sebserver.webservice.servicelayer.session.ClientIndicator;
public abstract class AbstractClientIndicator implements ClientIndicator {
private static final long PERSISTENT_UPDATE_INTERVAL = 3 * Constants.SECOND_IN_MILLIS;
private static final long PERSISTENT_UPDATE_INTERVAL = 1 * Constants.SECOND_IN_MILLIS;
protected Long indicatorId;
protected Long examId;

View file

@ -25,7 +25,7 @@ import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientEventDAO;
public abstract class AbstractPingIndicator extends AbstractClientIndicator {
private static final int PING_COUNT_INTERVAL_FOR_PERSISTENT_UPDATE = 3;
private static final int PING_COUNT_INTERVAL_FOR_PERSISTENT_UPDATE = 2;
private final Set<EventType> EMPTY_SET = Collections.unmodifiableSet(EnumSet.noneOf(EventType.class));

View file

@ -1,11 +1,11 @@
server.address=localhost
server.port=8080
server.port=8090
sebserver.gui.http.external.scheme=http
sebserver.gui.entrypoint=/gui
sebserver.gui.webservice.protocol=http
sebserver.gui.webservice.address=localhost
sebserver.gui.webservice.port=8080
sebserver.gui.webservice.port=8090
sebserver.gui.webservice.apipath=/admin-api/v1
# defines the polling interval that is used to poll the webservice for client connection data on a monitored exam page
sebserver.gui.webservice.poll-interval=1000

View file

@ -22,7 +22,7 @@ sebserver.webservice.clean-db-on-startup=false
# webservice configuration
sebserver.init.adminaccount.gen-on-init=false
sebserver.webservice.distributed=false
sebserver.webservice.distributed=true
sebserver.webservice.master.delay.threshold=10000
sebserver.webservice.http.external.scheme=http
sebserver.webservice.http.external.servername=localhost

View file

@ -82,17 +82,6 @@
</resources>
</cache>
<cache alias="LAST_PING_TIME_CACHE">
<key-type>java.lang.Long</key-type>
<value-type>org.ehcache.impl.internal.concurrent.ConcurrentHashMap</value-type>
<expiry>
<ttl unit="hours">24</ttl>
</expiry>
<resources>
<heap unit="entries">10</heap>
</resources>
</cache>
<cache alias="QUIZ_DATA_CACHE">
<key-type>java.lang.String</key-type>
<value-type>ch.ethz.seb.sebserver.gbl.model.exam.QuizData</value-type>