From 06b433e6cca2cb1535de8bbc8a1041d0bca0f3dd Mon Sep 17 00:00:00 2001 From: anhefti Date: Tue, 7 Dec 2021 15:43:05 +0100 Subject: [PATCH] SEBSERV-250 improved missing ping update --- .../impl/SEBClientConnectionServiceImpl.java | 20 ++++- .../impl/indicator/AbstractPingIndicator.java | 3 +- .../indicator/DistributedPingService.java | 85 ++++++++++++------- .../PingIntervalClientIndicator.java | 26 ++---- 4 files changed, 77 insertions(+), 57 deletions(-) diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java index 04e19675..db15c171 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java @@ -8,6 +8,7 @@ package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl; +import java.math.BigDecimal; import java.security.Principal; import java.util.Objects; import java.util.UUID; @@ -488,7 +489,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic // delete stored ping if this is a distributed setup if (this.isDistributedSetup) { this.distributedPingCache - .deletePingForConnection(updatedClientConnection.id); + .deletePingIndicator(updatedClientConnection.id); } reloadConnectionCache(connectionToken); @@ -542,7 +543,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic // delete stored ping if this is a distributed setup if (this.isDistributedSetup) { this.distributedPingCache - .deletePingForConnection(updatedClientConnection.id); + .deletePingIndicator(updatedClientConnection.id); } reloadConnectionCache(connectionToken); @@ -794,9 +795,20 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic } private Consumer missingPingUpdate(final long now) { + return connection -> { - final ClientEventRecord clientEventRecord = connection.pingIndicator.updateLogEvent(now); - if (clientEventRecord != null) { + + if (connection.pingIndicator.missingPingUpdate(now)) { + final boolean missingPing = connection.pingIndicator.isMissingPing(); + final ClientEventRecord clientEventRecord = new ClientEventRecord( + null, + connection.getConnectionId(), + (missingPing) ? EventType.ERROR_LOG.id : EventType.INFO_LOG.id, + now, + now, + new BigDecimal(connection.pingIndicator.getValue()), + (missingPing) ? "Missing Client Ping" : "Client Ping Back To Normal"); + // store event and and flush cache this.eventHandlingStrategy.accept(clientEventRecord); if (this.isDistributedSetup) { diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java index 5cb1a7dc..437c5694 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java @@ -17,7 +17,6 @@ import org.slf4j.LoggerFactory; import ch.ethz.seb.sebserver.gbl.model.exam.Indicator; import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType; -import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord; import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator.DistributedPingService.PingUpdate; public abstract class AbstractPingIndicator extends AbstractClientIndicator { @@ -89,6 +88,6 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator { return this.EMPTY_SET; } - public abstract ClientEventRecord updateLogEvent(final long now); + public abstract boolean missingPingUpdate(final long now); } diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedPingService.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedPingService.java index 91a1156d..57310444 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedPingService.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedPingService.java @@ -44,6 +44,20 @@ import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientIndicatorRec @Lazy @Component @WebServiceProfile +/** This service is only needed within a distributed setup where more then one webservice works + * simultaneously within one SEB Server and one persistent storage. + *

+ * This service handles the SEB client ping updates within such a setup and implements functionality to + * efficiently store and load ping time indicators form and to shared store. + *

+ * The update from the persistent store is done periodically within a batch while the ping time writes + * are done individually per SEB client when they arrive but within a dedicated task executor with minimal task + * queue to do not overflow other executor services when it comes to a leak on storing lot of ping times. + * In this case some ping time updates will be just dropped and not go to the persistent store until the leak + * is resolved. + *

