diff --git a/src/main/java/ch/ethz/seb/sebserver/ClientHttpRequestFactoryService.java b/src/main/java/ch/ethz/seb/sebserver/ClientHttpRequestFactoryService.java index 06b16245..5272843e 100644 --- a/src/main/java/ch/ethz/seb/sebserver/ClientHttpRequestFactoryService.java +++ b/src/main/java/ch/ethz/seb/sebserver/ClientHttpRequestFactoryService.java @@ -76,7 +76,7 @@ public class ClientHttpRequestFactoryService { final ClientCredentialService clientCredentialService, @Value("${sebserver.http.client.connect-timeout:15000}") final int connectTimeout, @Value("${sebserver.http.client.connection-request-timeout:20000}") final int connectionRequestTimeout, - @Value("${sebserver.http.client.read-timeout:60000}") final int readTimeout) { + @Value("${sebserver.http.client.read-timeout:20000}") final int readTimeout) { this.environment = environment; this.clientCredentialService = clientCredentialService; diff --git a/src/main/java/ch/ethz/seb/sebserver/gbl/async/AsyncServiceSpringConfig.java b/src/main/java/ch/ethz/seb/sebserver/gbl/async/AsyncServiceSpringConfig.java index 23b8de59..dd7385cd 100644 --- a/src/main/java/ch/ethz/seb/sebserver/gbl/async/AsyncServiceSpringConfig.java +++ b/src/main/java/ch/ethz/seb/sebserver/gbl/async/AsyncServiceSpringConfig.java @@ -17,6 +17,7 @@ import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @Configuration @EnableAsync @@ -25,6 +26,7 @@ public class AsyncServiceSpringConfig implements AsyncConfigurer { public static final String EXECUTOR_BEAN_NAME = "AsyncServiceExecutorBean"; + /** This ThreadPool is used for internal long running background tasks */ @Bean(name = EXECUTOR_BEAN_NAME) public Executor threadPoolTaskExecutor() { final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); @@ -39,6 +41,9 @@ public class AsyncServiceSpringConfig implements AsyncConfigurer { public static final String EXAM_API_EXECUTOR_BEAN_NAME = "ExamAPIAsyncServiceExecutorBean"; + /** This ThreadPool is used for SEB client connection establishment and + * should be able to handle incoming bursts of SEB client connection requests (handshake) + * when up to 1000 - 2000 clients connect at nearly the same time (start of an exam) */ @Bean(name = EXAM_API_EXECUTOR_BEAN_NAME) public Executor examAPIThreadPoolTaskExecutor() { final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); @@ -51,6 +56,32 @@ public class AsyncServiceSpringConfig implements AsyncConfigurer { return executor; } + public static final String EXAM_API_PING_SERVICE_EXECUTOR_BEAN_NAME = "examAPIPingThreadPoolTaskExecutor"; + + /** This ThreadPool is used for ping handling in a distributed setup and shall reject + * incoming ping requests as fast as possible if there is to much load on the DB. + * We prefer to loose a shared ping update and respond to the client in time over a client request timeout */ + @Bean(name = EXAM_API_PING_SERVICE_EXECUTOR_BEAN_NAME) + public Executor examAPIPingThreadPoolTaskExecutor() { + final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(10); + executor.setMaxPoolSize(20); + executor.setQueueCapacity(0); + executor.setThreadNamePrefix("SEBPingService-"); + executor.initialize(); + executor.setWaitForTasksToCompleteOnShutdown(false); + return executor; + } + + @Bean + public ThreadPoolTaskScheduler threadPoolTaskScheduler() { + final ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler(); + threadPoolTaskScheduler.setPoolSize(5); + threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(false); + threadPoolTaskScheduler.setThreadNamePrefix("SEB-Server-BgTask-"); + return threadPoolTaskScheduler; + } + @Override public Executor getAsyncExecutor() { return threadPoolTaskExecutor(); diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/CacheConfig.java b/src/main/java/ch/ethz/seb/sebserver/webservice/CacheConfig.java index c93ba58a..20923778 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/CacheConfig.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/CacheConfig.java @@ -46,7 +46,6 @@ public class CacheConfig extends JCacheConfigurerSupport { final CachingProvider cachingProvider = Caching.getCachingProvider(); final javax.cache.CacheManager cacheManager = cachingProvider.getCacheManager(new URI(this.jCacheConfig), this.getClass().getClassLoader()); - System.out.println("cacheManager:" + cacheManager); final CompositeCacheManager composite = new CompositeCacheManager(); composite.setCacheManagers(Arrays.asList( diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/datalayer/batis/ClientEventExtensionMapper.java b/src/main/java/ch/ethz/seb/sebserver/webservice/datalayer/batis/ClientEventExtensionMapper.java index 655f2791..ce538fea 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/datalayer/batis/ClientEventExtensionMapper.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/datalayer/batis/ClientEventExtensionMapper.java @@ -80,7 +80,7 @@ public interface ClientEventExtensionMapper { .from(ClientEventRecordDynamicSqlSupport.clientEventRecord) - .leftJoin(ClientConnectionRecordDynamicSqlSupport.clientConnectionRecord) + .join(ClientConnectionRecordDynamicSqlSupport.clientConnectionRecord) .on( ClientEventRecordDynamicSqlSupport.clientEventRecord.clientConnectionId, equalTo(ClientConnectionRecordDynamicSqlSupport.clientConnectionRecord.id)); diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/SEBClientConnectionService.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/SEBClientConnectionService.java index 790923fc..df8771b9 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/SEBClientConnectionService.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/SEBClientConnectionService.java @@ -137,8 +137,9 @@ public interface SEBClientConnectionService { * @param connectionToken the connection token * @param timestamp the ping time-stamp * @param pingNumber the ping number + * @param instructionConfirm instruction confirm sent by the SEB client or null * @return SEB instruction if available */ - String notifyPing(String connectionToken, long timestamp, int pingNumber); + String notifyPing(String connectionToken, long timestamp, int pingNumber, String instructionConfirm); /** Notify a SEB client event for live indication and storing to database. * diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ExamSessionServiceImpl.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ExamSessionServiceImpl.java index eb2d0ad1..090d802f 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ExamSessionServiceImpl.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ExamSessionServiceImpl.java @@ -198,7 +198,8 @@ public class ExamSessionServiceImpl implements ExamSessionService { } @Override - public /* synchronized */ Result getRunningExam(final Long examId) { + public synchronized Result getRunningExam(final Long examId) { + if (log.isTraceEnabled()) { log.trace("Running exam request for exam {}", examId); } @@ -217,6 +218,7 @@ public class ExamSessionServiceImpl implements ExamSessionService { return Result.of(exam); } else { if (exam != null) { + log.info("Exam {} is not running anymore. Flush caches", exam); flushCache(exam); } @@ -401,8 +403,8 @@ public class ExamSessionServiceImpl implements ExamSessionService { } // If we are in a distributed setup the active connection token cache get flushed - // at least every second. This allows caching over multiple monitoring requests but - // ensure an update every second for new incoming connections + // in specified time interval. This allows caching over multiple monitoring requests but + // ensure an update every now and then for new incoming connections private void updateClientConnections(final Long examId) { try { final long currentTimeMillis = System.currentTimeMillis(); diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java index 905a9953..a8322b68 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java @@ -34,7 +34,6 @@ import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType; import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile; import ch.ethz.seb.sebserver.gbl.util.Result; import ch.ethz.seb.sebserver.gbl.util.Utils; -import ch.ethz.seb.sebserver.webservice.WebserviceInfo; import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientConnectionRecord; import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord; import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientConnectionDAO; @@ -70,14 +69,13 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic private final SEBClientConfigDAO sebClientConfigDAO; private final SEBClientInstructionService sebInstructionService; private final SEBClientNotificationService sebClientNotificationService; - private final WebserviceInfo webserviceInfo; private final ExamAdminService examAdminService; private final DistributedPingCache distributedPingCache; + private final boolean isDistributedSetup; protected SEBClientConnectionServiceImpl( final ExamSessionService examSessionService, final EventHandlingStrategyFactory eventHandlingStrategyFactory, - final SEBClientConfigDAO sebClientConfigDAO, final SEBClientInstructionService sebInstructionService, final SEBClientNotificationService sebClientNotificationService, @@ -92,9 +90,9 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic this.sebClientConfigDAO = sebClientConfigDAO; this.sebInstructionService = sebInstructionService; this.sebClientNotificationService = sebClientNotificationService; - this.webserviceInfo = sebInstructionService.getWebserviceInfo(); this.examAdminService = examAdminService; this.distributedPingCache = distributedPingCache; + this.isDistributedSetup = sebInstructionService.getWebserviceInfo().isDistributed(); } @Override @@ -477,7 +475,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic } // delete stored ping if this is a distributed setup - if (this.webserviceInfo.isDistributed()) { + if (this.isDistributedSetup) { this.distributedPingCache .deletePingForConnection(updatedClientConnection.id); } @@ -531,7 +529,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic } // delete stored ping if this is a distributed setup - if (this.webserviceInfo.isDistributed()) { + if (this.isDistributedSetup) { this.distributedPingCache .deletePingForConnection(updatedClientConnection.id); } @@ -545,7 +543,6 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic public void updatePingEvents() { try { - final boolean distributed = this.webserviceInfo.isDistributed(); final Cache cache = this.cacheManager.getCache(ExamSessionCacheService.CACHE_NAME_ACTIVE_CLIENT_CONNECTION); final long now = Utils.getMillisecondsNow(); final Consumer missingPingUpdate = missingPingUpdate(now); @@ -554,8 +551,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic .allRunningExamIds() .getOrThrow() .stream() - .flatMap(examId -> distributed - // TODO fetch only the connection tokens form active connections here + .flatMap(examId -> this.isDistributedSetup ? this.clientConnectionDAO .getConnectionTokensNoCache(examId) .getOrThrow() @@ -584,16 +580,26 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic public String notifyPing( final String connectionToken, final long timestamp, - final int pingNumber) { + final int pingNumber, + final String instructionConfirm) { + + processPing(connectionToken, timestamp, pingNumber); + + if (instructionConfirm != null) { + this.sebInstructionService.confirmInstructionDone(connectionToken, instructionConfirm); + } + + return this.sebInstructionService.getInstructionJSON(connectionToken); + } + + private void processPing(final String connectionToken, final long timestamp, final int pingNumber) { final ClientConnectionDataInternal activeClientConnection = - this.examSessionService.getConnectionDataInternal(connectionToken); + this.examSessionCacheService.getClientConnection(connectionToken); if (activeClientConnection != null) { activeClientConnection.notifyPing(timestamp, pingNumber); } - - return this.sebInstructionService.getInstructionJSON(connectionToken); } @Override @@ -735,7 +741,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic } private void checkExamIntegrity(final Long examId) { - if (this.webserviceInfo.isDistributed()) { + if (this.isDistributedSetup) { // if the cached Exam is not up to date anymore, we have to update the cache first final Result updateExamCache = this.examSessionService.updateExamCache(examId); if (updateExamCache.hasError()) { @@ -782,7 +788,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic if (clientEventRecord != null) { // store event and and flush cache this.eventHandlingStrategy.accept(clientEventRecord); - if (this.webserviceInfo.isDistributed()) { + if (this.isDistributedSetup) { // mark for update and flush the cache this.clientConnectionDAO.save(connection.clientConnection); this.examSessionCacheService.evictClientConnection( diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractClientIndicator.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractClientIndicator.java index c5fa6bfb..eafc5e89 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractClientIndicator.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractClientIndicator.java @@ -10,21 +10,16 @@ package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator; import org.joda.time.DateTimeUtils; -import ch.ethz.seb.sebserver.gbl.Constants; import ch.ethz.seb.sebserver.gbl.model.exam.Indicator; import ch.ethz.seb.sebserver.webservice.servicelayer.session.ClientIndicator; public abstract class AbstractClientIndicator implements ClientIndicator { - private static final long PERSISTENT_UPDATE_INTERVAL = Constants.SECOND_IN_MILLIS; - protected Long indicatorId; protected Long examId; protected Long connectionId; protected boolean cachingEnabled; protected boolean active = true; - protected long persistentUpdateInterval = PERSISTENT_UPDATE_INTERVAL; - protected long lastPersistentUpdate = 0; protected boolean valueInitializes = false; protected double currentValue = Double.NaN; @@ -72,15 +67,11 @@ public abstract class AbstractClientIndicator implements ClientIndicator { final long now = DateTimeUtils.currentTimeMillis(); if (!this.valueInitializes) { this.currentValue = computeValueAt(now); - this.lastPersistentUpdate = now; this.valueInitializes = true; } if (!this.cachingEnabled && this.active) { - if (now - this.lastPersistentUpdate > this.persistentUpdateInterval) { - this.currentValue = computeValueAt(now); - this.lastPersistentUpdate = now; - } + this.currentValue = computeValueAt(now); } return this.currentValue; diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogIndicator.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogIndicator.java index 743361e3..39edc2d5 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogIndicator.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogIndicator.java @@ -24,11 +24,16 @@ import ch.ethz.seb.sebserver.gbl.util.Utils; public abstract class AbstractLogIndicator extends AbstractClientIndicator { + protected static final Long DISTRIBUTED_LOG_UPDATE_INTERVAL = 5 * Constants.SECOND_IN_MILLIS; + protected final Set observed; protected final List eventTypeIds; protected String[] tags; + protected long lastDistributedUpdate = 0L; + protected AbstractLogIndicator(final EventType... eventTypes) { + this.observed = Collections.unmodifiableSet(EnumSet.of(eventTypes[0], eventTypes)); this.eventTypeIds = Utils.immutableListOf(Arrays.stream(eventTypes) .map(et -> et.id) @@ -44,7 +49,6 @@ public abstract class AbstractLogIndicator extends AbstractClientIndicator { final boolean cachingEnabled) { super.init(indicatorDefinition, connectionId, active, cachingEnabled); - super.persistentUpdateInterval = 2 * Constants.SECOND_IN_MILLIS; if (indicatorDefinition == null || StringUtils.isBlank(indicatorDefinition.tags)) { this.tags = null; @@ -75,4 +79,12 @@ public abstract class AbstractLogIndicator extends AbstractClientIndicator { return this.observed; } + protected boolean loadFromPersistent(final long timestamp) { + if (!super.valueInitializes) { + return true; + } + + return timestamp - this.lastDistributedUpdate > DISTRIBUTED_LOG_UPDATE_INTERVAL; + } + } diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogLevelCountIndicator.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogLevelCountIndicator.java index 8a7fa0fd..50cc95eb 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogLevelCountIndicator.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogLevelCountIndicator.java @@ -57,6 +57,10 @@ public abstract class AbstractLogLevelCountIndicator extends AbstractLogIndicato @Override public double computeValueAt(final long timestamp) { + if (!loadFromPersistent(timestamp)) { + return super.currentValue; + } + try { final Long errors = this.clientEventRecordMapper @@ -72,9 +76,12 @@ public abstract class AbstractLogLevelCountIndicator extends AbstractLogIndicato .execute(); return errors.doubleValue(); + } catch (final Exception e) { log.error("Failed to get indicator count from persistent storage: ", e); return super.currentValue; + } finally { + super.lastDistributedUpdate = timestamp; } } diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogNumberIndicator.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogNumberIndicator.java index bb9d13dc..5e9f381c 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogNumberIndicator.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractLogNumberIndicator.java @@ -62,8 +62,15 @@ public abstract class AbstractLogNumberIndicator extends AbstractLogIndicator { @Override public double computeValueAt(final long timestamp) { + + if (!loadFromPersistent(timestamp)) { + return super.currentValue; + } + try { + System.out.println("************** loadFromPersistent"); + final List execute = this.clientEventRecordMapper.selectByExample() .where(ClientEventRecordDynamicSqlSupport.clientConnectionId, isEqualTo(this.connectionId)) .and(ClientEventRecordDynamicSqlSupport.type, isIn(this.eventTypeIds)) @@ -90,6 +97,8 @@ public abstract class AbstractLogNumberIndicator extends AbstractLogIndicator { } catch (final Exception e) { log.error("Failed to get indicator number from persistent storage: {}", e.getMessage()); return this.currentValue; + } finally { + super.lastDistributedUpdate = timestamp; } } diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java index a67a7546..0005171b 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/AbstractPingIndicator.java @@ -11,34 +11,38 @@ package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator; import java.util.Collections; import java.util.EnumSet; import java.util.Set; +import java.util.concurrent.Executor; -import org.joda.time.DateTime; -import org.joda.time.DateTimeUtils; -import org.joda.time.DateTimeZone; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; -import ch.ethz.seb.sebserver.gbl.Constants; +import ch.ethz.seb.sebserver.gbl.async.AsyncServiceSpringConfig; import ch.ethz.seb.sebserver.gbl.model.exam.Indicator; +import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType; +import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent; import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType; +import ch.ethz.seb.sebserver.gbl.util.Utils; +import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientEventLastPingMapper; import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord; public abstract class AbstractPingIndicator extends AbstractClientIndicator { private static final Logger log = LoggerFactory.getLogger(AbstractPingIndicator.class); - private static final long INTERVAL_FOR_PERSISTENT_UPDATE = Constants.SECOND_IN_MILLIS; - private final Set EMPTY_SET = Collections.unmodifiableSet(EnumSet.noneOf(EventType.class)); + private final Executor executor; protected final DistributedPingCache distributedPingCache; - private final long lastUpdate = 0; - protected Long pingRecord = null; - - protected AbstractPingIndicator(final DistributedPingCache distributedPingCache) { + //protected Long pingRecord = null; + protected PingUpdate pingUpdate = null; + protected AbstractPingIndicator( + final DistributedPingCache distributedPingCache, + @Qualifier(AsyncServiceSpringConfig.EXAM_API_PING_SERVICE_EXECUTOR_BEAN_NAME) final Executor executor) { super(); + this.executor = executor; this.distributedPingCache = distributedPingCache; } @@ -53,33 +57,31 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator { if (!this.cachingEnabled && this.active) { try { - this.pingRecord = this.distributedPingCache.initPingForConnection(this.connectionId); + createPingUpdate(); } catch (final Exception e) { - this.pingRecord = this.distributedPingCache.getPingRecordIdForConnectionId(connectionId); + createPingUpdate(); } } } public final void notifyPing(final long timestamp, final int pingNumber) { - final long now = DateTime.now(DateTimeZone.UTC).getMillis(); - super.currentValue = now; - super.lastPersistentUpdate = now; + super.currentValue = timestamp; if (!this.cachingEnabled) { - if (this.pingRecord == null) { + if (this.pingUpdate == null) { tryRecoverPingRecord(); - if (this.pingRecord == null) { + if (this.pingUpdate == null) { return; } } - // Update last ping time on persistent storage - final long millisecondsNow = DateTimeUtils.currentTimeMillis(); - if (millisecondsNow - this.lastUpdate > INTERVAL_FOR_PERSISTENT_UPDATE) { - synchronized (this) { - this.distributedPingCache.updatePing(this.pingRecord, millisecondsNow); - } + // Update last ping time on persistent storage asynchronously within a defines thread pool with no + // waiting queue to skip further ping updates if all update threads are busy + try { + this.executor.execute(this.pingUpdate); + } catch (final Exception e) { + //log.warn("Failed to schedule ping task: {}" + e.getMessage()); } } } @@ -91,15 +93,21 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator { } try { - this.pingRecord = this.distributedPingCache.getPingRecordIdForConnectionId(this.connectionId); - if (this.pingRecord == null) { - this.pingRecord = this.distributedPingCache.initPingForConnection(this.connectionId); + createPingUpdate(); + if (this.pingUpdate == null) { + createPingUpdate(); } } catch (final Exception e) { log.error("Failed to recover ping record for connection: {}", this.connectionId, e); } } + private void createPingUpdate() { + this.pingUpdate = new PingUpdate( + this.distributedPingCache.getClientEventLastPingMapper(), + this.distributedPingCache.initPingForConnection(this.connectionId)); + } + @Override public Set observedEvents() { return this.EMPTY_SET; @@ -107,4 +115,46 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator { public abstract ClientEventRecord updateLogEvent(final long now); + @Override + public double computeValueAt(final long timestamp) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void notifyValueChange(final ClientEvent event) { + // TODO Auto-generated method stub + + } + + @Override + public void notifyValueChange(final ClientEventRecord clientEventRecord) { + // TODO Auto-generated method stub + + } + + @Override + public IndicatorType getType() { + // TODO Auto-generated method stub + return null; + } + + static final class PingUpdate implements Runnable { + + private final ClientEventLastPingMapper clientEventLastPingMapper; + final Long pingRecord; + + public PingUpdate(final ClientEventLastPingMapper clientEventLastPingMapper, final Long pingRecord) { + this.clientEventLastPingMapper = clientEventLastPingMapper; + this.pingRecord = pingRecord; + } + + @Override + public void run() { + this.clientEventLastPingMapper + .updatePingTime(this.pingRecord, Utils.getMillisecondsNow()); + } + + } + } diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedPingCache.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedPingCache.java index 3384fc89..a80221ca 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedPingCache.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedPingCache.java @@ -12,11 +12,12 @@ import static org.mybatis.dynamic.sql.SqlBuilder.isEqualTo; import static org.mybatis.dynamic.sql.SqlBuilder.isIn; import java.util.ArrayList; +import java.util.Collection; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import java.util.stream.Collectors; -import org.ehcache.impl.internal.concurrent.ConcurrentHashMap; import org.joda.time.DateTimeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,7 +26,6 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.scheduling.TaskScheduler; import org.springframework.stereotype.Component; -import org.springframework.transaction.annotation.Isolation; import org.springframework.transaction.annotation.Transactional; import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType; @@ -33,6 +33,7 @@ import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile; import ch.ethz.seb.sebserver.gbl.util.Utils; import ch.ethz.seb.sebserver.webservice.WebserviceInfo; import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientEventLastPingMapper; +import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientEventLastPingMapper.ClientEventLastPingRecord; import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordDynamicSqlSupport; import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordMapper; import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord; @@ -64,7 +65,7 @@ public class DistributedPingCache implements DisposableBean { this.pingUpdateTolerance = pingUpdate * 2 / 3; if (webserviceInfo.isDistributed()) { try { - this.taskRef = taskScheduler.scheduleAtFixedRate(this::updateCache, pingUpdate); + this.taskRef = taskScheduler.scheduleAtFixedRate(this::updatePings, pingUpdate); } catch (final Exception e) { log.error("Failed to initialize distributed ping cache update task"); this.taskRef = null; @@ -74,6 +75,10 @@ public class DistributedPingCache implements DisposableBean { } } + public ClientEventLastPingMapper getClientEventLastPingMapper() { + return this.clientEventLastPingMapper; + } + @Transactional public Long initPingForConnection(final Long connectionId) { try { @@ -133,17 +138,6 @@ public class DistributedPingCache implements DisposableBean { } } - public void updatePing(final Long pingRecordId, final Long pingTime) { - try { - - this.clientEventLastPingMapper - .updatePingTime(pingRecordId, pingTime); - - } catch (final Exception e) { - log.error("Failed to update ping for ping record id -> {}", pingRecordId); - } - } - @Transactional public void deletePingForConnection(final Long connectionId) { try { @@ -152,32 +146,49 @@ public class DistributedPingCache implements DisposableBean { log.debug("*** Delete ping record for SEB connection: {}", connectionId); } - this.clientEventRecordMapper - .deleteByExample() + final Collection records = this.clientEventLastPingMapper + .selectByExample() .where(ClientEventRecordDynamicSqlSupport.clientConnectionId, isEqualTo(connectionId)) .and(ClientEventRecordDynamicSqlSupport.type, isEqualTo(EventType.LAST_PING.id)) .build() .execute(); + if (records == null || records.isEmpty()) { + return; + } + + final Long id = records.iterator().next().id; + this.pingCache.remove(id); + this.clientEventRecordMapper.deleteByPrimaryKey(id); + } catch (final Exception e) { log.error("Failed to delete ping for connection -> {}", connectionId, e); - } finally { - this.pingCache.remove(connectionId); + try { + log.info( + "Because of failed ping record deletion, " + + "flushing the ping cache to ensure no dead connections pings remain in the cache"); + this.pingCache.clear(); + } catch (final Exception ee) { + log.error("Failed to force flushing the ping cache: ", e); + } } } - public Long getLastPing(final Long pingRecordId) { + public Long getLastPing(final Long pingRecordId, final boolean missing) { try { Long ping = this.pingCache.get(pingRecordId); - if (ping == null) { + if (ping == null && !missing) { + if (log.isDebugEnabled()) { log.debug("*** Get and cache ping time: {}", pingRecordId); } ping = this.clientEventLastPingMapper.selectPingTimeByPrimaryKey(pingRecordId); - if (ping != null) { - this.pingCache.put(pingRecordId, ping); - } + } + + // if we have a missing ping we need to check new ping from next update even if the cache was empty + if (ping != null || missing) { + this.pingCache.put(pingRecordId, ping); } return ping; @@ -187,8 +198,11 @@ public class DistributedPingCache implements DisposableBean { } } - @Transactional(readOnly = true, isolation = Isolation.READ_UNCOMMITTED) - public void updateCache() { + private void updatePings() { + + if (this.pingCache.isEmpty()) { + return; + } final long millisecondsNow = Utils.getMillisecondsNow(); if (millisecondsNow - this.lastUpdate < this.pingUpdateTolerance) { @@ -196,15 +210,12 @@ public class DistributedPingCache implements DisposableBean { return; } - if (this.pingCache.isEmpty()) { - return; - } - if (log.isDebugEnabled()) { log.trace("*** Update distributed ping cache: {}", this.pingCache); } try { + final ArrayList pks = new ArrayList<>(this.pingCache.keySet()); final Map mapping = this.clientEventLastPingMapper .selectByExample() @@ -225,6 +236,8 @@ public class DistributedPingCache implements DisposableBean { } catch (final Exception e) { log.error("Error while trying to update distributed ping cache: {}", this.pingCache, e); } + + this.lastUpdate = millisecondsNow; } @Override diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicator.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicator.java index 85bb95a6..f22b7b53 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicator.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicator.java @@ -10,10 +10,12 @@ package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator; import java.math.BigDecimal; import java.util.Comparator; +import java.util.concurrent.Executor; import org.joda.time.DateTimeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Scope; @@ -22,6 +24,7 @@ import org.springframework.stereotype.Component; import com.fasterxml.jackson.annotation.JsonIgnore; import ch.ethz.seb.sebserver.gbl.Constants; +import ch.ethz.seb.sebserver.gbl.async.AsyncServiceSpringConfig; import ch.ethz.seb.sebserver.gbl.model.exam.Indicator; import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType; import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent; @@ -44,8 +47,10 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator { private boolean missingPing = false; private boolean hidden = false; - public PingIntervalClientIndicator(final DistributedPingCache distributedPingCache) { - super(distributedPingCache); + public PingIntervalClientIndicator( + final DistributedPingCache distributedPingCache, + @Qualifier(AsyncServiceSpringConfig.EXAM_API_PING_SERVICE_EXECUTOR_BEAN_NAME) final Executor executor) { + super(distributedPingCache, executor); this.cachingEnabled = true; } @@ -122,16 +127,12 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator { @Override public final double computeValueAt(final long timestamp) { + if (!this.cachingEnabled && super.pingUpdate != null) { - if (!this.cachingEnabled && super.pingRecord != null) { - - // if this indicator is not missing ping - if (!this.isMissingPing()) { - final Long lastPing = this.distributedPingCache.getLastPing(super.pingRecord); - if (lastPing != null) { - final double doubleValue = lastPing.doubleValue(); - return Math.max(Double.isNaN(this.currentValue) ? doubleValue : this.currentValue, doubleValue); - } + final Long lastPing = this.distributedPingCache.getLastPing(super.pingUpdate.pingRecord, this.missingPing); + if (lastPing != null) { + final double doubleValue = lastPing.doubleValue(); + return Math.max(Double.isNaN(this.currentValue) ? doubleValue : this.currentValue, doubleValue); } return this.currentValue; diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/weblayer/api/ClientEventController.java b/src/main/java/ch/ethz/seb/sebserver/webservice/weblayer/api/ClientEventController.java index 79dc124c..7d50c201 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/weblayer/api/ClientEventController.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/weblayer/api/ClientEventController.java @@ -110,19 +110,13 @@ public class ClientEventController extends ReadonlyEntityController this.clientEventDAO.allMatchingExtended(filterMap, this::hasReadAccess)) - .getOrThrow(); - } catch (final Exception e) { - e.printStackTrace(); - throw e; - } + return this.paginationService.getPage( + pageNumber, + pageSize, + sort, + getSQLTableOfEntity().name(), + () -> this.clientEventDAO.allMatchingExtended(filterMap, this::hasReadAccess)) + .getOrThrow(); } @Override diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/weblayer/api/ExamAPI_V1_Controller.java b/src/main/java/ch/ethz/seb/sebserver/webservice/weblayer/api/ExamAPI_V1_Controller.java index 5aed1f6c..66369a5a 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/weblayer/api/ExamAPI_V1_Controller.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/weblayer/api/ExamAPI_V1_Controller.java @@ -270,32 +270,31 @@ public class ExamAPI_V1_Controller { final String pingNumString = request.getParameter(API.EXAM_API_PING_NUMBER); final String instructionConfirm = request.getParameter(API.EXAM_API_PING_INSTRUCTION_CONFIRM); - if (connectionToken == null) { - log.warn("Missing connection token on ping. Ignore the request"); + long pingTime; + try { + pingTime = Long.parseLong(timeStampString); + } catch (final Exception e) { + log.error("Invalid ping request: {}", connectionToken); + response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR.value()); return; } - if (instructionConfirm != null) { - this.sebClientConnectionService.confirmInstructionDone(connectionToken, instructionConfirm); - } - - final Long clientTime = timeStampString != null ? Long.parseLong(timeStampString) : 0L; final String instruction = this.sebClientConnectionService .notifyPing( connectionToken, - clientTime, - pingNumString != null ? Integer.parseInt(pingNumString) : -1); + pingTime, + pingNumString != null ? Integer.parseInt(pingNumString) : -1, + instructionConfirm); if (instruction == null) { response.setStatus(HttpStatus.NO_CONTENT.value()); - return; - } - - try { - response.setStatus(HttpStatus.OK.value()); - response.getOutputStream().write(instruction.getBytes()); - } catch (final IOException e) { - log.error("Failed to send instruction as response: {}", connectionToken, e); + } else { + try { + response.setStatus(HttpStatus.OK.value()); + response.getOutputStream().write(instruction.getBytes()); + } catch (final IOException e) { + log.error("Failed to send instruction as response: {}", connectionToken, e); + } } } diff --git a/src/main/resources/config/application-dev-ws.properties b/src/main/resources/config/application-dev-ws.properties index cef30a70..ecc62461 100644 --- a/src/main/resources/config/application-dev-ws.properties +++ b/src/main/resources/config/application-dev-ws.properties @@ -20,7 +20,7 @@ sebserver.http.client.connect-timeout=15000 sebserver.http.client.connection-request-timeout=10000 sebserver.http.client.read-timeout=20000 sebserver.webservice.distributed.pingUpdate=1000 -sebserver.webservice.distributed.connectionUpdate=1000 +sebserver.webservice.distributed.connectionUpdate=2000 sebserver.webservice.clean-db-on-startup=false # webservice configuration diff --git a/src/main/resources/config/application-ws.properties b/src/main/resources/config/application-ws.properties index 4325f14a..88fef4ce 100644 --- a/src/main/resources/config/application-ws.properties +++ b/src/main/resources/config/application-ws.properties @@ -10,9 +10,6 @@ sebserver.init.adminaccount.username=sebserver-admin sebserver.init.database.integrity.checks=true sebserver.init.database.integrity.try-fix=true -sebserver.webservice.distributed=false -sebserver.webservice.distributed.pingUpdate=3000 - ### webservice caching spring.cache.jcache.provider=org.ehcache.jsr107.EhcacheCachingProvider spring.cache.jcache.config=classpath:config/ehcache.xml @@ -42,6 +39,7 @@ sebserver.webservice.internalSecret=${sebserver.password} ### webservice networking sebserver.webservice.forceMaster=false sebserver.webservice.distributed=false +sebserver.webservice.distributed.pingUpdate=3000 sebserver.webservice.http.external.scheme=https sebserver.webservice.http.external.servername= sebserver.webservice.http.external.port= diff --git a/src/test/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicatorTest.java b/src/test/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicatorTest.java index 6fbb52b4..43146d4d 100644 --- a/src/test/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicatorTest.java +++ b/src/test/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/PingIntervalClientIndicatorTest.java @@ -10,6 +10,8 @@ package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator; import static org.junit.Assert.assertEquals; +import java.util.concurrent.Executor; + import org.joda.time.DateTimeUtils; import org.junit.After; import org.junit.Test; @@ -34,9 +36,10 @@ public class PingIntervalClientIndicatorTest { final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class); final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class); + final Executor executor = Mockito.mock(Executor.class); final PingIntervalClientIndicator pingIntervalClientIndicator = - new PingIntervalClientIndicator(distributedPingCache); + new PingIntervalClientIndicator(distributedPingCache, executor); assertEquals("0.0", String.valueOf(pingIntervalClientIndicator.getValue())); } @@ -47,9 +50,10 @@ public class PingIntervalClientIndicatorTest { final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class); final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class); + final Executor executor = Mockito.mock(Executor.class); final PingIntervalClientIndicator pingIntervalClientIndicator = - new PingIntervalClientIndicator(distributedPingCache); + new PingIntervalClientIndicator(distributedPingCache, executor); assertEquals("0.0", String.valueOf(pingIntervalClientIndicator.getValue())); DateTimeUtils.setCurrentMillisProvider(() -> 10L); @@ -63,9 +67,10 @@ public class PingIntervalClientIndicatorTest { final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class); final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class); + final Executor executor = Mockito.mock(Executor.class); final PingIntervalClientIndicator pingIntervalClientIndicator = - new PingIntervalClientIndicator(distributedPingCache); + new PingIntervalClientIndicator(distributedPingCache, executor); final JSONMapper jsonMapper = new JSONMapper(); final String json = jsonMapper.writeValueAsString(pingIntervalClientIndicator); assertEquals("{\"indicatorValue\":0.0,\"indicatorType\":\"LAST_PING\"}", json);