SEBSERV-250 improved missing ping update

This commit is contained in:
anhefti 2021-12-07 15:43:05 +01:00
parent dfb8afb740
commit 06b433e6cc
4 changed files with 77 additions and 57 deletions

View file

@ -8,6 +8,7 @@
package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl; package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl;
import java.math.BigDecimal;
import java.security.Principal; import java.security.Principal;
import java.util.Objects; import java.util.Objects;
import java.util.UUID; import java.util.UUID;
@ -488,7 +489,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
// delete stored ping if this is a distributed setup // delete stored ping if this is a distributed setup
if (this.isDistributedSetup) { if (this.isDistributedSetup) {
this.distributedPingCache this.distributedPingCache
.deletePingForConnection(updatedClientConnection.id); .deletePingIndicator(updatedClientConnection.id);
} }
reloadConnectionCache(connectionToken); reloadConnectionCache(connectionToken);
@ -542,7 +543,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
// delete stored ping if this is a distributed setup // delete stored ping if this is a distributed setup
if (this.isDistributedSetup) { if (this.isDistributedSetup) {
this.distributedPingCache this.distributedPingCache
.deletePingForConnection(updatedClientConnection.id); .deletePingIndicator(updatedClientConnection.id);
} }
reloadConnectionCache(connectionToken); reloadConnectionCache(connectionToken);
@ -794,9 +795,20 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
} }
private Consumer<ClientConnectionDataInternal> missingPingUpdate(final long now) { private Consumer<ClientConnectionDataInternal> missingPingUpdate(final long now) {
return connection -> { return connection -> {
final ClientEventRecord clientEventRecord = connection.pingIndicator.updateLogEvent(now);
if (clientEventRecord != null) { if (connection.pingIndicator.missingPingUpdate(now)) {
final boolean missingPing = connection.pingIndicator.isMissingPing();
final ClientEventRecord clientEventRecord = new ClientEventRecord(
null,
connection.getConnectionId(),
(missingPing) ? EventType.ERROR_LOG.id : EventType.INFO_LOG.id,
now,
now,
new BigDecimal(connection.pingIndicator.getValue()),
(missingPing) ? "Missing Client Ping" : "Client Ping Back To Normal");
// store event and and flush cache // store event and and flush cache
this.eventHandlingStrategy.accept(clientEventRecord); this.eventHandlingStrategy.accept(clientEventRecord);
if (this.isDistributedSetup) { if (this.isDistributedSetup) {

View file

@ -17,7 +17,6 @@ import org.slf4j.LoggerFactory;
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator; import ch.ethz.seb.sebserver.gbl.model.exam.Indicator;
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType; import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator.DistributedPingService.PingUpdate; import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator.DistributedPingService.PingUpdate;
public abstract class AbstractPingIndicator extends AbstractClientIndicator { public abstract class AbstractPingIndicator extends AbstractClientIndicator {
@ -89,6 +88,6 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator {
return this.EMPTY_SET; return this.EMPTY_SET;
} }
public abstract ClientEventRecord updateLogEvent(final long now); public abstract boolean missingPingUpdate(final long now);
} }

View file

@ -44,6 +44,20 @@ import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientIndicatorRec
@Lazy @Lazy
@Component @Component
@WebServiceProfile @WebServiceProfile
/** This service is only needed within a distributed setup where more then one webservice works
* simultaneously within one SEB Server and one persistent storage.
* </p>
* This service handles the SEB client ping updates within such a setup and implements functionality to
* efficiently store and load ping time indicators form and to shared store.
* </p>
* The update from the persistent store is done periodically within a batch while the ping time writes
* are done individually per SEB client when they arrive but within a dedicated task executor with minimal task
* queue to do not overflow other executor services when it comes to a leak on storing lot of ping times.
* In this case some ping time updates will be just dropped and not go to the persistent store until the leak
* is resolved.
* </p>
* Note that the ping time update and read operations are also not within a transaction for performance reasons
* and because it is not a big deal to loose one ore two ping updates for a SEB client. */
public class DistributedPingService implements DisposableBean { public class DistributedPingService implements DisposableBean {
private static final Logger log = LoggerFactory.getLogger(DistributedPingService.class); private static final Logger log = LoggerFactory.getLogger(DistributedPingService.class);
@ -67,6 +81,10 @@ public class DistributedPingService implements DisposableBean {
this.clientPingMapper = clientPingMapper; this.clientPingMapper = clientPingMapper;
} }
/** Initializes the service by attaching it to the scheduler for periodical update.
* If the webservice is not initialized within a distributed setup, this will do nothing
*
* @param initEvent the SEB Server webservice init event */
@EventListener(SEBServerInitEvent.class) @EventListener(SEBServerInitEvent.class)
public void init(final SEBServerInitEvent initEvent) { public void init(final SEBServerInitEvent initEvent) {
final ApplicationContext applicationContext = initEvent.webserviceInit.getApplicationContext(); final ApplicationContext applicationContext = initEvent.webserviceInit.getApplicationContext();
@ -86,7 +104,7 @@ public class DistributedPingService implements DisposableBean {
try { try {
this.taskRef = taskScheduler.scheduleAtFixedRate( this.taskRef = taskScheduler.scheduleAtFixedRate(
this::persistentPingUpdate, this::updatePingCache,
distributedPingUpdateInterval); distributedPingUpdateInterval);
SEBServerInit.INIT_LOGGER.info("------> distributed ping service successfully initialized!"); SEBServerInit.INIT_LOGGER.info("------> distributed ping service successfully initialized!");
@ -101,10 +119,13 @@ public class DistributedPingService implements DisposableBean {
} }
} }
public ClientPingMapper getClientPingMapper() { /** This initializes a SEB client ping indicator on the persistent storage for a given SEB client
return this.clientPingMapper; * connection identifier.
} * If there is already such a ping indicator for the specified SEB client connection identifier, returns
* the id of the existing one.
*
* @param connectionId SEB client connection identifier
* @return SEB client ping indicator identifier (PK) */
@Transactional @Transactional
public Long initPingForConnection(final Long connectionId) { public Long initPingForConnection(final Long connectionId) {
try { try {
@ -149,21 +170,12 @@ public class DistributedPingService implements DisposableBean {
} }
} }
@Transactional(readOnly = true) /** Deletes a existing SEB client ping indicator for a given SEB client connection identifier
public Long getPingRecordIdForConnectionId(final Long connectionId) { * on the persistent storage.
try { *
* @param connectionId SEB client connection identifier */
return this.clientPingMapper
.pingRecordIdByConnectionId(connectionId);
} catch (final Exception e) {
log.error("Failed to get ping record for connection id: {} cause: {}", connectionId, e.getMessage());
return null;
}
}
@Transactional @Transactional
public void deletePingForConnection(final Long connectionId) { public void deletePingIndicator(final Long connectionId) {
try { try {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
@ -198,11 +210,18 @@ public class DistributedPingService implements DisposableBean {
} }
} }
public Long getLastPing(final Long pingRecordId, final boolean missing) { /** Use this to get the last ping time indicator value with a given indicator identifier (PK)
* This fist tries to get the ping time from internal cache. If not present, tries to get
* the ping indicator value from persistent storage and put it to the cache.
*
* @param pingRecordId The ping indicator record id (PK). Get one for a given SEB client connection identifier by
* calling: initPingForConnection
* @return The actual (last) ping time. */
public Long getLastPing(final Long pingRecordId) {
try { try {
Long ping = this.pingCache.get(pingRecordId); Long ping = this.pingCache.get(pingRecordId);
if (ping == null && !missing) { if (ping == null) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("*** Get and cache ping time: {}", pingRecordId); log.debug("*** Get and cache ping time: {}", pingRecordId);
@ -211,11 +230,6 @@ public class DistributedPingService implements DisposableBean {
ping = this.clientPingMapper.selectPingTimeByPrimaryKey(pingRecordId); ping = this.clientPingMapper.selectPingTimeByPrimaryKey(pingRecordId);
} }
// if we have a missing ping we need to check new ping from next update even if the cache was empty
if (ping != null || missing) {
this.pingCache.put(pingRecordId, ping);
}
return ping; return ping;
} catch (final Exception e) { } catch (final Exception e) {
log.error("Error while trying to get last ping from storage: {}", e.getMessage()); log.error("Error while trying to get last ping from storage: {}", e.getMessage());
@ -223,7 +237,12 @@ public class DistributedPingService implements DisposableBean {
} }
} }
public void persistentPingUpdate() { /** Updates the internal ping cache by loading all actual SEB client ping indicators from persistent storage
* and put it in the cache.
* This is internally periodically scheduled by the task scheduler but also implements an execution drop if
* the last update was less then 2/3 of the schedule interval ago. This is to prevent task queue overflows
* and wait with update when there is a persistent storage leak or a lot of network latency. */
private void updatePingCache() {
if (this.pingCache.isEmpty()) { if (this.pingCache.isEmpty()) {
return; return;
} }
@ -263,8 +282,8 @@ public class DistributedPingService implements DisposableBean {
this.lastUpdate = millisecondsNow; this.lastUpdate = millisecondsNow;
} }
// Update last ping time on persistent storage asynchronously within a defines thread pool with no /** Update last ping time on persistent storage asynchronously within a defines thread pool with no
// waiting queue to skip further ping updates if all update threads are busy * waiting queue to skip further ping updates if all update threads are busy **/
void updatePingAsync(final PingUpdate pingUpdate) { void updatePingAsync(final PingUpdate pingUpdate) {
try { try {
this.pingUpdateExecutor.execute(pingUpdate); this.pingUpdateExecutor.execute(pingUpdate);
@ -275,12 +294,17 @@ public class DistributedPingService implements DisposableBean {
} }
} }
/** Create a PingUpdate for a specified SEB client connectionId.
*
* @param connectionId SEB client connection identifier
* @return PingUpdate for a specified SEB client connectionId */
PingUpdate createPingUpdate(final Long connectionId) { PingUpdate createPingUpdate(final Long connectionId) {
return new PingUpdate( return new PingUpdate(
this.getClientPingMapper(), this.clientPingMapper,
this.initPingForConnection(connectionId)); this.initPingForConnection(connectionId));
} }
/** Encapsulates a SEB client ping update on persistent storage */
static final class PingUpdate implements Runnable { static final class PingUpdate implements Runnable {
private final ClientPingMapper clientPingMapper; private final ClientPingMapper clientPingMapper;
@ -292,6 +316,7 @@ public class DistributedPingService implements DisposableBean {
} }
@Override @Override
/** Processes the ping update on persistent storage by using the current time stamp. */
public void run() { public void run() {
try { try {
this.clientPingMapper this.clientPingMapper

View file

@ -8,7 +8,6 @@
package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator; package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator;
import java.math.BigDecimal;
import java.util.Comparator; import java.util.Comparator;
import org.joda.time.DateTimeUtils; import org.joda.time.DateTimeUtils;
@ -25,7 +24,6 @@ import ch.ethz.seb.sebserver.gbl.Constants;
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator; import ch.ethz.seb.sebserver.gbl.model.exam.Indicator;
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType; import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType;
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent; import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent;
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord; import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
@Lazy @Lazy
@ -129,7 +127,7 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator {
public final double computeValueAt(final long timestamp) { public final double computeValueAt(final long timestamp) {
if (!this.cachingEnabled && super.pingUpdate != null) { if (!this.cachingEnabled && super.pingUpdate != null) {
final Long lastPing = this.distributedPingCache.getLastPing(super.pingUpdate.pingRecord, this.missingPing); final Long lastPing = this.distributedPingCache.getLastPing(super.pingUpdate.pingRecord);
if (lastPing != null) { if (lastPing != null) {
final double doubleValue = lastPing.doubleValue(); final double doubleValue = lastPing.doubleValue();
return Math.max(Double.isNaN(this.currentValue) ? doubleValue : this.currentValue, doubleValue); return Math.max(Double.isNaN(this.currentValue) ? doubleValue : this.currentValue, doubleValue);
@ -142,35 +140,21 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator {
} }
@Override @Override
public ClientEventRecord updateLogEvent(final long now) { public boolean missingPingUpdate(final long now) {
final long value = now - (long) super.currentValue; final long value = now - (long) super.currentValue;
if (this.missingPing) { if (this.missingPing) {
if (this.pingErrorThreshold > value) { if (this.pingErrorThreshold > value) {
this.missingPing = false; this.missingPing = false;
return new ClientEventRecord( return true;
null,
this.connectionId,
EventType.INFO_LOG.id,
now,
now,
new BigDecimal(value),
"Client Ping Back To Normal");
} }
} else { } else {
if (this.pingErrorThreshold < value) { if (this.pingErrorThreshold < value) {
this.missingPing = true; this.missingPing = true;
return new ClientEventRecord( return true;
null,
this.connectionId,
EventType.ERROR_LOG.id,
now,
now,
new BigDecimal(value),
"Missing Client Ping");
} }
} }
return null; return false;
} }
} }