From 64536fd9099f36fa12a75c27339f7d6b044209a3 Mon Sep 17 00:00:00 2001 From: anhefti Date: Thu, 22 Jul 2021 13:27:44 +0200 Subject: [PATCH 1/2] distributed ping cache --- pom.xml | 2 +- .../batis/ClientEventLastPingMapper.java | 55 +++++- .../dao/impl/ClientEventDAOImpl.java | 18 +- .../impl/SEBClientConnectionServiceImpl.java | 21 +- .../impl/indicator/AbstractPingIndicator.java | 27 ++- .../impl/indicator/DistributedPingCache.java | 187 ++++++++++++++++++ .../PingIntervalClientIndicator.java | 21 +- .../weblayer/api/ExamAPI_V1_Controller.java | 2 +- .../config/application-dev-ws.properties | 2 +- .../PingIntervalClientIndicatorTest.java | 9 +- 10 files changed, 288 insertions(+), 56 deletions(-) create mode 100644 src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedPingCache.java diff --git a/pom.xml b/pom.xml index f143fd31..819389e3 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,7 @@ jar - 1.2-rc1 + 1.2.1-SNAPSHOT ${sebserver-version} ${sebserver-version} UTF-8 diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/datalayer/batis/ClientEventLastPingMapper.java b/src/main/java/ch/ethz/seb/sebserver/webservice/datalayer/batis/ClientEventLastPingMapper.java index cf233e83..6fdaadb8 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/datalayer/batis/ClientEventLastPingMapper.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/datalayer/batis/ClientEventLastPingMapper.java @@ -8,6 +8,9 @@ package ch.ethz.seb.sebserver.webservice.datalayer.batis; +import static ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordDynamicSqlSupport.*; +import static org.mybatis.dynamic.sql.SqlBuilder.isEqualTo; + import java.util.Collection; import org.apache.ibatis.annotations.Arg; @@ -15,39 +18,85 @@ import org.apache.ibatis.annotations.ConstructorArgs; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.ResultType; import org.apache.ibatis.annotations.SelectProvider; +import org.apache.ibatis.annotations.UpdateProvider; import org.apache.ibatis.type.JdbcType; import org.mybatis.dynamic.sql.select.MyBatis3SelectModelAdapter; import org.mybatis.dynamic.sql.select.QueryExpressionDSL; import org.mybatis.dynamic.sql.select.SelectDSL; import org.mybatis.dynamic.sql.select.render.SelectStatementProvider; +import org.mybatis.dynamic.sql.update.UpdateDSL; +import org.mybatis.dynamic.sql.update.render.UpdateStatementProvider; import org.mybatis.dynamic.sql.util.SqlProviderAdapter; +import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType; import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordDynamicSqlSupport; @Mapper public interface ClientEventLastPingMapper { @SelectProvider(type = SqlProviderAdapter.class, method = "select") - Long num(SelectStatementProvider selectStatement); + @ConstructorArgs({ @Arg(column = "server_time", javaType = Long.class, jdbcType = JdbcType.BIGINT) }) + Collection selectPingTimes(SelectStatementProvider selectStatement); + + @SelectProvider(type = SqlProviderAdapter.class, method = "select") + @ConstructorArgs({ @Arg(column = "server_time", javaType = Long.class, jdbcType = JdbcType.BIGINT) }) + Long selectPingTime(SelectStatementProvider selectStatement); + + @SelectProvider(type = SqlProviderAdapter.class, method = "select") + @ConstructorArgs({ @Arg(column = "id", javaType = Long.class, jdbcType = JdbcType.BIGINT, id = true) }) + Long selectPK(SelectStatementProvider selectStatement); + + @UpdateProvider(type = SqlProviderAdapter.class, method = "update") + int update(UpdateStatementProvider updateStatement); @SelectProvider(type = SqlProviderAdapter.class, method = "select") @ResultType(ClientEventLastPingRecord.class) @ConstructorArgs({ @Arg(column = "id", javaType = Long.class, jdbcType = JdbcType.BIGINT), - @Arg(column = "server_time", javaType = Long.class, jdbcType = JdbcType.BIGINT), + @Arg(column = "server_time", javaType = Long.class, jdbcType = JdbcType.BIGINT) }) Collection selectMany(SelectStatementProvider select); + default Long selectPingTimeByPrimaryKey(final Long id_) { + return SelectDSL.selectWithMapper( + this::selectPingTime, + ClientEventRecordDynamicSqlSupport.serverTime.as("server_time")) + .from(ClientEventRecordDynamicSqlSupport.clientEventRecord) + .where(ClientEventRecordDynamicSqlSupport.id, isEqualTo(id_)) + .build() + .execute(); + } + + default Long pingRecordIdByConnectionId(final Long connectionId) { + return SelectDSL.selectDistinctWithMapper( + this::selectPK, + ClientEventRecordDynamicSqlSupport.id.as("id")) + .from(ClientEventRecordDynamicSqlSupport.clientEventRecord) + .where(ClientEventRecordDynamicSqlSupport.clientConnectionId, isEqualTo(connectionId)) + .and(ClientEventRecordDynamicSqlSupport.type, isEqualTo(EventType.LAST_PING.id)) + .build() + .execute(); + } + default QueryExpressionDSL>> selectByExample() { + return SelectDSL.selectWithMapper( this::selectMany, - ClientEventRecordDynamicSqlSupport.clientConnectionId.as("id"), + ClientEventRecordDynamicSqlSupport.id.as("id"), ClientEventRecordDynamicSqlSupport.serverTime.as("server_time")) .from(ClientEventRecordDynamicSqlSupport.clientEventRecord); } + default int updatePingTime(final Long _id, final Long pingTime) { + return UpdateDSL.updateWithMapper(this::update, clientEventRecord) + .set(serverTime).equalTo(pingTime) + .where(id, isEqualTo(_id)) + .build() + .execute(); + } + final class ClientEventLastPingRecord { public final Long id; diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/ClientEventDAOImpl.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/ClientEventDAOImpl.java index 1d07897b..7c4b583a 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/ClientEventDAOImpl.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/ClientEventDAOImpl.java @@ -18,6 +18,7 @@ import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; +import org.joda.time.DateTimeUtils; import org.mybatis.dynamic.sql.SqlBuilder; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; @@ -312,21 +313,6 @@ public class ClientEventDAOImpl implements ClientEventDAO { .stream() .map(pk -> new EntityKey(String.valueOf(pk), EntityType.CLIENT_EVENT)) .collect(Collectors.toList()); - -// return all -// .stream() -// .map(EntityKey::getModelId) -// .map(Long::parseLong) -// .map(pk -> { -// final int deleted = this.clientEventRecordMapper.deleteByPrimaryKey(pk); -// if (deleted == 1) { -// return new EntityKey(String.valueOf(pk), EntityType.CLIENT_EVENT); -// } else { -// return null; -// } -// }) -// .filter(Objects::nonNull) -// .collect(Collectors.toList()); }); } @@ -345,7 +331,7 @@ public class ClientEventDAOImpl implements ClientEventDAO { return lastPingRec.get(0); } - final long millisecondsNow = Utils.getMillisecondsNow(); + final long millisecondsNow = DateTimeUtils.currentTimeMillis(); final ClientEventRecord clientEventRecord = new ClientEventRecord(); clientEventRecord.setClientConnectionId(connectionId); clientEventRecord.setType(EventType.LAST_PING.id); diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java index c996e4a0..a056b7d9 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java @@ -45,6 +45,7 @@ import ch.ethz.seb.sebserver.webservice.servicelayer.session.ExamSessionService; import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientConnectionService; import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientInstructionService; import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientNotificationService; +import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator.DistributedPingCache; import ch.ethz.seb.sebserver.webservice.weblayer.api.APIConstraintViolationException; @Lazy @@ -71,6 +72,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic private final SEBClientNotificationService sebClientNotificationService; private final WebserviceInfo webserviceInfo; private final ExamAdminService examAdminService; + private final DistributedPingCache distributedPingCache; protected SEBClientConnectionServiceImpl( final ExamSessionService examSessionService, @@ -79,7 +81,8 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic final SEBClientConfigDAO sebClientConfigDAO, final SEBClientInstructionService sebInstructionService, final SEBClientNotificationService sebClientNotificationService, - final ExamAdminService examAdminService) { + final ExamAdminService examAdminService, + final DistributedPingCache distributedPingCache) { this.examSessionService = examSessionService; this.examSessionCacheService = examSessionService.getExamSessionCacheService(); @@ -91,6 +94,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic this.sebClientNotificationService = sebClientNotificationService; this.webserviceInfo = sebInstructionService.getWebserviceInfo(); this.examAdminService = examAdminService; + this.distributedPingCache = distributedPingCache; } @Override @@ -453,6 +457,12 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic updatedClientConnection = clientConnection; } + // delete stored ping if this is a distributed setup + if (this.webserviceInfo.isDistributed()) { + this.distributedPingCache + .deletePingForConnection(updatedClientConnection.id); + } + reloadConnectionCache(connectionToken); return updatedClientConnection; }); @@ -501,6 +511,12 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic updatedClientConnection = clientConnection; } + // delete stored ping if this is a distributed setup + if (this.webserviceInfo.isDistributed()) { + this.distributedPingCache + .deletePingForConnection(updatedClientConnection.id); + } + reloadConnectionCache(connectionToken); return updatedClientConnection; }); @@ -510,6 +526,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic public void updatePingEvents() { try { + final boolean distributed = this.webserviceInfo.isDistributed(); final Cache cache = this.cacheManager.getCache(ExamSessionCacheService.CACHE_NAME_ACTIVE_CLIENT_CONNECTION); final long now = Utils.getMillisecondsNow(); final Consumer missingPingUpdate = missingPingUpdate(now); @@ -518,7 +535,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic .allRunningExamIds() .getOrThrow() .stream() - .flatMap(examId -> (this.webserviceInfo.isDistributed()) + .flatMap(examId -> distributed ? this.clientConnectionDAO .getConnectionTokensNoCache(examId) .getOrThrow() diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java index 18a5bff0..754232ff 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java @@ -20,7 +20,6 @@ import ch.ethz.seb.sebserver.gbl.Constants; import ch.ethz.seb.sebserver.gbl.model.exam.Indicator; import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType; import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord; -import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientEventDAO; public abstract class AbstractPingIndicator extends AbstractClientIndicator { @@ -28,14 +27,15 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator { private final Set EMPTY_SET = Collections.unmodifiableSet(EnumSet.noneOf(EventType.class)); - protected final ClientEventDAO clientEventDAO; + protected final DistributedPingCache distributedPingCache; - private long lastUpdate = 0; - protected ClientEventRecord pingRecord = null; + private final long lastUpdate = 0; + protected Long pingRecord = null; + + protected AbstractPingIndicator(final DistributedPingCache distributedPingCache) { - protected AbstractPingIndicator(final ClientEventDAO clientEventDAO) { super(); - this.clientEventDAO = clientEventDAO; + this.distributedPingCache = distributedPingCache; } @Override @@ -47,10 +47,12 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator { super.init(indicatorDefinition, connectionId, active, cachingEnabled); - if (!this.cachingEnabled) { - this.pingRecord = this.clientEventDAO - .initPingEvent(this.connectionId) - .getOr(null); + if (!this.cachingEnabled && this.active) { + this.pingRecord = this.distributedPingCache.initPingForConnection(this.connectionId); + if (this.pingRecord == null) { + // try once again + this.pingRecord = this.distributedPingCache.initPingForConnection(this.connectionId); + } } } @@ -64,10 +66,7 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator { // Update last ping time on persistent storage final long millisecondsNow = DateTimeUtils.currentTimeMillis(); if (millisecondsNow - this.lastUpdate > INTERVAL_FOR_PERSISTENT_UPDATE) { - this.pingRecord.setClientTime(timestamp); - this.pingRecord.setServerTime(millisecondsNow); - this.clientEventDAO.updatePingEvent(this.pingRecord); - this.lastUpdate = millisecondsNow; + this.distributedPingCache.updatePing(this.pingRecord, millisecondsNow); } } } diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedPingCache.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedPingCache.java new file mode 100644 index 00000000..c60d99bc --- /dev/null +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedPingCache.java @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2021 ETH Zürich, Educational Development and Technology (LET) + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator; + +import static org.mybatis.dynamic.sql.SqlBuilder.isEqualTo; +import static org.mybatis.dynamic.sql.SqlBuilder.isIn; + +import java.util.ArrayList; +import java.util.Map; +import java.util.concurrent.ScheduledFuture; +import java.util.stream.Collectors; + +import org.ehcache.impl.internal.concurrent.ConcurrentHashMap; +import org.joda.time.DateTimeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.context.annotation.Lazy; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType; +import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile; +import ch.ethz.seb.sebserver.webservice.WebserviceInfo; +import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientEventLastPingMapper; +import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordDynamicSqlSupport; +import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordMapper; +import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord; + +@Lazy +@Component +@WebServiceProfile +public class DistributedPingCache implements DisposableBean { + + private static final Logger log = LoggerFactory.getLogger(DistributedPingCache.class); + + private final ClientEventLastPingMapper clientEventLastPingMapper; + private final ClientEventRecordMapper clientEventRecordMapper; + private ScheduledFuture taskRef; + + private final Map pingCache = new ConcurrentHashMap<>(); + + public DistributedPingCache( + final ClientEventLastPingMapper clientEventLastPingMapper, + final ClientEventRecordMapper clientEventRecordMapper, + final WebserviceInfo webserviceInfo, + final TaskScheduler taskScheduler) { + + this.clientEventLastPingMapper = clientEventLastPingMapper; + this.clientEventRecordMapper = clientEventRecordMapper; + if (webserviceInfo.isDistributed()) { + try { + this.taskRef = taskScheduler.scheduleAtFixedRate(this::updateCache, 1000); + } catch (final Exception e) { + log.error("Failed to initialize distributed ping cache update task"); + this.taskRef = null; + } + } else { + this.taskRef = null; + } + } + + @Transactional + public Long initPingForConnection(final Long connectionId) { + try { + Long recordId = this.clientEventLastPingMapper + .pingRecordIdByConnectionId(connectionId); + + if (recordId == null) { + final long millisecondsNow = DateTimeUtils.currentTimeMillis(); + final ClientEventRecord clientEventRecord = new ClientEventRecord(); + clientEventRecord.setClientConnectionId(connectionId); + clientEventRecord.setType(EventType.LAST_PING.id); + clientEventRecord.setClientTime(millisecondsNow); + clientEventRecord.setServerTime(millisecondsNow); + this.clientEventRecordMapper.insert(clientEventRecord); + + recordId = this.clientEventLastPingMapper.pingRecordIdByConnectionId(connectionId); + } + + return recordId; + } catch (final Exception e) { + log.error("Failed to initialize ping for connection -> {}", connectionId, e); + return null; + } + } + + @Transactional + public void updatePing(final Long pingRecordId, final Long pingTime) { + try { + + this.clientEventLastPingMapper + .updatePingTime(pingRecordId, pingTime); + + } catch (final Exception e) { + log.error("Failed to update ping for ping record id -> {}", pingRecordId); + } + } + + @Transactional + public void deletePingForConnection(final Long connectionId) { + try { + + this.clientEventRecordMapper + .deleteByExample() + .where(ClientEventRecordDynamicSqlSupport.clientConnectionId, isEqualTo(connectionId)) + .and(ClientEventRecordDynamicSqlSupport.type, isEqualTo(EventType.LAST_PING.id)) + .build() + .execute(); + + } catch (final Exception e) { + log.error("Failed to delete ping for connection -> {}", connectionId, e); + } + } + + public Long getLastPing(final Long pingRecordId) { + try { + Long ping = this.pingCache.get(pingRecordId); + if (ping == null) { + log.debug("******* Get and cache ping time: {}", pingRecordId); + ping = this.clientEventLastPingMapper.selectPingTimeByPrimaryKey(pingRecordId); + if (ping != null) { + this.pingCache.put(pingRecordId, ping); + } + } + + return ping; + } catch (final Exception e) { + log.error("Error while trying to get last ping from storage: {}", e.getMessage()); + return 0L; + } + } + + @Transactional + public void updateCache() { + + if (this.pingCache.isEmpty()) { + return; + } + + log.debug("****** Update distributed ping cache: {}", this.pingCache); + + try { + final ArrayList pks = new ArrayList<>(this.pingCache.keySet()); + final Map mapping = this.clientEventLastPingMapper + .selectByExample() + .where( + ClientEventRecordDynamicSqlSupport.id, + isIn(pks)) + .build() + .execute() + .stream() + .collect(Collectors.toMap(entry -> entry.id, entry -> entry.lastPingTime)); + + if (mapping != null) { + this.pingCache.clear(); + this.pingCache.putAll(mapping); + } + + } catch (final Exception e) { + log.error("Error while trying to update distributed ping cache: {}", this.pingCache, e); + } + } + + @Override + public void destroy() throws Exception { + if (this.taskRef != null) { + try { + final boolean cancel = this.taskRef.cancel(true); + if (!cancel) { + log.warn("Failed to cancel distributed ping cache update task"); + } + } catch (final Exception e) { + log.error("Failed to cancel distributed ping cache update task: ", e); + } + } + + } + +} diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicator.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicator.java index c4954a74..85bb95a6 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicator.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicator.java @@ -26,9 +26,7 @@ import ch.ethz.seb.sebserver.gbl.model.exam.Indicator; import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType; import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent; import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType; -import ch.ethz.seb.sebserver.gbl.util.Result; import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord; -import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientEventDAO; @Lazy @Component(IndicatorType.Names.LAST_PING) @@ -46,8 +44,8 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator { private boolean missingPing = false; private boolean hidden = false; - public PingIntervalClientIndicator(final ClientEventDAO clientEventDAO) { - super(clientEventDAO); + public PingIntervalClientIndicator(final DistributedPingCache distributedPingCache) { + super(distributedPingCache); this.cachingEnabled = true; } @@ -129,17 +127,10 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator { // if this indicator is not missing ping if (!this.isMissingPing()) { - - final Result lastPing = this.clientEventDAO - .getLastPing(super.pingRecord.getId()); - - if (!lastPing.hasError()) { - if (Double.isNaN(this.currentValue)) { - return lastPing.get().doubleValue(); - } - return Math.max(this.currentValue, lastPing.get().doubleValue()); - } else { - log.error("Failed to get last ping from persistent: {}", lastPing.getError().getMessage()); + final Long lastPing = this.distributedPingCache.getLastPing(super.pingRecord); + if (lastPing != null) { + final double doubleValue = lastPing.doubleValue(); + return Math.max(Double.isNaN(this.currentValue) ? doubleValue : this.currentValue, doubleValue); } } diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/weblayer/api/ExamAPI_V1_Controller.java b/src/main/java/ch/ethz/seb/sebserver/webservice/weblayer/api/ExamAPI_V1_Controller.java index bd54c09a..4120f6b1 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/weblayer/api/ExamAPI_V1_Controller.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/weblayer/api/ExamAPI_V1_Controller.java @@ -270,7 +270,7 @@ public class ExamAPI_V1_Controller { final String instructionConfirm = request.getParameter(API.EXAM_API_PING_INSTRUCTION_CONFIRM); if (log.isTraceEnabled()) { - log.trace("****************** SEB client connection: {} ip: ", + log.trace("****************** SEB client connection: {} ip: {}", connectionToken, getClientAddress(request)); } diff --git a/src/main/resources/config/application-dev-ws.properties b/src/main/resources/config/application-dev-ws.properties index 81ac2bc0..277c15de 100644 --- a/src/main/resources/config/application-dev-ws.properties +++ b/src/main/resources/config/application-dev-ws.properties @@ -22,7 +22,7 @@ sebserver.webservice.clean-db-on-startup=false # webservice configuration sebserver.init.adminaccount.gen-on-init=false -sebserver.webservice.distributed=false +sebserver.webservice.distributed=true sebserver.webservice.master.delay.threshold=10000 sebserver.webservice.http.external.scheme=http sebserver.webservice.http.external.servername=localhost diff --git a/src/test/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicatorTest.java b/src/test/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicatorTest.java index 1d1b6143..6fbb52b4 100644 --- a/src/test/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicatorTest.java +++ b/src/test/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicatorTest.java @@ -33,9 +33,10 @@ public class PingIntervalClientIndicatorTest { DateTimeUtils.setCurrentMillisProvider(() -> 1L); final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class); + final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class); final PingIntervalClientIndicator pingIntervalClientIndicator = - new PingIntervalClientIndicator(clientEventDAO); + new PingIntervalClientIndicator(distributedPingCache); assertEquals("0.0", String.valueOf(pingIntervalClientIndicator.getValue())); } @@ -45,9 +46,10 @@ public class PingIntervalClientIndicatorTest { DateTimeUtils.setCurrentMillisProvider(() -> 1L); final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class); + final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class); final PingIntervalClientIndicator pingIntervalClientIndicator = - new PingIntervalClientIndicator(clientEventDAO); + new PingIntervalClientIndicator(distributedPingCache); assertEquals("0.0", String.valueOf(pingIntervalClientIndicator.getValue())); DateTimeUtils.setCurrentMillisProvider(() -> 10L); @@ -60,9 +62,10 @@ public class PingIntervalClientIndicatorTest { DateTimeUtils.setCurrentMillisProvider(() -> 1L); final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class); + final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class); final PingIntervalClientIndicator pingIntervalClientIndicator = - new PingIntervalClientIndicator(clientEventDAO); + new PingIntervalClientIndicator(distributedPingCache); final JSONMapper jsonMapper = new JSONMapper(); final String json = jsonMapper.writeValueAsString(pingIntervalClientIndicator); assertEquals("{\"indicatorValue\":0.0,\"indicatorType\":\"LAST_PING\"}", json); From ed7ae28a0d37d48dc11d033ae033dadc07f9e344 Mon Sep 17 00:00:00 2001 From: anhefti Date: Wed, 28 Jul 2021 14:29:21 +0200 Subject: [PATCH 2/2] fixed distributed ping cache --- .../session/ExamSessionService.java | 10 ++++ .../session/impl/ExamSessionServiceImpl.java | 13 +++-- .../impl/SEBClientConnectionServiceImpl.java | 10 ++-- .../indicator/AbstractClientIndicator.java | 3 +- .../impl/indicator/AbstractLogIndicator.java | 1 + .../indicator/AbstractLogNumberIndicator.java | 1 + .../impl/indicator/AbstractPingIndicator.java | 35 ++++++++++-- .../impl/indicator/DistributedPingCache.java | 53 +++++++++++++++++-- .../config/application-dev-gui.properties | 4 +- 9 files changed, 110 insertions(+), 20 deletions(-) diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/ExamSessionService.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/ExamSessionService.java index 14b362ce..10395008 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/ExamSessionService.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/ExamSessionService.java @@ -24,6 +24,7 @@ import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientConnectionDAO; import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ExamDAO; import ch.ethz.seb.sebserver.webservice.servicelayer.dao.FilterMap; import ch.ethz.seb.sebserver.webservice.servicelayer.lms.LmsAPIService; +import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.ClientConnectionDataInternal; import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.ExamSessionCacheService; /** A Service to handle running exam sessions */ @@ -178,6 +179,15 @@ public interface ExamSessionService { * @return Result with reference to the given Exam or to an error if happened */ Result flushCache(final Exam exam); + /** Is is supposed to be the single access point to internally get client connection + * data for a specified connection token. + * This uses the client connection data cache for lookup and also synchronizes asynchronous + * cache calls to prevent parallel creation of ClientConnectionDataInternal + * + * @param connectionToken the connection token of the active SEB client connection + * @return ClientConnectionDataInternal by synchronized cache lookup or null if not available */ + ClientConnectionDataInternal getConnectionDataInternal(String connectionToken); + /** Checks if the given ClientConnectionData is an active SEB client connection. * * @param connection ClientConnectionData instance diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ExamSessionServiceImpl.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ExamSessionServiceImpl.java index e09969a3..67a62448 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ExamSessionServiceImpl.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ExamSessionServiceImpl.java @@ -308,13 +308,20 @@ public class ExamSessionServiceImpl implements ExamSessionService { } } + @Override + public ClientConnectionDataInternal getConnectionDataInternal(final String connectionToken) { + synchronized (this.examSessionCacheService) { + return this.examSessionCacheService.getClientConnection(connectionToken); + } + } + @Override public Result getConnectionData(final String connectionToken) { return Result.tryCatch(() -> { - final ClientConnectionDataInternal activeClientConnection = this.examSessionCacheService - .getClientConnection(connectionToken); + final ClientConnectionDataInternal activeClientConnection = + getConnectionDataInternal(connectionToken); if (activeClientConnection == null) { throw new NoSuchElementException("Client Connection with token: " + connectionToken); @@ -403,7 +410,7 @@ public class ExamSessionServiceImpl implements ExamSessionService { .getConnectionTokens(examId) .getOrThrow() .stream() - .map(this.examSessionCacheService::getClientConnection) + .map(this::getConnectionDataInternal) .filter(Objects::nonNull) .map(cc -> cc.getClientConnection().updateTime) .collect(Collectors.toSet()); diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java index a056b7d9..d498b69d 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java @@ -159,8 +159,8 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic .getOrThrow(); // load client connection data into cache - final ClientConnectionDataInternal activeClientConnection = this.examSessionCacheService - .getClientConnection(connectionToken); + final ClientConnectionDataInternal activeClientConnection = this.examSessionService + .getConnectionDataInternal(connectionToken); if (activeClientConnection == null) { log.warn("Failed to load ClientConnectionDataInternal into cache on update"); @@ -567,7 +567,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic final int pingNumber) { final ClientConnectionDataInternal activeClientConnection = - this.examSessionCacheService.getClientConnection(connectionToken); + this.examSessionService.getConnectionDataInternal(connectionToken); if (activeClientConnection != null) { activeClientConnection.notifyPing(timestamp, pingNumber); @@ -583,7 +583,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic try { final ClientConnectionDataInternal activeClientConnection = - this.examSessionCacheService.getClientConnection(connectionToken); + this.examSessionService.getConnectionDataInternal(connectionToken); if (activeClientConnection != null) { @@ -748,7 +748,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic // evict cached ClientConnection this.examSessionCacheService.evictClientConnection(connectionToken); // and load updated ClientConnection into cache - return this.examSessionCacheService.getClientConnection(connectionToken); + return this.examSessionService.getConnectionDataInternal(connectionToken); } private Consumer missingPingUpdate(final long now) { diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractClientIndicator.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractClientIndicator.java index db1d2301..9790c05a 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractClientIndicator.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractClientIndicator.java @@ -23,6 +23,7 @@ public abstract class AbstractClientIndicator implements ClientIndicator { protected Long connectionId; protected boolean cachingEnabled; protected boolean active = true; + protected long persistentUpdateInterval = PERSISTENT_UPDATE_INTERVAL; protected long lastPersistentUpdate = 0; protected boolean valueInitializes = false; @@ -72,7 +73,7 @@ public abstract class AbstractClientIndicator implements ClientIndicator { } if (!this.cachingEnabled && this.active) { - if (now - this.lastPersistentUpdate > PERSISTENT_UPDATE_INTERVAL) { + if (now - this.lastPersistentUpdate > this.persistentUpdateInterval) { this.currentValue = computeValueAt(now); this.lastPersistentUpdate = now; } diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogIndicator.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogIndicator.java index 605db6ab..743361e3 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogIndicator.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogIndicator.java @@ -44,6 +44,7 @@ public abstract class AbstractLogIndicator extends AbstractClientIndicator { final boolean cachingEnabled) { super.init(indicatorDefinition, connectionId, active, cachingEnabled); + super.persistentUpdateInterval = 2 * Constants.SECOND_IN_MILLIS; if (indicatorDefinition == null || StringUtils.isBlank(indicatorDefinition.tags)) { this.tags = null; diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogNumberIndicator.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogNumberIndicator.java index a92ed3d9..bb9d13dc 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogNumberIndicator.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogNumberIndicator.java @@ -86,6 +86,7 @@ public abstract class AbstractLogNumberIndicator extends AbstractLogIndicator { } else { return super.currentValue; } + } catch (final Exception e) { log.error("Failed to get indicator number from persistent storage: {}", e.getMessage()); return this.currentValue; diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java index 754232ff..553d972d 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java @@ -15,6 +15,8 @@ import java.util.Set; import org.joda.time.DateTime; import org.joda.time.DateTimeUtils; import org.joda.time.DateTimeZone; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import ch.ethz.seb.sebserver.gbl.Constants; import ch.ethz.seb.sebserver.gbl.model.exam.Indicator; @@ -23,6 +25,8 @@ import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord; public abstract class AbstractPingIndicator extends AbstractClientIndicator { + private static final Logger log = LoggerFactory.getLogger(AbstractPingIndicator.class); + private static final long INTERVAL_FOR_PERSISTENT_UPDATE = Constants.SECOND_IN_MILLIS; private final Set EMPTY_SET = Collections.unmodifiableSet(EnumSet.noneOf(EventType.class)); @@ -48,10 +52,10 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator { super.init(indicatorDefinition, connectionId, active, cachingEnabled); if (!this.cachingEnabled && this.active) { - this.pingRecord = this.distributedPingCache.initPingForConnection(this.connectionId); - if (this.pingRecord == null) { - // try once again + try { this.pingRecord = this.distributedPingCache.initPingForConnection(this.connectionId); + } catch (final Exception e) { + this.pingRecord = this.distributedPingCache.getPingRecordIdForConnectionId(connectionId); } } } @@ -61,7 +65,14 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator { super.currentValue = now; super.lastPersistentUpdate = now; - if (!this.cachingEnabled && this.pingRecord != null) { + if (!this.cachingEnabled) { + + if (this.pingRecord == null) { + tryRecoverPingRecord(); + if (this.pingRecord == null) { + return; + } + } // Update last ping time on persistent storage final long millisecondsNow = DateTimeUtils.currentTimeMillis(); @@ -71,6 +82,22 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator { } } + private void tryRecoverPingRecord() { + + if (log.isWarnEnabled()) { + log.warn("*** Missing ping record for connection: {}. Try to recover...", this.connectionId); + } + + try { + this.pingRecord = this.distributedPingCache.getPingRecordIdForConnectionId(this.connectionId); + if (this.pingRecord == null) { + this.pingRecord = this.distributedPingCache.initPingForConnection(this.connectionId); + } + } catch (final Exception e) { + log.error("Failed to recover ping record for connection: {}", this.connectionId, e); + } + } + @Override public Set observedEvents() { return this.EMPTY_SET; diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedPingCache.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedPingCache.java index c60d99bc..e35bbdf5 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedPingCache.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedPingCache.java @@ -70,7 +70,12 @@ public class DistributedPingCache implements DisposableBean { @Transactional public Long initPingForConnection(final Long connectionId) { try { - Long recordId = this.clientEventLastPingMapper + + if (log.isDebugEnabled()) { + log.trace("*** Initialize ping record for SEB connection: {}", connectionId); + } + + final Long recordId = this.clientEventLastPingMapper .pingRecordIdByConnectionId(connectionId); if (recordId == null) { @@ -82,12 +87,41 @@ public class DistributedPingCache implements DisposableBean { clientEventRecord.setServerTime(millisecondsNow); this.clientEventRecordMapper.insert(clientEventRecord); - recordId = this.clientEventLastPingMapper.pingRecordIdByConnectionId(connectionId); + try { + // This also double-check by trying again. If we have more then one entry here + // this will throw an exception that causes a rollback + return this.clientEventLastPingMapper + .pingRecordIdByConnectionId(connectionId); + + } catch (final Exception e) { + + log.warn("Detected multiple client ping entries for connection: " + connectionId + + ". Force rollback to prevent"); + + // force rollback + throw new RuntimeException("Detected multiple client ping entries"); + } } return recordId; } catch (final Exception e) { + log.error("Failed to initialize ping for connection -> {}", connectionId, e); + + // force rollback + throw new RuntimeException("Failed to initialize ping for connection -> " + connectionId, e); + } + } + + @Transactional(readOnly = true) + public Long getPingRecordIdForConnectionId(final Long connectionId) { + try { + + return this.clientEventLastPingMapper + .pingRecordIdByConnectionId(connectionId); + + } catch (final Exception e) { + log.error("Failed to get ping record for connection id: {} cause: {}", connectionId, e.getMessage()); return null; } } @@ -108,6 +142,10 @@ public class DistributedPingCache implements DisposableBean { public void deletePingForConnection(final Long connectionId) { try { + if (log.isDebugEnabled()) { + log.debug("*** Delete ping record for SEB connection: {}", connectionId); + } + this.clientEventRecordMapper .deleteByExample() .where(ClientEventRecordDynamicSqlSupport.clientConnectionId, isEqualTo(connectionId)) @@ -124,7 +162,11 @@ public class DistributedPingCache implements DisposableBean { try { Long ping = this.pingCache.get(pingRecordId); if (ping == null) { - log.debug("******* Get and cache ping time: {}", pingRecordId); + + if (log.isDebugEnabled()) { + log.debug("*** Get and cache ping time: {}", pingRecordId); + } + ping = this.clientEventLastPingMapper.selectPingTimeByPrimaryKey(pingRecordId); if (ping != null) { this.pingCache.put(pingRecordId, ping); @@ -145,7 +187,9 @@ public class DistributedPingCache implements DisposableBean { return; } - log.debug("****** Update distributed ping cache: {}", this.pingCache); + if (log.isDebugEnabled()) { + log.trace("*** Update distributed ping cache: {}", this.pingCache); + } try { final ArrayList pks = new ArrayList<>(this.pingCache.keySet()); @@ -181,7 +225,6 @@ public class DistributedPingCache implements DisposableBean { log.error("Failed to cancel distributed ping cache update task: ", e); } } - } } diff --git a/src/main/resources/config/application-dev-gui.properties b/src/main/resources/config/application-dev-gui.properties index 8069b264..a22c98bc 100644 --- a/src/main/resources/config/application-dev-gui.properties +++ b/src/main/resources/config/application-dev-gui.properties @@ -1,11 +1,11 @@ server.address=localhost -server.port=8080 +server.port=8090 sebserver.gui.http.external.scheme=http sebserver.gui.entrypoint=/gui sebserver.gui.webservice.protocol=http sebserver.gui.webservice.address=localhost -sebserver.gui.webservice.port=8080 +sebserver.gui.webservice.port=8090 sebserver.gui.webservice.apipath=/admin-api/v1 # defines the polling interval that is used to poll the webservice for client connection data on a monitored exam page sebserver.gui.webservice.poll-interval=1000