diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/WebserviceInfo.java b/src/main/java/ch/ethz/seb/sebserver/webservice/WebserviceInfo.java index d4721202..f1dbf615 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/WebserviceInfo.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/WebserviceInfo.java @@ -26,6 +26,7 @@ import org.springframework.web.util.UriComponentsBuilder; import ch.ethz.seb.sebserver.gbl.Constants; import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile; +import ch.ethz.seb.sebserver.webservice.servicelayer.dao.WebserviceInfoDAO; @Lazy @Service @@ -64,7 +65,14 @@ public class WebserviceInfo { private Map lmsExternalAddressAlias; - public WebserviceInfo(final Environment environment) { + private final WebserviceInfoDAO webserviceInfoDAO; + private boolean isMaster = false; + + public WebserviceInfo( + final WebserviceInfoDAO webserviceInfoDAO, + final Environment environment) { + + this.webserviceInfoDAO = webserviceInfoDAO; this.webserviceUUID = UUID.randomUUID().toString(); this.sebServerVersion = environment.getRequiredProperty(VERSION_KEY); this.testProperty = environment.getProperty(WEB_SERVICE_TEST_PROPERTY, "NOT_AVAILABLE"); @@ -123,6 +131,14 @@ public class WebserviceInfo { } } + public boolean isMaster() { + return this.isMaster; + } + + public void updateMaster() { + this.isMaster = this.webserviceInfoDAO.isMaster(this.getWebserviceUUID()); + } + public String getWebserviceUUID() { return this.webserviceUUID; } diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/datalayer/batis/ClientIndicatorValueMapper.java b/src/main/java/ch/ethz/seb/sebserver/webservice/datalayer/batis/ClientIndicatorValueMapper.java index 657a82d9..dad3a01b 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/datalayer/batis/ClientIndicatorValueMapper.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/datalayer/batis/ClientIndicatorValueMapper.java @@ -79,17 +79,6 @@ public interface ClientIndicatorValueMapper { .execute(); } - default Long pingRecordIdByConnectionId(final Long connectionId) { - return SelectDSL.selectDistinctWithMapper( - this::selectPK, - id.as("id")) - .from(clientIndicatorRecord) - .where(clientConnectionId, isEqualTo(connectionId)) - .and(type, isEqualTo(IndicatorType.LAST_PING.id)) - .build() - .execute(); - } - default QueryExpressionDSL>> selectByExample() { return SelectDSL.selectWithMapper( diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/ClientEventDAOImpl.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/ClientEventDAOImpl.java index e073bab0..d58b9e64 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/ClientEventDAOImpl.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/ClientEventDAOImpl.java @@ -137,6 +137,7 @@ public class ClientEventDAOImpl implements ClientEventDAO { } @Override + @Transactional(readOnly = true) public Result> allMatchingExtended( final FilterMap filterMap, final Predicate predicate) { diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ExamSessionControlTask.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ExamSessionControlTask.java index df94c743..6d939cac 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ExamSessionControlTask.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ExamSessionControlTask.java @@ -28,13 +28,12 @@ import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile; import ch.ethz.seb.sebserver.gbl.util.Result; import ch.ethz.seb.sebserver.webservice.WebserviceInfo; import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ExamDAO; -import ch.ethz.seb.sebserver.webservice.servicelayer.dao.WebserviceInfoDAO; import ch.ethz.seb.sebserver.webservice.servicelayer.session.ExamProctoringRoomService; import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientConnectionService; @Service @WebServiceProfile -class ExamSessionControlTask implements DisposableBean { +public class ExamSessionControlTask implements DisposableBean { private static final Logger log = LoggerFactory.getLogger(ExamSessionControlTask.class); @@ -43,7 +42,6 @@ class ExamSessionControlTask implements DisposableBean { private final ExamUpdateHandler examUpdateHandler; private final ExamProctoringRoomService examProcotringRoomService; private final WebserviceInfo webserviceInfo; - private final WebserviceInfoDAO webserviceInfoDAO; private final Long examTimePrefix; private final Long examTimeSuffix; @@ -56,7 +54,6 @@ class ExamSessionControlTask implements DisposableBean { final ExamUpdateHandler examUpdateHandler, final ExamProctoringRoomService examProcotringRoomService, final WebserviceInfo webserviceInfo, - final WebserviceInfoDAO webserviceInfoDAO, @Value("${sebserver.webservice.api.exam.time-prefix:3600000}") final Long examTimePrefix, @Value("${sebserver.webservice.api.exam.time-suffix:3600000}") final Long examTimeSuffix, @Value("${sebserver.webservice.api.exam.update-interval:1 * * * * *}") final String examTaskCron, @@ -66,7 +63,6 @@ class ExamSessionControlTask implements DisposableBean { this.sebClientConnectionService = sebClientConnectionService; this.examUpdateHandler = examUpdateHandler; this.webserviceInfo = webserviceInfo; - this.webserviceInfoDAO = webserviceInfoDAO; this.examTimePrefix = examTimePrefix; this.examTimeSuffix = examTimeSuffix; this.examTaskCron = examTaskCron; @@ -84,7 +80,7 @@ class ExamSessionControlTask implements DisposableBean { this.examTimePrefix, this.examTimeSuffix); - this.webserviceInfoDAO.isMaster(this.webserviceInfo.getWebserviceUUID()); + this.updateMaster(); SEBServerInit.INIT_LOGGER.info("------>"); SEBServerInit.INIT_LOGGER.info( @@ -97,7 +93,7 @@ class ExamSessionControlTask implements DisposableBean { initialDelay = 10000) public void examRunUpdateTask() { - if (!this.webserviceInfoDAO.isMaster(this.webserviceInfo.getWebserviceUUID())) { + if (!this.webserviceInfo.isMaster()) { return; } @@ -117,9 +113,11 @@ class ExamSessionControlTask implements DisposableBean { initialDelay = 5000) public void examSessionUpdateTask() { + updateMaster(); + this.sebClientConnectionService.updatePingEvents(); - if (!this.webserviceInfoDAO.isMaster(this.webserviceInfo.getWebserviceUUID())) { + if (!this.webserviceInfo.isMaster()) { return; } @@ -136,7 +134,7 @@ class ExamSessionControlTask implements DisposableBean { initialDelay = 30000) public void examSessionCleanupTask() { - if (!this.webserviceInfoDAO.isMaster(this.webserviceInfo.getWebserviceUUID())) { + if (!this.webserviceInfo.isMaster()) { return; } @@ -198,6 +196,10 @@ class ExamSessionControlTask implements DisposableBean { } } + private void updateMaster() { + this.webserviceInfo.updateMaster(); + } + @Override public void destroy() { // TODO try to reset master diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractClientIndicator.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractClientIndicator.java index 7aff1d9f..06656d71 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractClientIndicator.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractClientIndicator.java @@ -36,6 +36,8 @@ public abstract class AbstractClientIndicator implements ClientIndicator { protected double incidentThreshold = 0.0; + protected long lastUpdate = 0; + public AbstractClientIndicator(final DistributedIndicatorValueService distributedPingCache) { super(); this.distributedPingCache = distributedPingCache; @@ -122,6 +124,22 @@ public abstract class AbstractClientIndicator implements ClientIndicator { @Override public double getValue() { + + if (this.initialized && !this.cachingEnabled && this.active + && this.lastUpdate != this.distributedPingCache.lastUpdate()) { + + if (this.ditributedIndicatorValueRecordId == null) { + this.tryRecoverIndicatorRecord(); + } + + final Long indicatorValue = this.distributedPingCache + .getIndicatorValue(this.ditributedIndicatorValueRecordId); + if (indicatorValue != null) { + this.currentValue = indicatorValue.doubleValue(); + } + this.lastUpdate = this.distributedPingCache.lastUpdate(); + } + return this.currentValue; } diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedIndicatorValueService.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedIndicatorValueService.java index 9e61c72c..428b3e65 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedIndicatorValueService.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedIndicatorValueService.java @@ -29,6 +29,7 @@ import org.springframework.context.event.EventListener; import org.springframework.scheduling.TaskScheduler; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.interceptor.TransactionInterceptor; import ch.ethz.seb.sebserver.SEBServerInit; import ch.ethz.seb.sebserver.SEBServerInitEvent; @@ -65,6 +66,8 @@ public class DistributedIndicatorValueService implements DisposableBean { private final Executor indicatorValueUpdateExecutor; private final ClientIndicatorRecordMapper clientIndicatorRecordMapper; private final ClientIndicatorValueMapper clientIndicatorValueMapper; + private final WebserviceInfo webserviceInfo; + long distributedUpdateInterval = 2000; private long updateTolerance; private ScheduledFuture taskRef; @@ -74,11 +77,17 @@ public class DistributedIndicatorValueService implements DisposableBean { public DistributedIndicatorValueService( @Qualifier(AsyncServiceSpringConfig.EXAM_API_PING_SERVICE_EXECUTOR_BEAN_NAME) final Executor pingUpdateExecutor, final ClientIndicatorRecordMapper clientIndicatorRecordMapper, - final ClientIndicatorValueMapper clientIndicatorValueMapper) { + final ClientIndicatorValueMapper clientIndicatorValueMapper, + final WebserviceInfo webserviceInfo) { this.indicatorValueUpdateExecutor = pingUpdateExecutor; this.clientIndicatorRecordMapper = clientIndicatorRecordMapper; this.clientIndicatorValueMapper = clientIndicatorValueMapper; + this.webserviceInfo = webserviceInfo; + } + + long lastUpdate() { + return this.lastUpdate; } /** Initializes the service by attaching it to the scheduler for periodical update. @@ -95,18 +104,18 @@ public class DistributedIndicatorValueService implements DisposableBean { SEBServerInit.INIT_LOGGER.info("------> Activate distributed indicator value service:"); final TaskScheduler taskScheduler = applicationContext.getBean(TaskScheduler.class); - final long distributedUpdateInterval = webserviceInfo.getDistributedUpdateInterval(); - this.updateTolerance = distributedUpdateInterval * 2 / 3; + this.distributedUpdateInterval = webserviceInfo.getDistributedUpdateInterval(); + this.updateTolerance = this.distributedUpdateInterval * 2 / 3; SEBServerInit.INIT_LOGGER.info("------> with distributedUpdateInterval: {}", - distributedUpdateInterval); + this.distributedUpdateInterval); SEBServerInit.INIT_LOGGER.info("------> with taskScheduler: {}", taskScheduler); try { this.taskRef = taskScheduler.scheduleAtFixedRate( this::updateIndicatorValueCache, - distributedUpdateInterval); + this.distributedUpdateInterval); SEBServerInit.INIT_LOGGER.info("------> distributed indicator value service successfully initialized!"); @@ -141,38 +150,78 @@ public class DistributedIndicatorValueService implements DisposableBean { log.trace("*** Initialize indicator value record for SEB connection: {}", connectionId); } - final Long recordId = this.clientIndicatorValueMapper - .indicatorRecordIdByConnectionId(connectionId, type); + synchronized (this) { - if (recordId == null) { - final ClientIndicatorRecord clientEventRecord = new ClientIndicatorRecord( - null, connectionId, type.id, value); - - this.clientIndicatorRecordMapper.insert(clientEventRecord); + Long recordId = null; try { - // This also double-check by trying again. If we have more then one entry here - // this will throw an exception that causes a rollback - return this.clientIndicatorValueMapper + recordId = this.clientIndicatorValueMapper .indicatorRecordIdByConnectionId(connectionId, type); - } catch (final Exception e) { + // There is already more then one indicator record entry!!! + // delete the second one and work on with the first one - log.warn( - "Detected multiple client indicator entries for connection: {} and type: {}. Force rollback to prevent", + log.warn("Duplicate indicator entry detected for connectionId: {}, type: {} --> try to recover", connectionId, type); - // force rollback - throw new RuntimeException("Detected multiple client indicator value entries"); - } - } + try { + final List records = this.clientIndicatorRecordMapper.selectByExample() + .where(ClientIndicatorRecordDynamicSqlSupport.clientConnectionId, + isEqualTo(connectionId)) + .and(ClientIndicatorRecordDynamicSqlSupport.type, isEqualTo(type.id)) + .build() + .execute(); + if (records.size() > 1) { + this.clientIndicatorRecordMapper.deleteByPrimaryKey(records.get(1).getId()); + } - return recordId; + return records.get(0).getId(); + } catch (final Exception ee) { + log.error("Failed to recover from duplicate indicator entry: ", ee); + return null; + } + } + + if (recordId == null) { + if (!this.webserviceInfo.isMaster()) { + if (log.isDebugEnabled()) { + log.debug("Skip indicator record init because this is no master instance"); + } + return null; + } + + final ClientIndicatorRecord clientEventRecord = new ClientIndicatorRecord( + null, connectionId, type.id, value); + + this.clientIndicatorRecordMapper.insert(clientEventRecord); + + try { + // This also double-check by trying again. If we have more then one entry here + // this will throw an exception that causes a rollback + return this.clientIndicatorValueMapper + .indicatorRecordIdByConnectionId(connectionId, type); + + } catch (final Exception e) { + + log.warn( + "Detected multiple client indicator entries for connection: {} and type: {}. Force rollback to prevent", + connectionId, type); + + // force rollback + TransactionInterceptor.currentTransactionStatus().setRollbackOnly(); + throw new RuntimeException("Detected multiple client indicator value entries"); + } + } + + return recordId; + + } } catch (final Exception e) { log.error("Failed to initialize indicator value for connection -> {}", connectionId, e); // force rollback + TransactionInterceptor.currentTransactionStatus().setRollbackOnly(); throw new RuntimeException("Failed to initialize indicator value for connection -> " + connectionId, e); } } @@ -230,23 +279,29 @@ public class DistributedIndicatorValueService implements DisposableBean { * @param indicatorPK The indicator record id (PK). * @return The actual (last) indicator value. */ public Long getIndicatorValue(final Long indicatorPK) { - try { + if (indicatorPK == null) { + return null; + } - Long value = this.indicatorValueCache.get(indicatorPK); - if (value == null) { + Long value = this.indicatorValueCache.get(indicatorPK); + if (value == null) { + try { if (log.isDebugEnabled()) { log.debug("*** Get and cache ping time: {}", indicatorPK); } value = this.clientIndicatorValueMapper.selectValueByPrimaryKey(indicatorPK); - } + if (value != null) { + this.indicatorValueCache.put(indicatorPK, value); + } - return value; - } catch (final Exception e) { - log.error("Error while trying to get last indicator value from storage: {}", e.getMessage()); - return 0L; + } catch (final Exception e) { + log.error("Error while trying to get last indicator value from storage: {}", e.getMessage()); + return -1L; + } } + return value; } /** Updates the internal indicator value cache by loading all actual SEB client indicators from persistent storage @@ -284,6 +339,8 @@ public class DistributedIndicatorValueService implements DisposableBean { this.lastUpdate = millisecondsNow; } + // System.out.println("*** Update distributed indicator value cache: " + this.indicatorValueCache); + } catch (final Exception e) { log.error("Error while trying to update distributed indicator value cache: {}", this.indicatorValueCache, e); @@ -293,7 +350,10 @@ public class DistributedIndicatorValueService implements DisposableBean { } /** Update last ping time on persistent storage asynchronously within a defines thread pool with no - * waiting queue to skip further ping updates if all update threads are busy **/ + * waiting queue to skip further ping updates if all update threads are busy + * + * TODO: we need a better handling strategy here. + * Try to apply a batch update and store the pings in a concurrent hash map **/ void updatePingAsync(final Long pingRecord) { try { this.indicatorValueUpdateExecutor diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicator.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicator.java index cf89f6d1..d79c9dae 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicator.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicator.java @@ -81,7 +81,12 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator { if (!this.initialized) { return Double.NaN; } - return DateTimeUtils.currentTimeMillis() - this.currentValue; + final long currentTimeMillis = DateTimeUtils.currentTimeMillis(); + if (this.initialized && !this.cachingEnabled && this.active + && this.lastUpdate != this.distributedPingCache.lastUpdate()) { + this.currentValue = computeValueAt(currentTimeMillis); + } + return currentTimeMillis - this.currentValue; } @Override @@ -96,7 +101,6 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator { @Override public final double computeValueAt(final long timestamp) { - if (!this.cachingEnabled && super.ditributedIndicatorValueRecordId != null) { final Long lastPing = this.distributedPingCache