+ * Note that the ping time update and read operations are also not within a transaction for performance reasons + * and because it is not a big deal to loose one ore two ping updates for a SEB client. */ public class DistributedPingService implements DisposableBean { private static final Logger log = LoggerFactory.getLogger(DistributedPingService.class); @@ -67,6 +81,10 @@ public class DistributedPingService implements DisposableBean { this.clientPingMapper = clientPingMapper; } + /** Initializes the service by attaching it to the scheduler for periodical update. + * If the webservice is not initialized within a distributed setup, this will do nothing + * + * @param initEvent the SEB Server webservice init event */ @EventListener(SEBServerInitEvent.class) public void init(final SEBServerInitEvent initEvent) { final ApplicationContext applicationContext = initEvent.webserviceInit.getApplicationContext(); @@ -86,7 +104,7 @@ public class DistributedPingService implements DisposableBean { try { this.taskRef = taskScheduler.scheduleAtFixedRate( - this::persistentPingUpdate, + this::updatePingCache, distributedPingUpdateInterval); SEBServerInit.INIT_LOGGER.info("------> distributed ping service successfully initialized!"); @@ -101,10 +119,13 @@ public class DistributedPingService implements DisposableBean { } } - public ClientPingMapper getClientPingMapper() { - return this.clientPingMapper; - } - + /** This initializes a SEB client ping indicator on the persistent storage for a given SEB client + * connection identifier. + * If there is already such a ping indicator for the specified SEB client connection identifier, returns + * the id of the existing one. + * + * @param connectionId SEB client connection identifier + * @return SEB client ping indicator identifier (PK) */ @Transactional public Long initPingForConnection(final Long connectionId) { try { @@ -149,21 +170,12 @@ public class DistributedPingService implements DisposableBean { } } - @Transactional(readOnly = true) - public Long getPingRecordIdForConnectionId(final Long connectionId) { - try { - - return this.clientPingMapper - .pingRecordIdByConnectionId(connectionId); - - } catch (final Exception e) { - log.error("Failed to get ping record for connection id: {} cause: {}", connectionId, e.getMessage()); - return null; - } - } - + /** Deletes a existing SEB client ping indicator for a given SEB client connection identifier + * on the persistent storage. + * + * @param connectionId SEB client connection identifier */ @Transactional - public void deletePingForConnection(final Long connectionId) { + public void deletePingIndicator(final Long connectionId) { try { if (log.isDebugEnabled()) { @@ -198,11 +210,18 @@ public class DistributedPingService implements DisposableBean { } } - public Long getLastPing(final Long pingRecordId, final boolean missing) { + /** Use this to get the last ping time indicator value with a given indicator identifier (PK) + * This fist tries to get the ping time from internal cache. If not present, tries to get + * the ping indicator value from persistent storage and put it to the cache. + * + * @param pingRecordId The ping indicator record id (PK). Get one for a given SEB client connection identifier by + * calling: initPingForConnection + * @return The actual (last) ping time. */ + public Long getLastPing(final Long pingRecordId) { try { Long ping = this.pingCache.get(pingRecordId); - if (ping == null && !missing) { + if (ping == null) { if (log.isDebugEnabled()) { log.debug("*** Get and cache ping time: {}", pingRecordId); @@ -211,11 +230,6 @@ public class DistributedPingService implements DisposableBean { ping = this.clientPingMapper.selectPingTimeByPrimaryKey(pingRecordId); } - // if we have a missing ping we need to check new ping from next update even if the cache was empty - if (ping != null || missing) { - this.pingCache.put(pingRecordId, ping); - } - return ping; } catch (final Exception e) { log.error("Error while trying to get last ping from storage: {}", e.getMessage()); @@ -223,7 +237,12 @@ public class DistributedPingService implements DisposableBean { } } - public void persistentPingUpdate() { + /** Updates the internal ping cache by loading all actual SEB client ping indicators from persistent storage + * and put it in the cache. + * This is internally periodically scheduled by the task scheduler but also implements an execution drop if + * the last update was less then 2/3 of the schedule interval ago. This is to prevent task queue overflows + * and wait with update when there is a persistent storage leak or a lot of network latency. */ + private void updatePingCache() { if (this.pingCache.isEmpty()) { return; } @@ -263,8 +282,8 @@ public class DistributedPingService implements DisposableBean { this.lastUpdate = millisecondsNow; } - // Update last ping time on persistent storage asynchronously within a defines thread pool with no - // waiting queue to skip further ping updates if all update threads are busy + /** Update last ping time on persistent storage asynchronously within a defines thread pool with no + * waiting queue to skip further ping updates if all update threads are busy **/ void updatePingAsync(final PingUpdate pingUpdate) { try { this.pingUpdateExecutor.execute(pingUpdate); @@ -275,12 +294,17 @@ public class DistributedPingService implements DisposableBean { } } + /** Create a PingUpdate for a specified SEB client connectionId. + * + * @param connectionId SEB client connection identifier + * @return PingUpdate for a specified SEB client connectionId */ PingUpdate createPingUpdate(final Long connectionId) { return new PingUpdate( - this.getClientPingMapper(), + this.clientPingMapper, this.initPingForConnection(connectionId)); } + /** Encapsulates a SEB client ping update on persistent storage */ static final class PingUpdate implements Runnable { private final ClientPingMapper clientPingMapper; @@ -292,6 +316,7 @@ public class DistributedPingService implements DisposableBean { } @Override + /** Processes the ping update on persistent storage by using the current time stamp. */ public void run() { try { this.clientPingMapper diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicator.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicator.java index cb2b7382..1377de38 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicator.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicator.java @@ -8,7 +8,6 @@ package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator; -import java.math.BigDecimal; import java.util.Comparator; import org.joda.time.DateTimeUtils; @@ -25,7 +24,6 @@ import ch.ethz.seb.sebserver.gbl.Constants; import ch.ethz.seb.sebserver.gbl.model.exam.Indicator; import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType; import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent; -import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType; import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord; @Lazy @@ -129,7 +127,7 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator { public final double computeValueAt(final long timestamp) { if (!this.cachingEnabled && super.pingUpdate != null) { - final Long lastPing = this.distributedPingCache.getLastPing(super.pingUpdate.pingRecord, this.missingPing); + final Long lastPing = this.distributedPingCache.getLastPing(super.pingUpdate.pingRecord); if (lastPing != null) { final double doubleValue = lastPing.doubleValue(); return Math.max(Double.isNaN(this.currentValue) ? doubleValue : this.currentValue, doubleValue); @@ -142,35 +140,21 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator { } @Override - public ClientEventRecord updateLogEvent(final long now) { + public boolean missingPingUpdate(final long now) { final long value = now - (long) super.currentValue; if (this.missingPing) { if (this.pingErrorThreshold > value) { this.missingPing = false; - return new ClientEventRecord( - null, - this.connectionId, - EventType.INFO_LOG.id, - now, - now, - new BigDecimal(value), - "Client Ping Back To Normal"); + return true; } } else { if (this.pingErrorThreshold < value) { this.missingPing = true; - return new ClientEventRecord( - null, - this.connectionId, - EventType.ERROR_LOG.id, - now, - now, - new BigDecimal(value), - "Missing Client Ping"); + return true; } } - return null; + return false; } }