Merge remote-tracking branch 'origin/dev-1.2-pingCache' into dev-1.2

Conflicts:
	src/main/java/ch/ethz/seb/sebserver/ClientHttpRequestFactoryService.java
	src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ExamSessionServiceImpl.java
	src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java
	src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/indicator/DistributedPingCache.java
	src/main/java/ch/ethz/seb/sebserver/webservice/weblayer/api/ExamAPI_V1_Controller.java
	src/main/resources/config/application-dev-ws.properties
This commit is contained in:
anhefti 2021-11-24 10:45:39 +01:00
commit 46786af1d6
19 changed files with 254 additions and 136 deletions

View file

@ -76,7 +76,7 @@ public class ClientHttpRequestFactoryService {
final ClientCredentialService clientCredentialService, final ClientCredentialService clientCredentialService,
@Value("${sebserver.http.client.connect-timeout:15000}") final int connectTimeout, @Value("${sebserver.http.client.connect-timeout:15000}") final int connectTimeout,
@Value("${sebserver.http.client.connection-request-timeout:20000}") final int connectionRequestTimeout, @Value("${sebserver.http.client.connection-request-timeout:20000}") final int connectionRequestTimeout,
@Value("${sebserver.http.client.read-timeout:60000}") final int readTimeout) { @Value("${sebserver.http.client.read-timeout:20000}") final int readTimeout) {
this.environment = environment; this.environment = environment;
this.clientCredentialService = clientCredentialService; this.clientCredentialService = clientCredentialService;

View file

@ -17,6 +17,7 @@ import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@Configuration @Configuration
@EnableAsync @EnableAsync
@ -25,6 +26,7 @@ public class AsyncServiceSpringConfig implements AsyncConfigurer {
public static final String EXECUTOR_BEAN_NAME = "AsyncServiceExecutorBean"; public static final String EXECUTOR_BEAN_NAME = "AsyncServiceExecutorBean";
/** This ThreadPool is used for internal long running background tasks */
@Bean(name = EXECUTOR_BEAN_NAME) @Bean(name = EXECUTOR_BEAN_NAME)
public Executor threadPoolTaskExecutor() { public Executor threadPoolTaskExecutor() {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
@ -39,6 +41,9 @@ public class AsyncServiceSpringConfig implements AsyncConfigurer {
public static final String EXAM_API_EXECUTOR_BEAN_NAME = "ExamAPIAsyncServiceExecutorBean"; public static final String EXAM_API_EXECUTOR_BEAN_NAME = "ExamAPIAsyncServiceExecutorBean";
/** This ThreadPool is used for SEB client connection establishment and
* should be able to handle incoming bursts of SEB client connection requests (handshake)
* when up to 1000 - 2000 clients connect at nearly the same time (start of an exam) */
@Bean(name = EXAM_API_EXECUTOR_BEAN_NAME) @Bean(name = EXAM_API_EXECUTOR_BEAN_NAME)
public Executor examAPIThreadPoolTaskExecutor() { public Executor examAPIThreadPoolTaskExecutor() {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
@ -51,6 +56,32 @@ public class AsyncServiceSpringConfig implements AsyncConfigurer {
return executor; return executor;
} }
public static final String EXAM_API_PING_SERVICE_EXECUTOR_BEAN_NAME = "examAPIPingThreadPoolTaskExecutor";
/** This ThreadPool is used for ping handling in a distributed setup and shall reject
* incoming ping requests as fast as possible if there is to much load on the DB.
* We prefer to loose a shared ping update and respond to the client in time over a client request timeout */
@Bean(name = EXAM_API_PING_SERVICE_EXECUTOR_BEAN_NAME)
public Executor examAPIPingThreadPoolTaskExecutor() {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(0);
executor.setThreadNamePrefix("SEBPingService-");
executor.initialize();
executor.setWaitForTasksToCompleteOnShutdown(false);
return executor;
}
@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
final ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(5);
threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(false);
threadPoolTaskScheduler.setThreadNamePrefix("SEB-Server-BgTask-");
return threadPoolTaskScheduler;
}
@Override @Override
public Executor getAsyncExecutor() { public Executor getAsyncExecutor() {
return threadPoolTaskExecutor(); return threadPoolTaskExecutor();

View file

@ -46,7 +46,6 @@ public class CacheConfig extends JCacheConfigurerSupport {
final CachingProvider cachingProvider = Caching.getCachingProvider(); final CachingProvider cachingProvider = Caching.getCachingProvider();
final javax.cache.CacheManager cacheManager = final javax.cache.CacheManager cacheManager =
cachingProvider.getCacheManager(new URI(this.jCacheConfig), this.getClass().getClassLoader()); cachingProvider.getCacheManager(new URI(this.jCacheConfig), this.getClass().getClassLoader());
System.out.println("cacheManager:" + cacheManager);
final CompositeCacheManager composite = new CompositeCacheManager(); final CompositeCacheManager composite = new CompositeCacheManager();
composite.setCacheManagers(Arrays.asList( composite.setCacheManagers(Arrays.asList(

View file

@ -80,7 +80,7 @@ public interface ClientEventExtensionMapper {
.from(ClientEventRecordDynamicSqlSupport.clientEventRecord) .from(ClientEventRecordDynamicSqlSupport.clientEventRecord)
.leftJoin(ClientConnectionRecordDynamicSqlSupport.clientConnectionRecord) .join(ClientConnectionRecordDynamicSqlSupport.clientConnectionRecord)
.on( .on(
ClientEventRecordDynamicSqlSupport.clientEventRecord.clientConnectionId, ClientEventRecordDynamicSqlSupport.clientEventRecord.clientConnectionId,
equalTo(ClientConnectionRecordDynamicSqlSupport.clientConnectionRecord.id)); equalTo(ClientConnectionRecordDynamicSqlSupport.clientConnectionRecord.id));

View file

@ -137,8 +137,9 @@ public interface SEBClientConnectionService {
* @param connectionToken the connection token * @param connectionToken the connection token
* @param timestamp the ping time-stamp * @param timestamp the ping time-stamp
* @param pingNumber the ping number * @param pingNumber the ping number
* @param instructionConfirm instruction confirm sent by the SEB client or null
* @return SEB instruction if available */ * @return SEB instruction if available */
String notifyPing(String connectionToken, long timestamp, int pingNumber); String notifyPing(String connectionToken, long timestamp, int pingNumber, String instructionConfirm);
/** Notify a SEB client event for live indication and storing to database. /** Notify a SEB client event for live indication and storing to database.
* *

View file

@ -198,7 +198,8 @@ public class ExamSessionServiceImpl implements ExamSessionService {
} }
@Override @Override
public /* synchronized */ Result<Exam> getRunningExam(final Long examId) { public synchronized Result<Exam> getRunningExam(final Long examId) {
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
log.trace("Running exam request for exam {}", examId); log.trace("Running exam request for exam {}", examId);
} }
@ -217,6 +218,7 @@ public class ExamSessionServiceImpl implements ExamSessionService {
return Result.of(exam); return Result.of(exam);
} else { } else {
if (exam != null) { if (exam != null) {
log.info("Exam {} is not running anymore. Flush caches", exam);
flushCache(exam); flushCache(exam);
} }
@ -401,8 +403,8 @@ public class ExamSessionServiceImpl implements ExamSessionService {
} }
// If we are in a distributed setup the active connection token cache get flushed // If we are in a distributed setup the active connection token cache get flushed
// at least every second. This allows caching over multiple monitoring requests but // in specified time interval. This allows caching over multiple monitoring requests but
// ensure an update every second for new incoming connections // ensure an update every now and then for new incoming connections
private void updateClientConnections(final Long examId) { private void updateClientConnections(final Long examId) {
try { try {
final long currentTimeMillis = System.currentTimeMillis(); final long currentTimeMillis = System.currentTimeMillis();

View file

@ -34,7 +34,6 @@ import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile; import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
import ch.ethz.seb.sebserver.gbl.util.Result; import ch.ethz.seb.sebserver.gbl.util.Result;
import ch.ethz.seb.sebserver.gbl.util.Utils; import ch.ethz.seb.sebserver.gbl.util.Utils;
import ch.ethz.seb.sebserver.webservice.WebserviceInfo;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientConnectionRecord; import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientConnectionRecord;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord; import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientConnectionDAO; import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientConnectionDAO;
@ -70,14 +69,13 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
private final SEBClientConfigDAO sebClientConfigDAO; private final SEBClientConfigDAO sebClientConfigDAO;
private final SEBClientInstructionService sebInstructionService; private final SEBClientInstructionService sebInstructionService;
private final SEBClientNotificationService sebClientNotificationService; private final SEBClientNotificationService sebClientNotificationService;
private final WebserviceInfo webserviceInfo;
private final ExamAdminService examAdminService; private final ExamAdminService examAdminService;
private final DistributedPingCache distributedPingCache; private final DistributedPingCache distributedPingCache;
private final boolean isDistributedSetup;
protected SEBClientConnectionServiceImpl( protected SEBClientConnectionServiceImpl(
final ExamSessionService examSessionService, final ExamSessionService examSessionService,
final EventHandlingStrategyFactory eventHandlingStrategyFactory, final EventHandlingStrategyFactory eventHandlingStrategyFactory,
final SEBClientConfigDAO sebClientConfigDAO, final SEBClientConfigDAO sebClientConfigDAO,
final SEBClientInstructionService sebInstructionService, final SEBClientInstructionService sebInstructionService,
final SEBClientNotificationService sebClientNotificationService, final SEBClientNotificationService sebClientNotificationService,
@ -92,9 +90,9 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
this.sebClientConfigDAO = sebClientConfigDAO; this.sebClientConfigDAO = sebClientConfigDAO;
this.sebInstructionService = sebInstructionService; this.sebInstructionService = sebInstructionService;
this.sebClientNotificationService = sebClientNotificationService; this.sebClientNotificationService = sebClientNotificationService;
this.webserviceInfo = sebInstructionService.getWebserviceInfo();
this.examAdminService = examAdminService; this.examAdminService = examAdminService;
this.distributedPingCache = distributedPingCache; this.distributedPingCache = distributedPingCache;
this.isDistributedSetup = sebInstructionService.getWebserviceInfo().isDistributed();
} }
@Override @Override
@ -477,7 +475,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
} }
// delete stored ping if this is a distributed setup // delete stored ping if this is a distributed setup
if (this.webserviceInfo.isDistributed()) { if (this.isDistributedSetup) {
this.distributedPingCache this.distributedPingCache
.deletePingForConnection(updatedClientConnection.id); .deletePingForConnection(updatedClientConnection.id);
} }
@ -531,7 +529,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
} }
// delete stored ping if this is a distributed setup // delete stored ping if this is a distributed setup
if (this.webserviceInfo.isDistributed()) { if (this.isDistributedSetup) {
this.distributedPingCache this.distributedPingCache
.deletePingForConnection(updatedClientConnection.id); .deletePingForConnection(updatedClientConnection.id);
} }
@ -545,7 +543,6 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
public void updatePingEvents() { public void updatePingEvents() {
try { try {
final boolean distributed = this.webserviceInfo.isDistributed();
final Cache cache = this.cacheManager.getCache(ExamSessionCacheService.CACHE_NAME_ACTIVE_CLIENT_CONNECTION); final Cache cache = this.cacheManager.getCache(ExamSessionCacheService.CACHE_NAME_ACTIVE_CLIENT_CONNECTION);
final long now = Utils.getMillisecondsNow(); final long now = Utils.getMillisecondsNow();
final Consumer<ClientConnectionDataInternal> missingPingUpdate = missingPingUpdate(now); final Consumer<ClientConnectionDataInternal> missingPingUpdate = missingPingUpdate(now);
@ -554,8 +551,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
.allRunningExamIds() .allRunningExamIds()
.getOrThrow() .getOrThrow()
.stream() .stream()
.flatMap(examId -> distributed .flatMap(examId -> this.isDistributedSetup
// TODO fetch only the connection tokens form active connections here
? this.clientConnectionDAO ? this.clientConnectionDAO
.getConnectionTokensNoCache(examId) .getConnectionTokensNoCache(examId)
.getOrThrow() .getOrThrow()
@ -584,16 +580,26 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
public String notifyPing( public String notifyPing(
final String connectionToken, final String connectionToken,
final long timestamp, final long timestamp,
final int pingNumber) { final int pingNumber,
final String instructionConfirm) {
processPing(connectionToken, timestamp, pingNumber);
if (instructionConfirm != null) {
this.sebInstructionService.confirmInstructionDone(connectionToken, instructionConfirm);
}
return this.sebInstructionService.getInstructionJSON(connectionToken);
}
private void processPing(final String connectionToken, final long timestamp, final int pingNumber) {
final ClientConnectionDataInternal activeClientConnection = final ClientConnectionDataInternal activeClientConnection =
this.examSessionService.getConnectionDataInternal(connectionToken); this.examSessionCacheService.getClientConnection(connectionToken);
if (activeClientConnection != null) { if (activeClientConnection != null) {
activeClientConnection.notifyPing(timestamp, pingNumber); activeClientConnection.notifyPing(timestamp, pingNumber);
} }
return this.sebInstructionService.getInstructionJSON(connectionToken);
} }
@Override @Override
@ -735,7 +741,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
} }
private void checkExamIntegrity(final Long examId) { private void checkExamIntegrity(final Long examId) {
if (this.webserviceInfo.isDistributed()) { if (this.isDistributedSetup) {
// if the cached Exam is not up to date anymore, we have to update the cache first // if the cached Exam is not up to date anymore, we have to update the cache first
final Result<Exam> updateExamCache = this.examSessionService.updateExamCache(examId); final Result<Exam> updateExamCache = this.examSessionService.updateExamCache(examId);
if (updateExamCache.hasError()) { if (updateExamCache.hasError()) {
@ -782,7 +788,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
if (clientEventRecord != null) { if (clientEventRecord != null) {
// store event and and flush cache // store event and and flush cache
this.eventHandlingStrategy.accept(clientEventRecord); this.eventHandlingStrategy.accept(clientEventRecord);
if (this.webserviceInfo.isDistributed()) { if (this.isDistributedSetup) {
// mark for update and flush the cache // mark for update and flush the cache
this.clientConnectionDAO.save(connection.clientConnection); this.clientConnectionDAO.save(connection.clientConnection);
this.examSessionCacheService.evictClientConnection( this.examSessionCacheService.evictClientConnection(

View file

@ -10,21 +10,16 @@ package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator;
import org.joda.time.DateTimeUtils; import org.joda.time.DateTimeUtils;
import ch.ethz.seb.sebserver.gbl.Constants;
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator; import ch.ethz.seb.sebserver.gbl.model.exam.Indicator;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.ClientIndicator; import ch.ethz.seb.sebserver.webservice.servicelayer.session.ClientIndicator;
public abstract class AbstractClientIndicator implements ClientIndicator { public abstract class AbstractClientIndicator implements ClientIndicator {
private static final long PERSISTENT_UPDATE_INTERVAL = Constants.SECOND_IN_MILLIS;
protected Long indicatorId; protected Long indicatorId;
protected Long examId; protected Long examId;
protected Long connectionId; protected Long connectionId;
protected boolean cachingEnabled; protected boolean cachingEnabled;
protected boolean active = true; protected boolean active = true;
protected long persistentUpdateInterval = PERSISTENT_UPDATE_INTERVAL;
protected long lastPersistentUpdate = 0;
protected boolean valueInitializes = false; protected boolean valueInitializes = false;
protected double currentValue = Double.NaN; protected double currentValue = Double.NaN;
@ -72,15 +67,11 @@ public abstract class AbstractClientIndicator implements ClientIndicator {
final long now = DateTimeUtils.currentTimeMillis(); final long now = DateTimeUtils.currentTimeMillis();
if (!this.valueInitializes) { if (!this.valueInitializes) {
this.currentValue = computeValueAt(now); this.currentValue = computeValueAt(now);
this.lastPersistentUpdate = now;
this.valueInitializes = true; this.valueInitializes = true;
} }
if (!this.cachingEnabled && this.active) { if (!this.cachingEnabled && this.active) {
if (now - this.lastPersistentUpdate > this.persistentUpdateInterval) { this.currentValue = computeValueAt(now);
this.currentValue = computeValueAt(now);
this.lastPersistentUpdate = now;
}
} }
return this.currentValue; return this.currentValue;

View file

@ -24,11 +24,16 @@ import ch.ethz.seb.sebserver.gbl.util.Utils;
public abstract class AbstractLogIndicator extends AbstractClientIndicator { public abstract class AbstractLogIndicator extends AbstractClientIndicator {
protected static final Long DISTRIBUTED_LOG_UPDATE_INTERVAL = 5 * Constants.SECOND_IN_MILLIS;
protected final Set<EventType> observed; protected final Set<EventType> observed;
protected final List<Integer> eventTypeIds; protected final List<Integer> eventTypeIds;
protected String[] tags; protected String[] tags;
protected long lastDistributedUpdate = 0L;
protected AbstractLogIndicator(final EventType... eventTypes) { protected AbstractLogIndicator(final EventType... eventTypes) {
this.observed = Collections.unmodifiableSet(EnumSet.of(eventTypes[0], eventTypes)); this.observed = Collections.unmodifiableSet(EnumSet.of(eventTypes[0], eventTypes));
this.eventTypeIds = Utils.immutableListOf(Arrays.stream(eventTypes) this.eventTypeIds = Utils.immutableListOf(Arrays.stream(eventTypes)
.map(et -> et.id) .map(et -> et.id)
@ -44,7 +49,6 @@ public abstract class AbstractLogIndicator extends AbstractClientIndicator {
final boolean cachingEnabled) { final boolean cachingEnabled) {
super.init(indicatorDefinition, connectionId, active, cachingEnabled); super.init(indicatorDefinition, connectionId, active, cachingEnabled);
super.persistentUpdateInterval = 2 * Constants.SECOND_IN_MILLIS;
if (indicatorDefinition == null || StringUtils.isBlank(indicatorDefinition.tags)) { if (indicatorDefinition == null || StringUtils.isBlank(indicatorDefinition.tags)) {
this.tags = null; this.tags = null;
@ -75,4 +79,12 @@ public abstract class AbstractLogIndicator extends AbstractClientIndicator {
return this.observed; return this.observed;
} }
protected boolean loadFromPersistent(final long timestamp) {
if (!super.valueInitializes) {
return true;
}
return timestamp - this.lastDistributedUpdate > DISTRIBUTED_LOG_UPDATE_INTERVAL;
}
} }

View file

@ -57,6 +57,10 @@ public abstract class AbstractLogLevelCountIndicator extends AbstractLogIndicato
@Override @Override
public double computeValueAt(final long timestamp) { public double computeValueAt(final long timestamp) {
if (!loadFromPersistent(timestamp)) {
return super.currentValue;
}
try { try {
final Long errors = this.clientEventRecordMapper final Long errors = this.clientEventRecordMapper
@ -72,9 +76,12 @@ public abstract class AbstractLogLevelCountIndicator extends AbstractLogIndicato
.execute(); .execute();
return errors.doubleValue(); return errors.doubleValue();
} catch (final Exception e) { } catch (final Exception e) {
log.error("Failed to get indicator count from persistent storage: ", e); log.error("Failed to get indicator count from persistent storage: ", e);
return super.currentValue; return super.currentValue;
} finally {
super.lastDistributedUpdate = timestamp;
} }
} }

View file

@ -62,8 +62,15 @@ public abstract class AbstractLogNumberIndicator extends AbstractLogIndicator {
@Override @Override
public double computeValueAt(final long timestamp) { public double computeValueAt(final long timestamp) {
if (!loadFromPersistent(timestamp)) {
return super.currentValue;
}
try { try {
System.out.println("************** loadFromPersistent");
final List<ClientEventRecord> execute = this.clientEventRecordMapper.selectByExample() final List<ClientEventRecord> execute = this.clientEventRecordMapper.selectByExample()
.where(ClientEventRecordDynamicSqlSupport.clientConnectionId, isEqualTo(this.connectionId)) .where(ClientEventRecordDynamicSqlSupport.clientConnectionId, isEqualTo(this.connectionId))
.and(ClientEventRecordDynamicSqlSupport.type, isIn(this.eventTypeIds)) .and(ClientEventRecordDynamicSqlSupport.type, isIn(this.eventTypeIds))
@ -90,6 +97,8 @@ public abstract class AbstractLogNumberIndicator extends AbstractLogIndicator {
} catch (final Exception e) { } catch (final Exception e) {
log.error("Failed to get indicator number from persistent storage: {}", e.getMessage()); log.error("Failed to get indicator number from persistent storage: {}", e.getMessage());
return this.currentValue; return this.currentValue;
} finally {
super.lastDistributedUpdate = timestamp;
} }
} }

View file

@ -11,34 +11,38 @@ package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executor;
import org.joda.time.DateTime;
import org.joda.time.DateTimeUtils;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import ch.ethz.seb.sebserver.gbl.Constants; import ch.ethz.seb.sebserver.gbl.async.AsyncServiceSpringConfig;
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator; import ch.ethz.seb.sebserver.gbl.model.exam.Indicator;
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType;
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent;
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType; import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
import ch.ethz.seb.sebserver.gbl.util.Utils;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientEventLastPingMapper;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord; import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
public abstract class AbstractPingIndicator extends AbstractClientIndicator { public abstract class AbstractPingIndicator extends AbstractClientIndicator {
private static final Logger log = LoggerFactory.getLogger(AbstractPingIndicator.class); private static final Logger log = LoggerFactory.getLogger(AbstractPingIndicator.class);
private static final long INTERVAL_FOR_PERSISTENT_UPDATE = Constants.SECOND_IN_MILLIS;
private final Set<EventType> EMPTY_SET = Collections.unmodifiableSet(EnumSet.noneOf(EventType.class)); private final Set<EventType> EMPTY_SET = Collections.unmodifiableSet(EnumSet.noneOf(EventType.class));
private final Executor executor;
protected final DistributedPingCache distributedPingCache; protected final DistributedPingCache distributedPingCache;
private final long lastUpdate = 0; //protected Long pingRecord = null;
protected Long pingRecord = null; protected PingUpdate pingUpdate = null;
protected AbstractPingIndicator(final DistributedPingCache distributedPingCache) {
protected AbstractPingIndicator(
final DistributedPingCache distributedPingCache,
@Qualifier(AsyncServiceSpringConfig.EXAM_API_PING_SERVICE_EXECUTOR_BEAN_NAME) final Executor executor) {
super(); super();
this.executor = executor;
this.distributedPingCache = distributedPingCache; this.distributedPingCache = distributedPingCache;
} }
@ -53,33 +57,31 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator {
if (!this.cachingEnabled && this.active) { if (!this.cachingEnabled && this.active) {
try { try {
this.pingRecord = this.distributedPingCache.initPingForConnection(this.connectionId); createPingUpdate();
} catch (final Exception e) { } catch (final Exception e) {
this.pingRecord = this.distributedPingCache.getPingRecordIdForConnectionId(connectionId); createPingUpdate();
} }
} }
} }
public final void notifyPing(final long timestamp, final int pingNumber) { public final void notifyPing(final long timestamp, final int pingNumber) {
final long now = DateTime.now(DateTimeZone.UTC).getMillis(); super.currentValue = timestamp;
super.currentValue = now;
super.lastPersistentUpdate = now;
if (!this.cachingEnabled) { if (!this.cachingEnabled) {
if (this.pingRecord == null) { if (this.pingUpdate == null) {
tryRecoverPingRecord(); tryRecoverPingRecord();
if (this.pingRecord == null) { if (this.pingUpdate == null) {
return; return;
} }
} }
// Update last ping time on persistent storage // Update last ping time on persistent storage asynchronously within a defines thread pool with no
final long millisecondsNow = DateTimeUtils.currentTimeMillis(); // waiting queue to skip further ping updates if all update threads are busy
if (millisecondsNow - this.lastUpdate > INTERVAL_FOR_PERSISTENT_UPDATE) { try {
synchronized (this) { this.executor.execute(this.pingUpdate);
this.distributedPingCache.updatePing(this.pingRecord, millisecondsNow); } catch (final Exception e) {
} //log.warn("Failed to schedule ping task: {}" + e.getMessage());
} }
} }
} }
@ -91,15 +93,21 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator {
} }
try { try {
this.pingRecord = this.distributedPingCache.getPingRecordIdForConnectionId(this.connectionId); createPingUpdate();
if (this.pingRecord == null) { if (this.pingUpdate == null) {
this.pingRecord = this.distributedPingCache.initPingForConnection(this.connectionId); createPingUpdate();
} }
} catch (final Exception e) { } catch (final Exception e) {
log.error("Failed to recover ping record for connection: {}", this.connectionId, e); log.error("Failed to recover ping record for connection: {}", this.connectionId, e);
} }
} }
private void createPingUpdate() {
this.pingUpdate = new PingUpdate(
this.distributedPingCache.getClientEventLastPingMapper(),
this.distributedPingCache.initPingForConnection(this.connectionId));
}
@Override @Override
public Set<EventType> observedEvents() { public Set<EventType> observedEvents() {
return this.EMPTY_SET; return this.EMPTY_SET;
@ -107,4 +115,46 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator {
public abstract ClientEventRecord updateLogEvent(final long now); public abstract ClientEventRecord updateLogEvent(final long now);
@Override
public double computeValueAt(final long timestamp) {
// TODO Auto-generated method stub
return 0;
}
@Override
public void notifyValueChange(final ClientEvent event) {
// TODO Auto-generated method stub
}
@Override
public void notifyValueChange(final ClientEventRecord clientEventRecord) {
// TODO Auto-generated method stub
}
@Override
public IndicatorType getType() {
// TODO Auto-generated method stub
return null;
}
static final class PingUpdate implements Runnable {
private final ClientEventLastPingMapper clientEventLastPingMapper;
final Long pingRecord;
public PingUpdate(final ClientEventLastPingMapper clientEventLastPingMapper, final Long pingRecord) {
this.clientEventLastPingMapper = clientEventLastPingMapper;
this.pingRecord = pingRecord;
}
@Override
public void run() {
this.clientEventLastPingMapper
.updatePingTime(this.pingRecord, Utils.getMillisecondsNow());
}
}
} }

View file

@ -12,11 +12,12 @@ import static org.mybatis.dynamic.sql.SqlBuilder.isEqualTo;
import static org.mybatis.dynamic.sql.SqlBuilder.isIn; import static org.mybatis.dynamic.sql.SqlBuilder.isIn;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.ehcache.impl.internal.concurrent.ConcurrentHashMap;
import org.joda.time.DateTimeUtils; import org.joda.time.DateTimeUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -25,7 +26,6 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType; import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
@ -33,6 +33,7 @@ import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
import ch.ethz.seb.sebserver.gbl.util.Utils; import ch.ethz.seb.sebserver.gbl.util.Utils;
import ch.ethz.seb.sebserver.webservice.WebserviceInfo; import ch.ethz.seb.sebserver.webservice.WebserviceInfo;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientEventLastPingMapper; import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientEventLastPingMapper;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientEventLastPingMapper.ClientEventLastPingRecord;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordDynamicSqlSupport; import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordDynamicSqlSupport;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordMapper; import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordMapper;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord; import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
@ -64,7 +65,7 @@ public class DistributedPingCache implements DisposableBean {
this.pingUpdateTolerance = pingUpdate * 2 / 3; this.pingUpdateTolerance = pingUpdate * 2 / 3;
if (webserviceInfo.isDistributed()) { if (webserviceInfo.isDistributed()) {
try { try {
this.taskRef = taskScheduler.scheduleAtFixedRate(this::updateCache, pingUpdate); this.taskRef = taskScheduler.scheduleAtFixedRate(this::updatePings, pingUpdate);
} catch (final Exception e) { } catch (final Exception e) {
log.error("Failed to initialize distributed ping cache update task"); log.error("Failed to initialize distributed ping cache update task");
this.taskRef = null; this.taskRef = null;
@ -74,6 +75,10 @@ public class DistributedPingCache implements DisposableBean {
} }
} }
public ClientEventLastPingMapper getClientEventLastPingMapper() {
return this.clientEventLastPingMapper;
}
@Transactional @Transactional
public Long initPingForConnection(final Long connectionId) { public Long initPingForConnection(final Long connectionId) {
try { try {
@ -133,17 +138,6 @@ public class DistributedPingCache implements DisposableBean {
} }
} }
public void updatePing(final Long pingRecordId, final Long pingTime) {
try {
this.clientEventLastPingMapper
.updatePingTime(pingRecordId, pingTime);
} catch (final Exception e) {
log.error("Failed to update ping for ping record id -> {}", pingRecordId);
}
}
@Transactional @Transactional
public void deletePingForConnection(final Long connectionId) { public void deletePingForConnection(final Long connectionId) {
try { try {
@ -152,32 +146,49 @@ public class DistributedPingCache implements DisposableBean {
log.debug("*** Delete ping record for SEB connection: {}", connectionId); log.debug("*** Delete ping record for SEB connection: {}", connectionId);
} }
this.clientEventRecordMapper final Collection<ClientEventLastPingRecord> records = this.clientEventLastPingMapper
.deleteByExample() .selectByExample()
.where(ClientEventRecordDynamicSqlSupport.clientConnectionId, isEqualTo(connectionId)) .where(ClientEventRecordDynamicSqlSupport.clientConnectionId, isEqualTo(connectionId))
.and(ClientEventRecordDynamicSqlSupport.type, isEqualTo(EventType.LAST_PING.id)) .and(ClientEventRecordDynamicSqlSupport.type, isEqualTo(EventType.LAST_PING.id))
.build() .build()
.execute(); .execute();
if (records == null || records.isEmpty()) {
return;
}
final Long id = records.iterator().next().id;
this.pingCache.remove(id);
this.clientEventRecordMapper.deleteByPrimaryKey(id);
} catch (final Exception e) { } catch (final Exception e) {
log.error("Failed to delete ping for connection -> {}", connectionId, e); log.error("Failed to delete ping for connection -> {}", connectionId, e);
} finally { try {
this.pingCache.remove(connectionId); log.info(
"Because of failed ping record deletion, "
+ "flushing the ping cache to ensure no dead connections pings remain in the cache");
this.pingCache.clear();
} catch (final Exception ee) {
log.error("Failed to force flushing the ping cache: ", e);
}
} }
} }
public Long getLastPing(final Long pingRecordId) { public Long getLastPing(final Long pingRecordId, final boolean missing) {
try { try {
Long ping = this.pingCache.get(pingRecordId); Long ping = this.pingCache.get(pingRecordId);
if (ping == null) { if (ping == null && !missing) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("*** Get and cache ping time: {}", pingRecordId); log.debug("*** Get and cache ping time: {}", pingRecordId);
} }
ping = this.clientEventLastPingMapper.selectPingTimeByPrimaryKey(pingRecordId); ping = this.clientEventLastPingMapper.selectPingTimeByPrimaryKey(pingRecordId);
if (ping != null) { }
this.pingCache.put(pingRecordId, ping);
} // if we have a missing ping we need to check new ping from next update even if the cache was empty
if (ping != null || missing) {
this.pingCache.put(pingRecordId, ping);
} }
return ping; return ping;
@ -187,8 +198,11 @@ public class DistributedPingCache implements DisposableBean {
} }
} }
@Transactional(readOnly = true, isolation = Isolation.READ_UNCOMMITTED) private void updatePings() {
public void updateCache() {
if (this.pingCache.isEmpty()) {
return;
}
final long millisecondsNow = Utils.getMillisecondsNow(); final long millisecondsNow = Utils.getMillisecondsNow();
if (millisecondsNow - this.lastUpdate < this.pingUpdateTolerance) { if (millisecondsNow - this.lastUpdate < this.pingUpdateTolerance) {
@ -196,15 +210,12 @@ public class DistributedPingCache implements DisposableBean {
return; return;
} }
if (this.pingCache.isEmpty()) {
return;
}
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.trace("*** Update distributed ping cache: {}", this.pingCache); log.trace("*** Update distributed ping cache: {}", this.pingCache);
} }
try { try {
final ArrayList<Long> pks = new ArrayList<>(this.pingCache.keySet()); final ArrayList<Long> pks = new ArrayList<>(this.pingCache.keySet());
final Map<Long, Long> mapping = this.clientEventLastPingMapper final Map<Long, Long> mapping = this.clientEventLastPingMapper
.selectByExample() .selectByExample()
@ -225,6 +236,8 @@ public class DistributedPingCache implements DisposableBean {
} catch (final Exception e) { } catch (final Exception e) {
log.error("Error while trying to update distributed ping cache: {}", this.pingCache, e); log.error("Error while trying to update distributed ping cache: {}", this.pingCache, e);
} }
this.lastUpdate = millisecondsNow;
} }
@Override @Override

View file

@ -10,10 +10,12 @@ package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.util.Comparator; import java.util.Comparator;
import java.util.concurrent.Executor;
import org.joda.time.DateTimeUtils; import org.joda.time.DateTimeUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Scope; import org.springframework.context.annotation.Scope;
@ -22,6 +24,7 @@ import org.springframework.stereotype.Component;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import ch.ethz.seb.sebserver.gbl.Constants; import ch.ethz.seb.sebserver.gbl.Constants;
import ch.ethz.seb.sebserver.gbl.async.AsyncServiceSpringConfig;
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator; import ch.ethz.seb.sebserver.gbl.model.exam.Indicator;
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType; import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType;
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent; import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent;
@ -44,8 +47,10 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator {
private boolean missingPing = false; private boolean missingPing = false;
private boolean hidden = false; private boolean hidden = false;
public PingIntervalClientIndicator(final DistributedPingCache distributedPingCache) { public PingIntervalClientIndicator(
super(distributedPingCache); final DistributedPingCache distributedPingCache,
@Qualifier(AsyncServiceSpringConfig.EXAM_API_PING_SERVICE_EXECUTOR_BEAN_NAME) final Executor executor) {
super(distributedPingCache, executor);
this.cachingEnabled = true; this.cachingEnabled = true;
} }
@ -122,16 +127,12 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator {
@Override @Override
public final double computeValueAt(final long timestamp) { public final double computeValueAt(final long timestamp) {
if (!this.cachingEnabled && super.pingUpdate != null) {
if (!this.cachingEnabled && super.pingRecord != null) { final Long lastPing = this.distributedPingCache.getLastPing(super.pingUpdate.pingRecord, this.missingPing);
if (lastPing != null) {
// if this indicator is not missing ping final double doubleValue = lastPing.doubleValue();
if (!this.isMissingPing()) { return Math.max(Double.isNaN(this.currentValue) ? doubleValue : this.currentValue, doubleValue);
final Long lastPing = this.distributedPingCache.getLastPing(super.pingRecord);
if (lastPing != null) {
final double doubleValue = lastPing.doubleValue();
return Math.max(Double.isNaN(this.currentValue) ? doubleValue : this.currentValue, doubleValue);
}
} }
return this.currentValue; return this.currentValue;

View file

@ -110,19 +110,13 @@ public class ClientEventController extends ReadonlyEntityController<ClientEvent,
final FilterMap filterMap = new FilterMap(allRequestParams, request.getQueryString()); final FilterMap filterMap = new FilterMap(allRequestParams, request.getQueryString());
populateFilterMap(filterMap, institutionId, sort); populateFilterMap(filterMap, institutionId, sort);
try { return this.paginationService.getPage(
pageNumber,
return this.paginationService.getPage( pageSize,
pageNumber, sort,
pageSize, getSQLTableOfEntity().name(),
sort, () -> this.clientEventDAO.allMatchingExtended(filterMap, this::hasReadAccess))
getSQLTableOfEntity().name(), .getOrThrow();
() -> this.clientEventDAO.allMatchingExtended(filterMap, this::hasReadAccess))
.getOrThrow();
} catch (final Exception e) {
e.printStackTrace();
throw e;
}
} }
@Override @Override

View file

@ -270,32 +270,31 @@ public class ExamAPI_V1_Controller {
final String pingNumString = request.getParameter(API.EXAM_API_PING_NUMBER); final String pingNumString = request.getParameter(API.EXAM_API_PING_NUMBER);
final String instructionConfirm = request.getParameter(API.EXAM_API_PING_INSTRUCTION_CONFIRM); final String instructionConfirm = request.getParameter(API.EXAM_API_PING_INSTRUCTION_CONFIRM);
if (connectionToken == null) { long pingTime;
log.warn("Missing connection token on ping. Ignore the request"); try {
pingTime = Long.parseLong(timeStampString);
} catch (final Exception e) {
log.error("Invalid ping request: {}", connectionToken);
response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR.value());
return; return;
} }
if (instructionConfirm != null) {
this.sebClientConnectionService.confirmInstructionDone(connectionToken, instructionConfirm);
}
final Long clientTime = timeStampString != null ? Long.parseLong(timeStampString) : 0L;
final String instruction = this.sebClientConnectionService final String instruction = this.sebClientConnectionService
.notifyPing( .notifyPing(
connectionToken, connectionToken,
clientTime, pingTime,
pingNumString != null ? Integer.parseInt(pingNumString) : -1); pingNumString != null ? Integer.parseInt(pingNumString) : -1,
instructionConfirm);
if (instruction == null) { if (instruction == null) {
response.setStatus(HttpStatus.NO_CONTENT.value()); response.setStatus(HttpStatus.NO_CONTENT.value());
return; } else {
} try {
response.setStatus(HttpStatus.OK.value());
try { response.getOutputStream().write(instruction.getBytes());
response.setStatus(HttpStatus.OK.value()); } catch (final IOException e) {
response.getOutputStream().write(instruction.getBytes()); log.error("Failed to send instruction as response: {}", connectionToken, e);
} catch (final IOException e) { }
log.error("Failed to send instruction as response: {}", connectionToken, e);
} }
} }

View file

@ -20,7 +20,7 @@ sebserver.http.client.connect-timeout=15000
sebserver.http.client.connection-request-timeout=10000 sebserver.http.client.connection-request-timeout=10000
sebserver.http.client.read-timeout=20000 sebserver.http.client.read-timeout=20000
sebserver.webservice.distributed.pingUpdate=1000 sebserver.webservice.distributed.pingUpdate=1000
sebserver.webservice.distributed.connectionUpdate=1000 sebserver.webservice.distributed.connectionUpdate=2000
sebserver.webservice.clean-db-on-startup=false sebserver.webservice.clean-db-on-startup=false
# webservice configuration # webservice configuration

View file

@ -10,9 +10,6 @@ sebserver.init.adminaccount.username=sebserver-admin
sebserver.init.database.integrity.checks=true sebserver.init.database.integrity.checks=true
sebserver.init.database.integrity.try-fix=true sebserver.init.database.integrity.try-fix=true
sebserver.webservice.distributed=false
sebserver.webservice.distributed.pingUpdate=3000
### webservice caching ### webservice caching
spring.cache.jcache.provider=org.ehcache.jsr107.EhcacheCachingProvider spring.cache.jcache.provider=org.ehcache.jsr107.EhcacheCachingProvider
spring.cache.jcache.config=classpath:config/ehcache.xml spring.cache.jcache.config=classpath:config/ehcache.xml
@ -42,6 +39,7 @@ sebserver.webservice.internalSecret=${sebserver.password}
### webservice networking ### webservice networking
sebserver.webservice.forceMaster=false sebserver.webservice.forceMaster=false
sebserver.webservice.distributed=false sebserver.webservice.distributed=false
sebserver.webservice.distributed.pingUpdate=3000
sebserver.webservice.http.external.scheme=https sebserver.webservice.http.external.scheme=https
sebserver.webservice.http.external.servername= sebserver.webservice.http.external.servername=
sebserver.webservice.http.external.port= sebserver.webservice.http.external.port=

View file

@ -10,6 +10,8 @@ package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.util.concurrent.Executor;
import org.joda.time.DateTimeUtils; import org.joda.time.DateTimeUtils;
import org.junit.After; import org.junit.After;
import org.junit.Test; import org.junit.Test;
@ -34,9 +36,10 @@ public class PingIntervalClientIndicatorTest {
final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class); final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class);
final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class); final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class);
final Executor executor = Mockito.mock(Executor.class);
final PingIntervalClientIndicator pingIntervalClientIndicator = final PingIntervalClientIndicator pingIntervalClientIndicator =
new PingIntervalClientIndicator(distributedPingCache); new PingIntervalClientIndicator(distributedPingCache, executor);
assertEquals("0.0", String.valueOf(pingIntervalClientIndicator.getValue())); assertEquals("0.0", String.valueOf(pingIntervalClientIndicator.getValue()));
} }
@ -47,9 +50,10 @@ public class PingIntervalClientIndicatorTest {
final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class); final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class);
final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class); final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class);
final Executor executor = Mockito.mock(Executor.class);
final PingIntervalClientIndicator pingIntervalClientIndicator = final PingIntervalClientIndicator pingIntervalClientIndicator =
new PingIntervalClientIndicator(distributedPingCache); new PingIntervalClientIndicator(distributedPingCache, executor);
assertEquals("0.0", String.valueOf(pingIntervalClientIndicator.getValue())); assertEquals("0.0", String.valueOf(pingIntervalClientIndicator.getValue()));
DateTimeUtils.setCurrentMillisProvider(() -> 10L); DateTimeUtils.setCurrentMillisProvider(() -> 10L);
@ -63,9 +67,10 @@ public class PingIntervalClientIndicatorTest {
final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class); final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class);
final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class); final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class);
final Executor executor = Mockito.mock(Executor.class);
final PingIntervalClientIndicator pingIntervalClientIndicator = final PingIntervalClientIndicator pingIntervalClientIndicator =
new PingIntervalClientIndicator(distributedPingCache); new PingIntervalClientIndicator(distributedPingCache, executor);
final JSONMapper jsonMapper = new JSONMapper(); final JSONMapper jsonMapper = new JSONMapper();
final String json = jsonMapper.writeValueAsString(pingIntervalClientIndicator); final String json = jsonMapper.writeValueAsString(pingIntervalClientIndicator);
assertEquals("{\"indicatorValue\":0.0,\"indicatorType\":\"LAST_PING\"}", json); assertEquals("{\"indicatorValue\":0.0,\"indicatorType\":\"LAST_PING\"}", json);