SEBSERV-209 fixed indicators for distributed setup

also fixed instructions for distributed setup
This commit is contained in:
anhefti 2021-07-14 13:21:19 +02:00
parent ef72400656
commit eb7042acf6
24 changed files with 248 additions and 414 deletions

View file

@ -11,6 +11,7 @@ package ch.ethz.seb.sebserver.gui.service.push;
import java.util.function.Consumer;
import org.eclipse.rap.rwt.service.ServerPushSession;
import org.eclipse.swt.SWTException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Lazy;
@ -70,6 +71,8 @@ public class ServerPushService {
business.accept(context);
doUpdate(context, update);
} catch (final SWTException swte) {
log.error("Disposed GUI widget(s) while update: {}", swte.getMessage());
} catch (final Exception e) {
log.error("Unexpected error while do business for server push service", e);
context.internalStop = context.errorHandler.apply(e);

View file

@ -17,6 +17,7 @@ import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent;
import ch.ethz.seb.sebserver.gbl.model.session.ClientNotification;
import ch.ethz.seb.sebserver.gbl.model.session.ExtendedClientEvent;
import ch.ethz.seb.sebserver.gbl.util.Result;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
public interface ClientEventDAO extends EntityDAO<ClientEvent, ClientEvent> {
@ -54,4 +55,10 @@ public interface ClientEventDAO extends EntityDAO<ClientEvent, ClientEvent> {
* @return Result refer to the confirmed notification or to en error when happened */
Result<ClientNotification> confirmPendingNotification(Long notificationId, Long clientConnectionId);
Result<ClientEventRecord> initPingEvent(Long connectionId);
void updatePingEvent(ClientEventRecord pingRecord);
Result<Long> getLastPing(Long pk);
}

View file

@ -243,7 +243,8 @@ public class ClientEventDAOImpl implements ClientEventDAO {
public Result<ClientNotification> confirmPendingNotification(final Long notificationId,
final Long clientConnectionId) {
return Result.tryCatch(() -> {
final Long pk = this.clientEventRecordMapper.selectIdsByExample()
final Long pk = this.clientEventRecordMapper
.selectIdsByExample()
.where(ClientEventRecordDynamicSqlSupport.id, isEqualTo(notificationId))
.and(ClientEventRecordDynamicSqlSupport.type, isEqualTo(EventType.NOTIFICATION.id))
.build()
@ -312,6 +313,64 @@ public class ClientEventDAOImpl implements ClientEventDAO {
});
}
@Override
@Transactional
public Result<ClientEventRecord> initPingEvent(final Long connectionId) {
return Result.tryCatch(() -> {
final List<ClientEventRecord> lastPingRec = this.clientEventRecordMapper
.selectByExample()
.where(ClientEventRecordDynamicSqlSupport.clientConnectionId, isEqualTo(connectionId))
.and(ClientEventRecordDynamicSqlSupport.type, isEqualTo(EventType.LAST_PING.id))
.build()
.execute();
if (lastPingRec != null && !lastPingRec.isEmpty()) {
return lastPingRec.get(0);
}
final long millisecondsNow = Utils.getMillisecondsNow();
final ClientEventRecord clientEventRecord = new ClientEventRecord();
clientEventRecord.setClientConnectionId(connectionId);
clientEventRecord.setType(EventType.LAST_PING.id);
clientEventRecord.setClientTime(millisecondsNow);
clientEventRecord.setServerTime(millisecondsNow);
this.clientEventRecordMapper.insert(clientEventRecord);
try {
return this.clientEventRecordMapper
.selectByExample()
.where(ClientEventRecordDynamicSqlSupport.clientConnectionId, isEqualTo(connectionId))
.and(ClientEventRecordDynamicSqlSupport.type, isEqualTo(EventType.LAST_PING.id))
.build()
.execute()
.get(0);
} catch (final Exception e) {
return clientEventRecord;
}
});
}
@Override
@Transactional
public void updatePingEvent(final ClientEventRecord pingRecord) {
try {
this.clientEventRecordMapper.updateByPrimaryKeySelective(pingRecord);
} catch (final Exception e) {
log.error("Failed to update ping event: {}", e.getMessage());
}
}
@Override
@Transactional(readOnly = true)
public Result<Long> getLastPing(final Long pk) {
return Result.tryCatch(() -> this.clientEventRecordMapper
.selectByPrimaryKey(pk)
.getClientTime());
}
private Result<ClientEventRecord> recordById(final Long id) {
return Result.tryCatch(() -> {

View file

@ -27,8 +27,9 @@ public interface ClientIndicator extends IndicatorValue {
*
* @param indicatorDefinition The indicator definition that defines type and thresholds of the indicator
* @param connectionId the connection identifier to that this ClientIndicator is associated to
* @param active indicates whether the connection is still an a active state or not
* @param cachingEnabled defines whether indicator value caching is enabled or not. */
void init(Indicator indicatorDefinition, Long connectionId, boolean cachingEnabled);
void init(Indicator indicatorDefinition, Long connectionId, boolean active, boolean cachingEnabled);
/** Get the exam identifier of the client connection of this ClientIndicator
*

View file

@ -81,6 +81,7 @@ public class ClientIndicatorFactory {
indicator.init(
indicatorDef,
clientConnection.id,
clientConnection.status.clientActiveStatus,
this.enableCaching);
result.add(indicator);
@ -105,9 +106,11 @@ public class ClientIndicatorFactory {
null,
null,
Arrays.asList(new Indicator.Threshold(5000d, null, null)));
pingIndicator.init(
indicator,
clientConnection.id,
clientConnection.status.clientActiveStatus,
this.enableCaching);
result.add(pingIndicator);
}

View file

@ -1,81 +0,0 @@
/*
* Copyright (c) 2019 ETH Zürich, Educational Development and Technology (LET)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl;
import java.math.BigDecimal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
import ch.ethz.seb.sebserver.gbl.util.Utils;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordMapper;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.PingHandlingStrategy;
@Lazy
@Component
@WebServiceProfile
public class DistributedServerPingHandler implements PingHandlingStrategy {
private static final Logger log = LoggerFactory.getLogger(DistributedServerPingHandler.class);
private final ExamSessionCacheService examSessionCacheService;
private final ClientEventRecordMapper clientEventRecordMapper;
protected DistributedServerPingHandler(
final ExamSessionCacheService examSessionCacheService,
final ClientEventRecordMapper clientEventRecordMapper) {
this.examSessionCacheService = examSessionCacheService;
this.clientEventRecordMapper = clientEventRecordMapper;
}
@Override
@Transactional
public void notifyPing(final String connectionToken, final long timestamp, final int pingNumber) {
// store last ping in event
final ClientEventRecord pingRecord = this.examSessionCacheService.getPingRecord(connectionToken);
if (pingRecord != null) {
pingRecord.setClientTime(timestamp);
pingRecord.setServerTime(Utils.getMillisecondsNow());
pingRecord.setNumericValue(new BigDecimal(pingNumber));
this.clientEventRecordMapper.updateByPrimaryKeySelective(pingRecord);
}
// update ping indicators
final ClientConnectionDataInternal activeClientConnection =
this.examSessionCacheService.getClientConnection(connectionToken);
if (activeClientConnection != null) {
activeClientConnection.notifyPing(timestamp, pingNumber);
}
}
@Override
public void initForConnection(final Long connectionId, final String connectionToken) {
if (log.isDebugEnabled()) {
log.debug("Initialize distributed ping handler for connection: {}", connectionId);
}
final ClientEventRecord clientEventRecord = new ClientEventRecord();
clientEventRecord.setClientConnectionId(connectionId);
clientEventRecord.setType(EventType.LAST_PING.id);
clientEventRecord.setClientTime(Utils.getMillisecondsNow());
clientEventRecord.setServerTime(Utils.getMillisecondsNow());
this.clientEventRecordMapper.insertSelective(clientEventRecord);
}
}

View file

@ -15,12 +15,10 @@ import java.util.Collection;
import java.util.Collections;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.lang3.BooleanUtils;
import org.mybatis.dynamic.sql.SqlBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
@ -38,9 +36,6 @@ import ch.ethz.seb.sebserver.gbl.model.session.ClientConnection;
import ch.ethz.seb.sebserver.gbl.model.session.ClientConnectionData;
import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
import ch.ethz.seb.sebserver.gbl.util.Result;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientConnectionMinMapper;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientConnectionMinMapper.ClientConnectionMinRecord;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientConnectionRecordDynamicSqlSupport;
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientConnectionDAO;
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ExamConfigurationMapDAO;
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ExamDAO;
@ -49,7 +44,6 @@ import ch.ethz.seb.sebserver.webservice.servicelayer.dao.IndicatorDAO;
import ch.ethz.seb.sebserver.webservice.servicelayer.lms.LmsAPIService;
import ch.ethz.seb.sebserver.webservice.servicelayer.lms.SEBRestrictionService;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.ExamSessionService;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator.IndicatorDistributedRequestCache;
@Lazy
@Service
@ -59,37 +53,31 @@ public class ExamSessionServiceImpl implements ExamSessionService {
private static final Logger log = LoggerFactory.getLogger(ExamSessionServiceImpl.class);
private final ClientConnectionDAO clientConnectionDAO;
private final ClientConnectionMinMapper clientConnectionMinMapper;
private final IndicatorDAO indicatorDAO;
private final ExamSessionCacheService examSessionCacheService;
private final ExamDAO examDAO;
private final ExamConfigurationMapDAO examConfigurationMapDAO;
private final CacheManager cacheManager;
private final SEBRestrictionService sebRestrictionService;
private final IndicatorDistributedRequestCache indicatorDistributedRequestCache;
private final boolean distributedSetup;
protected ExamSessionServiceImpl(
final ExamSessionCacheService examSessionCacheService,
final ClientConnectionMinMapper clientConnectionMinMapper,
final ExamDAO examDAO,
final ExamConfigurationMapDAO examConfigurationMapDAO,
final ClientConnectionDAO clientConnectionDAO,
final IndicatorDAO indicatorDAO,
final CacheManager cacheManager,
final SEBRestrictionService sebRestrictionService,
final IndicatorDistributedRequestCache indicatorDistributedRequestCache,
@Value("${sebserver.webservice.distributed:false}") final boolean distributedSetup) {
this.examSessionCacheService = examSessionCacheService;
this.clientConnectionMinMapper = clientConnectionMinMapper;
this.examDAO = examDAO;
this.examConfigurationMapDAO = examConfigurationMapDAO;
this.clientConnectionDAO = clientConnectionDAO;
this.cacheManager = cacheManager;
this.indicatorDAO = indicatorDAO;
this.sebRestrictionService = sebRestrictionService;
this.indicatorDistributedRequestCache = indicatorDistributedRequestCache;
this.distributedSetup = distributedSetup;
}
@ -349,35 +337,13 @@ public class ExamSessionServiceImpl implements ExamSessionService {
final Long examId,
final Predicate<ClientConnectionData> filter) {
if (this.distributedSetup) {
// if we run in distributed mode, we have to get the connection tokens of the exam
// always from the persistent storage and update the client connection cache
// before by remove out-dated client connection. This is done within the update_time
// of the client connection record that is set on every update in the persistent
// storage. So if the update_time of the cached client connection doesen't match the
// update_time from persistent, we need to flush this particular client connection from the cache
this.indicatorDistributedRequestCache.evictPingTimes(examId);
return Result.tryCatch(() -> this.clientConnectionMinMapper.selectByExample()
.where(
ClientConnectionRecordDynamicSqlSupport.examId,
SqlBuilder.isEqualTo(examId))
.build()
.execute()
.stream()
.map(this.distributedClientConnectionUpdateFunction(filter))
.filter(filter)
.collect(Collectors.toList()));
} else {
return Result.tryCatch(() -> this.clientConnectionDAO
.getConnectionTokens(examId)
.getOrThrow()
.stream()
.map(this.examSessionCacheService::getClientConnection)
.filter(filter)
.collect(Collectors.toList()));
}
return Result.tryCatch(() -> this.clientConnectionDAO
.getConnectionTokens(examId)
.getOrThrow()
.stream()
.map(this.examSessionCacheService::getClientConnection)
.filter(filter)
.collect(Collectors.toList()));
}
@Override
@ -423,24 +389,24 @@ public class ExamSessionServiceImpl implements ExamSessionService {
});
}
private Function<ClientConnectionMinRecord, ClientConnectionDataInternal> distributedClientConnectionUpdateFunction(
final Predicate<ClientConnectionData> filter) {
return cd -> {
ClientConnectionDataInternal clientConnection = this.examSessionCacheService
.getClientConnection(cd.connection_token);
if (filter.test(clientConnection)) {
if (cd.update_time != null &&
!cd.update_time.equals(clientConnection.clientConnection.updateTime)) {
this.examSessionCacheService.evictClientConnection(cd.connection_token);
clientConnection = this.examSessionCacheService
.getClientConnection(cd.connection_token);
}
}
return clientConnection;
};
}
// private Function<ClientConnectionMinRecord, ClientConnectionDataInternal> distributedClientConnectionUpdateFunction(
// final Predicate<ClientConnectionData> filter) {
//
// return cd -> {
// ClientConnectionDataInternal clientConnection = this.examSessionCacheService
// .getClientConnection(cd.connection_token);
//
// if (filter.test(clientConnection)) {
// if (cd.update_time != null &&
// !cd.update_time.equals(clientConnection.clientConnection.updateTime)) {
//
// this.examSessionCacheService.evictClientConnection(cd.connection_token);
// clientConnection = this.examSessionCacheService
// .getClientConnection(cd.connection_token);
// }
// }
// return clientConnection;
// };
// }
}

View file

@ -1,44 +0,0 @@
/*
* Copyright (c) 2019 ETH Zürich, Educational Development and Technology (LET)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
import ch.ethz.seb.sebserver.webservice.WebserviceInfo;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.PingHandlingStrategy;
@Lazy
@Service
@WebServiceProfile
public class PingHandlingStrategyFactory {
private final SingleServerPingHandler singleServerPingHandler;
private final DistributedServerPingHandler distributedServerPingHandler;
private final WebserviceInfo webserviceInfo;
protected PingHandlingStrategyFactory(
final SingleServerPingHandler singleServerPingHandler,
final DistributedServerPingHandler distributedServerPingHandler,
final WebserviceInfo webserviceInfo) {
this.singleServerPingHandler = singleServerPingHandler;
this.distributedServerPingHandler = distributedServerPingHandler;
this.webserviceInfo = webserviceInfo;
}
public PingHandlingStrategy get() {
if (this.webserviceInfo.isDistributed()) {
return this.distributedServerPingHandler;
} else {
return this.singleServerPingHandler;
}
}
}

View file

@ -43,7 +43,6 @@ import ch.ethz.seb.sebserver.webservice.servicelayer.dao.SEBClientConfigDAO;
import ch.ethz.seb.sebserver.webservice.servicelayer.exam.ExamAdminService;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.EventHandlingStrategy;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.ExamSessionService;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.PingHandlingStrategy;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientConnectionService;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientInstructionService;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientNotificationService;
@ -68,7 +67,6 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
private final CacheManager cacheManager;
private final EventHandlingStrategy eventHandlingStrategy;
private final ClientConnectionDAO clientConnectionDAO;
private final PingHandlingStrategy pingHandlingStrategy;
private final SEBClientConfigDAO sebClientConfigDAO;
private final SEBClientInstructionService sebInstructionService;
private final SEBClientNotificationService sebClientNotificationService;
@ -78,7 +76,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
protected SEBClientConnectionServiceImpl(
final ExamSessionService examSessionService,
final EventHandlingStrategyFactory eventHandlingStrategyFactory,
final PingHandlingStrategyFactory pingHandlingStrategyFactory,
final SEBClientConfigDAO sebClientConfigDAO,
final SEBClientInstructionService sebInstructionService,
final SEBClientNotificationService sebClientNotificationService,
@ -88,7 +86,6 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
this.examSessionCacheService = examSessionService.getExamSessionCacheService();
this.cacheManager = examSessionService.getCacheManager();
this.clientConnectionDAO = examSessionService.getClientConnectionDAO();
this.pingHandlingStrategy = pingHandlingStrategyFactory.get();
this.eventHandlingStrategy = eventHandlingStrategyFactory.get();
this.sebClientConfigDAO = sebClientConfigDAO;
this.sebInstructionService = sebInstructionService;
@ -368,11 +365,6 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
updatedClientConnection);
}
// notify ping handler about established connection
this.pingHandlingStrategy.initForConnection(
updatedClientConnection.id,
connectionToken);
return updatedClientConnection;
});
}
@ -557,7 +549,13 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
final long timestamp,
final int pingNumber) {
this.pingHandlingStrategy.notifyPing(connectionToken, timestamp, pingNumber);
final ClientConnectionDataInternal activeClientConnection =
this.examSessionCacheService.getClientConnection(connectionToken);
if (activeClientConnection != null) {
activeClientConnection.notifyPing(timestamp, pingNumber);
}
return this.sebInstructionService.getInstructionJSON(connectionToken);
}

View file

@ -48,6 +48,7 @@ public class SEBClientInstructionServiceImpl implements SEBClientInstructionServ
private static final Logger log = LoggerFactory.getLogger(SEBClientInstructionServiceImpl.class);
private static final long PERSISTENT_UPDATE_INTERVAL = 2 * Constants.SECOND_IN_MILLIS;
private static final int INSTRUCTION_QUEUE_MAX_SIZE = 10;
private static final String JSON_INST = "instruction";
private static final String JSON_ATTR = "attributes";
@ -281,9 +282,9 @@ public class SEBClientInstructionServiceImpl implements SEBClientInstructionServ
// Since the queue is empty check periodically if there are active instructions on the persistent storage
final long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis - this.lastRefresh > Constants.SECOND_IN_MILLIS) {
if (currentTimeMillis - this.lastRefresh > PERSISTENT_UPDATE_INTERVAL) {
this.lastRefresh = currentTimeMillis;
loadInstructions(connectionToken)
loadInstructions()
.onError(error -> log.error(
"Failed load instructions from persistent storage and to refresh cache: ",
error));
@ -300,8 +301,10 @@ public class SEBClientInstructionServiceImpl implements SEBClientInstructionServ
// check if there are still queues in the cache, whether they are empty or not,
// for closed or disposed client connections and remove them from cache
synchronized (this.instructions) {
final Result<Collection<String>> result = this.clientConnectionDAO
.getInactiveConnctionTokens(this.instructions.keySet());
if (result.hasValue()) {
result.get().stream().forEach(token -> this.instructions.remove(token));
}
@ -325,12 +328,6 @@ public class SEBClientInstructionServiceImpl implements SEBClientInstructionServ
}
}
private Result<Void> loadInstructions(final String connectionToken) {
return Result.tryCatch(() -> this.clientInstructionDAO.getAllActive(connectionToken)
.getOrThrow()
.forEach(this::putToCacheIfAbsent));
}
private Result<Void> loadInstructions() {
return Result.tryCatch(() -> this.clientInstructionDAO.getAllActive()
.getOrThrow()

View file

@ -1,44 +0,0 @@
/*
* Copyright (c) 2019 ETH Zürich, Educational Development and Technology (LET)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.PingHandlingStrategy;
@Lazy
@Component
@WebServiceProfile
public class SingleServerPingHandler implements PingHandlingStrategy {
private final ExamSessionCacheService examSessionCacheService;
protected SingleServerPingHandler(final ExamSessionCacheService examSessionCacheService) {
this.examSessionCacheService = examSessionCacheService;
}
@Override
public void notifyPing(final String connectionToken, final long timestamp, final int pingNumber) {
// update ping indicators
final ClientConnectionDataInternal activeClientConnection =
this.examSessionCacheService.getClientConnection(connectionToken);
if (activeClientConnection != null) {
activeClientConnection.notifyPing(timestamp, pingNumber);
}
}
@Override
public void initForConnection(final Long connectionId, final String connectionToken) {
// nothing to do here
}
}

View file

@ -8,18 +8,21 @@
package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import ch.ethz.seb.sebserver.gbl.Constants;
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator;
import ch.ethz.seb.sebserver.gbl.util.Utils;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.ClientIndicator;
public abstract class AbstractClientIndicator implements ClientIndicator {
private static final long PERSISTENT_UPDATE_INTERVAL = 3 * Constants.SECOND_IN_MILLIS;
protected Long indicatorId;
protected Long examId;
protected Long connectionId;
protected boolean cachingEnabled;
protected boolean active = true;
protected long lastPersistentUpdate = 0;
protected boolean valueInitializes = false;
protected double currentValue = Double.NaN;
@ -28,11 +31,13 @@ public abstract class AbstractClientIndicator implements ClientIndicator {
public void init(
final Indicator indicatorDefinition,
final Long connectionId,
final boolean active,
final boolean cachingEnabled) {
this.indicatorId = (indicatorDefinition != null) ? indicatorDefinition.id : -1;
this.examId = (indicatorDefinition != null) ? indicatorDefinition.examId : -1;
this.connectionId = connectionId;
this.active = active;
this.cachingEnabled = cachingEnabled;
}
@ -58,11 +63,21 @@ public abstract class AbstractClientIndicator implements ClientIndicator {
@Override
public double getValue() {
if (!this.valueInitializes || !this.cachingEnabled) {
this.currentValue = computeValueAt(DateTime.now(DateTimeZone.UTC).getMillis());
if (!this.valueInitializes) {
final long now = Utils.getMillisecondsNow();
this.currentValue = computeValueAt(now);
this.lastPersistentUpdate = now;
this.valueInitializes = true;
}
if (!this.cachingEnabled && this.active) {
final long now = System.currentTimeMillis();
if (now - this.lastPersistentUpdate > PERSISTENT_UPDATE_INTERVAL) {
this.currentValue = computeValueAt(now);
this.lastPersistentUpdate = now;
}
}
return this.currentValue;
}

View file

@ -37,8 +37,14 @@ public abstract class AbstractLogIndicator extends AbstractClientIndicator {
}
@Override
public void init(final Indicator indicatorDefinition, final Long connectionId, final boolean cachingEnabled) {
super.init(indicatorDefinition, connectionId, cachingEnabled);
public void init(
final Indicator indicatorDefinition,
final Long connectionId,
final boolean active,
final boolean cachingEnabled) {
super.init(indicatorDefinition, connectionId, active, cachingEnabled);
if (indicatorDefinition == null || StringUtils.isBlank(indicatorDefinition.tags)) {
this.tags = null;
} else {

View file

@ -56,14 +56,11 @@ public abstract class AbstractLogLevelCountIndicator extends AbstractLogIndicato
@Override
public double computeValueAt(final long timestamp) {
try {
// TODO to boost performance here within a distributed setup, invent a new cache for all log count values
// of the running exam. So all indicators get the values from cache and only one single SQL call
// is needed for one update.
// This cache then is only valid for one (GUI) update cycle and the cache must to be flushed before
final Long errors = this.clientEventRecordMapper.countByExample()
final Long errors = this.clientEventRecordMapper
.countByExample()
.where(ClientEventRecordDynamicSqlSupport.clientConnectionId, isEqualTo(this.connectionId))
.and(ClientEventRecordDynamicSqlSupport.type, isIn(this.eventTypeIds))
.and(ClientEventRecordDynamicSqlSupport.serverTime, isLessThan(timestamp))

View file

@ -64,11 +64,6 @@ public abstract class AbstractLogNumberIndicator extends AbstractLogIndicator {
public double computeValueAt(final long timestamp) {
try {
// TODO to boost performance here within a distributed setup, invent a new cache for all log count values
// of the running exam. So all indicators get the values from cache and only one single SQL call
// is needed for one update.
// This cache then is only valid for one (GUI) update cycle and the cache must to be flushed before
final List<ClientEventRecord> execute = this.clientEventRecordMapper.selectByExample()
.where(ClientEventRecordDynamicSqlSupport.clientConnectionId, isEqualTo(this.connectionId))
.and(ClientEventRecordDynamicSqlSupport.type, isIn(this.eventTypeIds))

View file

@ -17,28 +17,45 @@ import org.joda.time.DateTimeZone;
import com.fasterxml.jackson.annotation.JsonIgnore;
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator;
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientEventExtensionMapper;
import ch.ethz.seb.sebserver.gbl.util.Utils;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientEventDAO;
public abstract class AbstractPingIndicator extends AbstractClientIndicator {
private static final int PING_COUNT_INTERVAL_FOR_PERSISTENT_UPDATE = 3;
private final Set<EventType> EMPTY_SET = Collections.unmodifiableSet(EnumSet.noneOf(EventType.class));
protected final ClientEventExtensionMapper clientEventExtensionMapper;
protected final IndicatorDistributedRequestCache indicatorDistributedRequestCache;
protected final ClientEventDAO clientEventDAO;
protected long pingLatency;
protected int pingCount = 0;
protected int pingNumber = 0;
protected AbstractPingIndicator(
final ClientEventExtensionMapper clientEventExtensionMapper,
final IndicatorDistributedRequestCache indicatorDistributedRequestCache) {
protected ClientEventRecord pingRecord = null;
protected AbstractPingIndicator(final ClientEventDAO clientEventDAO) {
super();
this.clientEventExtensionMapper = clientEventExtensionMapper;
this.indicatorDistributedRequestCache = indicatorDistributedRequestCache;
this.clientEventDAO = clientEventDAO;
}
@Override
public void init(
final Indicator indicatorDefinition,
final Long connectionId,
final boolean active,
final boolean cachingEnabled) {
super.init(indicatorDefinition, connectionId, active, cachingEnabled);
if (!this.cachingEnabled) {
this.pingRecord = this.clientEventDAO
.initPingEvent(this.connectionId)
.getOr(null);
}
}
public final void notifyPing(final long timestamp, final int pingNumber) {
@ -47,21 +64,17 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator {
super.currentValue = now;
this.pingCount++;
this.pingNumber = pingNumber;
}
super.lastPersistentUpdate = now;
@Override
public final double computeValueAt(final long timestamp) {
if (this.cachingEnabled) {
return timestamp;
} else {
try {
return this.indicatorDistributedRequestCache
.getPingTimes(this.examId)
.getOrDefault(this.connectionId, 0L);
if (!this.cachingEnabled &&
this.pingCount > PING_COUNT_INTERVAL_FOR_PERSISTENT_UPDATE &&
this.pingRecord != null) {
} catch (final Exception e) {
return Double.NaN;
}
// Update last ping time on persistent storage
this.pingRecord.setClientTime(timestamp);
this.pingRecord.setServerTime(Utils.getMillisecondsNow());
this.clientEventDAO.updatePingEvent(this.pingRecord);
this.pingCount = 0;
}
}
@ -70,11 +83,6 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator {
return this.EMPTY_SET;
}
@JsonIgnore
public int getPingCount() {
return this.pingCount;
}
@JsonIgnore
public int getPingNumber() {
return this.pingNumber;

View file

@ -30,8 +30,13 @@ public class BatteryStatusIndicator extends AbstractLogNumberIndicator {
}
@Override
public void init(final Indicator indicatorDefinition, final Long connectionId, final boolean cachingEnabled) {
super.init(indicatorDefinition, connectionId, cachingEnabled);
public void init(
final Indicator indicatorDefinition,
final Long connectionId,
final boolean active,
final boolean cachingEnabled) {
super.init(indicatorDefinition, connectionId, active, cachingEnabled);
super.tags = new String[] { API.LOG_EVENT_TAG_BATTERY_STATUS };
}

View file

@ -1,71 +0,0 @@
/*
* Copyright (c) 2021 ETH Zürich, Educational Development and Technology (LET)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator;
import static org.mybatis.dynamic.sql.SqlBuilder.isEqualTo;
import java.util.stream.Collectors;
import org.ehcache.impl.internal.concurrent.ConcurrentHashMap;
import org.mybatis.dynamic.sql.SqlBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientEventLastPingMapper;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientConnectionRecordDynamicSqlSupport;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordDynamicSqlSupport;
@Lazy
@Service
@WebServiceProfile
public class IndicatorDistributedRequestCache {
public static final String LAST_PING_TIME_CACHE = "LAST_PING_TIME_CACHE";
private static final Logger log = LoggerFactory.getLogger(IndicatorDistributedRequestCache.class);
private final ClientEventLastPingMapper clientEventLastPingMapper;
public IndicatorDistributedRequestCache(final ClientEventLastPingMapper clientEventLastPingMapper) {
this.clientEventLastPingMapper = clientEventLastPingMapper;
}
@Cacheable(
cacheNames = LAST_PING_TIME_CACHE,
key = "#examId",
unless = "#result == null")
public ConcurrentHashMap<Long, Long> getPingTimes(final Long examId) {
return new ConcurrentHashMap<>(this.clientEventLastPingMapper.selectByExample()
.join(ClientConnectionRecordDynamicSqlSupport.clientConnectionRecord)
.on(
ClientEventRecordDynamicSqlSupport.clientConnectionId,
SqlBuilder.equalTo(ClientConnectionRecordDynamicSqlSupport.id))
.where(ClientConnectionRecordDynamicSqlSupport.examId, isEqualTo(examId))
.and(ClientEventRecordDynamicSqlSupport.type, isEqualTo(EventType.LAST_PING.id))
.build()
.execute()
.stream().collect(Collectors.toMap(rec -> rec.connectionId, rec -> rec.lastPingTime)));
}
@CacheEvict(
cacheNames = LAST_PING_TIME_CACHE,
key = "#examId")
public void evictPingTimes(final Long examId) {
if (log.isDebugEnabled()) {
log.debug("Evict LAST_PING_TIME_CACHE for examId: {}", examId);
}
}
}

View file

@ -27,9 +27,10 @@ import ch.ethz.seb.sebserver.gbl.model.exam.Indicator;
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType;
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent;
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
import ch.ethz.seb.sebserver.gbl.util.Result;
import ch.ethz.seb.sebserver.gbl.util.Utils;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientEventExtensionMapper;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientEventDAO;
@Lazy
@Component(IndicatorType.Names.LAST_PING)
@ -38,22 +39,29 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator {
private static final Logger log = LoggerFactory.getLogger(PingIntervalClientIndicator.class);
long pingErrorThreshold;
boolean missingPing = false;
boolean hidden = false;
// This is the default ping error threshold that is set if the threshold cannot be get
// from the ping threshold settings. If the last ping is older then this interval back in time
// then the ping is considered and marked as missing
private static final long DEFAULT_PING_ERROR_THRESHOLD = Constants.SECOND_IN_MILLIS * 5;
public PingIntervalClientIndicator(
final ClientEventExtensionMapper clientEventExtensionMapper,
final IndicatorDistributedRequestCache indicatorDistributedRequestCache) {
private long pingErrorThreshold;
private boolean missingPing = false;
private boolean hidden = false;
super(clientEventExtensionMapper, indicatorDistributedRequestCache);
public PingIntervalClientIndicator(final ClientEventDAO clientEventDAO) {
super(clientEventDAO);
this.cachingEnabled = true;
this.currentValue = computeValueAt(Utils.getMillisecondsNow());
}
@Override
public void init(final Indicator indicatorDefinition, final Long connectionId, final boolean cachingEnabled) {
super.init(indicatorDefinition, connectionId, cachingEnabled);
public void init(
final Indicator indicatorDefinition,
final Long connectionId,
final boolean active,
final boolean cachingEnabled) {
super.init(indicatorDefinition, connectionId, active, cachingEnabled);
try {
indicatorDefinition
@ -64,7 +72,7 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator {
} catch (final Exception e) {
log.error("Failed to initialize pingErrorThreshold: {}", e.getMessage());
this.pingErrorThreshold = Constants.SECOND_IN_MILLIS * 5;
this.pingErrorThreshold = DEFAULT_PING_ERROR_THRESHOLD;
}
if (!cachingEnabled) {
@ -114,6 +122,30 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator {
}
@Override
public final double computeValueAt(final long timestamp) {
if (!this.cachingEnabled && super.pingRecord != null) {
// if this indicator is not missing ping
if (!this.isMissingPing()) {
final Result<Long> lastPing = this.clientEventDAO
.getLastPing(super.pingRecord.getId());
if (!lastPing.hasError()) {
return lastPing.get();
} else {
log.error("Failed to get last ping from persistent: {}", lastPing.getError().getMessage());
}
}
return this.currentValue;
}
return !this.valueInitializes ? timestamp : this.currentValue;
}
@Override
public ClientEventRecord updateLogEvent(final long now) {
final long value = now - (long) super.currentValue;

View file

@ -30,8 +30,13 @@ public class WLANStatusIndicator extends AbstractLogNumberIndicator {
}
@Override
public void init(final Indicator indicatorDefinition, final Long connectionId, final boolean cachingEnabled) {
super.init(indicatorDefinition, connectionId, cachingEnabled);
public void init(
final Indicator indicatorDefinition,
final Long connectionId,
final boolean active,
final boolean cachingEnabled) {
super.init(indicatorDefinition, connectionId, active, cachingEnabled);
super.tags = new String[] { API.LOG_EVENT_TAG_WLAN_STATUS };
}

View file

@ -270,7 +270,9 @@ public class ExamAPI_V1_Controller {
final String instructionConfirm = request.getParameter(API.EXAM_API_PING_INSTRUCTION_CONFIRM);
if (log.isTraceEnabled()) {
log.trace("****************** SEB client connection: {}", connectionToken);
log.trace("****************** SEB client connection: {} ip: ",
connectionToken,
getClientAddress(request));
}
if (instructionConfirm != null) {
@ -352,32 +354,6 @@ public class ExamAPI_V1_Controller {
final ServletOutputStream outputStream = response.getOutputStream();
// try {
//
// final ClientConnectionData connection = this.examSessionService
// .getConnectionData(connectionToken)
// .getOrThrow();
//
// // exam integrity check
// if (connection.clientConnection.examId == null ||
// !this.examSessionService.isExamRunning(connection.clientConnection.examId)) {
//
// log.error("Missing exam identifier or requested exam is not running for connection: {}",
// connection);
// throw new IllegalStateException("Missing exam identifier or requested exam is not running");
// }
// } catch (final Exception e) {
//
// log.error("Unexpected error: ", e);
//
// final APIMessage errorMessage = APIMessage.ErrorMessage.GENERIC.of(e.getMessage());
// outputStream.write(Utils.toByteArray(this.jsonMapper.writeValueAsString(errorMessage)));
// response.setStatus(HttpStatus.BAD_REQUEST.value());
// outputStream.flush();
// outputStream.close();
// return;
// }
try {
this.examSessionService

View file

@ -11,7 +11,7 @@ logging.level.ch=INFO
logging.level.org.springframework.cache=INFO
logging.level.ch.ethz.seb.sebserver.webservice.servicelayer.lms.impl=DEBUG
logging.level.ch.ethz.seb.sebserver.webservice.servicelayer.session=DEBUG
logging.level.ch.ethz.seb.sebserver.webservice.weblayer.api.ExamAPI_V1_Controller=TRACE
#logging.level.ch.ethz.seb.sebserver.webservice.weblayer.api.ExamAPI_V1_Controller=TRACE
sebserver.http.client.connect-timeout=150000
sebserver.http.client.connection-request-timeout=100000

View file

@ -22,6 +22,7 @@ spring.datasource.url=jdbc:mariadb://${datastore.mariadb.server.address}:${datas
spring.flyway.enabled=true
spring.flyway.locations=classpath:config/sql/base
spring.flyway.cleanDisabled=true
spring.flyway.ignoreIgnoredMigrations=true
spring.datasource.driver-class-name=org.mariadb.jdbc.Driver
spring.datasource.hikari.initializationFailTimeout=3000
spring.datasource.hikari.connectionTimeout=30000

View file

@ -18,7 +18,7 @@ import org.mockito.Mockito;
import com.fasterxml.jackson.core.JsonProcessingException;
import ch.ethz.seb.sebserver.gbl.api.JSONMapper;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientEventExtensionMapper;
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientEventDAO;
public class PingIntervalClientIndicatorTest {
@ -32,10 +32,10 @@ public class PingIntervalClientIndicatorTest {
DateTimeUtils.setCurrentMillisProvider(() -> 1L);
final ClientEventExtensionMapper clientEventExtensionMapper = Mockito.mock(ClientEventExtensionMapper.class);
final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class);
final PingIntervalClientIndicator pingIntervalClientIndicator =
new PingIntervalClientIndicator(clientEventExtensionMapper, null);
new PingIntervalClientIndicator(clientEventDAO);
assertEquals("0.0", String.valueOf(pingIntervalClientIndicator.getValue()));
}
@ -44,10 +44,10 @@ public class PingIntervalClientIndicatorTest {
DateTimeUtils.setCurrentMillisProvider(() -> 1L);
final ClientEventExtensionMapper clientEventExtensionMapper = Mockito.mock(ClientEventExtensionMapper.class);
final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class);
final PingIntervalClientIndicator pingIntervalClientIndicator =
new PingIntervalClientIndicator(clientEventExtensionMapper, null);
new PingIntervalClientIndicator(clientEventDAO);
assertEquals("0.0", String.valueOf(pingIntervalClientIndicator.getValue()));
DateTimeUtils.setCurrentMillisProvider(() -> 10L);
@ -59,10 +59,10 @@ public class PingIntervalClientIndicatorTest {
public void testSerialization() throws JsonProcessingException {
DateTimeUtils.setCurrentMillisProvider(() -> 1L);
final ClientEventExtensionMapper clientEventExtensionMapper = Mockito.mock(ClientEventExtensionMapper.class);
final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class);
final PingIntervalClientIndicator pingIntervalClientIndicator =
new PingIntervalClientIndicator(clientEventExtensionMapper, null);
new PingIntervalClientIndicator(clientEventDAO);
final JSONMapper jsonMapper = new JSONMapper();
final String json = jsonMapper.writeValueAsString(pingIntervalClientIndicator);
assertEquals("{\"indicatorValue\":0.0,\"indicatorType\":\"LAST_PING\"}", json);