From ed7ae28a0d37d48dc11d033ae033dadc07f9e344 Mon Sep 17 00:00:00 2001 From: anhefti Date: Wed, 28 Jul 2021 14:29:21 +0200 Subject: [PATCH] fixed distributed ping cache --- .../session/ExamSessionService.java | 10 ++++ .../session/impl/ExamSessionServiceImpl.java | 13 +++-- .../impl/SEBClientConnectionServiceImpl.java | 10 ++-- .../indicator/AbstractClientIndicator.java | 3 +- .../impl/indicator/AbstractLogIndicator.java | 1 + .../indicator/AbstractLogNumberIndicator.java | 1 + .../impl/indicator/AbstractPingIndicator.java | 35 ++++++++++-- .../impl/indicator/DistributedPingCache.java | 53 +++++++++++++++++-- .../config/application-dev-gui.properties | 4 +- 9 files changed, 110 insertions(+), 20 deletions(-) diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/ExamSessionService.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/ExamSessionService.java index 14b362ce..10395008 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/ExamSessionService.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/ExamSessionService.java @@ -24,6 +24,7 @@ import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientConnectionDAO; import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ExamDAO; import ch.ethz.seb.sebserver.webservice.servicelayer.dao.FilterMap; import ch.ethz.seb.sebserver.webservice.servicelayer.lms.LmsAPIService; +import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.ClientConnectionDataInternal; import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.ExamSessionCacheService; /** A Service to handle running exam sessions */ @@ -178,6 +179,15 @@ public interface ExamSessionService { * @return Result with reference to the given Exam or to an error if happened */ Result flushCache(final Exam exam); + /** Is is supposed to be the single access point to internally get client connection + * data for a specified connection token. + * This uses the client connection data cache for lookup and also synchronizes asynchronous + * cache calls to prevent parallel creation of ClientConnectionDataInternal + * + * @param connectionToken the connection token of the active SEB client connection + * @return ClientConnectionDataInternal by synchronized cache lookup or null if not available */ + ClientConnectionDataInternal getConnectionDataInternal(String connectionToken); + /** Checks if the given ClientConnectionData is an active SEB client connection. * * @param connection ClientConnectionData instance diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ExamSessionServiceImpl.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ExamSessionServiceImpl.java index e09969a3..67a62448 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ExamSessionServiceImpl.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ExamSessionServiceImpl.java @@ -308,13 +308,20 @@ public class ExamSessionServiceImpl implements ExamSessionService { } } + @Override + public ClientConnectionDataInternal getConnectionDataInternal(final String connectionToken) { + synchronized (this.examSessionCacheService) { + return this.examSessionCacheService.getClientConnection(connectionToken); + } + } + @Override public Result getConnectionData(final String connectionToken) { return Result.tryCatch(() -> { - final ClientConnectionDataInternal activeClientConnection = this.examSessionCacheService - .getClientConnection(connectionToken); + final ClientConnectionDataInternal activeClientConnection = + getConnectionDataInternal(connectionToken); if (activeClientConnection == null) { throw new NoSuchElementException("Client Connection with token: " + connectionToken); @@ -403,7 +410,7 @@ public class ExamSessionServiceImpl implements ExamSessionService { .getConnectionTokens(examId) .getOrThrow() .stream() - .map(this.examSessionCacheService::getClientConnection) + .map(this::getConnectionDataInternal) .filter(Objects::nonNull) .map(cc -> cc.getClientConnection().updateTime) .collect(Collectors.toSet()); diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java index a056b7d9..d498b69d 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java @@ -159,8 +159,8 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic .getOrThrow(); // load client connection data into cache - final ClientConnectionDataInternal activeClientConnection = this.examSessionCacheService - .getClientConnection(connectionToken); + final ClientConnectionDataInternal activeClientConnection = this.examSessionService + .getConnectionDataInternal(connectionToken); if (activeClientConnection == null) { log.warn("Failed to load ClientConnectionDataInternal into cache on update"); @@ -567,7 +567,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic final int pingNumber) { final ClientConnectionDataInternal activeClientConnection = - this.examSessionCacheService.getClientConnection(connectionToken); + this.examSessionService.getConnectionDataInternal(connectionToken); if (activeClientConnection != null) { activeClientConnection.notifyPing(timestamp, pingNumber); @@ -583,7 +583,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic try { final ClientConnectionDataInternal activeClientConnection = - this.examSessionCacheService.getClientConnection(connectionToken); + this.examSessionService.getConnectionDataInternal(connectionToken); if (activeClientConnection != null) { @@ -748,7 +748,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic // evict cached ClientConnection this.examSessionCacheService.evictClientConnection(connectionToken); // and load updated ClientConnection into cache - return this.examSessionCacheService.getClientConnection(connectionToken); + return this.examSessionService.getConnectionDataInternal(connectionToken); } private Consumer missingPingUpdate(final long now) { diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractClientIndicator.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractClientIndicator.java index db1d2301..9790c05a 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractClientIndicator.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractClientIndicator.java @@ -23,6 +23,7 @@ public abstract class AbstractClientIndicator implements ClientIndicator { protected Long connectionId; protected boolean cachingEnabled; protected boolean active = true; + protected long persistentUpdateInterval = PERSISTENT_UPDATE_INTERVAL; protected long lastPersistentUpdate = 0; protected boolean valueInitializes = false; @@ -72,7 +73,7 @@ public abstract class AbstractClientIndicator implements ClientIndicator { } if (!this.cachingEnabled && this.active) { - if (now - this.lastPersistentUpdate > PERSISTENT_UPDATE_INTERVAL) { + if (now - this.lastPersistentUpdate > this.persistentUpdateInterval) { this.currentValue = computeValueAt(now); this.lastPersistentUpdate = now; } diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogIndicator.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogIndicator.java index 605db6ab..743361e3 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogIndicator.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogIndicator.java @@ -44,6 +44,7 @@ public abstract class AbstractLogIndicator extends AbstractClientIndicator { final boolean cachingEnabled) { super.init(indicatorDefinition, connectionId, active, cachingEnabled); + super.persistentUpdateInterval = 2 * Constants.SECOND_IN_MILLIS; if (indicatorDefinition == null || StringUtils.isBlank(indicatorDefinition.tags)) { this.tags = null; diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogNumberIndicator.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogNumberIndicator.java index a92ed3d9..bb9d13dc 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogNumberIndicator.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogNumberIndicator.java @@ -86,6 +86,7 @@ public abstract class AbstractLogNumberIndicator extends AbstractLogIndicator { } else { return super.currentValue; } + } catch (final Exception e) { log.error("Failed to get indicator number from persistent storage: {}", e.getMessage()); return this.currentValue; diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java index 754232ff..553d972d 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java @@ -15,6 +15,8 @@ import java.util.Set; import org.joda.time.DateTime; import org.joda.time.DateTimeUtils; import org.joda.time.DateTimeZone; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import ch.ethz.seb.sebserver.gbl.Constants; import ch.ethz.seb.sebserver.gbl.model.exam.Indicator; @@ -23,6 +25,8 @@ import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord; public abstract class AbstractPingIndicator extends AbstractClientIndicator { + private static final Logger log = LoggerFactory.getLogger(AbstractPingIndicator.class); + private static final long INTERVAL_FOR_PERSISTENT_UPDATE = Constants.SECOND_IN_MILLIS; private final Set EMPTY_SET = Collections.unmodifiableSet(EnumSet.noneOf(EventType.class)); @@ -48,10 +52,10 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator { super.init(indicatorDefinition, connectionId, active, cachingEnabled); if (!this.cachingEnabled && this.active) { - this.pingRecord = this.distributedPingCache.initPingForConnection(this.connectionId); - if (this.pingRecord == null) { - // try once again + try { this.pingRecord = this.distributedPingCache.initPingForConnection(this.connectionId); + } catch (final Exception e) { + this.pingRecord = this.distributedPingCache.getPingRecordIdForConnectionId(connectionId); } } } @@ -61,7 +65,14 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator { super.currentValue = now; super.lastPersistentUpdate = now; - if (!this.cachingEnabled && this.pingRecord != null) { + if (!this.cachingEnabled) { + + if (this.pingRecord == null) { + tryRecoverPingRecord(); + if (this.pingRecord == null) { + return; + } + } // Update last ping time on persistent storage final long millisecondsNow = DateTimeUtils.currentTimeMillis(); @@ -71,6 +82,22 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator { } } + private void tryRecoverPingRecord() { + + if (log.isWarnEnabled()) { + log.warn("*** Missing ping record for connection: {}. Try to recover...", this.connectionId); + } + + try { + this.pingRecord = this.distributedPingCache.getPingRecordIdForConnectionId(this.connectionId); + if (this.pingRecord == null) { + this.pingRecord = this.distributedPingCache.initPingForConnection(this.connectionId); + } + } catch (final Exception e) { + log.error("Failed to recover ping record for connection: {}", this.connectionId, e); + } + } + @Override public Set observedEvents() { return this.EMPTY_SET; diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedPingCache.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedPingCache.java index c60d99bc..e35bbdf5 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedPingCache.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedPingCache.java @@ -70,7 +70,12 @@ public class DistributedPingCache implements DisposableBean { @Transactional public Long initPingForConnection(final Long connectionId) { try { - Long recordId = this.clientEventLastPingMapper + + if (log.isDebugEnabled()) { + log.trace("*** Initialize ping record for SEB connection: {}", connectionId); + } + + final Long recordId = this.clientEventLastPingMapper .pingRecordIdByConnectionId(connectionId); if (recordId == null) { @@ -82,12 +87,41 @@ public class DistributedPingCache implements DisposableBean { clientEventRecord.setServerTime(millisecondsNow); this.clientEventRecordMapper.insert(clientEventRecord); - recordId = this.clientEventLastPingMapper.pingRecordIdByConnectionId(connectionId); + try { + // This also double-check by trying again. If we have more then one entry here + // this will throw an exception that causes a rollback + return this.clientEventLastPingMapper + .pingRecordIdByConnectionId(connectionId); + + } catch (final Exception e) { + + log.warn("Detected multiple client ping entries for connection: " + connectionId + + ". Force rollback to prevent"); + + // force rollback + throw new RuntimeException("Detected multiple client ping entries"); + } } return recordId; } catch (final Exception e) { + log.error("Failed to initialize ping for connection -> {}", connectionId, e); + + // force rollback + throw new RuntimeException("Failed to initialize ping for connection -> " + connectionId, e); + } + } + + @Transactional(readOnly = true) + public Long getPingRecordIdForConnectionId(final Long connectionId) { + try { + + return this.clientEventLastPingMapper + .pingRecordIdByConnectionId(connectionId); + + } catch (final Exception e) { + log.error("Failed to get ping record for connection id: {} cause: {}", connectionId, e.getMessage()); return null; } } @@ -108,6 +142,10 @@ public class DistributedPingCache implements DisposableBean { public void deletePingForConnection(final Long connectionId) { try { + if (log.isDebugEnabled()) { + log.debug("*** Delete ping record for SEB connection: {}", connectionId); + } + this.clientEventRecordMapper .deleteByExample() .where(ClientEventRecordDynamicSqlSupport.clientConnectionId, isEqualTo(connectionId)) @@ -124,7 +162,11 @@ public class DistributedPingCache implements DisposableBean { try { Long ping = this.pingCache.get(pingRecordId); if (ping == null) { - log.debug("******* Get and cache ping time: {}", pingRecordId); + + if (log.isDebugEnabled()) { + log.debug("*** Get and cache ping time: {}", pingRecordId); + } + ping = this.clientEventLastPingMapper.selectPingTimeByPrimaryKey(pingRecordId); if (ping != null) { this.pingCache.put(pingRecordId, ping); @@ -145,7 +187,9 @@ public class DistributedPingCache implements DisposableBean { return; } - log.debug("****** Update distributed ping cache: {}", this.pingCache); + if (log.isDebugEnabled()) { + log.trace("*** Update distributed ping cache: {}", this.pingCache); + } try { final ArrayList pks = new ArrayList<>(this.pingCache.keySet()); @@ -181,7 +225,6 @@ public class DistributedPingCache implements DisposableBean { log.error("Failed to cancel distributed ping cache update task: ", e); } } - } } diff --git a/src/main/resources/config/application-dev-gui.properties b/src/main/resources/config/application-dev-gui.properties index 8069b264..a22c98bc 100644 --- a/src/main/resources/config/application-dev-gui.properties +++ b/src/main/resources/config/application-dev-gui.properties @@ -1,11 +1,11 @@ server.address=localhost -server.port=8080 +server.port=8090 sebserver.gui.http.external.scheme=http sebserver.gui.entrypoint=/gui sebserver.gui.webservice.protocol=http sebserver.gui.webservice.address=localhost -sebserver.gui.webservice.port=8080 +sebserver.gui.webservice.port=8090 sebserver.gui.webservice.apipath=/admin-api/v1 # defines the polling interval that is used to poll the webservice for client connection data on a monitored exam page sebserver.gui.webservice.poll-interval=1000