SEBSERV-194 better distributed ping handling (dedicated ThreadPool)
This commit is contained in:
		
							parent
							
								
									52b96647c1
								
							
						
					
					
						commit
						a88e308ba9
					
				
					 21 changed files with 273 additions and 139 deletions
				
			
		|  | @ -76,7 +76,7 @@ public class ClientHttpRequestFactoryService { | ||||||
|             final ClientCredentialService clientCredentialService, |             final ClientCredentialService clientCredentialService, | ||||||
|             @Value("${sebserver.http.client.connect-timeout:15000}") final int connectTimeout, |             @Value("${sebserver.http.client.connect-timeout:15000}") final int connectTimeout, | ||||||
|             @Value("${sebserver.http.client.connection-request-timeout:20000}") final int connectionRequestTimeout, |             @Value("${sebserver.http.client.connection-request-timeout:20000}") final int connectionRequestTimeout, | ||||||
|             @Value("${sebserver.http.client.read-timeout:10000}") final int readTimeout) { |             @Value("${sebserver.http.client.read-timeout:20000}") final int readTimeout) { | ||||||
| 
 | 
 | ||||||
|         this.environment = environment; |         this.environment = environment; | ||||||
|         this.clientCredentialService = clientCredentialService; |         this.clientCredentialService = clientCredentialService; | ||||||
|  |  | ||||||
|  | @ -17,6 +17,7 @@ import org.springframework.scheduling.annotation.AsyncConfigurer; | ||||||
| import org.springframework.scheduling.annotation.EnableAsync; | import org.springframework.scheduling.annotation.EnableAsync; | ||||||
| import org.springframework.scheduling.annotation.EnableScheduling; | import org.springframework.scheduling.annotation.EnableScheduling; | ||||||
| import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; | import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; | ||||||
|  | import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; | ||||||
| 
 | 
 | ||||||
| @Configuration | @Configuration | ||||||
| @EnableAsync | @EnableAsync | ||||||
|  | @ -25,6 +26,7 @@ public class AsyncServiceSpringConfig implements AsyncConfigurer { | ||||||
| 
 | 
 | ||||||
|     public static final String EXECUTOR_BEAN_NAME = "AsyncServiceExecutorBean"; |     public static final String EXECUTOR_BEAN_NAME = "AsyncServiceExecutorBean"; | ||||||
| 
 | 
 | ||||||
|  |     /** This ThreadPool is used for internal long running background tasks */ | ||||||
|     @Bean(name = EXECUTOR_BEAN_NAME) |     @Bean(name = EXECUTOR_BEAN_NAME) | ||||||
|     public Executor threadPoolTaskExecutor() { |     public Executor threadPoolTaskExecutor() { | ||||||
|         final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); |         final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); | ||||||
|  | @ -39,6 +41,9 @@ public class AsyncServiceSpringConfig implements AsyncConfigurer { | ||||||
| 
 | 
 | ||||||
|     public static final String EXAM_API_EXECUTOR_BEAN_NAME = "ExamAPIAsyncServiceExecutorBean"; |     public static final String EXAM_API_EXECUTOR_BEAN_NAME = "ExamAPIAsyncServiceExecutorBean"; | ||||||
| 
 | 
 | ||||||
|  |     /** This ThreadPool is used for SEB client connection establishment and | ||||||
|  |      * should be able to handle incoming bursts of SEB client connection requests (handshake) | ||||||
|  |      * when up to 1000 - 2000 clients connect at nearly the same time (start of an exam) */ | ||||||
|     @Bean(name = EXAM_API_EXECUTOR_BEAN_NAME) |     @Bean(name = EXAM_API_EXECUTOR_BEAN_NAME) | ||||||
|     public Executor examAPIThreadPoolTaskExecutor() { |     public Executor examAPIThreadPoolTaskExecutor() { | ||||||
|         final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); |         final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); | ||||||
|  | @ -51,6 +56,32 @@ public class AsyncServiceSpringConfig implements AsyncConfigurer { | ||||||
|         return executor; |         return executor; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     public static final String EXAM_API_PING_SERVICE_EXECUTOR_BEAN_NAME = "examAPIPingThreadPoolTaskExecutor"; | ||||||
|  | 
 | ||||||
|  |     /** This ThreadPool is used for ping handling in a distributed setup and shall reject | ||||||
|  |      * incoming ping requests as fast as possible if there is to much load on the DB. | ||||||
|  |      * We prefer to loose a shared ping update and respond to the client in time over a client request timeout */ | ||||||
|  |     @Bean(name = EXAM_API_PING_SERVICE_EXECUTOR_BEAN_NAME) | ||||||
|  |     public Executor examAPIPingThreadPoolTaskExecutor() { | ||||||
|  |         final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); | ||||||
|  |         executor.setCorePoolSize(10); | ||||||
|  |         executor.setMaxPoolSize(20); | ||||||
|  |         executor.setQueueCapacity(0); | ||||||
|  |         executor.setThreadNamePrefix("SEBPingService-"); | ||||||
|  |         executor.initialize(); | ||||||
|  |         executor.setWaitForTasksToCompleteOnShutdown(false); | ||||||
|  |         return executor; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Bean | ||||||
|  |     public ThreadPoolTaskScheduler threadPoolTaskScheduler() { | ||||||
|  |         final ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler(); | ||||||
|  |         threadPoolTaskScheduler.setPoolSize(5); | ||||||
|  |         threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(false); | ||||||
|  |         threadPoolTaskScheduler.setThreadNamePrefix("SEB-Server-BgTask-"); | ||||||
|  |         return threadPoolTaskScheduler; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     @Override |     @Override | ||||||
|     public Executor getAsyncExecutor() { |     public Executor getAsyncExecutor() { | ||||||
|         return threadPoolTaskExecutor(); |         return threadPoolTaskExecutor(); | ||||||
|  |  | ||||||
|  | @ -46,7 +46,6 @@ public class CacheConfig extends JCacheConfigurerSupport { | ||||||
|             final CachingProvider cachingProvider = Caching.getCachingProvider(); |             final CachingProvider cachingProvider = Caching.getCachingProvider(); | ||||||
|             final javax.cache.CacheManager cacheManager = |             final javax.cache.CacheManager cacheManager = | ||||||
|                     cachingProvider.getCacheManager(new URI(this.jCacheConfig), this.getClass().getClassLoader()); |                     cachingProvider.getCacheManager(new URI(this.jCacheConfig), this.getClass().getClassLoader()); | ||||||
|             System.out.println("cacheManager:" + cacheManager); |  | ||||||
| 
 | 
 | ||||||
|             final CompositeCacheManager composite = new CompositeCacheManager(); |             final CompositeCacheManager composite = new CompositeCacheManager(); | ||||||
|             composite.setCacheManagers(Arrays.asList( |             composite.setCacheManagers(Arrays.asList( | ||||||
|  |  | ||||||
|  | @ -80,7 +80,7 @@ public interface ClientEventExtensionMapper { | ||||||
| 
 | 
 | ||||||
|                 .from(ClientEventRecordDynamicSqlSupport.clientEventRecord) |                 .from(ClientEventRecordDynamicSqlSupport.clientEventRecord) | ||||||
| 
 | 
 | ||||||
|                 .leftJoin(ClientConnectionRecordDynamicSqlSupport.clientConnectionRecord) |                 .join(ClientConnectionRecordDynamicSqlSupport.clientConnectionRecord) | ||||||
|                 .on( |                 .on( | ||||||
|                         ClientEventRecordDynamicSqlSupport.clientEventRecord.clientConnectionId, |                         ClientEventRecordDynamicSqlSupport.clientEventRecord.clientConnectionId, | ||||||
|                         equalTo(ClientConnectionRecordDynamicSqlSupport.clientConnectionRecord.id)); |                         equalTo(ClientConnectionRecordDynamicSqlSupport.clientConnectionRecord.id)); | ||||||
|  |  | ||||||
|  | @ -137,8 +137,9 @@ public interface SEBClientConnectionService { | ||||||
|      * @param connectionToken the connection token |      * @param connectionToken the connection token | ||||||
|      * @param timestamp the ping time-stamp |      * @param timestamp the ping time-stamp | ||||||
|      * @param pingNumber the ping number |      * @param pingNumber the ping number | ||||||
|  |      * @param instructionConfirm instruction confirm sent by the SEB client or null | ||||||
|      * @return SEB instruction if available */ |      * @return SEB instruction if available */ | ||||||
|     String notifyPing(String connectionToken, long timestamp, int pingNumber); |     String notifyPing(String connectionToken, long timestamp, int pingNumber, String instructionConfirm); | ||||||
| 
 | 
 | ||||||
|     /** Notify a SEB client event for live indication and storing to database. |     /** Notify a SEB client event for live indication and storing to database. | ||||||
|      * |      * | ||||||
|  |  | ||||||
|  | @ -69,7 +69,7 @@ public class ExamSessionCacheService { | ||||||
|             cacheNames = CACHE_NAME_RUNNING_EXAM, |             cacheNames = CACHE_NAME_RUNNING_EXAM, | ||||||
|             key = "#examId", |             key = "#examId", | ||||||
|             unless = "#result == null") |             unless = "#result == null") | ||||||
|     public Exam getRunningExam(final Long examId) { |     public synchronized Exam getRunningExam(final Long examId) { | ||||||
| 
 | 
 | ||||||
|         if (log.isDebugEnabled()) { |         if (log.isDebugEnabled()) { | ||||||
|             log.debug("Verify running exam for id: {}", examId); |             log.debug("Verify running exam for id: {}", examId); | ||||||
|  |  | ||||||
|  | @ -193,7 +193,7 @@ public class ExamSessionServiceImpl implements ExamSessionService { | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     @Override |     @Override | ||||||
|     public Result<Exam> getRunningExam(final Long examId) { |     public synchronized Result<Exam> getRunningExam(final Long examId) { | ||||||
|         if (log.isTraceEnabled()) { |         if (log.isTraceEnabled()) { | ||||||
|             log.trace("Running exam request for exam {}", examId); |             log.trace("Running exam request for exam {}", examId); | ||||||
|         } |         } | ||||||
|  | @ -212,6 +212,7 @@ public class ExamSessionServiceImpl implements ExamSessionService { | ||||||
|             return Result.of(exam); |             return Result.of(exam); | ||||||
|         } else { |         } else { | ||||||
|             if (exam != null) { |             if (exam != null) { | ||||||
|  |                 log.info("Exam {} is not running anymore. Flush caches", exam); | ||||||
|                 flushCache(exam); |                 flushCache(exam); | ||||||
|             } |             } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -34,7 +34,6 @@ import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType; | ||||||
| import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile; | import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile; | ||||||
| import ch.ethz.seb.sebserver.gbl.util.Result; | import ch.ethz.seb.sebserver.gbl.util.Result; | ||||||
| import ch.ethz.seb.sebserver.gbl.util.Utils; | import ch.ethz.seb.sebserver.gbl.util.Utils; | ||||||
| import ch.ethz.seb.sebserver.webservice.WebserviceInfo; |  | ||||||
| import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientConnectionRecord; | import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientConnectionRecord; | ||||||
| import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord; | import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord; | ||||||
| import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientConnectionDAO; | import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientConnectionDAO; | ||||||
|  | @ -70,14 +69,13 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic | ||||||
|     private final SEBClientConfigDAO sebClientConfigDAO; |     private final SEBClientConfigDAO sebClientConfigDAO; | ||||||
|     private final SEBClientInstructionService sebInstructionService; |     private final SEBClientInstructionService sebInstructionService; | ||||||
|     private final SEBClientNotificationService sebClientNotificationService; |     private final SEBClientNotificationService sebClientNotificationService; | ||||||
|     private final WebserviceInfo webserviceInfo; |  | ||||||
|     private final ExamAdminService examAdminService; |     private final ExamAdminService examAdminService; | ||||||
|     private final DistributedPingCache distributedPingCache; |     private final DistributedPingCache distributedPingCache; | ||||||
|  |     private final boolean isDistributedSetup; | ||||||
| 
 | 
 | ||||||
|     protected SEBClientConnectionServiceImpl( |     protected SEBClientConnectionServiceImpl( | ||||||
|             final ExamSessionService examSessionService, |             final ExamSessionService examSessionService, | ||||||
|             final EventHandlingStrategyFactory eventHandlingStrategyFactory, |             final EventHandlingStrategyFactory eventHandlingStrategyFactory, | ||||||
| 
 |  | ||||||
|             final SEBClientConfigDAO sebClientConfigDAO, |             final SEBClientConfigDAO sebClientConfigDAO, | ||||||
|             final SEBClientInstructionService sebInstructionService, |             final SEBClientInstructionService sebInstructionService, | ||||||
|             final SEBClientNotificationService sebClientNotificationService, |             final SEBClientNotificationService sebClientNotificationService, | ||||||
|  | @ -92,9 +90,9 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic | ||||||
|         this.sebClientConfigDAO = sebClientConfigDAO; |         this.sebClientConfigDAO = sebClientConfigDAO; | ||||||
|         this.sebInstructionService = sebInstructionService; |         this.sebInstructionService = sebInstructionService; | ||||||
|         this.sebClientNotificationService = sebClientNotificationService; |         this.sebClientNotificationService = sebClientNotificationService; | ||||||
|         this.webserviceInfo = sebInstructionService.getWebserviceInfo(); |  | ||||||
|         this.examAdminService = examAdminService; |         this.examAdminService = examAdminService; | ||||||
|         this.distributedPingCache = distributedPingCache; |         this.distributedPingCache = distributedPingCache; | ||||||
|  |         this.isDistributedSetup = sebInstructionService.getWebserviceInfo().isDistributed(); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     @Override |     @Override | ||||||
|  | @ -475,7 +473,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic | ||||||
|             } |             } | ||||||
| 
 | 
 | ||||||
|             // delete stored ping if this is a distributed setup |             // delete stored ping if this is a distributed setup | ||||||
|             if (this.webserviceInfo.isDistributed()) { |             if (this.isDistributedSetup) { | ||||||
|                 this.distributedPingCache |                 this.distributedPingCache | ||||||
|                         .deletePingForConnection(updatedClientConnection.id); |                         .deletePingForConnection(updatedClientConnection.id); | ||||||
|             } |             } | ||||||
|  | @ -529,7 +527,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic | ||||||
|             } |             } | ||||||
| 
 | 
 | ||||||
|             // delete stored ping if this is a distributed setup |             // delete stored ping if this is a distributed setup | ||||||
|             if (this.webserviceInfo.isDistributed()) { |             if (this.isDistributedSetup) { | ||||||
|                 this.distributedPingCache |                 this.distributedPingCache | ||||||
|                         .deletePingForConnection(updatedClientConnection.id); |                         .deletePingForConnection(updatedClientConnection.id); | ||||||
|             } |             } | ||||||
|  | @ -543,7 +541,6 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic | ||||||
|     public void updatePingEvents() { |     public void updatePingEvents() { | ||||||
|         try { |         try { | ||||||
| 
 | 
 | ||||||
|             final boolean distributed = this.webserviceInfo.isDistributed(); |  | ||||||
|             final Cache cache = this.cacheManager.getCache(ExamSessionCacheService.CACHE_NAME_ACTIVE_CLIENT_CONNECTION); |             final Cache cache = this.cacheManager.getCache(ExamSessionCacheService.CACHE_NAME_ACTIVE_CLIENT_CONNECTION); | ||||||
|             final long now = Utils.getMillisecondsNow(); |             final long now = Utils.getMillisecondsNow(); | ||||||
|             final Consumer<ClientConnectionDataInternal> missingPingUpdate = missingPingUpdate(now); |             final Consumer<ClientConnectionDataInternal> missingPingUpdate = missingPingUpdate(now); | ||||||
|  | @ -552,7 +549,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic | ||||||
|                     .allRunningExamIds() |                     .allRunningExamIds() | ||||||
|                     .getOrThrow() |                     .getOrThrow() | ||||||
|                     .stream() |                     .stream() | ||||||
|                     .flatMap(examId -> distributed |                     .flatMap(examId -> this.isDistributedSetup | ||||||
|                             ? this.clientConnectionDAO |                             ? this.clientConnectionDAO | ||||||
|                                     .getConnectionTokensNoCache(examId) |                                     .getConnectionTokensNoCache(examId) | ||||||
|                                     .getOrThrow() |                                     .getOrThrow() | ||||||
|  | @ -581,16 +578,26 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic | ||||||
|     public String notifyPing( |     public String notifyPing( | ||||||
|             final String connectionToken, |             final String connectionToken, | ||||||
|             final long timestamp, |             final long timestamp, | ||||||
|             final int pingNumber) { |             final int pingNumber, | ||||||
|  |             final String instructionConfirm) { | ||||||
|  | 
 | ||||||
|  |         processPing(connectionToken, timestamp, pingNumber); | ||||||
|  | 
 | ||||||
|  |         if (instructionConfirm != null) { | ||||||
|  |             this.sebInstructionService.confirmInstructionDone(connectionToken, instructionConfirm); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         return this.sebInstructionService.getInstructionJSON(connectionToken); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     private void processPing(final String connectionToken, final long timestamp, final int pingNumber) { | ||||||
| 
 | 
 | ||||||
|         final ClientConnectionDataInternal activeClientConnection = |         final ClientConnectionDataInternal activeClientConnection = | ||||||
|                 this.examSessionService.getConnectionDataInternal(connectionToken); |                 this.examSessionCacheService.getClientConnection(connectionToken); | ||||||
| 
 | 
 | ||||||
|         if (activeClientConnection != null) { |         if (activeClientConnection != null) { | ||||||
|             activeClientConnection.notifyPing(timestamp, pingNumber); |             activeClientConnection.notifyPing(timestamp, pingNumber); | ||||||
|         } |         } | ||||||
| 
 |  | ||||||
|         return this.sebInstructionService.getInstructionJSON(connectionToken); |  | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     @Override |     @Override | ||||||
|  | @ -732,7 +739,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     private void checkExamIntegrity(final Long examId) { |     private void checkExamIntegrity(final Long examId) { | ||||||
|         if (this.webserviceInfo.isDistributed()) { |         if (this.isDistributedSetup) { | ||||||
|             // if the cached Exam is not up to date anymore, we have to update the cache first |             // if the cached Exam is not up to date anymore, we have to update the cache first | ||||||
|             final Result<Exam> updateExamCache = this.examSessionService.updateExamCache(examId); |             final Result<Exam> updateExamCache = this.examSessionService.updateExamCache(examId); | ||||||
|             if (updateExamCache.hasError()) { |             if (updateExamCache.hasError()) { | ||||||
|  | @ -779,7 +786,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic | ||||||
|             if (clientEventRecord != null) { |             if (clientEventRecord != null) { | ||||||
|                 // store event and and flush cache |                 // store event and and flush cache | ||||||
|                 this.eventHandlingStrategy.accept(clientEventRecord); |                 this.eventHandlingStrategy.accept(clientEventRecord); | ||||||
|                 if (this.webserviceInfo.isDistributed()) { |                 if (this.isDistributedSetup) { | ||||||
|                     // mark for update and flush the cache |                     // mark for update and flush the cache | ||||||
|                     this.clientConnectionDAO.save(connection.clientConnection); |                     this.clientConnectionDAO.save(connection.clientConnection); | ||||||
|                     this.examSessionCacheService.evictClientConnection( |                     this.examSessionCacheService.evictClientConnection( | ||||||
|  |  | ||||||
|  | @ -10,21 +10,16 @@ package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator; | ||||||
| 
 | 
 | ||||||
| import org.joda.time.DateTimeUtils; | import org.joda.time.DateTimeUtils; | ||||||
| 
 | 
 | ||||||
| import ch.ethz.seb.sebserver.gbl.Constants; |  | ||||||
| import ch.ethz.seb.sebserver.gbl.model.exam.Indicator; | import ch.ethz.seb.sebserver.gbl.model.exam.Indicator; | ||||||
| import ch.ethz.seb.sebserver.webservice.servicelayer.session.ClientIndicator; | import ch.ethz.seb.sebserver.webservice.servicelayer.session.ClientIndicator; | ||||||
| 
 | 
 | ||||||
| public abstract class AbstractClientIndicator implements ClientIndicator { | public abstract class AbstractClientIndicator implements ClientIndicator { | ||||||
| 
 | 
 | ||||||
|     private static final long PERSISTENT_UPDATE_INTERVAL = Constants.SECOND_IN_MILLIS; |  | ||||||
| 
 |  | ||||||
|     protected Long indicatorId; |     protected Long indicatorId; | ||||||
|     protected Long examId; |     protected Long examId; | ||||||
|     protected Long connectionId; |     protected Long connectionId; | ||||||
|     protected boolean cachingEnabled; |     protected boolean cachingEnabled; | ||||||
|     protected boolean active = true; |     protected boolean active = true; | ||||||
|     protected long persistentUpdateInterval = PERSISTENT_UPDATE_INTERVAL; |  | ||||||
|     protected long lastPersistentUpdate = 0; |  | ||||||
| 
 | 
 | ||||||
|     protected boolean valueInitializes = false; |     protected boolean valueInitializes = false; | ||||||
|     protected double currentValue = Double.NaN; |     protected double currentValue = Double.NaN; | ||||||
|  | @ -72,15 +67,11 @@ public abstract class AbstractClientIndicator implements ClientIndicator { | ||||||
|         final long now = DateTimeUtils.currentTimeMillis(); |         final long now = DateTimeUtils.currentTimeMillis(); | ||||||
|         if (!this.valueInitializes) { |         if (!this.valueInitializes) { | ||||||
|             this.currentValue = computeValueAt(now); |             this.currentValue = computeValueAt(now); | ||||||
|             this.lastPersistentUpdate = now; |  | ||||||
|             this.valueInitializes = true; |             this.valueInitializes = true; | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         if (!this.cachingEnabled && this.active) { |         if (!this.cachingEnabled && this.active) { | ||||||
|             if (now - this.lastPersistentUpdate > this.persistentUpdateInterval) { |  | ||||||
|             this.currentValue = computeValueAt(now); |             this.currentValue = computeValueAt(now); | ||||||
|                 this.lastPersistentUpdate = now; |  | ||||||
|             } |  | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         return this.currentValue; |         return this.currentValue; | ||||||
|  |  | ||||||
|  | @ -24,11 +24,16 @@ import ch.ethz.seb.sebserver.gbl.util.Utils; | ||||||
| 
 | 
 | ||||||
| public abstract class AbstractLogIndicator extends AbstractClientIndicator { | public abstract class AbstractLogIndicator extends AbstractClientIndicator { | ||||||
| 
 | 
 | ||||||
|  |     protected static final Long DISTRIBUTED_LOG_UPDATE_INTERVAL = 5 * Constants.SECOND_IN_MILLIS; | ||||||
|  | 
 | ||||||
|     protected final Set<EventType> observed; |     protected final Set<EventType> observed; | ||||||
|     protected final List<Integer> eventTypeIds; |     protected final List<Integer> eventTypeIds; | ||||||
|     protected String[] tags; |     protected String[] tags; | ||||||
| 
 | 
 | ||||||
|  |     protected long lastDistributedUpdate = 0L; | ||||||
|  | 
 | ||||||
|     protected AbstractLogIndicator(final EventType... eventTypes) { |     protected AbstractLogIndicator(final EventType... eventTypes) { | ||||||
|  | 
 | ||||||
|         this.observed = Collections.unmodifiableSet(EnumSet.of(eventTypes[0], eventTypes)); |         this.observed = Collections.unmodifiableSet(EnumSet.of(eventTypes[0], eventTypes)); | ||||||
|         this.eventTypeIds = Utils.immutableListOf(Arrays.stream(eventTypes) |         this.eventTypeIds = Utils.immutableListOf(Arrays.stream(eventTypes) | ||||||
|                 .map(et -> et.id) |                 .map(et -> et.id) | ||||||
|  | @ -44,7 +49,6 @@ public abstract class AbstractLogIndicator extends AbstractClientIndicator { | ||||||
|             final boolean cachingEnabled) { |             final boolean cachingEnabled) { | ||||||
| 
 | 
 | ||||||
|         super.init(indicatorDefinition, connectionId, active, cachingEnabled); |         super.init(indicatorDefinition, connectionId, active, cachingEnabled); | ||||||
|         super.persistentUpdateInterval = 2 * Constants.SECOND_IN_MILLIS; |  | ||||||
| 
 | 
 | ||||||
|         if (indicatorDefinition == null || StringUtils.isBlank(indicatorDefinition.tags)) { |         if (indicatorDefinition == null || StringUtils.isBlank(indicatorDefinition.tags)) { | ||||||
|             this.tags = null; |             this.tags = null; | ||||||
|  | @ -75,4 +79,12 @@ public abstract class AbstractLogIndicator extends AbstractClientIndicator { | ||||||
|         return this.observed; |         return this.observed; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     protected boolean loadFromPersistent(final long timestamp) { | ||||||
|  |         if (!super.valueInitializes) { | ||||||
|  |             return true; | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         return timestamp - this.lastDistributedUpdate > DISTRIBUTED_LOG_UPDATE_INTERVAL; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -57,6 +57,10 @@ public abstract class AbstractLogLevelCountIndicator extends AbstractLogIndicato | ||||||
|     @Override |     @Override | ||||||
|     public double computeValueAt(final long timestamp) { |     public double computeValueAt(final long timestamp) { | ||||||
| 
 | 
 | ||||||
|  |         if (!loadFromPersistent(timestamp)) { | ||||||
|  |             return super.currentValue; | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|         try { |         try { | ||||||
| 
 | 
 | ||||||
|             final Long errors = this.clientEventRecordMapper |             final Long errors = this.clientEventRecordMapper | ||||||
|  | @ -72,9 +76,12 @@ public abstract class AbstractLogLevelCountIndicator extends AbstractLogIndicato | ||||||
|                     .execute(); |                     .execute(); | ||||||
| 
 | 
 | ||||||
|             return errors.doubleValue(); |             return errors.doubleValue(); | ||||||
|  | 
 | ||||||
|         } catch (final Exception e) { |         } catch (final Exception e) { | ||||||
|             log.error("Failed to get indicator count from persistent storage: ", e); |             log.error("Failed to get indicator count from persistent storage: ", e); | ||||||
|             return super.currentValue; |             return super.currentValue; | ||||||
|  |         } finally { | ||||||
|  |             super.lastDistributedUpdate = timestamp; | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -62,8 +62,15 @@ public abstract class AbstractLogNumberIndicator extends AbstractLogIndicator { | ||||||
| 
 | 
 | ||||||
|     @Override |     @Override | ||||||
|     public double computeValueAt(final long timestamp) { |     public double computeValueAt(final long timestamp) { | ||||||
|  | 
 | ||||||
|  |         if (!loadFromPersistent(timestamp)) { | ||||||
|  |             return super.currentValue; | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|         try { |         try { | ||||||
| 
 | 
 | ||||||
|  |             System.out.println("************** loadFromPersistent"); | ||||||
|  | 
 | ||||||
|             final List<ClientEventRecord> execute = this.clientEventRecordMapper.selectByExample() |             final List<ClientEventRecord> execute = this.clientEventRecordMapper.selectByExample() | ||||||
|                     .where(ClientEventRecordDynamicSqlSupport.clientConnectionId, isEqualTo(this.connectionId)) |                     .where(ClientEventRecordDynamicSqlSupport.clientConnectionId, isEqualTo(this.connectionId)) | ||||||
|                     .and(ClientEventRecordDynamicSqlSupport.type, isIn(this.eventTypeIds)) |                     .and(ClientEventRecordDynamicSqlSupport.type, isIn(this.eventTypeIds)) | ||||||
|  | @ -90,6 +97,8 @@ public abstract class AbstractLogNumberIndicator extends AbstractLogIndicator { | ||||||
|         } catch (final Exception e) { |         } catch (final Exception e) { | ||||||
|             log.error("Failed to get indicator number from persistent storage: {}", e.getMessage()); |             log.error("Failed to get indicator number from persistent storage: {}", e.getMessage()); | ||||||
|             return this.currentValue; |             return this.currentValue; | ||||||
|  |         } finally { | ||||||
|  |             super.lastDistributedUpdate = timestamp; | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -11,34 +11,38 @@ package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator; | ||||||
| import java.util.Collections; | import java.util.Collections; | ||||||
| import java.util.EnumSet; | import java.util.EnumSet; | ||||||
| import java.util.Set; | import java.util.Set; | ||||||
|  | import java.util.concurrent.Executor; | ||||||
| 
 | 
 | ||||||
| import org.joda.time.DateTime; |  | ||||||
| import org.joda.time.DateTimeUtils; |  | ||||||
| import org.joda.time.DateTimeZone; |  | ||||||
| import org.slf4j.Logger; | import org.slf4j.Logger; | ||||||
| import org.slf4j.LoggerFactory; | import org.slf4j.LoggerFactory; | ||||||
|  | import org.springframework.beans.factory.annotation.Qualifier; | ||||||
| 
 | 
 | ||||||
| import ch.ethz.seb.sebserver.gbl.Constants; | import ch.ethz.seb.sebserver.gbl.async.AsyncServiceSpringConfig; | ||||||
| import ch.ethz.seb.sebserver.gbl.model.exam.Indicator; | import ch.ethz.seb.sebserver.gbl.model.exam.Indicator; | ||||||
|  | import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType; | ||||||
|  | import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent; | ||||||
| import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType; | import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType; | ||||||
|  | import ch.ethz.seb.sebserver.gbl.util.Utils; | ||||||
|  | import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientEventLastPingMapper; | ||||||
| import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord; | import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord; | ||||||
| 
 | 
 | ||||||
| public abstract class AbstractPingIndicator extends AbstractClientIndicator { | public abstract class AbstractPingIndicator extends AbstractClientIndicator { | ||||||
| 
 | 
 | ||||||
|     private static final Logger log = LoggerFactory.getLogger(AbstractPingIndicator.class); |     private static final Logger log = LoggerFactory.getLogger(AbstractPingIndicator.class); | ||||||
| 
 | 
 | ||||||
|     private static final long INTERVAL_FOR_PERSISTENT_UPDATE = Constants.SECOND_IN_MILLIS; |  | ||||||
| 
 |  | ||||||
|     private final Set<EventType> EMPTY_SET = Collections.unmodifiableSet(EnumSet.noneOf(EventType.class)); |     private final Set<EventType> EMPTY_SET = Collections.unmodifiableSet(EnumSet.noneOf(EventType.class)); | ||||||
| 
 | 
 | ||||||
|  |     private final Executor executor; | ||||||
|     protected final DistributedPingCache distributedPingCache; |     protected final DistributedPingCache distributedPingCache; | ||||||
| 
 | 
 | ||||||
|     private final long lastUpdate = 0; |     //protected Long pingRecord = null; | ||||||
|     protected Long pingRecord = null; |     protected PingUpdate pingUpdate = null; | ||||||
| 
 |  | ||||||
|     protected AbstractPingIndicator(final DistributedPingCache distributedPingCache) { |  | ||||||
| 
 | 
 | ||||||
|  |     protected AbstractPingIndicator( | ||||||
|  |             final DistributedPingCache distributedPingCache, | ||||||
|  |             @Qualifier(AsyncServiceSpringConfig.EXAM_API_PING_SERVICE_EXECUTOR_BEAN_NAME) final Executor executor) { | ||||||
|         super(); |         super(); | ||||||
|  |         this.executor = executor; | ||||||
|         this.distributedPingCache = distributedPingCache; |         this.distributedPingCache = distributedPingCache; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  | @ -53,33 +57,31 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator { | ||||||
| 
 | 
 | ||||||
|         if (!this.cachingEnabled && this.active) { |         if (!this.cachingEnabled && this.active) { | ||||||
|             try { |             try { | ||||||
|                 this.pingRecord = this.distributedPingCache.initPingForConnection(this.connectionId); |                 createPingUpdate(); | ||||||
|             } catch (final Exception e) { |             } catch (final Exception e) { | ||||||
|                 this.pingRecord = this.distributedPingCache.getPingRecordIdForConnectionId(connectionId); |                 createPingUpdate(); | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     public final void notifyPing(final long timestamp, final int pingNumber) { |     public final void notifyPing(final long timestamp, final int pingNumber) { | ||||||
|         final long now = DateTime.now(DateTimeZone.UTC).getMillis(); |         super.currentValue = timestamp; | ||||||
|         super.currentValue = now; |  | ||||||
|         super.lastPersistentUpdate = now; |  | ||||||
| 
 | 
 | ||||||
|         if (!this.cachingEnabled) { |         if (!this.cachingEnabled) { | ||||||
| 
 | 
 | ||||||
|             if (this.pingRecord == null) { |             if (this.pingUpdate == null) { | ||||||
|                 tryRecoverPingRecord(); |                 tryRecoverPingRecord(); | ||||||
|                 if (this.pingRecord == null) { |                 if (this.pingUpdate == null) { | ||||||
|                     return; |                     return; | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
| 
 | 
 | ||||||
|             // Update last ping time on persistent storage |             // Update last ping time on persistent storage asynchronously within a defines thread pool with no | ||||||
|             final long millisecondsNow = DateTimeUtils.currentTimeMillis(); |             // waiting queue to skip further ping updates if all update threads are busy | ||||||
|             if (millisecondsNow - this.lastUpdate > INTERVAL_FOR_PERSISTENT_UPDATE) { |             try { | ||||||
|                 synchronized (this) { |                 this.executor.execute(this.pingUpdate); | ||||||
|                     this.distributedPingCache.updatePing(this.pingRecord, millisecondsNow); |             } catch (final Exception e) { | ||||||
|                 } |                 //log.warn("Failed to schedule ping task: {}" + e.getMessage()); | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  | @ -91,15 +93,21 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator { | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         try { |         try { | ||||||
|             this.pingRecord = this.distributedPingCache.getPingRecordIdForConnectionId(this.connectionId); |             createPingUpdate(); | ||||||
|             if (this.pingRecord == null) { |             if (this.pingUpdate == null) { | ||||||
|                 this.pingRecord = this.distributedPingCache.initPingForConnection(this.connectionId); |                 createPingUpdate(); | ||||||
|             } |             } | ||||||
|         } catch (final Exception e) { |         } catch (final Exception e) { | ||||||
|             log.error("Failed to recover ping record for connection: {}", this.connectionId, e); |             log.error("Failed to recover ping record for connection: {}", this.connectionId, e); | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     private void createPingUpdate() { | ||||||
|  |         this.pingUpdate = new PingUpdate( | ||||||
|  |                 this.distributedPingCache.getClientEventLastPingMapper(), | ||||||
|  |                 this.distributedPingCache.initPingForConnection(this.connectionId)); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     @Override |     @Override | ||||||
|     public Set<EventType> observedEvents() { |     public Set<EventType> observedEvents() { | ||||||
|         return this.EMPTY_SET; |         return this.EMPTY_SET; | ||||||
|  | @ -107,4 +115,46 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator { | ||||||
| 
 | 
 | ||||||
|     public abstract ClientEventRecord updateLogEvent(final long now); |     public abstract ClientEventRecord updateLogEvent(final long now); | ||||||
| 
 | 
 | ||||||
|  |     @Override | ||||||
|  |     public double computeValueAt(final long timestamp) { | ||||||
|  |         // TODO Auto-generated method stub | ||||||
|  |         return 0; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void notifyValueChange(final ClientEvent event) { | ||||||
|  |         // TODO Auto-generated method stub | ||||||
|  | 
 | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public void notifyValueChange(final ClientEventRecord clientEventRecord) { | ||||||
|  |         // TODO Auto-generated method stub | ||||||
|  | 
 | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     @Override | ||||||
|  |     public IndicatorType getType() { | ||||||
|  |         // TODO Auto-generated method stub | ||||||
|  |         return null; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     static final class PingUpdate implements Runnable { | ||||||
|  | 
 | ||||||
|  |         private final ClientEventLastPingMapper clientEventLastPingMapper; | ||||||
|  |         final Long pingRecord; | ||||||
|  | 
 | ||||||
|  |         public PingUpdate(final ClientEventLastPingMapper clientEventLastPingMapper, final Long pingRecord) { | ||||||
|  |             this.clientEventLastPingMapper = clientEventLastPingMapper; | ||||||
|  |             this.pingRecord = pingRecord; | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         @Override | ||||||
|  |         public void run() { | ||||||
|  |             this.clientEventLastPingMapper | ||||||
|  |                     .updatePingTime(this.pingRecord, Utils.getMillisecondsNow()); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |     } | ||||||
|  | 
 | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -12,11 +12,12 @@ import static org.mybatis.dynamic.sql.SqlBuilder.isEqualTo; | ||||||
| import static org.mybatis.dynamic.sql.SqlBuilder.isIn; | import static org.mybatis.dynamic.sql.SqlBuilder.isIn; | ||||||
| 
 | 
 | ||||||
| import java.util.ArrayList; | import java.util.ArrayList; | ||||||
|  | import java.util.Collection; | ||||||
| import java.util.Map; | import java.util.Map; | ||||||
|  | import java.util.concurrent.ConcurrentHashMap; | ||||||
| import java.util.concurrent.ScheduledFuture; | import java.util.concurrent.ScheduledFuture; | ||||||
| import java.util.stream.Collectors; | import java.util.stream.Collectors; | ||||||
| 
 | 
 | ||||||
| import org.ehcache.impl.internal.concurrent.ConcurrentHashMap; |  | ||||||
| import org.joda.time.DateTimeUtils; | import org.joda.time.DateTimeUtils; | ||||||
| import org.slf4j.Logger; | import org.slf4j.Logger; | ||||||
| import org.slf4j.LoggerFactory; | import org.slf4j.LoggerFactory; | ||||||
|  | @ -25,13 +26,14 @@ import org.springframework.beans.factory.annotation.Value; | ||||||
| import org.springframework.context.annotation.Lazy; | import org.springframework.context.annotation.Lazy; | ||||||
| import org.springframework.scheduling.TaskScheduler; | import org.springframework.scheduling.TaskScheduler; | ||||||
| import org.springframework.stereotype.Component; | import org.springframework.stereotype.Component; | ||||||
| import org.springframework.transaction.annotation.Isolation; |  | ||||||
| import org.springframework.transaction.annotation.Transactional; | import org.springframework.transaction.annotation.Transactional; | ||||||
| 
 | 
 | ||||||
| import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType; | import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType; | ||||||
| import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile; | import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile; | ||||||
|  | import ch.ethz.seb.sebserver.gbl.util.Utils; | ||||||
| import ch.ethz.seb.sebserver.webservice.WebserviceInfo; | import ch.ethz.seb.sebserver.webservice.WebserviceInfo; | ||||||
| import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientEventLastPingMapper; | import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientEventLastPingMapper; | ||||||
|  | import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientEventLastPingMapper.ClientEventLastPingRecord; | ||||||
| import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordDynamicSqlSupport; | import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordDynamicSqlSupport; | ||||||
| import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordMapper; | import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordMapper; | ||||||
| import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord; | import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord; | ||||||
|  | @ -45,9 +47,11 @@ public class DistributedPingCache implements DisposableBean { | ||||||
| 
 | 
 | ||||||
|     private final ClientEventLastPingMapper clientEventLastPingMapper; |     private final ClientEventLastPingMapper clientEventLastPingMapper; | ||||||
|     private final ClientEventRecordMapper clientEventRecordMapper; |     private final ClientEventRecordMapper clientEventRecordMapper; | ||||||
|     private ScheduledFuture<?> taskRef; |     private final long pingUpdateTolerance; | ||||||
| 
 | 
 | ||||||
|  |     private ScheduledFuture<?> taskRef; | ||||||
|     private final Map<Long, Long> pingCache = new ConcurrentHashMap<>(); |     private final Map<Long, Long> pingCache = new ConcurrentHashMap<>(); | ||||||
|  |     private long lastUpdate = 0L; | ||||||
| 
 | 
 | ||||||
|     public DistributedPingCache( |     public DistributedPingCache( | ||||||
|             final ClientEventLastPingMapper clientEventLastPingMapper, |             final ClientEventLastPingMapper clientEventLastPingMapper, | ||||||
|  | @ -58,9 +62,10 @@ public class DistributedPingCache implements DisposableBean { | ||||||
| 
 | 
 | ||||||
|         this.clientEventLastPingMapper = clientEventLastPingMapper; |         this.clientEventLastPingMapper = clientEventLastPingMapper; | ||||||
|         this.clientEventRecordMapper = clientEventRecordMapper; |         this.clientEventRecordMapper = clientEventRecordMapper; | ||||||
|  |         this.pingUpdateTolerance = pingUpdate * 2 / 3; | ||||||
|         if (webserviceInfo.isDistributed()) { |         if (webserviceInfo.isDistributed()) { | ||||||
|             try { |             try { | ||||||
|                 this.taskRef = taskScheduler.scheduleAtFixedRate(this::updateCache, pingUpdate); |                 this.taskRef = taskScheduler.scheduleAtFixedRate(this::updatePings, pingUpdate); | ||||||
|             } catch (final Exception e) { |             } catch (final Exception e) { | ||||||
|                 log.error("Failed to initialize distributed ping cache update task"); |                 log.error("Failed to initialize distributed ping cache update task"); | ||||||
|                 this.taskRef = null; |                 this.taskRef = null; | ||||||
|  | @ -70,6 +75,10 @@ public class DistributedPingCache implements DisposableBean { | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     public ClientEventLastPingMapper getClientEventLastPingMapper() { | ||||||
|  |         return this.clientEventLastPingMapper; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     @Transactional |     @Transactional | ||||||
|     public Long initPingForConnection(final Long connectionId) { |     public Long initPingForConnection(final Long connectionId) { | ||||||
|         try { |         try { | ||||||
|  | @ -129,17 +138,6 @@ public class DistributedPingCache implements DisposableBean { | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     public void updatePing(final Long pingRecordId, final Long pingTime) { |  | ||||||
|         try { |  | ||||||
| 
 |  | ||||||
|             this.clientEventLastPingMapper |  | ||||||
|                     .updatePingTime(pingRecordId, pingTime); |  | ||||||
| 
 |  | ||||||
|         } catch (final Exception e) { |  | ||||||
|             log.error("Failed to update ping for ping record id -> {}", pingRecordId); |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     @Transactional |     @Transactional | ||||||
|     public void deletePingForConnection(final Long connectionId) { |     public void deletePingForConnection(final Long connectionId) { | ||||||
|         try { |         try { | ||||||
|  | @ -148,33 +146,49 @@ public class DistributedPingCache implements DisposableBean { | ||||||
|                 log.debug("*** Delete ping record for SEB connection: {}", connectionId); |                 log.debug("*** Delete ping record for SEB connection: {}", connectionId); | ||||||
|             } |             } | ||||||
| 
 | 
 | ||||||
|             this.clientEventRecordMapper |             final Collection<ClientEventLastPingRecord> records = this.clientEventLastPingMapper | ||||||
|                     .deleteByExample() |                     .selectByExample() | ||||||
|                     .where(ClientEventRecordDynamicSqlSupport.clientConnectionId, isEqualTo(connectionId)) |                     .where(ClientEventRecordDynamicSqlSupport.clientConnectionId, isEqualTo(connectionId)) | ||||||
|                     .and(ClientEventRecordDynamicSqlSupport.type, isEqualTo(EventType.LAST_PING.id)) |                     .and(ClientEventRecordDynamicSqlSupport.type, isEqualTo(EventType.LAST_PING.id)) | ||||||
|                     .build() |                     .build() | ||||||
|                     .execute(); |                     .execute(); | ||||||
| 
 | 
 | ||||||
|  |             if (records == null || records.isEmpty()) { | ||||||
|  |                 return; | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             final Long id = records.iterator().next().id; | ||||||
|  |             this.pingCache.remove(id); | ||||||
|  |             this.clientEventRecordMapper.deleteByPrimaryKey(id); | ||||||
|  | 
 | ||||||
|         } catch (final Exception e) { |         } catch (final Exception e) { | ||||||
|             log.error("Failed to delete ping for connection -> {}", connectionId, e); |             log.error("Failed to delete ping for connection -> {}", connectionId, e); | ||||||
|         } finally { |             try { | ||||||
|             this.pingCache.remove(connectionId); |                 log.info( | ||||||
|  |                         "Because of failed ping record deletion, " | ||||||
|  |                                 + "flushing the ping cache to ensure no dead connections pings remain in the cache"); | ||||||
|  |                 this.pingCache.clear(); | ||||||
|  |             } catch (final Exception ee) { | ||||||
|  |                 log.error("Failed to force flushing the ping cache: ", e); | ||||||
|  |             } | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     public Long getLastPing(final Long pingRecordId) { |     public Long getLastPing(final Long pingRecordId, final boolean missing) { | ||||||
|         try { |         try { | ||||||
|             Long ping = this.pingCache.get(pingRecordId); |             Long ping = this.pingCache.get(pingRecordId); | ||||||
|             if (ping == null) { |             if (ping == null && !missing) { | ||||||
| 
 | 
 | ||||||
|                 if (log.isDebugEnabled()) { |                 if (log.isDebugEnabled()) { | ||||||
|                     log.debug("*** Get and cache ping time: {}", pingRecordId); |                     log.debug("*** Get and cache ping time: {}", pingRecordId); | ||||||
|                 } |                 } | ||||||
| 
 | 
 | ||||||
|                 ping = this.clientEventLastPingMapper.selectPingTimeByPrimaryKey(pingRecordId); |                 ping = this.clientEventLastPingMapper.selectPingTimeByPrimaryKey(pingRecordId); | ||||||
|                 if (ping != null) { |  | ||||||
|                     this.pingCache.put(pingRecordId, ping); |  | ||||||
|             } |             } | ||||||
|  | 
 | ||||||
|  |             // if we have a missing ping we need to check new ping from next update even if the cache was empty | ||||||
|  |             if (ping != null || missing) { | ||||||
|  |                 this.pingCache.put(pingRecordId, ping); | ||||||
|             } |             } | ||||||
| 
 | 
 | ||||||
|             return ping; |             return ping; | ||||||
|  | @ -184,18 +198,24 @@ public class DistributedPingCache implements DisposableBean { | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     @Transactional(readOnly = true, isolation = Isolation.READ_UNCOMMITTED) |     private void updatePings() { | ||||||
|     public void updateCache() { |  | ||||||
| 
 | 
 | ||||||
|         if (this.pingCache.isEmpty()) { |         if (this.pingCache.isEmpty()) { | ||||||
|             return; |             return; | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|  |         final long millisecondsNow = Utils.getMillisecondsNow(); | ||||||
|  |         if (millisecondsNow - this.lastUpdate < this.pingUpdateTolerance) { | ||||||
|  |             log.warn("Skip ping update schedule because the last one was less then 2 seconds ago"); | ||||||
|  |             return; | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|         if (log.isDebugEnabled()) { |         if (log.isDebugEnabled()) { | ||||||
|             log.trace("*** Update distributed ping cache: {}", this.pingCache); |             log.trace("*** Update distributed ping cache: {}", this.pingCache); | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         try { |         try { | ||||||
|  | 
 | ||||||
|             final ArrayList<Long> pks = new ArrayList<>(this.pingCache.keySet()); |             final ArrayList<Long> pks = new ArrayList<>(this.pingCache.keySet()); | ||||||
|             final Map<Long, Long> mapping = this.clientEventLastPingMapper |             final Map<Long, Long> mapping = this.clientEventLastPingMapper | ||||||
|                     .selectByExample() |                     .selectByExample() | ||||||
|  | @ -215,6 +235,8 @@ public class DistributedPingCache implements DisposableBean { | ||||||
|         } catch (final Exception e) { |         } catch (final Exception e) { | ||||||
|             log.error("Error while trying to update distributed ping cache: {}", this.pingCache, e); |             log.error("Error while trying to update distributed ping cache: {}", this.pingCache, e); | ||||||
|         } |         } | ||||||
|  | 
 | ||||||
|  |         this.lastUpdate = millisecondsNow; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     @Override |     @Override | ||||||
|  |  | ||||||
|  | @ -10,10 +10,12 @@ package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator; | ||||||
| 
 | 
 | ||||||
| import java.math.BigDecimal; | import java.math.BigDecimal; | ||||||
| import java.util.Comparator; | import java.util.Comparator; | ||||||
|  | import java.util.concurrent.Executor; | ||||||
| 
 | 
 | ||||||
| import org.joda.time.DateTimeUtils; | import org.joda.time.DateTimeUtils; | ||||||
| import org.slf4j.Logger; | import org.slf4j.Logger; | ||||||
| import org.slf4j.LoggerFactory; | import org.slf4j.LoggerFactory; | ||||||
|  | import org.springframework.beans.factory.annotation.Qualifier; | ||||||
| import org.springframework.beans.factory.config.ConfigurableBeanFactory; | import org.springframework.beans.factory.config.ConfigurableBeanFactory; | ||||||
| import org.springframework.context.annotation.Lazy; | import org.springframework.context.annotation.Lazy; | ||||||
| import org.springframework.context.annotation.Scope; | import org.springframework.context.annotation.Scope; | ||||||
|  | @ -22,6 +24,7 @@ import org.springframework.stereotype.Component; | ||||||
| import com.fasterxml.jackson.annotation.JsonIgnore; | import com.fasterxml.jackson.annotation.JsonIgnore; | ||||||
| 
 | 
 | ||||||
| import ch.ethz.seb.sebserver.gbl.Constants; | import ch.ethz.seb.sebserver.gbl.Constants; | ||||||
|  | import ch.ethz.seb.sebserver.gbl.async.AsyncServiceSpringConfig; | ||||||
| import ch.ethz.seb.sebserver.gbl.model.exam.Indicator; | import ch.ethz.seb.sebserver.gbl.model.exam.Indicator; | ||||||
| import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType; | import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType; | ||||||
| import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent; | import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent; | ||||||
|  | @ -44,8 +47,10 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator { | ||||||
|     private boolean missingPing = false; |     private boolean missingPing = false; | ||||||
|     private boolean hidden = false; |     private boolean hidden = false; | ||||||
| 
 | 
 | ||||||
|     public PingIntervalClientIndicator(final DistributedPingCache distributedPingCache) { |     public PingIntervalClientIndicator( | ||||||
|         super(distributedPingCache); |             final DistributedPingCache distributedPingCache, | ||||||
|  |             @Qualifier(AsyncServiceSpringConfig.EXAM_API_PING_SERVICE_EXECUTOR_BEAN_NAME) final Executor executor) { | ||||||
|  |         super(distributedPingCache, executor); | ||||||
|         this.cachingEnabled = true; |         this.cachingEnabled = true; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  | @ -122,17 +127,13 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator { | ||||||
| 
 | 
 | ||||||
|     @Override |     @Override | ||||||
|     public final double computeValueAt(final long timestamp) { |     public final double computeValueAt(final long timestamp) { | ||||||
|  |         if (!this.cachingEnabled && super.pingUpdate != null) { | ||||||
| 
 | 
 | ||||||
|         if (!this.cachingEnabled && super.pingRecord != null) { |             final Long lastPing = this.distributedPingCache.getLastPing(super.pingUpdate.pingRecord, this.missingPing); | ||||||
| 
 |  | ||||||
|             // if this indicator is not missing ping |  | ||||||
|             if (!this.isMissingPing()) { |  | ||||||
|                 final Long lastPing = this.distributedPingCache.getLastPing(super.pingRecord); |  | ||||||
|             if (lastPing != null) { |             if (lastPing != null) { | ||||||
|                 final double doubleValue = lastPing.doubleValue(); |                 final double doubleValue = lastPing.doubleValue(); | ||||||
|                 return Math.max(Double.isNaN(this.currentValue) ? doubleValue : this.currentValue, doubleValue); |                 return Math.max(Double.isNaN(this.currentValue) ? doubleValue : this.currentValue, doubleValue); | ||||||
|             } |             } | ||||||
|             } |  | ||||||
| 
 | 
 | ||||||
|             return this.currentValue; |             return this.currentValue; | ||||||
|         } |         } | ||||||
|  |  | ||||||
|  | @ -105,8 +105,6 @@ public class ClientEventController extends ReadonlyEntityController<ClientEvent, | ||||||
|         final FilterMap filterMap = new FilterMap(allRequestParams, request.getQueryString()); |         final FilterMap filterMap = new FilterMap(allRequestParams, request.getQueryString()); | ||||||
|         populateFilterMap(filterMap, institutionId, sort); |         populateFilterMap(filterMap, institutionId, sort); | ||||||
| 
 | 
 | ||||||
|         try { |  | ||||||
| 
 |  | ||||||
|         return this.paginationService.getPage( |         return this.paginationService.getPage( | ||||||
|                 pageNumber, |                 pageNumber, | ||||||
|                 pageSize, |                 pageSize, | ||||||
|  | @ -114,10 +112,6 @@ public class ClientEventController extends ReadonlyEntityController<ClientEvent, | ||||||
|                 getSQLTableOfEntity().name(), |                 getSQLTableOfEntity().name(), | ||||||
|                 () -> this.clientEventDAO.allMatchingExtended(filterMap, this::hasReadAccess)) |                 () -> this.clientEventDAO.allMatchingExtended(filterMap, this::hasReadAccess)) | ||||||
|                 .getOrThrow(); |                 .getOrThrow(); | ||||||
|         } catch (final Exception e) { |  | ||||||
|             e.printStackTrace(); |  | ||||||
|             throw e; |  | ||||||
|         } |  | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     @Override |     @Override | ||||||
|  |  | ||||||
|  | @ -269,27 +269,25 @@ public class ExamAPI_V1_Controller { | ||||||
|         final String pingNumString = request.getParameter(API.EXAM_API_PING_NUMBER); |         final String pingNumString = request.getParameter(API.EXAM_API_PING_NUMBER); | ||||||
|         final String instructionConfirm = request.getParameter(API.EXAM_API_PING_INSTRUCTION_CONFIRM); |         final String instructionConfirm = request.getParameter(API.EXAM_API_PING_INSTRUCTION_CONFIRM); | ||||||
| 
 | 
 | ||||||
|         if (log.isTraceEnabled()) { |         long pingTime; | ||||||
|             log.trace("****************** SEB client connection: {} ip: {}", |         try { | ||||||
|                     connectionToken, |             pingTime = Long.parseLong(timeStampString); | ||||||
|                     getClientAddress(request)); |         } catch (final Exception e) { | ||||||
|         } |             log.error("Invalid ping request: {}", connectionToken); | ||||||
| 
 |             response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR.value()); | ||||||
|         if (instructionConfirm != null) { |             return; | ||||||
|             this.sebClientConnectionService.confirmInstructionDone(connectionToken, instructionConfirm); |  | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         final String instruction = this.sebClientConnectionService |         final String instruction = this.sebClientConnectionService | ||||||
|                 .notifyPing( |                 .notifyPing( | ||||||
|                         connectionToken, |                         connectionToken, | ||||||
|                         Long.parseLong(timeStampString), |                         pingTime, | ||||||
|                         pingNumString != null ? Integer.parseInt(pingNumString) : -1); |                         pingNumString != null ? Integer.parseInt(pingNumString) : -1, | ||||||
|  |                         instructionConfirm); | ||||||
| 
 | 
 | ||||||
|         if (instruction == null) { |         if (instruction == null) { | ||||||
|             response.setStatus(HttpStatus.NO_CONTENT.value()); |             response.setStatus(HttpStatus.NO_CONTENT.value()); | ||||||
|             return; |         } else { | ||||||
|         } |  | ||||||
| 
 |  | ||||||
|             try { |             try { | ||||||
|                 response.setStatus(HttpStatus.OK.value()); |                 response.setStatus(HttpStatus.OK.value()); | ||||||
|                 response.getOutputStream().write(instruction.getBytes()); |                 response.getOutputStream().write(instruction.getBytes()); | ||||||
|  | @ -297,6 +295,7 @@ public class ExamAPI_V1_Controller { | ||||||
|                 log.error("Failed to send instruction as response: {}", connectionToken, e); |                 log.error("Failed to send instruction as response: {}", connectionToken, e); | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|  |     } | ||||||
| 
 | 
 | ||||||
|     @RequestMapping( |     @RequestMapping( | ||||||
|             path = API.EXAM_API_EVENT_ENDPOINT, |             path = API.EXAM_API_EVENT_ENDPOINT, | ||||||
|  |  | ||||||
|  | @ -13,12 +13,13 @@ spring.datasource.hikari.initializationFailTimeout=30000 | ||||||
| spring.datasource.hikari.connectionTimeout=30000 | spring.datasource.hikari.connectionTimeout=30000 | ||||||
| spring.datasource.hikari.idleTimeout=600000 | spring.datasource.hikari.idleTimeout=600000 | ||||||
| spring.datasource.hikari.maxLifetime=1800000 | spring.datasource.hikari.maxLifetime=1800000 | ||||||
| spring.datasource.hikari.maximumPoolSize=500 | spring.datasource.hikari.maximumPoolSize=10 | ||||||
|  | spring.datasource.hikari.leakDetectionThreshold=2000 | ||||||
| 
 | 
 | ||||||
| sebserver.http.client.connect-timeout=15000 | sebserver.http.client.connect-timeout=15000 | ||||||
| sebserver.http.client.connection-request-timeout=10000 | sebserver.http.client.connection-request-timeout=10000 | ||||||
| sebserver.http.client.read-timeout=20000 | sebserver.http.client.read-timeout=20000 | ||||||
| 
 | sebserver.webservice.distributed.pingUpdate=2000 | ||||||
| sebserver.webservice.clean-db-on-startup=false | sebserver.webservice.clean-db-on-startup=false | ||||||
| 
 | 
 | ||||||
| # webservice configuration | # webservice configuration | ||||||
|  |  | ||||||
|  | @ -10,11 +10,16 @@ server.tomcat.uri-encoding=UTF-8 | ||||||
| logging.level.ch=INFO | logging.level.ch=INFO | ||||||
| logging.level.ch.ethz.seb.sebserver.webservice.datalayer=INFO | logging.level.ch.ethz.seb.sebserver.webservice.datalayer=INFO | ||||||
| logging.level.org.springframework.cache=INFO | logging.level.org.springframework.cache=INFO | ||||||
| logging.level.ch.ethz.seb.sebserver.webservice.servicelayer.lms.impl=DEBUG | logging.level.ch.ethz.seb.sebserver.webservice.servicelayer.lms.impl=INFO | ||||||
| logging.level.ch.ethz.seb.sebserver.webservice.servicelayer.session=DEBUG | logging.level.ch.ethz.seb.sebserver.webservice.servicelayer.session=INFO | ||||||
|  | logging.level.ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.proctoring=INFO | ||||||
|  | logging.level.ch.ethz.seb.sebserver.webservice.servicelayer.dao.impl=DEBUG | ||||||
|  | #logging.level.ch.ethz.seb.sebserver.webservice.datalayer.batis=DEBUG | ||||||
|  | #logging.level.ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper=DEBUG | ||||||
| #logging.level.ch.ethz.seb.sebserver.webservice.weblayer.api.ExamAPI_V1_Controller=TRACE | #logging.level.ch.ethz.seb.sebserver.webservice.weblayer.api.ExamAPI_V1_Controller=TRACE | ||||||
|  | logging.level.com.zaxxer.hikari=DEBUG | ||||||
| 
 | 
 | ||||||
| sebserver.http.client.connect-timeout=150000 | sebserver.http.client.connect-timeout=15000 | ||||||
| sebserver.http.client.connection-request-timeout=100000 | sebserver.http.client.connection-request-timeout=10000 | ||||||
| sebserver.http.client.read-timeout=200000 | sebserver.http.client.read-timeout=60000 | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -10,9 +10,6 @@ sebserver.init.adminaccount.username=sebserver-admin | ||||||
| sebserver.init.database.integrity.checks=true | sebserver.init.database.integrity.checks=true | ||||||
| sebserver.init.database.integrity.try-fix=true | sebserver.init.database.integrity.try-fix=true | ||||||
| 
 | 
 | ||||||
| sebserver.webservice.distributed=false |  | ||||||
| sebserver.webservice.distributed.pingUpdate=3000 |  | ||||||
| 
 |  | ||||||
| ### webservice caching | ### webservice caching | ||||||
| spring.cache.jcache.provider=org.ehcache.jsr107.EhcacheCachingProvider | spring.cache.jcache.provider=org.ehcache.jsr107.EhcacheCachingProvider | ||||||
| spring.cache.jcache.config=classpath:config/ehcache.xml | spring.cache.jcache.config=classpath:config/ehcache.xml | ||||||
|  | @ -31,7 +28,8 @@ spring.datasource.hikari.initializationFailTimeout=3000 | ||||||
| spring.datasource.hikari.connectionTimeout=30000 | spring.datasource.hikari.connectionTimeout=30000 | ||||||
| spring.datasource.hikari.idleTimeout=600000 | spring.datasource.hikari.idleTimeout=600000 | ||||||
| spring.datasource.hikari.maxLifetime=1800000 | spring.datasource.hikari.maxLifetime=1800000 | ||||||
| spring.datasource.hikari.maximumPoolSize=500 | spring.datasource.hikari.maximumPoolSize=100 | ||||||
|  | spring.datasource.hikari.leakDetectionThreshold=10000 | ||||||
| 
 | 
 | ||||||
| ### webservice security | ### webservice security | ||||||
| spring.datasource.password=${sebserver.mariadb.password} | spring.datasource.password=${sebserver.mariadb.password} | ||||||
|  | @ -41,6 +39,7 @@ sebserver.webservice.internalSecret=${sebserver.password} | ||||||
| ### webservice networking | ### webservice networking | ||||||
| sebserver.webservice.forceMaster=false | sebserver.webservice.forceMaster=false | ||||||
| sebserver.webservice.distributed=false | sebserver.webservice.distributed=false | ||||||
|  | sebserver.webservice.distributed.pingUpdate=3000 | ||||||
| sebserver.webservice.http.external.scheme=https | sebserver.webservice.http.external.scheme=https | ||||||
| sebserver.webservice.http.external.servername= | sebserver.webservice.http.external.servername= | ||||||
| sebserver.webservice.http.external.port= | sebserver.webservice.http.external.port= | ||||||
|  |  | ||||||
|  | @ -10,6 +10,8 @@ package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator; | ||||||
| 
 | 
 | ||||||
| import static org.junit.Assert.assertEquals; | import static org.junit.Assert.assertEquals; | ||||||
| 
 | 
 | ||||||
|  | import java.util.concurrent.Executor; | ||||||
|  | 
 | ||||||
| import org.joda.time.DateTimeUtils; | import org.joda.time.DateTimeUtils; | ||||||
| import org.junit.After; | import org.junit.After; | ||||||
| import org.junit.Test; | import org.junit.Test; | ||||||
|  | @ -34,9 +36,10 @@ public class PingIntervalClientIndicatorTest { | ||||||
| 
 | 
 | ||||||
|         final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class); |         final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class); | ||||||
|         final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class); |         final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class); | ||||||
|  |         final Executor executor = Mockito.mock(Executor.class); | ||||||
| 
 | 
 | ||||||
|         final PingIntervalClientIndicator pingIntervalClientIndicator = |         final PingIntervalClientIndicator pingIntervalClientIndicator = | ||||||
|                 new PingIntervalClientIndicator(distributedPingCache); |                 new PingIntervalClientIndicator(distributedPingCache, executor); | ||||||
|         assertEquals("0.0", String.valueOf(pingIntervalClientIndicator.getValue())); |         assertEquals("0.0", String.valueOf(pingIntervalClientIndicator.getValue())); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  | @ -47,9 +50,10 @@ public class PingIntervalClientIndicatorTest { | ||||||
| 
 | 
 | ||||||
|         final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class); |         final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class); | ||||||
|         final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class); |         final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class); | ||||||
|  |         final Executor executor = Mockito.mock(Executor.class); | ||||||
| 
 | 
 | ||||||
|         final PingIntervalClientIndicator pingIntervalClientIndicator = |         final PingIntervalClientIndicator pingIntervalClientIndicator = | ||||||
|                 new PingIntervalClientIndicator(distributedPingCache); |                 new PingIntervalClientIndicator(distributedPingCache, executor); | ||||||
|         assertEquals("0.0", String.valueOf(pingIntervalClientIndicator.getValue())); |         assertEquals("0.0", String.valueOf(pingIntervalClientIndicator.getValue())); | ||||||
| 
 | 
 | ||||||
|         DateTimeUtils.setCurrentMillisProvider(() -> 10L); |         DateTimeUtils.setCurrentMillisProvider(() -> 10L); | ||||||
|  | @ -63,9 +67,10 @@ public class PingIntervalClientIndicatorTest { | ||||||
| 
 | 
 | ||||||
|         final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class); |         final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class); | ||||||
|         final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class); |         final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class); | ||||||
|  |         final Executor executor = Mockito.mock(Executor.class); | ||||||
| 
 | 
 | ||||||
|         final PingIntervalClientIndicator pingIntervalClientIndicator = |         final PingIntervalClientIndicator pingIntervalClientIndicator = | ||||||
|                 new PingIntervalClientIndicator(distributedPingCache); |                 new PingIntervalClientIndicator(distributedPingCache, executor); | ||||||
|         final JSONMapper jsonMapper = new JSONMapper(); |         final JSONMapper jsonMapper = new JSONMapper(); | ||||||
|         final String json = jsonMapper.writeValueAsString(pingIntervalClientIndicator); |         final String json = jsonMapper.writeValueAsString(pingIntervalClientIndicator); | ||||||
|         assertEquals("{\"indicatorValue\":0.0,\"indicatorType\":\"LAST_PING\"}", json); |         assertEquals("{\"indicatorValue\":0.0,\"indicatorType\":\"LAST_PING\"}", json); | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		
		Reference in a new issue
	
	 anhefti
						anhefti