Merge remote-tracking branch 'origin/distributed-ping-cache' into

dev-1.2

Conflicts:
	pom.xml
This commit is contained in:
anhefti 2021-07-28 14:33:00 +02:00
commit a9810e3864
16 changed files with 390 additions and 68 deletions

View file

@ -18,7 +18,7 @@
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
<sebserver-version>1.2.0</sebserver-version> <sebserver-version>1.2.1-SNAPSHOT</sebserver-version>
<build-version>${sebserver-version}</build-version> <build-version>${sebserver-version}</build-version>
<revision>${sebserver-version}</revision> <revision>${sebserver-version}</revision>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

View file

@ -8,6 +8,9 @@
package ch.ethz.seb.sebserver.webservice.datalayer.batis; package ch.ethz.seb.sebserver.webservice.datalayer.batis;
import static ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordDynamicSqlSupport.*;
import static org.mybatis.dynamic.sql.SqlBuilder.isEqualTo;
import java.util.Collection; import java.util.Collection;
import org.apache.ibatis.annotations.Arg; import org.apache.ibatis.annotations.Arg;
@ -15,39 +18,85 @@ import org.apache.ibatis.annotations.ConstructorArgs;
import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.ResultType; import org.apache.ibatis.annotations.ResultType;
import org.apache.ibatis.annotations.SelectProvider; import org.apache.ibatis.annotations.SelectProvider;
import org.apache.ibatis.annotations.UpdateProvider;
import org.apache.ibatis.type.JdbcType; import org.apache.ibatis.type.JdbcType;
import org.mybatis.dynamic.sql.select.MyBatis3SelectModelAdapter; import org.mybatis.dynamic.sql.select.MyBatis3SelectModelAdapter;
import org.mybatis.dynamic.sql.select.QueryExpressionDSL; import org.mybatis.dynamic.sql.select.QueryExpressionDSL;
import org.mybatis.dynamic.sql.select.SelectDSL; import org.mybatis.dynamic.sql.select.SelectDSL;
import org.mybatis.dynamic.sql.select.render.SelectStatementProvider; import org.mybatis.dynamic.sql.select.render.SelectStatementProvider;
import org.mybatis.dynamic.sql.update.UpdateDSL;
import org.mybatis.dynamic.sql.update.render.UpdateStatementProvider;
import org.mybatis.dynamic.sql.util.SqlProviderAdapter; import org.mybatis.dynamic.sql.util.SqlProviderAdapter;
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordDynamicSqlSupport; import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordDynamicSqlSupport;
@Mapper @Mapper
public interface ClientEventLastPingMapper { public interface ClientEventLastPingMapper {
@SelectProvider(type = SqlProviderAdapter.class, method = "select") @SelectProvider(type = SqlProviderAdapter.class, method = "select")
Long num(SelectStatementProvider selectStatement); @ConstructorArgs({ @Arg(column = "server_time", javaType = Long.class, jdbcType = JdbcType.BIGINT) })
Collection<Long> selectPingTimes(SelectStatementProvider selectStatement);
@SelectProvider(type = SqlProviderAdapter.class, method = "select")
@ConstructorArgs({ @Arg(column = "server_time", javaType = Long.class, jdbcType = JdbcType.BIGINT) })
Long selectPingTime(SelectStatementProvider selectStatement);
@SelectProvider(type = SqlProviderAdapter.class, method = "select")
@ConstructorArgs({ @Arg(column = "id", javaType = Long.class, jdbcType = JdbcType.BIGINT, id = true) })
Long selectPK(SelectStatementProvider selectStatement);
@UpdateProvider(type = SqlProviderAdapter.class, method = "update")
int update(UpdateStatementProvider updateStatement);
@SelectProvider(type = SqlProviderAdapter.class, method = "select") @SelectProvider(type = SqlProviderAdapter.class, method = "select")
@ResultType(ClientEventLastPingRecord.class) @ResultType(ClientEventLastPingRecord.class)
@ConstructorArgs({ @ConstructorArgs({
@Arg(column = "id", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Arg(column = "id", javaType = Long.class, jdbcType = JdbcType.BIGINT),
@Arg(column = "server_time", javaType = Long.class, jdbcType = JdbcType.BIGINT), @Arg(column = "server_time", javaType = Long.class, jdbcType = JdbcType.BIGINT)
}) })
Collection<ClientEventLastPingRecord> selectMany(SelectStatementProvider select); Collection<ClientEventLastPingRecord> selectMany(SelectStatementProvider select);
default Long selectPingTimeByPrimaryKey(final Long id_) {
return SelectDSL.selectWithMapper(
this::selectPingTime,
ClientEventRecordDynamicSqlSupport.serverTime.as("server_time"))
.from(ClientEventRecordDynamicSqlSupport.clientEventRecord)
.where(ClientEventRecordDynamicSqlSupport.id, isEqualTo(id_))
.build()
.execute();
}
default Long pingRecordIdByConnectionId(final Long connectionId) {
return SelectDSL.selectDistinctWithMapper(
this::selectPK,
ClientEventRecordDynamicSqlSupport.id.as("id"))
.from(ClientEventRecordDynamicSqlSupport.clientEventRecord)
.where(ClientEventRecordDynamicSqlSupport.clientConnectionId, isEqualTo(connectionId))
.and(ClientEventRecordDynamicSqlSupport.type, isEqualTo(EventType.LAST_PING.id))
.build()
.execute();
}
default QueryExpressionDSL<MyBatis3SelectModelAdapter<Collection<ClientEventLastPingRecord>>> selectByExample() { default QueryExpressionDSL<MyBatis3SelectModelAdapter<Collection<ClientEventLastPingRecord>>> selectByExample() {
return SelectDSL.selectWithMapper( return SelectDSL.selectWithMapper(
this::selectMany, this::selectMany,
ClientEventRecordDynamicSqlSupport.clientConnectionId.as("id"), ClientEventRecordDynamicSqlSupport.id.as("id"),
ClientEventRecordDynamicSqlSupport.serverTime.as("server_time")) ClientEventRecordDynamicSqlSupport.serverTime.as("server_time"))
.from(ClientEventRecordDynamicSqlSupport.clientEventRecord); .from(ClientEventRecordDynamicSqlSupport.clientEventRecord);
} }
default int updatePingTime(final Long _id, final Long pingTime) {
return UpdateDSL.updateWithMapper(this::update, clientEventRecord)
.set(serverTime).equalTo(pingTime)
.where(id, isEqualTo(_id))
.build()
.execute();
}
final class ClientEventLastPingRecord { final class ClientEventLastPingRecord {
public final Long id; public final Long id;

View file

@ -18,6 +18,7 @@ import java.util.Set;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.joda.time.DateTimeUtils;
import org.mybatis.dynamic.sql.SqlBuilder; import org.mybatis.dynamic.sql.SqlBuilder;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -312,21 +313,6 @@ public class ClientEventDAOImpl implements ClientEventDAO {
.stream() .stream()
.map(pk -> new EntityKey(String.valueOf(pk), EntityType.CLIENT_EVENT)) .map(pk -> new EntityKey(String.valueOf(pk), EntityType.CLIENT_EVENT))
.collect(Collectors.toList()); .collect(Collectors.toList());
// return all
// .stream()
// .map(EntityKey::getModelId)
// .map(Long::parseLong)
// .map(pk -> {
// final int deleted = this.clientEventRecordMapper.deleteByPrimaryKey(pk);
// if (deleted == 1) {
// return new EntityKey(String.valueOf(pk), EntityType.CLIENT_EVENT);
// } else {
// return null;
// }
// })
// .filter(Objects::nonNull)
// .collect(Collectors.toList());
}); });
} }
@ -345,7 +331,7 @@ public class ClientEventDAOImpl implements ClientEventDAO {
return lastPingRec.get(0); return lastPingRec.get(0);
} }
final long millisecondsNow = Utils.getMillisecondsNow(); final long millisecondsNow = DateTimeUtils.currentTimeMillis();
final ClientEventRecord clientEventRecord = new ClientEventRecord(); final ClientEventRecord clientEventRecord = new ClientEventRecord();
clientEventRecord.setClientConnectionId(connectionId); clientEventRecord.setClientConnectionId(connectionId);
clientEventRecord.setType(EventType.LAST_PING.id); clientEventRecord.setType(EventType.LAST_PING.id);

