diff --git a/pom.xml b/pom.xml
index c44d404b..819389e3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,7 +18,7 @@
jar
- 1.2.0
+ 1.2.1-SNAPSHOT
${sebserver-version}
${sebserver-version}
UTF-8
diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/datalayer/batis/ClientEventLastPingMapper.java b/src/main/java/ch/ethz/seb/sebserver/webservice/datalayer/batis/ClientEventLastPingMapper.java
index cf233e83..6fdaadb8 100644
--- a/src/main/java/ch/ethz/seb/sebserver/webservice/datalayer/batis/ClientEventLastPingMapper.java
+++ b/src/main/java/ch/ethz/seb/sebserver/webservice/datalayer/batis/ClientEventLastPingMapper.java
@@ -8,6 +8,9 @@
package ch.ethz.seb.sebserver.webservice.datalayer.batis;
+import static ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordDynamicSqlSupport.*;
+import static org.mybatis.dynamic.sql.SqlBuilder.isEqualTo;
+
import java.util.Collection;
import org.apache.ibatis.annotations.Arg;
@@ -15,39 +18,85 @@ import org.apache.ibatis.annotations.ConstructorArgs;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.ResultType;
import org.apache.ibatis.annotations.SelectProvider;
+import org.apache.ibatis.annotations.UpdateProvider;
import org.apache.ibatis.type.JdbcType;
import org.mybatis.dynamic.sql.select.MyBatis3SelectModelAdapter;
import org.mybatis.dynamic.sql.select.QueryExpressionDSL;
import org.mybatis.dynamic.sql.select.SelectDSL;
import org.mybatis.dynamic.sql.select.render.SelectStatementProvider;
+import org.mybatis.dynamic.sql.update.UpdateDSL;
+import org.mybatis.dynamic.sql.update.render.UpdateStatementProvider;
import org.mybatis.dynamic.sql.util.SqlProviderAdapter;
+import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordDynamicSqlSupport;
@Mapper
public interface ClientEventLastPingMapper {
@SelectProvider(type = SqlProviderAdapter.class, method = "select")
- Long num(SelectStatementProvider selectStatement);
+ @ConstructorArgs({ @Arg(column = "server_time", javaType = Long.class, jdbcType = JdbcType.BIGINT) })
+ Collection selectPingTimes(SelectStatementProvider selectStatement);
+
+ @SelectProvider(type = SqlProviderAdapter.class, method = "select")
+ @ConstructorArgs({ @Arg(column = "server_time", javaType = Long.class, jdbcType = JdbcType.BIGINT) })
+ Long selectPingTime(SelectStatementProvider selectStatement);
+
+ @SelectProvider(type = SqlProviderAdapter.class, method = "select")
+ @ConstructorArgs({ @Arg(column = "id", javaType = Long.class, jdbcType = JdbcType.BIGINT, id = true) })
+ Long selectPK(SelectStatementProvider selectStatement);
+
+ @UpdateProvider(type = SqlProviderAdapter.class, method = "update")
+ int update(UpdateStatementProvider updateStatement);
@SelectProvider(type = SqlProviderAdapter.class, method = "select")
@ResultType(ClientEventLastPingRecord.class)
@ConstructorArgs({
@Arg(column = "id", javaType = Long.class, jdbcType = JdbcType.BIGINT),
- @Arg(column = "server_time", javaType = Long.class, jdbcType = JdbcType.BIGINT),
+ @Arg(column = "server_time", javaType = Long.class, jdbcType = JdbcType.BIGINT)
})
Collection selectMany(SelectStatementProvider select);
+ default Long selectPingTimeByPrimaryKey(final Long id_) {
+ return SelectDSL.selectWithMapper(
+ this::selectPingTime,
+ ClientEventRecordDynamicSqlSupport.serverTime.as("server_time"))
+ .from(ClientEventRecordDynamicSqlSupport.clientEventRecord)
+ .where(ClientEventRecordDynamicSqlSupport.id, isEqualTo(id_))
+ .build()
+ .execute();
+ }
+
+ default Long pingRecordIdByConnectionId(final Long connectionId) {
+ return SelectDSL.selectDistinctWithMapper(
+ this::selectPK,
+ ClientEventRecordDynamicSqlSupport.id.as("id"))
+ .from(ClientEventRecordDynamicSqlSupport.clientEventRecord)
+ .where(ClientEventRecordDynamicSqlSupport.clientConnectionId, isEqualTo(connectionId))
+ .and(ClientEventRecordDynamicSqlSupport.type, isEqualTo(EventType.LAST_PING.id))
+ .build()
+ .execute();
+ }
+
default QueryExpressionDSL>> selectByExample() {
+
return SelectDSL.selectWithMapper(
this::selectMany,
- ClientEventRecordDynamicSqlSupport.clientConnectionId.as("id"),
+ ClientEventRecordDynamicSqlSupport.id.as("id"),
ClientEventRecordDynamicSqlSupport.serverTime.as("server_time"))
.from(ClientEventRecordDynamicSqlSupport.clientEventRecord);
}
+ default int updatePingTime(final Long _id, final Long pingTime) {
+ return UpdateDSL.updateWithMapper(this::update, clientEventRecord)
+ .set(serverTime).equalTo(pingTime)
+ .where(id, isEqualTo(_id))
+ .build()
+ .execute();
+ }
+
final class ClientEventLastPingRecord {
public final Long id;
diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/ClientEventDAOImpl.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/ClientEventDAOImpl.java
index 1d07897b..7c4b583a 100644
--- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/ClientEventDAOImpl.java
+++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/ClientEventDAOImpl.java
@@ -18,6 +18,7 @@ import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import org.joda.time.DateTimeUtils;
import org.mybatis.dynamic.sql.SqlBuilder;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
@@ -312,21 +313,6 @@ public class ClientEventDAOImpl implements ClientEventDAO {
.stream()
.map(pk -> new EntityKey(String.valueOf(pk), EntityType.CLIENT_EVENT))
.collect(Collectors.toList());
-
-// return all
-// .stream()
-// .map(EntityKey::getModelId)
-// .map(Long::parseLong)
-// .map(pk -> {
-// final int deleted = this.clientEventRecordMapper.deleteByPrimaryKey(pk);
-// if (deleted == 1) {
-// return new EntityKey(String.valueOf(pk), EntityType.CLIENT_EVENT);
-// } else {
-// return null;
-// }
-// })
-// .filter(Objects::nonNull)
-// .collect(Collectors.toList());
});
}
@@ -345,7 +331,7 @@ public class ClientEventDAOImpl implements ClientEventDAO {
return lastPingRec.get(0);
}
- final long millisecondsNow = Utils.getMillisecondsNow();
+ final long millisecondsNow = DateTimeUtils.currentTimeMillis();
final ClientEventRecord clientEventRecord = new ClientEventRecord();
clientEventRecord.setClientConnectionId(connectionId);
clientEventRecord.setType(EventType.LAST_PING.id);
diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/ExamSessionService.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/ExamSessionService.java
index 14b362ce..10395008 100644
--- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/ExamSessionService.java
+++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/ExamSessionService.java
@@ -24,6 +24,7 @@ import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientConnectionDAO;
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ExamDAO;
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.FilterMap;
import ch.ethz.seb.sebserver.webservice.servicelayer.lms.LmsAPIService;
+import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.ClientConnectionDataInternal;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.ExamSessionCacheService;
/** A Service to handle running exam sessions */
@@ -178,6 +179,15 @@ public interface ExamSessionService {
* @return Result with reference to the given Exam or to an error if happened */
Result flushCache(final Exam exam);
+ /** Is is supposed to be the single access point to internally get client connection
+ * data for a specified connection token.
+ * This uses the client connection data cache for lookup and also synchronizes asynchronous
+ * cache calls to prevent parallel creation of ClientConnectionDataInternal
+ *
+ * @param connectionToken the connection token of the active SEB client connection
+ * @return ClientConnectionDataInternal by synchronized cache lookup or null if not available */
+ ClientConnectionDataInternal getConnectionDataInternal(String connectionToken);
+
/** Checks if the given ClientConnectionData is an active SEB client connection.
*
* @param connection ClientConnectionData instance
diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ExamSessionServiceImpl.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ExamSessionServiceImpl.java
index e09969a3..67a62448 100644
--- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ExamSessionServiceImpl.java
+++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ExamSessionServiceImpl.java
@@ -308,13 +308,20 @@ public class ExamSessionServiceImpl implements ExamSessionService {
}
}
+ @Override
+ public ClientConnectionDataInternal getConnectionDataInternal(final String connectionToken) {
+ synchronized (this.examSessionCacheService) {
+ return this.examSessionCacheService.getClientConnection(connectionToken);
+ }
+ }
+
@Override
public Result getConnectionData(final String connectionToken) {
return Result.tryCatch(() -> {
- final ClientConnectionDataInternal activeClientConnection = this.examSessionCacheService
- .getClientConnection(connectionToken);
+ final ClientConnectionDataInternal activeClientConnection =
+ getConnectionDataInternal(connectionToken);
if (activeClientConnection == null) {
throw new NoSuchElementException("Client Connection with token: " + connectionToken);
@@ -403,7 +410,7 @@ public class ExamSessionServiceImpl implements ExamSessionService {
.getConnectionTokens(examId)
.getOrThrow()
.stream()
- .map(this.examSessionCacheService::getClientConnection)
+ .map(this::getConnectionDataInternal)
.filter(Objects::nonNull)
.map(cc -> cc.getClientConnection().updateTime)
.collect(Collectors.toSet());
diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java
index c996e4a0..d498b69d 100644
--- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java
+++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java
@@ -45,6 +45,7 @@ import ch.ethz.seb.sebserver.webservice.servicelayer.session.ExamSessionService;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientConnectionService;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientInstructionService;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientNotificationService;
+import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator.DistributedPingCache;
import ch.ethz.seb.sebserver.webservice.weblayer.api.APIConstraintViolationException;
@Lazy
@@ -71,6 +72,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
private final SEBClientNotificationService sebClientNotificationService;
private final WebserviceInfo webserviceInfo;
private final ExamAdminService examAdminService;
+ private final DistributedPingCache distributedPingCache;
protected SEBClientConnectionServiceImpl(
final ExamSessionService examSessionService,
@@ -79,7 +81,8 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
final SEBClientConfigDAO sebClientConfigDAO,
final SEBClientInstructionService sebInstructionService,
final SEBClientNotificationService sebClientNotificationService,
- final ExamAdminService examAdminService) {
+ final ExamAdminService examAdminService,
+ final DistributedPingCache distributedPingCache) {
this.examSessionService = examSessionService;
this.examSessionCacheService = examSessionService.getExamSessionCacheService();
@@ -91,6 +94,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
this.sebClientNotificationService = sebClientNotificationService;
this.webserviceInfo = sebInstructionService.getWebserviceInfo();
this.examAdminService = examAdminService;
+ this.distributedPingCache = distributedPingCache;
}
@Override
@@ -155,8 +159,8 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
.getOrThrow();
// load client connection data into cache
- final ClientConnectionDataInternal activeClientConnection = this.examSessionCacheService
- .getClientConnection(connectionToken);
+ final ClientConnectionDataInternal activeClientConnection = this.examSessionService
+ .getConnectionDataInternal(connectionToken);
if (activeClientConnection == null) {
log.warn("Failed to load ClientConnectionDataInternal into cache on update");
@@ -453,6 +457,12 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
updatedClientConnection = clientConnection;
}
+ // delete stored ping if this is a distributed setup
+ if (this.webserviceInfo.isDistributed()) {
+ this.distributedPingCache
+ .deletePingForConnection(updatedClientConnection.id);
+ }
+
reloadConnectionCache(connectionToken);
return updatedClientConnection;
});
@@ -501,6 +511,12 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
updatedClientConnection = clientConnection;
}
+ // delete stored ping if this is a distributed setup
+ if (this.webserviceInfo.isDistributed()) {
+ this.distributedPingCache
+ .deletePingForConnection(updatedClientConnection.id);
+ }
+
reloadConnectionCache(connectionToken);
return updatedClientConnection;
});
@@ -510,6 +526,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
public void updatePingEvents() {
try {
+ final boolean distributed = this.webserviceInfo.isDistributed();
final Cache cache = this.cacheManager.getCache(ExamSessionCacheService.CACHE_NAME_ACTIVE_CLIENT_CONNECTION);
final long now = Utils.getMillisecondsNow();
final Consumer missingPingUpdate = missingPingUpdate(now);
@@ -518,7 +535,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
.allRunningExamIds()
.getOrThrow()
.stream()
- .flatMap(examId -> (this.webserviceInfo.isDistributed())
+ .flatMap(examId -> distributed
? this.clientConnectionDAO
.getConnectionTokensNoCache(examId)
.getOrThrow()
@@ -550,7 +567,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
final int pingNumber) {
final ClientConnectionDataInternal activeClientConnection =
- this.examSessionCacheService.getClientConnection(connectionToken);
+ this.examSessionService.getConnectionDataInternal(connectionToken);
if (activeClientConnection != null) {
activeClientConnection.notifyPing(timestamp, pingNumber);
@@ -566,7 +583,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
try {
final ClientConnectionDataInternal activeClientConnection =
- this.examSessionCacheService.getClientConnection(connectionToken);
+ this.examSessionService.getConnectionDataInternal(connectionToken);
if (activeClientConnection != null) {
@@ -731,7 +748,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
// evict cached ClientConnection
this.examSessionCacheService.evictClientConnection(connectionToken);
// and load updated ClientConnection into cache
- return this.examSessionCacheService.getClientConnection(connectionToken);
+ return this.examSessionService.getConnectionDataInternal(connectionToken);
}
private Consumer missingPingUpdate(final long now) {
diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractClientIndicator.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractClientIndicator.java
index db1d2301..9790c05a 100644
--- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractClientIndicator.java
+++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractClientIndicator.java
@@ -23,6 +23,7 @@ public abstract class AbstractClientIndicator implements ClientIndicator {
protected Long connectionId;
protected boolean cachingEnabled;
protected boolean active = true;
+ protected long persistentUpdateInterval = PERSISTENT_UPDATE_INTERVAL;
protected long lastPersistentUpdate = 0;
protected boolean valueInitializes = false;
@@ -72,7 +73,7 @@ public abstract class AbstractClientIndicator implements ClientIndicator {
}
if (!this.cachingEnabled && this.active) {
- if (now - this.lastPersistentUpdate > PERSISTENT_UPDATE_INTERVAL) {
+ if (now - this.lastPersistentUpdate > this.persistentUpdateInterval) {
this.currentValue = computeValueAt(now);
this.lastPersistentUpdate = now;
}
diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogIndicator.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogIndicator.java
index 605db6ab..743361e3 100644
--- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogIndicator.java
+++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogIndicator.java
@@ -44,6 +44,7 @@ public abstract class AbstractLogIndicator extends AbstractClientIndicator {
final boolean cachingEnabled) {
super.init(indicatorDefinition, connectionId, active, cachingEnabled);
+ super.persistentUpdateInterval = 2 * Constants.SECOND_IN_MILLIS;
if (indicatorDefinition == null || StringUtils.isBlank(indicatorDefinition.tags)) {
this.tags = null;
diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogNumberIndicator.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogNumberIndicator.java
index a92ed3d9..bb9d13dc 100644
--- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogNumberIndicator.java
+++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogNumberIndicator.java
@@ -86,6 +86,7 @@ public abstract class AbstractLogNumberIndicator extends AbstractLogIndicator {
} else {
return super.currentValue;
}
+
} catch (final Exception e) {
log.error("Failed to get indicator number from persistent storage: {}", e.getMessage());
return this.currentValue;
diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java
index 18a5bff0..553d972d 100644
--- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java
+++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java
@@ -15,27 +15,31 @@ import java.util.Set;
import org.joda.time.DateTime;
import org.joda.time.DateTimeUtils;
import org.joda.time.DateTimeZone;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import ch.ethz.seb.sebserver.gbl.Constants;
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator;
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
-import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientEventDAO;
public abstract class AbstractPingIndicator extends AbstractClientIndicator {
+ private static final Logger log = LoggerFactory.getLogger(AbstractPingIndicator.class);
+
private static final long INTERVAL_FOR_PERSISTENT_UPDATE = Constants.SECOND_IN_MILLIS;
private final Set EMPTY_SET = Collections.unmodifiableSet(EnumSet.noneOf(EventType.class));
- protected final ClientEventDAO clientEventDAO;
+ protected final DistributedPingCache distributedPingCache;
- private long lastUpdate = 0;
- protected ClientEventRecord pingRecord = null;
+ private final long lastUpdate = 0;
+ protected Long pingRecord = null;
+
+ protected AbstractPingIndicator(final DistributedPingCache distributedPingCache) {
- protected AbstractPingIndicator(final ClientEventDAO clientEventDAO) {
super();
- this.clientEventDAO = clientEventDAO;
+ this.distributedPingCache = distributedPingCache;
}
@Override
@@ -47,10 +51,12 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator {
super.init(indicatorDefinition, connectionId, active, cachingEnabled);
- if (!this.cachingEnabled) {
- this.pingRecord = this.clientEventDAO
- .initPingEvent(this.connectionId)
- .getOr(null);
+ if (!this.cachingEnabled && this.active) {
+ try {
+ this.pingRecord = this.distributedPingCache.initPingForConnection(this.connectionId);
+ } catch (final Exception e) {
+ this.pingRecord = this.distributedPingCache.getPingRecordIdForConnectionId(connectionId);
+ }
}
}
@@ -59,19 +65,39 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator {
super.currentValue = now;
super.lastPersistentUpdate = now;
- if (!this.cachingEnabled && this.pingRecord != null) {
+ if (!this.cachingEnabled) {
+
+ if (this.pingRecord == null) {
+ tryRecoverPingRecord();
+ if (this.pingRecord == null) {
+ return;
+ }
+ }
// Update last ping time on persistent storage
final long millisecondsNow = DateTimeUtils.currentTimeMillis();
if (millisecondsNow - this.lastUpdate > INTERVAL_FOR_PERSISTENT_UPDATE) {
- this.pingRecord.setClientTime(timestamp);
- this.pingRecord.setServerTime(millisecondsNow);
- this.clientEventDAO.updatePingEvent(this.pingRecord);
- this.lastUpdate = millisecondsNow;
+ this.distributedPingCache.updatePing(this.pingRecord, millisecondsNow);
}
}
}
+ private void tryRecoverPingRecord() {
+
+ if (log.isWarnEnabled()) {
+ log.warn("*** Missing ping record for connection: {}. Try to recover...", this.connectionId);
+ }
+
+ try {
+ this.pingRecord = this.distributedPingCache.getPingRecordIdForConnectionId(this.connectionId);
+ if (this.pingRecord == null) {
+ this.pingRecord = this.distributedPingCache.initPingForConnection(this.connectionId);
+ }
+ } catch (final Exception e) {
+ log.error("Failed to recover ping record for connection: {}", this.connectionId, e);
+ }
+ }
+
@Override
public Set observedEvents() {
return this.EMPTY_SET;
diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedPingCache.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedPingCache.java
new file mode 100644
index 00000000..e35bbdf5
--- /dev/null
+++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedPingCache.java
@@ -0,0 +1,230 @@
+/*
+ * Copyright (c) 2021 ETH Zürich, Educational Development and Technology (LET)
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator;
+
+import static org.mybatis.dynamic.sql.SqlBuilder.isEqualTo;
+import static org.mybatis.dynamic.sql.SqlBuilder.isIn;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
+import java.util.stream.Collectors;
+
+import org.ehcache.impl.internal.concurrent.ConcurrentHashMap;
+import org.joda.time.DateTimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.context.annotation.Lazy;
+import org.springframework.scheduling.TaskScheduler;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+
+import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
+import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
+import ch.ethz.seb.sebserver.webservice.WebserviceInfo;
+import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientEventLastPingMapper;
+import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordDynamicSqlSupport;
+import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordMapper;
+import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
+
+@Lazy
+@Component
+@WebServiceProfile
+public class DistributedPingCache implements DisposableBean {
+
+ private static final Logger log = LoggerFactory.getLogger(DistributedPingCache.class);
+
+ private final ClientEventLastPingMapper clientEventLastPingMapper;
+ private final ClientEventRecordMapper clientEventRecordMapper;
+ private ScheduledFuture> taskRef;
+
+ private final Map pingCache = new ConcurrentHashMap<>();
+
+ public DistributedPingCache(
+ final ClientEventLastPingMapper clientEventLastPingMapper,
+ final ClientEventRecordMapper clientEventRecordMapper,
+ final WebserviceInfo webserviceInfo,
+ final TaskScheduler taskScheduler) {
+
+ this.clientEventLastPingMapper = clientEventLastPingMapper;
+ this.clientEventRecordMapper = clientEventRecordMapper;
+ if (webserviceInfo.isDistributed()) {
+ try {
+ this.taskRef = taskScheduler.scheduleAtFixedRate(this::updateCache, 1000);
+ } catch (final Exception e) {
+ log.error("Failed to initialize distributed ping cache update task");
+ this.taskRef = null;
+ }
+ } else {
+ this.taskRef = null;
+ }
+ }
+
+ @Transactional
+ public Long initPingForConnection(final Long connectionId) {
+ try {
+
+ if (log.isDebugEnabled()) {
+ log.trace("*** Initialize ping record for SEB connection: {}", connectionId);
+ }
+
+ final Long recordId = this.clientEventLastPingMapper
+ .pingRecordIdByConnectionId(connectionId);
+
+ if (recordId == null) {
+ final long millisecondsNow = DateTimeUtils.currentTimeMillis();
+ final ClientEventRecord clientEventRecord = new ClientEventRecord();
+ clientEventRecord.setClientConnectionId(connectionId);
+ clientEventRecord.setType(EventType.LAST_PING.id);
+ clientEventRecord.setClientTime(millisecondsNow);
+ clientEventRecord.setServerTime(millisecondsNow);
+ this.clientEventRecordMapper.insert(clientEventRecord);
+
+ try {
+ // This also double-check by trying again. If we have more then one entry here
+ // this will throw an exception that causes a rollback
+ return this.clientEventLastPingMapper
+ .pingRecordIdByConnectionId(connectionId);
+
+ } catch (final Exception e) {
+
+ log.warn("Detected multiple client ping entries for connection: " + connectionId
+ + ". Force rollback to prevent");
+
+ // force rollback
+ throw new RuntimeException("Detected multiple client ping entries");
+ }
+ }
+
+ return recordId;
+ } catch (final Exception e) {
+
+ log.error("Failed to initialize ping for connection -> {}", connectionId, e);
+
+ // force rollback
+ throw new RuntimeException("Failed to initialize ping for connection -> " + connectionId, e);
+ }
+ }
+
+ @Transactional(readOnly = true)
+ public Long getPingRecordIdForConnectionId(final Long connectionId) {
+ try {
+
+ return this.clientEventLastPingMapper
+ .pingRecordIdByConnectionId(connectionId);
+
+ } catch (final Exception e) {
+ log.error("Failed to get ping record for connection id: {} cause: {}", connectionId, e.getMessage());
+ return null;
+ }
+ }
+
+ @Transactional
+ public void updatePing(final Long pingRecordId, final Long pingTime) {
+ try {
+
+ this.clientEventLastPingMapper
+ .updatePingTime(pingRecordId, pingTime);
+
+ } catch (final Exception e) {
+ log.error("Failed to update ping for ping record id -> {}", pingRecordId);
+ }
+ }
+
+ @Transactional
+ public void deletePingForConnection(final Long connectionId) {
+ try {
+
+ if (log.isDebugEnabled()) {
+ log.debug("*** Delete ping record for SEB connection: {}", connectionId);
+ }
+
+ this.clientEventRecordMapper
+ .deleteByExample()
+ .where(ClientEventRecordDynamicSqlSupport.clientConnectionId, isEqualTo(connectionId))
+ .and(ClientEventRecordDynamicSqlSupport.type, isEqualTo(EventType.LAST_PING.id))
+ .build()
+ .execute();
+
+ } catch (final Exception e) {
+ log.error("Failed to delete ping for connection -> {}", connectionId, e);
+ }
+ }
+
+ public Long getLastPing(final Long pingRecordId) {
+ try {
+ Long ping = this.pingCache.get(pingRecordId);
+ if (ping == null) {
+
+ if (log.isDebugEnabled()) {
+ log.debug("*** Get and cache ping time: {}", pingRecordId);
+ }
+
+ ping = this.clientEventLastPingMapper.selectPingTimeByPrimaryKey(pingRecordId);
+ if (ping != null) {
+ this.pingCache.put(pingRecordId, ping);
+ }
+ }
+
+ return ping;
+ } catch (final Exception e) {
+ log.error("Error while trying to get last ping from storage: {}", e.getMessage());
+ return 0L;
+ }
+ }
+
+ @Transactional
+ public void updateCache() {
+
+ if (this.pingCache.isEmpty()) {
+ return;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.trace("*** Update distributed ping cache: {}", this.pingCache);
+ }
+
+ try {
+ final ArrayList pks = new ArrayList<>(this.pingCache.keySet());
+ final Map mapping = this.clientEventLastPingMapper
+ .selectByExample()
+ .where(
+ ClientEventRecordDynamicSqlSupport.id,
+ isIn(pks))
+ .build()
+ .execute()
+ .stream()
+ .collect(Collectors.toMap(entry -> entry.id, entry -> entry.lastPingTime));
+
+ if (mapping != null) {
+ this.pingCache.clear();
+ this.pingCache.putAll(mapping);
+ }
+
+ } catch (final Exception e) {
+ log.error("Error while trying to update distributed ping cache: {}", this.pingCache, e);
+ }
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ if (this.taskRef != null) {
+ try {
+ final boolean cancel = this.taskRef.cancel(true);
+ if (!cancel) {
+ log.warn("Failed to cancel distributed ping cache update task");
+ }
+ } catch (final Exception e) {
+ log.error("Failed to cancel distributed ping cache update task: ", e);
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicator.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicator.java
index c4954a74..85bb95a6 100644
--- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicator.java
+++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicator.java
@@ -26,9 +26,7 @@ import ch.ethz.seb.sebserver.gbl.model.exam.Indicator;
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType;
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent;
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
-import ch.ethz.seb.sebserver.gbl.util.Result;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
-import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientEventDAO;
@Lazy
@Component(IndicatorType.Names.LAST_PING)
@@ -46,8 +44,8 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator {
private boolean missingPing = false;
private boolean hidden = false;
- public PingIntervalClientIndicator(final ClientEventDAO clientEventDAO) {
- super(clientEventDAO);
+ public PingIntervalClientIndicator(final DistributedPingCache distributedPingCache) {
+ super(distributedPingCache);
this.cachingEnabled = true;
}
@@ -129,17 +127,10 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator {
// if this indicator is not missing ping
if (!this.isMissingPing()) {
-
- final Result lastPing = this.clientEventDAO
- .getLastPing(super.pingRecord.getId());
-
- if (!lastPing.hasError()) {
- if (Double.isNaN(this.currentValue)) {
- return lastPing.get().doubleValue();
- }
- return Math.max(this.currentValue, lastPing.get().doubleValue());
- } else {
- log.error("Failed to get last ping from persistent: {}", lastPing.getError().getMessage());
+ final Long lastPing = this.distributedPingCache.getLastPing(super.pingRecord);
+ if (lastPing != null) {
+ final double doubleValue = lastPing.doubleValue();
+ return Math.max(Double.isNaN(this.currentValue) ? doubleValue : this.currentValue, doubleValue);
}
}
diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/weblayer/api/ExamAPI_V1_Controller.java b/src/main/java/ch/ethz/seb/sebserver/webservice/weblayer/api/ExamAPI_V1_Controller.java
index bd54c09a..4120f6b1 100644
--- a/src/main/java/ch/ethz/seb/sebserver/webservice/weblayer/api/ExamAPI_V1_Controller.java
+++ b/src/main/java/ch/ethz/seb/sebserver/webservice/weblayer/api/ExamAPI_V1_Controller.java
@@ -270,7 +270,7 @@ public class ExamAPI_V1_Controller {
final String instructionConfirm = request.getParameter(API.EXAM_API_PING_INSTRUCTION_CONFIRM);
if (log.isTraceEnabled()) {
- log.trace("****************** SEB client connection: {} ip: ",
+ log.trace("****************** SEB client connection: {} ip: {}",
connectionToken,
getClientAddress(request));
}
diff --git a/src/main/resources/config/application-dev-gui.properties b/src/main/resources/config/application-dev-gui.properties
index 8069b264..a22c98bc 100644
--- a/src/main/resources/config/application-dev-gui.properties
+++ b/src/main/resources/config/application-dev-gui.properties
@@ -1,11 +1,11 @@
server.address=localhost
-server.port=8080
+server.port=8090
sebserver.gui.http.external.scheme=http
sebserver.gui.entrypoint=/gui
sebserver.gui.webservice.protocol=http
sebserver.gui.webservice.address=localhost
-sebserver.gui.webservice.port=8080
+sebserver.gui.webservice.port=8090
sebserver.gui.webservice.apipath=/admin-api/v1
# defines the polling interval that is used to poll the webservice for client connection data on a monitored exam page
sebserver.gui.webservice.poll-interval=1000
diff --git a/src/main/resources/config/application-dev-ws.properties b/src/main/resources/config/application-dev-ws.properties
index 81ac2bc0..277c15de 100644
--- a/src/main/resources/config/application-dev-ws.properties
+++ b/src/main/resources/config/application-dev-ws.properties
@@ -22,7 +22,7 @@ sebserver.webservice.clean-db-on-startup=false
# webservice configuration
sebserver.init.adminaccount.gen-on-init=false
-sebserver.webservice.distributed=false
+sebserver.webservice.distributed=true
sebserver.webservice.master.delay.threshold=10000
sebserver.webservice.http.external.scheme=http
sebserver.webservice.http.external.servername=localhost
diff --git a/src/test/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicatorTest.java b/src/test/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicatorTest.java
index 1d1b6143..6fbb52b4 100644
--- a/src/test/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicatorTest.java
+++ b/src/test/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicatorTest.java
@@ -33,9 +33,10 @@ public class PingIntervalClientIndicatorTest {
DateTimeUtils.setCurrentMillisProvider(() -> 1L);
final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class);
+ final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class);
final PingIntervalClientIndicator pingIntervalClientIndicator =
- new PingIntervalClientIndicator(clientEventDAO);
+ new PingIntervalClientIndicator(distributedPingCache);
assertEquals("0.0", String.valueOf(pingIntervalClientIndicator.getValue()));
}
@@ -45,9 +46,10 @@ public class PingIntervalClientIndicatorTest {
DateTimeUtils.setCurrentMillisProvider(() -> 1L);
final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class);
+ final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class);
final PingIntervalClientIndicator pingIntervalClientIndicator =
- new PingIntervalClientIndicator(clientEventDAO);
+ new PingIntervalClientIndicator(distributedPingCache);
assertEquals("0.0", String.valueOf(pingIntervalClientIndicator.getValue()));
DateTimeUtils.setCurrentMillisProvider(() -> 10L);
@@ -60,9 +62,10 @@ public class PingIntervalClientIndicatorTest {
DateTimeUtils.setCurrentMillisProvider(() -> 1L);
final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class);
+ final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class);
final PingIntervalClientIndicator pingIntervalClientIndicator =
- new PingIntervalClientIndicator(clientEventDAO);
+ new PingIntervalClientIndicator(distributedPingCache);
final JSONMapper jsonMapper = new JSONMapper();
final String json = jsonMapper.writeValueAsString(pingIntervalClientIndicator);
assertEquals("{\"indicatorValue\":0.0,\"indicatorType\":\"LAST_PING\"}", json);