fixed distributed ping cache

This commit is contained in:
anhefti 2021-07-28 14:29:21 +02:00
parent 64536fd909
commit ed7ae28a0d
9 changed files with 110 additions and 20 deletions

View file

@ -24,6 +24,7 @@ import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientConnectionDAO;
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ExamDAO;
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.FilterMap;
import ch.ethz.seb.sebserver.webservice.servicelayer.lms.LmsAPIService;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.ClientConnectionDataInternal;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.ExamSessionCacheService;
/** A Service to handle running exam sessions */
@ -178,6 +179,15 @@ public interface ExamSessionService {
* @return Result with reference to the given Exam or to an error if happened */
Result<Exam> flushCache(final Exam exam);
/** Is is supposed to be the single access point to internally get client connection
* data for a specified connection token.
* This uses the client connection data cache for lookup and also synchronizes asynchronous
* cache calls to prevent parallel creation of ClientConnectionDataInternal
*
* @param connectionToken the connection token of the active SEB client connection
* @return ClientConnectionDataInternal by synchronized cache lookup or null if not available */
ClientConnectionDataInternal getConnectionDataInternal(String connectionToken);
/** Checks if the given ClientConnectionData is an active SEB client connection.
*
* @param connection ClientConnectionData instance

View file

@ -308,13 +308,20 @@ public class ExamSessionServiceImpl implements ExamSessionService {
}
}
@Override
public ClientConnectionDataInternal getConnectionDataInternal(final String connectionToken) {
synchronized (this.examSessionCacheService) {
return this.examSessionCacheService.getClientConnection(connectionToken);
}
}
@Override
public Result<ClientConnectionData> getConnectionData(final String connectionToken) {
return Result.tryCatch(() -> {
final ClientConnectionDataInternal activeClientConnection = this.examSessionCacheService
.getClientConnection(connectionToken);
final ClientConnectionDataInternal activeClientConnection =
getConnectionDataInternal(connectionToken);
if (activeClientConnection == null) {
throw new NoSuchElementException("Client Connection with token: " + connectionToken);
@ -403,7 +410,7 @@ public class ExamSessionServiceImpl implements ExamSessionService {
.getConnectionTokens(examId)
.getOrThrow()
.stream()
.map(this.examSessionCacheService::getClientConnection)
.map(this::getConnectionDataInternal)
.filter(Objects::nonNull)
.map(cc -> cc.getClientConnection().updateTime)
.collect(Collectors.toSet());

View file

@ -159,8 +159,8 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
.getOrThrow();
// load client connection data into cache
final ClientConnectionDataInternal activeClientConnection = this.examSessionCacheService
.getClientConnection(connectionToken);
final ClientConnectionDataInternal activeClientConnection = this.examSessionService
.getConnectionDataInternal(connectionToken);
if (activeClientConnection == null) {
log.warn("Failed to load ClientConnectionDataInternal into cache on update");
@ -567,7 +567,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
final int pingNumber) {
final ClientConnectionDataInternal activeClientConnection =
this.examSessionCacheService.getClientConnection(connectionToken);
this.examSessionService.getConnectionDataInternal(connectionToken);
if (activeClientConnection != null) {
activeClientConnection.notifyPing(timestamp, pingNumber);
@ -583,7 +583,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
try {
final ClientConnectionDataInternal activeClientConnection =
this.examSessionCacheService.getClientConnection(connectionToken);
this.examSessionService.getConnectionDataInternal(connectionToken);
if (activeClientConnection != null) {
@ -748,7 +748,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
// evict cached ClientConnection
this.examSessionCacheService.evictClientConnection(connectionToken);
// and load updated ClientConnection into cache
return this.examSessionCacheService.getClientConnection(connectionToken);
return this.examSessionService.getConnectionDataInternal(connectionToken);
}
private Consumer<ClientConnectionDataInternal> missingPingUpdate(final long now) {

View file

@ -23,6 +23,7 @@ public abstract class AbstractClientIndicator implements ClientIndicator {
protected Long connectionId;
protected boolean cachingEnabled;
protected boolean active = true;
protected long persistentUpdateInterval = PERSISTENT_UPDATE_INTERVAL;
protected long lastPersistentUpdate = 0;
protected boolean valueInitializes = false;
@ -72,7 +73,7 @@ public abstract class AbstractClientIndicator implements ClientIndicator {
}
if (!this.cachingEnabled && this.active) {
if (now - this.lastPersistentUpdate > PERSISTENT_UPDATE_INTERVAL) {
if (now - this.lastPersistentUpdate > this.persistentUpdateInterval) {
this.currentValue = computeValueAt(now);
this.lastPersistentUpdate = now;
}

View file

@ -44,6 +44,7 @@ public abstract class AbstractLogIndicator extends AbstractClientIndicator {
final boolean cachingEnabled) {
super.init(indicatorDefinition, connectionId, active, cachingEnabled);
super.persistentUpdateInterval = 2 * Constants.SECOND_IN_MILLIS;
if (indicatorDefinition == null || StringUtils.isBlank(indicatorDefinition.tags)) {
this.tags = null;

View file

@ -86,6 +86,7 @@ public abstract class AbstractLogNumberIndicator extends AbstractLogIndicator {
} else {
return super.currentValue;
}
} catch (final Exception e) {
log.error("Failed to get indicator number from persistent storage: {}", e.getMessage());
return this.currentValue;

View file

@ -15,6 +15,8 @@ import java.util.Set;
import org.joda.time.DateTime;
import org.joda.time.DateTimeUtils;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ch.ethz.seb.sebserver.gbl.Constants;
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator;
@ -23,6 +25,8 @@ import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
public abstract class AbstractPingIndicator extends AbstractClientIndicator {
private static final Logger log = LoggerFactory.getLogger(AbstractPingIndicator.class);
private static final long INTERVAL_FOR_PERSISTENT_UPDATE = Constants.SECOND_IN_MILLIS;
private final Set<EventType> EMPTY_SET = Collections.unmodifiableSet(EnumSet.noneOf(EventType.class));
@ -48,10 +52,10 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator {
super.init(indicatorDefinition, connectionId, active, cachingEnabled);
if (!this.cachingEnabled && this.active) {
this.pingRecord = this.distributedPingCache.initPingForConnection(this.connectionId);
if (this.pingRecord == null) {
// try once again
try {
this.pingRecord = this.distributedPingCache.initPingForConnection(this.connectionId);
} catch (final Exception e) {
this.pingRecord = this.distributedPingCache.getPingRecordIdForConnectionId(connectionId);
}
}
}
@ -61,7 +65,14 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator {
super.currentValue = now;
super.lastPersistentUpdate = now;
if (!this.cachingEnabled && this.pingRecord != null) {
if (!this.cachingEnabled) {
if (this.pingRecord == null) {
tryRecoverPingRecord();
if (this.pingRecord == null) {
return;
}
}
// Update last ping time on persistent storage
final long millisecondsNow = DateTimeUtils.currentTimeMillis();
@ -71,6 +82,22 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator {
}
}
private void tryRecoverPingRecord() {
if (log.isWarnEnabled()) {
log.warn("*** Missing ping record for connection: {}. Try to recover...", this.connectionId);
}
try {
this.pingRecord = this.distributedPingCache.getPingRecordIdForConnectionId(this.connectionId);
if (this.pingRecord == null) {
this.pingRecord = this.distributedPingCache.initPingForConnection(this.connectionId);
}
} catch (final Exception e) {
log.error("Failed to recover ping record for connection: {}", this.connectionId, e);
}
}
@Override
public Set<EventType> observedEvents() {
return this.EMPTY_SET;

View file

@ -70,7 +70,12 @@ public class DistributedPingCache implements DisposableBean {
@Transactional
public Long initPingForConnection(final Long connectionId) {
try {
Long recordId = this.clientEventLastPingMapper
if (log.isDebugEnabled()) {
log.trace("*** Initialize ping record for SEB connection: {}", connectionId);
}
final Long recordId = this.clientEventLastPingMapper
.pingRecordIdByConnectionId(connectionId);
if (recordId == null) {
@ -82,12 +87,41 @@ public class DistributedPingCache implements DisposableBean {
clientEventRecord.setServerTime(millisecondsNow);
this.clientEventRecordMapper.insert(clientEventRecord);
recordId = this.clientEventLastPingMapper.pingRecordIdByConnectionId(connectionId);
try {
// This also double-check by trying again. If we have more then one entry here
// this will throw an exception that causes a rollback
return this.clientEventLastPingMapper
.pingRecordIdByConnectionId(connectionId);
} catch (final Exception e) {
log.warn("Detected multiple client ping entries for connection: " + connectionId
+ ". Force rollback to prevent");
// force rollback
throw new RuntimeException("Detected multiple client ping entries");
}
}
return recordId;
} catch (final Exception e) {
log.error("Failed to initialize ping for connection -> {}", connectionId, e);
// force rollback
throw new RuntimeException("Failed to initialize ping for connection -> " + connectionId, e);
}
}
@Transactional(readOnly = true)
public Long getPingRecordIdForConnectionId(final Long connectionId) {
try {
return this.clientEventLastPingMapper
.pingRecordIdByConnectionId(connectionId);
} catch (final Exception e) {
log.error("Failed to get ping record for connection id: {} cause: {}", connectionId, e.getMessage());
return null;
}
}
@ -108,6 +142,10 @@ public class DistributedPingCache implements DisposableBean {
public void deletePingForConnection(final Long connectionId) {
try {
if (log.isDebugEnabled()) {
log.debug("*** Delete ping record for SEB connection: {}", connectionId);
}
this.clientEventRecordMapper
.deleteByExample()
.where(ClientEventRecordDynamicSqlSupport.clientConnectionId, isEqualTo(connectionId))
@ -124,7 +162,11 @@ public class DistributedPingCache implements DisposableBean {
try {
Long ping = this.pingCache.get(pingRecordId);
if (ping == null) {
log.debug("******* Get and cache ping time: {}", pingRecordId);
if (log.isDebugEnabled()) {
log.debug("*** Get and cache ping time: {}", pingRecordId);
}
ping = this.clientEventLastPingMapper.selectPingTimeByPrimaryKey(pingRecordId);
if (ping != null) {
this.pingCache.put(pingRecordId, ping);
@ -145,7 +187,9 @@ public class DistributedPingCache implements DisposableBean {
return;
}
log.debug("****** Update distributed ping cache: {}", this.pingCache);
if (log.isDebugEnabled()) {
log.trace("*** Update distributed ping cache: {}", this.pingCache);
}
try {
final ArrayList<Long> pks = new ArrayList<>(this.pingCache.keySet());
@ -181,7 +225,6 @@ public class DistributedPingCache implements DisposableBean {
log.error("Failed to cancel distributed ping cache update task: ", e);
}
}
}
}

View file

@ -1,11 +1,11 @@
server.address=localhost
server.port=8080
server.port=8090
sebserver.gui.http.external.scheme=http
sebserver.gui.entrypoint=/gui
sebserver.gui.webservice.protocol=http
sebserver.gui.webservice.address=localhost
sebserver.gui.webservice.port=8080
sebserver.gui.webservice.port=8090
sebserver.gui.webservice.apipath=/admin-api/v1
# defines the polling interval that is used to poll the webservice for client connection data on a monitored exam page
sebserver.gui.webservice.poll-interval=1000