View file

@ -24,6 +24,7 @@ import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientConnectionDAO;
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ExamDAO; import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ExamDAO;
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.FilterMap; import ch.ethz.seb.sebserver.webservice.servicelayer.dao.FilterMap;
import ch.ethz.seb.sebserver.webservice.servicelayer.lms.LmsAPIService; import ch.ethz.seb.sebserver.webservice.servicelayer.lms.LmsAPIService;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.ClientConnectionDataInternal;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.ExamSessionCacheService; import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.ExamSessionCacheService;
/** A Service to handle running exam sessions */ /** A Service to handle running exam sessions */
@ -178,6 +179,15 @@ public interface ExamSessionService {
* @return Result with reference to the given Exam or to an error if happened */ * @return Result with reference to the given Exam or to an error if happened */
Result<Exam> flushCache(final Exam exam); Result<Exam> flushCache(final Exam exam);
/** Is is supposed to be the single access point to internally get client connection
* data for a specified connection token.
* This uses the client connection data cache for lookup and also synchronizes asynchronous
* cache calls to prevent parallel creation of ClientConnectionDataInternal
*
* @param connectionToken the connection token of the active SEB client connection
* @return ClientConnectionDataInternal by synchronized cache lookup or null if not available */
ClientConnectionDataInternal getConnectionDataInternal(String connectionToken);
/** Checks if the given ClientConnectionData is an active SEB client connection. /** Checks if the given ClientConnectionData is an active SEB client connection.
* *
* @param connection ClientConnectionData instance * @param connection ClientConnectionData instance

View file

@ -308,13 +308,20 @@ public class ExamSessionServiceImpl implements ExamSessionService {
} }
} }
@Override
public ClientConnectionDataInternal getConnectionDataInternal(final String connectionToken) {
synchronized (this.examSessionCacheService) {
return this.examSessionCacheService.getClientConnection(connectionToken);
}
}
@Override @Override
public Result<ClientConnectionData> getConnectionData(final String connectionToken) { public Result<ClientConnectionData> getConnectionData(final String connectionToken) {
return Result.tryCatch(() -> { return Result.tryCatch(() -> {
final ClientConnectionDataInternal activeClientConnection = this.examSessionCacheService final ClientConnectionDataInternal activeClientConnection =
.getClientConnection(connectionToken); getConnectionDataInternal(connectionToken);
if (activeClientConnection == null) { if (activeClientConnection == null) {
throw new NoSuchElementException("Client Connection with token: " + connectionToken); throw new NoSuchElementException("Client Connection with token: " + connectionToken);
@ -403,7 +410,7 @@ public class ExamSessionServiceImpl implements ExamSessionService {
.getConnectionTokens(examId) .getConnectionTokens(examId)
.getOrThrow() .getOrThrow()
.stream() .stream()
.map(this.examSessionCacheService::getClientConnection) .map(this::getConnectionDataInternal)
.filter(Objects::nonNull) .filter(Objects::nonNull)
.map(cc -> cc.getClientConnection().updateTime) .map(cc -> cc.getClientConnection().updateTime)
.collect(Collectors.toSet()); .collect(Collectors.toSet());

View file

@ -45,6 +45,7 @@ import ch.ethz.seb.sebserver.webservice.servicelayer.session.ExamSessionService;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientConnectionService; import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientConnectionService;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientInstructionService; import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientInstructionService;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientNotificationService; import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientNotificationService;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator.DistributedPingCache;
import ch.ethz.seb.sebserver.webservice.weblayer.api.APIConstraintViolationException; import ch.ethz.seb.sebserver.webservice.weblayer.api.APIConstraintViolationException;
@Lazy @Lazy
@ -71,6 +72,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
private final SEBClientNotificationService sebClientNotificationService; private final SEBClientNotificationService sebClientNotificationService;
private final WebserviceInfo webserviceInfo; private final WebserviceInfo webserviceInfo;
private final ExamAdminService examAdminService; private final ExamAdminService examAdminService;
private final DistributedPingCache distributedPingCache;
protected SEBClientConnectionServiceImpl( protected SEBClientConnectionServiceImpl(
final ExamSessionService examSessionService, final ExamSessionService examSessionService,
@ -79,7 +81,8 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
final SEBClientConfigDAO sebClientConfigDAO, final SEBClientConfigDAO sebClientConfigDAO,
final SEBClientInstructionService sebInstructionService, final SEBClientInstructionService sebInstructionService,
final SEBClientNotificationService sebClientNotificationService, final SEBClientNotificationService sebClientNotificationService,
final ExamAdminService examAdminService) { final ExamAdminService examAdminService,
final DistributedPingCache distributedPingCache) {
this.examSessionService = examSessionService; this.examSessionService = examSessionService;
this.examSessionCacheService = examSessionService.getExamSessionCacheService(); this.examSessionCacheService = examSessionService.getExamSessionCacheService();
@ -91,6 +94,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
this.sebClientNotificationService = sebClientNotificationService; this.sebClientNotificationService = sebClientNotificationService;
this.webserviceInfo = sebInstructionService.getWebserviceInfo(); this.webserviceInfo = sebInstructionService.getWebserviceInfo();
this.examAdminService = examAdminService; this.examAdminService = examAdminService;
this.distributedPingCache = distributedPingCache;
} }
@Override @Override
@ -155,8 +159,8 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
.getOrThrow(); .getOrThrow();
// load client connection data into cache // load client connection data into cache
final ClientConnectionDataInternal activeClientConnection = this.examSessionCacheService final ClientConnectionDataInternal activeClientConnection = this.examSessionService
.getClientConnection(connectionToken); .getConnectionDataInternal(connectionToken);
if (activeClientConnection == null) { if (activeClientConnection == null) {
log.warn("Failed to load ClientConnectionDataInternal into cache on update"); log.warn("Failed to load ClientConnectionDataInternal into cache on update");
@ -453,6 +457,12 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
updatedClientConnection = clientConnection; updatedClientConnection = clientConnection;
} }
// delete stored ping if this is a distributed setup
if (this.webserviceInfo.isDistributed()) {
this.distributedPingCache
.deletePingForConnection(updatedClientConnection.id);
}
reloadConnectionCache(connectionToken); reloadConnectionCache(connectionToken);
return updatedClientConnection; return updatedClientConnection;
}); });
@ -501,6 +511,12 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
updatedClientConnection = clientConnection; updatedClientConnection = clientConnection;
} }
// delete stored ping if this is a distributed setup
if (this.webserviceInfo.isDistributed()) {
this.distributedPingCache
.deletePingForConnection(updatedClientConnection.id);
}
reloadConnectionCache(connectionToken); reloadConnectionCache(connectionToken);
return updatedClientConnection; return updatedClientConnection;
}); });
@ -510,6 +526,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
public void updatePingEvents() { public void updatePingEvents() {
try { try {
final boolean distributed = this.webserviceInfo.isDistributed();
final Cache cache = this.cacheManager.getCache(ExamSessionCacheService.CACHE_NAME_ACTIVE_CLIENT_CONNECTION); final Cache cache = this.cacheManager.getCache(ExamSessionCacheService.CACHE_NAME_ACTIVE_CLIENT_CONNECTION);
final long now = Utils.getMillisecondsNow(); final long now = Utils.getMillisecondsNow();
final Consumer<ClientConnectionDataInternal> missingPingUpdate = missingPingUpdate(now); final Consumer<ClientConnectionDataInternal> missingPingUpdate = missingPingUpdate(now);
@ -518,7 +535,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
.allRunningExamIds() .allRunningExamIds()
.getOrThrow() .getOrThrow()
.stream() .stream()
.flatMap(examId -> (this.webserviceInfo.isDistributed()) .flatMap(examId -> distributed
? this.clientConnectionDAO ? this.clientConnectionDAO
.getConnectionTokensNoCache(examId) .getConnectionTokensNoCache(examId)
.getOrThrow() .getOrThrow()
@ -550,7 +567,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
final int pingNumber) { final int pingNumber) {
final ClientConnectionDataInternal activeClientConnection = final ClientConnectionDataInternal activeClientConnection =
this.examSessionCacheService.getClientConnection(connectionToken); this.examSessionService.getConnectionDataInternal(connectionToken);
if (activeClientConnection != null) { if (activeClientConnection != null) {
activeClientConnection.notifyPing(timestamp, pingNumber); activeClientConnection.notifyPing(timestamp, pingNumber);
@ -566,7 +583,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
try { try {
final ClientConnectionDataInternal activeClientConnection = final ClientConnectionDataInternal activeClientConnection =
this.examSessionCacheService.getClientConnection(connectionToken); this.examSessionService.getConnectionDataInternal(connectionToken);
if (activeClientConnection != null) { if (activeClientConnection != null) {
@ -731,7 +748,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
// evict cached ClientConnection // evict cached ClientConnection
this.examSessionCacheService.evictClientConnection(connectionToken); this.examSessionCacheService.evictClientConnection(connectionToken);
// and load updated ClientConnection into cache // and load updated ClientConnection into cache
return this.examSessionCacheService.getClientConnection(connectionToken); return this.examSessionService.getConnectionDataInternal(connectionToken);
} }
private Consumer<ClientConnectionDataInternal> missingPingUpdate(final long now) { private Consumer<ClientConnectionDataInternal> missingPingUpdate(final long now) {

View file

@ -23,6 +23,7 @@ public abstract class AbstractClientIndicator implements ClientIndicator {
protected Long connectionId; protected Long connectionId;
protected boolean cachingEnabled; protected boolean cachingEnabled;
protected boolean active = true; protected boolean active = true;
protected long persistentUpdateInterval = PERSISTENT_UPDATE_INTERVAL;
protected long lastPersistentUpdate = 0; protected long lastPersistentUpdate = 0;
protected boolean valueInitializes = false; protected boolean valueInitializes = false;
@ -72,7 +73,7 @@ public abstract class AbstractClientIndicator implements ClientIndicator {
} }
if (!this.cachingEnabled && this.active) { if (!this.cachingEnabled && this.active) {
if (now - this.lastPersistentUpdate > PERSISTENT_UPDATE_INTERVAL) { if (now - this.lastPersistentUpdate > this.persistentUpdateInterval) {
this.currentValue = computeValueAt(now); this.currentValue = computeValueAt(now);
this.lastPersistentUpdate = now; this.lastPersistentUpdate = now;
} }

View file

@ -44,6 +44,7 @@ public abstract class AbstractLogIndicator extends AbstractClientIndicator {
final boolean cachingEnabled) { final boolean cachingEnabled) {
super.init(indicatorDefinition, connectionId, active, cachingEnabled); super.init(indicatorDefinition, connectionId, active, cachingEnabled);
super.persistentUpdateInterval = 2 * Constants.SECOND_IN_MILLIS;
if (indicatorDefinition == null || StringUtils.isBlank(indicatorDefinition.tags)) { if (indicatorDefinition == null || StringUtils.isBlank(indicatorDefinition.tags)) {
this.tags = null; this.tags = null;

View file

@ -86,6 +86,7 @@ public abstract class AbstractLogNumberIndicator extends AbstractLogIndicator {
} else { } else {
return super.currentValue; return super.currentValue;
} }
} catch (final Exception e) { } catch (final Exception e) {
log.error("Failed to get indicator number from persistent storage: {}", e.getMessage()); log.error("Failed to get indicator number from persistent storage: {}", e.getMessage());
return this.currentValue; return this.currentValue;

View file

@ -15,27 +15,31 @@ import java.util.Set;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.DateTimeUtils; import org.joda.time.DateTimeUtils;
import org.joda.time.DateTimeZone; import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ch.ethz.seb.sebserver.gbl.Constants; import ch.ethz.seb.sebserver.gbl.Constants;
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator; import ch.ethz.seb.sebserver.gbl.model.exam.Indicator;
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType; import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord; import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientEventDAO;
public abstract class AbstractPingIndicator extends AbstractClientIndicator { public abstract class AbstractPingIndicator extends AbstractClientIndicator {
private static final Logger log = LoggerFactory.getLogger(AbstractPingIndicator.class);
private static final long INTERVAL_FOR_PERSISTENT_UPDATE = Constants.SECOND_IN_MILLIS; private static final long INTERVAL_FOR_PERSISTENT_UPDATE = Constants.SECOND_IN_MILLIS;
private final Set<EventType> EMPTY_SET = Collections.unmodifiableSet(EnumSet.noneOf(EventType.class)); private final Set<EventType> EMPTY_SET = Collections.unmodifiableSet(EnumSet.noneOf(EventType.class));
protected final ClientEventDAO clientEventDAO; protected final DistributedPingCache distributedPingCache;
private long lastUpdate = 0; private final long lastUpdate = 0;
protected ClientEventRecord pingRecord = null; protected Long pingRecord = null;
protected AbstractPingIndicator(final DistributedPingCache distributedPingCache) {
protected AbstractPingIndicator(final ClientEventDAO clientEventDAO) {
super(); super();
this.clientEventDAO = clientEventDAO; this.distributedPingCache = distributedPingCache;
} }
@Override @Override
@ -47,10 +51,12 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator {
super.init(indicatorDefinition, connectionId, active, cachingEnabled); super.init(indicatorDefinition, connectionId, active, cachingEnabled);
if (!this.cachingEnabled) { if (!this.cachingEnabled && this.active) {
this.pingRecord = this.clientEventDAO try {
.initPingEvent(this.connectionId) this.pingRecord = this.distributedPingCache.initPingForConnection(this.connectionId);
.getOr(null); } catch (final Exception e) {
this.pingRecord = this.distributedPingCache.getPingRecordIdForConnectionId(connectionId);
}
} }
} }
@ -59,19 +65,39 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator {
super.currentValue = now; super.currentValue = now;
super.lastPersistentUpdate = now; super.lastPersistentUpdate = now;
if (!this.cachingEnabled && this.pingRecord != null) { if (!this.cachingEnabled) {
if (this.pingRecord == null) {
tryRecoverPingRecord();
if (this.pingRecord == null) {
return;
}
}
// Update last ping time on persistent storage // Update last ping time on persistent storage
final long millisecondsNow = DateTimeUtils.currentTimeMillis(); final long millisecondsNow = DateTimeUtils.currentTimeMillis();
if (millisecondsNow - this.lastUpdate > INTERVAL_FOR_PERSISTENT_UPDATE) { if (millisecondsNow - this.lastUpdate > INTERVAL_FOR_PERSISTENT_UPDATE) {
this.pingRecord.setClientTime(timestamp); this.distributedPingCache.updatePing(this.pingRecord, millisecondsNow);
this.pingRecord.setServerTime(millisecondsNow);
this.clientEventDAO.updatePingEvent(this.pingRecord);
this.lastUpdate = millisecondsNow;
} }
} }
} }
private void tryRecoverPingRecord() {
if (log.isWarnEnabled()) {
log.warn("*** Missing ping record for connection: {}. Try to recover...", this.connectionId);
}
try {
this.pingRecord = this.distributedPingCache.getPingRecordIdForConnectionId(this.connectionId);
if (this.pingRecord == null) {
this.pingRecord = this.distributedPingCache.initPingForConnection(this.connectionId);
}
} catch (final Exception e) {
log.error("Failed to recover ping record for connection: {}", this.connectionId, e);
}
}
@Override @Override
public Set<EventType> observedEvents() { public Set<EventType> observedEvents() {
return this.EMPTY_SET; return this.EMPTY_SET;

View file

@ -0,0 +1,230 @@
/*
* Copyright (c) 2021 ETH Zürich, Educational Development and Technology (LET)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator;
import static org.mybatis.dynamic.sql.SqlBuilder.isEqualTo;
import static org.mybatis.dynamic.sql.SqlBuilder.isIn;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.stream.Collectors;
import org.ehcache.impl.internal.concurrent.ConcurrentHashMap;
import org.joda.time.DateTimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
import ch.ethz.seb.sebserver.webservice.WebserviceInfo;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientEventLastPingMapper;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordDynamicSqlSupport;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordMapper;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
@Lazy
@Component
@WebServiceProfile
public class DistributedPingCache implements DisposableBean {
private static final Logger log = LoggerFactory.getLogger(DistributedPingCache.class);
private final ClientEventLastPingMapper clientEventLastPingMapper;
private final ClientEventRecordMapper clientEventRecordMapper;
private ScheduledFuture<?> taskRef;
private final Map<Long, Long> pingCache = new ConcurrentHashMap<>();
public DistributedPingCache(
final ClientEventLastPingMapper clientEventLastPingMapper,
final ClientEventRecordMapper clientEventRecordMapper,
final WebserviceInfo webserviceInfo,
final TaskScheduler taskScheduler) {
this.clientEventLastPingMapper = clientEventLastPingMapper;
this.clientEventRecordMapper = clientEventRecordMapper;
if (webserviceInfo.isDistributed()) {
try {
this.taskRef = taskScheduler.scheduleAtFixedRate(this::updateCache, 1000);
} catch (final Exception e) {
log.error("Failed to initialize distributed ping cache update task");
this.taskRef = null;
}
} else {
this.taskRef = null;
}
}
@Transactional
public Long initPingForConnection(final Long connectionId) {
try {
if (log.isDebugEnabled()) {
log.trace("*** Initialize ping record for SEB connection: {}", connectionId);
}
final Long recordId = this.clientEventLastPingMapper
.pingRecordIdByConnectionId(connectionId);
if (recordId == null) {
final long millisecondsNow = DateTimeUtils.currentTimeMillis();
final ClientEventRecord clientEventRecord = new ClientEventRecord();
clientEventRecord.setClientConnectionId(connectionId);
clientEventRecord.setType(EventType.LAST_PING.id);
clientEventRecord.setClientTime(millisecondsNow);
clientEventRecord.setServerTime(millisecondsNow);
this.clientEventRecordMapper.insert(clientEventRecord);
try {
// This also double-check by trying again. If we have more then one entry here
// this will throw an exception that causes a rollback
return this.clientEventLastPingMapper
.pingRecordIdByConnectionId(connectionId);
} catch (final Exception e) {
log.warn("Detected multiple client ping entries for connection: " + connectionId
+ ". Force rollback to prevent");
// force rollback
throw new RuntimeException("Detected multiple client ping entries");
}
}
return recordId;
} catch (final Exception e) {
log.error("Failed to initialize ping for connection -> {}", connectionId, e);
// force rollback
throw new RuntimeException("Failed to initialize ping for connection -> " + connectionId, e);
}
}
@Transactional(readOnly = true)
public Long getPingRecordIdForConnectionId(final Long connectionId) {
try {
return this.clientEventLastPingMapper
.pingRecordIdByConnectionId(connectionId);
} catch (final Exception e) {
log.error("Failed to get ping record for connection id: {} cause: {}", connectionId, e.getMessage());
return null;
}
}
@Transactional
public void updatePing(final Long pingRecordId, final Long pingTime) {
try {
this.clientEventLastPingMapper
.updatePingTime(pingRecordId, pingTime);
} catch (final Exception e) {
log.error("Failed to update ping for ping record id -> {}", pingRecordId);
}
}
@Transactional
public void deletePingForConnection(final Long connectionId) {
try {
if (log.isDebugEnabled()) {
log.debug("*** Delete ping record for SEB connection: {}", connectionId);
}
this.clientEventRecordMapper
.deleteByExample()
.where(ClientEventRecordDynamicSqlSupport.clientConnectionId, isEqualTo(connectionId))
.and(ClientEventRecordDynamicSqlSupport.type, isEqualTo(EventType.LAST_PING.id))
.build()
.execute();
} catch (final Exception e) {
log.error("Failed to delete ping for connection -> {}", connectionId, e);
}
}
public Long getLastPing(final Long pingRecordId) {
try {
Long ping = this.pingCache.get(pingRecordId);
if (ping == null) {
if (log.isDebugEnabled()) {
log.debug("*** Get and cache ping time: {}", pingRecordId);
}
ping = this.clientEventLastPingMapper.selectPingTimeByPrimaryKey(pingRecordId);
if (ping != null) {
this.pingCache.put(pingRecordId, ping);
}
}
return ping;
} catch (final Exception e) {
log.error("Error while trying to get last ping from storage: {}", e.getMessage());
return 0L;
}
}
@Transactional
public void updateCache() {
if (this.pingCache.isEmpty()) {
return;
}
if (log.isDebugEnabled()) {
log.trace("*** Update distributed ping cache: {}", this.pingCache);
}
try {
final ArrayList<Long> pks = new ArrayList<>(this.pingCache.keySet());
final Map<Long, Long> mapping = this.clientEventLastPingMapper
.selectByExample()
.where(
ClientEventRecordDynamicSqlSupport.id,
isIn(pks))
.build()
.execute()
.stream()
.collect(Collectors.toMap(entry -> entry.id, entry -> entry.lastPingTime));
if (mapping != null) {
this.pingCache.clear();
this.pingCache.putAll(mapping);
}
} catch (final Exception e) {
log.error("Error while trying to update distributed ping cache: {}", this.pingCache, e);
}
}
@Override
public void destroy() throws Exception {
if (this.taskRef != null) {
try {
final boolean cancel = this.taskRef.cancel(true);
if (!cancel) {
log.warn("Failed to cancel distributed ping cache update task");
}
} catch (final Exception e) {
log.error("Failed to cancel distributed ping cache update task: ", e);
}
}
}
}

View file

@ -26,9 +26,7 @@ import ch.ethz.seb.sebserver.gbl.model.exam.Indicator;
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType; import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType;
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent; import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent;
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType; import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
import ch.ethz.seb.sebserver.gbl.util.Result;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord; import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientEventDAO;
@Lazy @Lazy
@Component(IndicatorType.Names.LAST_PING) @Component(IndicatorType.Names.LAST_PING)
@ -46,8 +44,8 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator {
private boolean missingPing = false; private boolean missingPing = false;
private boolean hidden = false; private boolean hidden = false;
public PingIntervalClientIndicator(final ClientEventDAO clientEventDAO) { public PingIntervalClientIndicator(final DistributedPingCache distributedPingCache) {
super(clientEventDAO); super(distributedPingCache);
this.cachingEnabled = true; this.cachingEnabled = true;
} }
@ -129,17 +127,10 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator {
// if this indicator is not missing ping // if this indicator is not missing ping
if (!this.isMissingPing()) { if (!this.isMissingPing()) {
final Long lastPing = this.distributedPingCache.getLastPing(super.pingRecord);
final Result<Long> lastPing = this.clientEventDAO if (lastPing != null) {
.getLastPing(super.pingRecord.getId()); final double doubleValue = lastPing.doubleValue();
return Math.max(Double.isNaN(this.currentValue) ? doubleValue : this.currentValue, doubleValue);
if (!lastPing.hasError()) {
if (Double.isNaN(this.currentValue)) {
return lastPing.get().doubleValue();
}
return Math.max(this.currentValue, lastPing.get().doubleValue());
} else {
log.error("Failed to get last ping from persistent: {}", lastPing.getError().getMessage());
} }
} }

View file

@ -270,7 +270,7 @@ public class ExamAPI_V1_Controller {
final String instructionConfirm = request.getParameter(API.EXAM_API_PING_INSTRUCTION_CONFIRM); final String instructionConfirm = request.getParameter(API.EXAM_API_PING_INSTRUCTION_CONFIRM);
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
log.trace("****************** SEB client connection: {} ip: ", log.trace("****************** SEB client connection: {} ip: {}",
connectionToken, connectionToken,
getClientAddress(request)); getClientAddress(request));
} }

View file

@ -1,11 +1,11 @@
server.address=localhost server.address=localhost
server.port=8080 server.port=8090
sebserver.gui.http.external.scheme=http sebserver.gui.http.external.scheme=http
sebserver.gui.entrypoint=/gui sebserver.gui.entrypoint=/gui
sebserver.gui.webservice.protocol=http sebserver.gui.webservice.protocol=http
sebserver.gui.webservice.address=localhost sebserver.gui.webservice.address=localhost
sebserver.gui.webservice.port=8080 sebserver.gui.webservice.port=8090
sebserver.gui.webservice.apipath=/admin-api/v1 sebserver.gui.webservice.apipath=/admin-api/v1
# defines the polling interval that is used to poll the webservice for client connection data on a monitored exam page # defines the polling interval that is used to poll the webservice for client connection data on a monitored exam page
sebserver.gui.webservice.poll-interval=1000 sebserver.gui.webservice.poll-interval=1000

View file

@ -22,7 +22,7 @@ sebserver.webservice.clean-db-on-startup=false
# webservice configuration # webservice configuration
sebserver.init.adminaccount.gen-on-init=false sebserver.init.adminaccount.gen-on-init=false
sebserver.webservice.distributed=false sebserver.webservice.distributed=true
sebserver.webservice.master.delay.threshold=10000 sebserver.webservice.master.delay.threshold=10000
sebserver.webservice.http.external.scheme=http sebserver.webservice.http.external.scheme=http
sebserver.webservice.http.external.servername=localhost sebserver.webservice.http.external.servername=localhost

View file

@ -33,9 +33,10 @@ public class PingIntervalClientIndicatorTest {
DateTimeUtils.setCurrentMillisProvider(() -> 1L); DateTimeUtils.setCurrentMillisProvider(() -> 1L);
final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class); final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class);
final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class);
final PingIntervalClientIndicator pingIntervalClientIndicator = final PingIntervalClientIndicator pingIntervalClientIndicator =
new PingIntervalClientIndicator(clientEventDAO); new PingIntervalClientIndicator(distributedPingCache);
assertEquals("0.0", String.valueOf(pingIntervalClientIndicator.getValue())); assertEquals("0.0", String.valueOf(pingIntervalClientIndicator.getValue()));
} }
@ -45,9 +46,10 @@ public class PingIntervalClientIndicatorTest {
DateTimeUtils.setCurrentMillisProvider(() -> 1L); DateTimeUtils.setCurrentMillisProvider(() -> 1L);
final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class); final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class);
final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class);
final PingIntervalClientIndicator pingIntervalClientIndicator = final PingIntervalClientIndicator pingIntervalClientIndicator =
new PingIntervalClientIndicator(clientEventDAO); new PingIntervalClientIndicator(distributedPingCache);
assertEquals("0.0", String.valueOf(pingIntervalClientIndicator.getValue())); assertEquals("0.0", String.valueOf(pingIntervalClientIndicator.getValue()));
DateTimeUtils.setCurrentMillisProvider(() -> 10L); DateTimeUtils.setCurrentMillisProvider(() -> 10L);
@ -60,9 +62,10 @@ public class PingIntervalClientIndicatorTest {
DateTimeUtils.setCurrentMillisProvider(() -> 1L); DateTimeUtils.setCurrentMillisProvider(() -> 1L);
final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class); final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class);
final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class);
final PingIntervalClientIndicator pingIntervalClientIndicator = final PingIntervalClientIndicator pingIntervalClientIndicator =
new PingIntervalClientIndicator(clientEventDAO); new PingIntervalClientIndicator(distributedPingCache);
final JSONMapper jsonMapper = new JSONMapper(); final JSONMapper jsonMapper = new JSONMapper();
final String json = jsonMapper.writeValueAsString(pingIntervalClientIndicator); final String json = jsonMapper.writeValueAsString(pingIntervalClientIndicator);
assertEquals("{\"indicatorValue\":0.0,\"indicatorType\":\"LAST_PING\"}", json); assertEquals("{\"indicatorValue\":0.0,\"indicatorType\":\"LAST_PING\"}", json);