distributed ping cache
This commit is contained in:
parent
d96816311b
commit
64536fd909
10 changed files with 288 additions and 56 deletions
2
pom.xml
2
pom.xml
|
@ -18,7 +18,7 @@
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<sebserver-version>1.2-rc1</sebserver-version>
|
<sebserver-version>1.2.1-SNAPSHOT</sebserver-version>
|
||||||
<build-version>${sebserver-version}</build-version>
|
<build-version>${sebserver-version}</build-version>
|
||||||
<revision>${sebserver-version}</revision>
|
<revision>${sebserver-version}</revision>
|
||||||
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
||||||
|
|
|
@ -8,6 +8,9 @@
|
||||||
|
|
||||||
package ch.ethz.seb.sebserver.webservice.datalayer.batis;
|
package ch.ethz.seb.sebserver.webservice.datalayer.batis;
|
||||||
|
|
||||||
|
import static ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordDynamicSqlSupport.*;
|
||||||
|
import static org.mybatis.dynamic.sql.SqlBuilder.isEqualTo;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
import org.apache.ibatis.annotations.Arg;
|
import org.apache.ibatis.annotations.Arg;
|
||||||
|
@ -15,39 +18,85 @@ import org.apache.ibatis.annotations.ConstructorArgs;
|
||||||
import org.apache.ibatis.annotations.Mapper;
|
import org.apache.ibatis.annotations.Mapper;
|
||||||
import org.apache.ibatis.annotations.ResultType;
|
import org.apache.ibatis.annotations.ResultType;
|
||||||
import org.apache.ibatis.annotations.SelectProvider;
|
import org.apache.ibatis.annotations.SelectProvider;
|
||||||
|
import org.apache.ibatis.annotations.UpdateProvider;
|
||||||
import org.apache.ibatis.type.JdbcType;
|
import org.apache.ibatis.type.JdbcType;
|
||||||
import org.mybatis.dynamic.sql.select.MyBatis3SelectModelAdapter;
|
import org.mybatis.dynamic.sql.select.MyBatis3SelectModelAdapter;
|
||||||
import org.mybatis.dynamic.sql.select.QueryExpressionDSL;
|
import org.mybatis.dynamic.sql.select.QueryExpressionDSL;
|
||||||
import org.mybatis.dynamic.sql.select.SelectDSL;
|
import org.mybatis.dynamic.sql.select.SelectDSL;
|
||||||
import org.mybatis.dynamic.sql.select.render.SelectStatementProvider;
|
import org.mybatis.dynamic.sql.select.render.SelectStatementProvider;
|
||||||
|
import org.mybatis.dynamic.sql.update.UpdateDSL;
|
||||||
|
import org.mybatis.dynamic.sql.update.render.UpdateStatementProvider;
|
||||||
import org.mybatis.dynamic.sql.util.SqlProviderAdapter;
|
import org.mybatis.dynamic.sql.util.SqlProviderAdapter;
|
||||||
|
|
||||||
|
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
|
||||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordDynamicSqlSupport;
|
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordDynamicSqlSupport;
|
||||||
|
|
||||||
@Mapper
|
@Mapper
|
||||||
public interface ClientEventLastPingMapper {
|
public interface ClientEventLastPingMapper {
|
||||||
|
|
||||||
@SelectProvider(type = SqlProviderAdapter.class, method = "select")
|
@SelectProvider(type = SqlProviderAdapter.class, method = "select")
|
||||||
Long num(SelectStatementProvider selectStatement);
|
@ConstructorArgs({ @Arg(column = "server_time", javaType = Long.class, jdbcType = JdbcType.BIGINT) })
|
||||||
|
Collection<Long> selectPingTimes(SelectStatementProvider selectStatement);
|
||||||
|
|
||||||
|
@SelectProvider(type = SqlProviderAdapter.class, method = "select")
|
||||||
|
@ConstructorArgs({ @Arg(column = "server_time", javaType = Long.class, jdbcType = JdbcType.BIGINT) })
|
||||||
|
Long selectPingTime(SelectStatementProvider selectStatement);
|
||||||
|
|
||||||
|
@SelectProvider(type = SqlProviderAdapter.class, method = "select")
|
||||||
|
@ConstructorArgs({ @Arg(column = "id", javaType = Long.class, jdbcType = JdbcType.BIGINT, id = true) })
|
||||||
|
Long selectPK(SelectStatementProvider selectStatement);
|
||||||
|
|
||||||
|
@UpdateProvider(type = SqlProviderAdapter.class, method = "update")
|
||||||
|
int update(UpdateStatementProvider updateStatement);
|
||||||
|
|
||||||
@SelectProvider(type = SqlProviderAdapter.class, method = "select")
|
@SelectProvider(type = SqlProviderAdapter.class, method = "select")
|
||||||
@ResultType(ClientEventLastPingRecord.class)
|
@ResultType(ClientEventLastPingRecord.class)
|
||||||
@ConstructorArgs({
|
@ConstructorArgs({
|
||||||
@Arg(column = "id", javaType = Long.class, jdbcType = JdbcType.BIGINT),
|
@Arg(column = "id", javaType = Long.class, jdbcType = JdbcType.BIGINT),
|
||||||
@Arg(column = "server_time", javaType = Long.class, jdbcType = JdbcType.BIGINT),
|
@Arg(column = "server_time", javaType = Long.class, jdbcType = JdbcType.BIGINT)
|
||||||
})
|
})
|
||||||
Collection<ClientEventLastPingRecord> selectMany(SelectStatementProvider select);
|
Collection<ClientEventLastPingRecord> selectMany(SelectStatementProvider select);
|
||||||
|
|
||||||
|
default Long selectPingTimeByPrimaryKey(final Long id_) {
|
||||||
|
return SelectDSL.selectWithMapper(
|
||||||
|
this::selectPingTime,
|
||||||
|
ClientEventRecordDynamicSqlSupport.serverTime.as("server_time"))
|
||||||
|
.from(ClientEventRecordDynamicSqlSupport.clientEventRecord)
|
||||||
|
.where(ClientEventRecordDynamicSqlSupport.id, isEqualTo(id_))
|
||||||
|
.build()
|
||||||
|
.execute();
|
||||||
|
}
|
||||||
|
|
||||||
|
default Long pingRecordIdByConnectionId(final Long connectionId) {
|
||||||
|
return SelectDSL.selectDistinctWithMapper(
|
||||||
|
this::selectPK,
|
||||||
|
ClientEventRecordDynamicSqlSupport.id.as("id"))
|
||||||
|
.from(ClientEventRecordDynamicSqlSupport.clientEventRecord)
|
||||||
|
.where(ClientEventRecordDynamicSqlSupport.clientConnectionId, isEqualTo(connectionId))
|
||||||
|
.and(ClientEventRecordDynamicSqlSupport.type, isEqualTo(EventType.LAST_PING.id))
|
||||||
|
.build()
|
||||||
|
.execute();
|
||||||
|
}
|
||||||
|
|
||||||
default QueryExpressionDSL<MyBatis3SelectModelAdapter<Collection<ClientEventLastPingRecord>>> selectByExample() {
|
default QueryExpressionDSL<MyBatis3SelectModelAdapter<Collection<ClientEventLastPingRecord>>> selectByExample() {
|
||||||
|
|
||||||
return SelectDSL.selectWithMapper(
|
return SelectDSL.selectWithMapper(
|
||||||
this::selectMany,
|
this::selectMany,
|
||||||
|
|
||||||
ClientEventRecordDynamicSqlSupport.clientConnectionId.as("id"),
|
ClientEventRecordDynamicSqlSupport.id.as("id"),
|
||||||
ClientEventRecordDynamicSqlSupport.serverTime.as("server_time"))
|
ClientEventRecordDynamicSqlSupport.serverTime.as("server_time"))
|
||||||
|
|
||||||
.from(ClientEventRecordDynamicSqlSupport.clientEventRecord);
|
.from(ClientEventRecordDynamicSqlSupport.clientEventRecord);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
default int updatePingTime(final Long _id, final Long pingTime) {
|
||||||
|
return UpdateDSL.updateWithMapper(this::update, clientEventRecord)
|
||||||
|
.set(serverTime).equalTo(pingTime)
|
||||||
|
.where(id, isEqualTo(_id))
|
||||||
|
.build()
|
||||||
|
.execute();
|
||||||
|
}
|
||||||
|
|
||||||
final class ClientEventLastPingRecord {
|
final class ClientEventLastPingRecord {
|
||||||
|
|
||||||
public final Long id;
|
public final Long id;
|
||||||
|
|
|
@ -18,6 +18,7 @@ import java.util.Set;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.joda.time.DateTimeUtils;
|
||||||
import org.mybatis.dynamic.sql.SqlBuilder;
|
import org.mybatis.dynamic.sql.SqlBuilder;
|
||||||
import org.springframework.context.annotation.Lazy;
|
import org.springframework.context.annotation.Lazy;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
@ -312,21 +313,6 @@ public class ClientEventDAOImpl implements ClientEventDAO {
|
||||||
.stream()
|
.stream()
|
||||||
.map(pk -> new EntityKey(String.valueOf(pk), EntityType.CLIENT_EVENT))
|
.map(pk -> new EntityKey(String.valueOf(pk), EntityType.CLIENT_EVENT))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
// return all
|
|
||||||
// .stream()
|
|
||||||
// .map(EntityKey::getModelId)
|
|
||||||
// .map(Long::parseLong)
|
|
||||||
// .map(pk -> {
|
|
||||||
// final int deleted = this.clientEventRecordMapper.deleteByPrimaryKey(pk);
|
|
||||||
// if (deleted == 1) {
|
|
||||||
// return new EntityKey(String.valueOf(pk), EntityType.CLIENT_EVENT);
|
|
||||||
// } else {
|
|
||||||
// return null;
|
|
||||||
// }
|
|
||||||
// })
|
|
||||||
// .filter(Objects::nonNull)
|
|
||||||
// .collect(Collectors.toList());
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -345,7 +331,7 @@ public class ClientEventDAOImpl implements ClientEventDAO {
|
||||||
return lastPingRec.get(0);
|
return lastPingRec.get(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
final long millisecondsNow = Utils.getMillisecondsNow();
|
final long millisecondsNow = DateTimeUtils.currentTimeMillis();
|
||||||
final ClientEventRecord clientEventRecord = new ClientEventRecord();
|
final ClientEventRecord clientEventRecord = new ClientEventRecord();
|
||||||
clientEventRecord.setClientConnectionId(connectionId);
|
clientEventRecord.setClientConnectionId(connectionId);
|
||||||
clientEventRecord.setType(EventType.LAST_PING.id);
|
clientEventRecord.setType(EventType.LAST_PING.id);
|
||||||
|
|
|
@ -45,6 +45,7 @@ import ch.ethz.seb.sebserver.webservice.servicelayer.session.ExamSessionService;
|
||||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientConnectionService;
|
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientConnectionService;
|
||||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientInstructionService;
|
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientInstructionService;
|
||||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientNotificationService;
|
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientNotificationService;
|
||||||
|
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator.DistributedPingCache;
|
||||||
import ch.ethz.seb.sebserver.webservice.weblayer.api.APIConstraintViolationException;
|
import ch.ethz.seb.sebserver.webservice.weblayer.api.APIConstraintViolationException;
|
||||||
|
|
||||||
@Lazy
|
@Lazy
|
||||||
|
@ -71,6 +72,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
|
||||||
private final SEBClientNotificationService sebClientNotificationService;
|
private final SEBClientNotificationService sebClientNotificationService;
|
||||||
private final WebserviceInfo webserviceInfo;
|
private final WebserviceInfo webserviceInfo;
|
||||||
private final ExamAdminService examAdminService;
|
private final ExamAdminService examAdminService;
|
||||||
|
private final DistributedPingCache distributedPingCache;
|
||||||
|
|
||||||
protected SEBClientConnectionServiceImpl(
|
protected SEBClientConnectionServiceImpl(
|
||||||
final ExamSessionService examSessionService,
|
final ExamSessionService examSessionService,
|
||||||
|
@ -79,7 +81,8 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
|
||||||
final SEBClientConfigDAO sebClientConfigDAO,
|
final SEBClientConfigDAO sebClientConfigDAO,
|
||||||
final SEBClientInstructionService sebInstructionService,
|
final SEBClientInstructionService sebInstructionService,
|
||||||
final SEBClientNotificationService sebClientNotificationService,
|
final SEBClientNotificationService sebClientNotificationService,
|
||||||
final ExamAdminService examAdminService) {
|
final ExamAdminService examAdminService,
|
||||||
|
final DistributedPingCache distributedPingCache) {
|
||||||
|
|
||||||
this.examSessionService = examSessionService;
|
this.examSessionService = examSessionService;
|
||||||
this.examSessionCacheService = examSessionService.getExamSessionCacheService();
|
this.examSessionCacheService = examSessionService.getExamSessionCacheService();
|
||||||
|
@ -91,6 +94,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
|
||||||
this.sebClientNotificationService = sebClientNotificationService;
|
this.sebClientNotificationService = sebClientNotificationService;
|
||||||
this.webserviceInfo = sebInstructionService.getWebserviceInfo();
|
this.webserviceInfo = sebInstructionService.getWebserviceInfo();
|
||||||
this.examAdminService = examAdminService;
|
this.examAdminService = examAdminService;
|
||||||
|
this.distributedPingCache = distributedPingCache;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -453,6 +457,12 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
|
||||||
updatedClientConnection = clientConnection;
|
updatedClientConnection = clientConnection;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// delete stored ping if this is a distributed setup
|
||||||
|
if (this.webserviceInfo.isDistributed()) {
|
||||||
|
this.distributedPingCache
|
||||||
|
.deletePingForConnection(updatedClientConnection.id);
|
||||||
|
}
|
||||||
|
|
||||||
reloadConnectionCache(connectionToken);
|
reloadConnectionCache(connectionToken);
|
||||||
return updatedClientConnection;
|
return updatedClientConnection;
|
||||||
});
|
});
|
||||||
|
@ -501,6 +511,12 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
|
||||||
updatedClientConnection = clientConnection;
|
updatedClientConnection = clientConnection;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// delete stored ping if this is a distributed setup
|
||||||
|
if (this.webserviceInfo.isDistributed()) {
|
||||||
|
this.distributedPingCache
|
||||||
|
.deletePingForConnection(updatedClientConnection.id);
|
||||||
|
}
|
||||||
|
|
||||||
reloadConnectionCache(connectionToken);
|
reloadConnectionCache(connectionToken);
|
||||||
return updatedClientConnection;
|
return updatedClientConnection;
|
||||||
});
|
});
|
||||||
|
@ -510,6 +526,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
|
||||||
public void updatePingEvents() {
|
public void updatePingEvents() {
|
||||||
try {
|
try {
|
||||||
|
|
||||||
|
final boolean distributed = this.webserviceInfo.isDistributed();
|
||||||
final Cache cache = this.cacheManager.getCache(ExamSessionCacheService.CACHE_NAME_ACTIVE_CLIENT_CONNECTION);
|
final Cache cache = this.cacheManager.getCache(ExamSessionCacheService.CACHE_NAME_ACTIVE_CLIENT_CONNECTION);
|
||||||
final long now = Utils.getMillisecondsNow();
|
final long now = Utils.getMillisecondsNow();
|
||||||
final Consumer<ClientConnectionDataInternal> missingPingUpdate = missingPingUpdate(now);
|
final Consumer<ClientConnectionDataInternal> missingPingUpdate = missingPingUpdate(now);
|
||||||
|
@ -518,7 +535,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
|
||||||
.allRunningExamIds()
|
.allRunningExamIds()
|
||||||
.getOrThrow()
|
.getOrThrow()
|
||||||
.stream()
|
.stream()
|
||||||
.flatMap(examId -> (this.webserviceInfo.isDistributed())
|
.flatMap(examId -> distributed
|
||||||
? this.clientConnectionDAO
|
? this.clientConnectionDAO
|
||||||
.getConnectionTokensNoCache(examId)
|
.getConnectionTokensNoCache(examId)
|
||||||
.getOrThrow()
|
.getOrThrow()
|
||||||
|
|
|
@ -20,7 +20,6 @@ import ch.ethz.seb.sebserver.gbl.Constants;
|
||||||
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator;
|
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator;
|
||||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
|
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
|
||||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
|
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
|
||||||
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientEventDAO;
|
|
||||||
|
|
||||||
public abstract class AbstractPingIndicator extends AbstractClientIndicator {
|
public abstract class AbstractPingIndicator extends AbstractClientIndicator {
|
||||||
|
|
||||||
|
@ -28,14 +27,15 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator {
|
||||||
|
|
||||||
private final Set<EventType> EMPTY_SET = Collections.unmodifiableSet(EnumSet.noneOf(EventType.class));
|
private final Set<EventType> EMPTY_SET = Collections.unmodifiableSet(EnumSet.noneOf(EventType.class));
|
||||||
|
|
||||||
protected final ClientEventDAO clientEventDAO;
|
protected final DistributedPingCache distributedPingCache;
|
||||||
|
|
||||||
private long lastUpdate = 0;
|
private final long lastUpdate = 0;
|
||||||
protected ClientEventRecord pingRecord = null;
|
protected Long pingRecord = null;
|
||||||
|
|
||||||
|
protected AbstractPingIndicator(final DistributedPingCache distributedPingCache) {
|
||||||
|
|
||||||
protected AbstractPingIndicator(final ClientEventDAO clientEventDAO) {
|
|
||||||
super();
|
super();
|
||||||
this.clientEventDAO = clientEventDAO;
|
this.distributedPingCache = distributedPingCache;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -47,10 +47,12 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator {
|
||||||
|
|
||||||
super.init(indicatorDefinition, connectionId, active, cachingEnabled);
|
super.init(indicatorDefinition, connectionId, active, cachingEnabled);
|
||||||
|
|
||||||
if (!this.cachingEnabled) {
|
if (!this.cachingEnabled && this.active) {
|
||||||
this.pingRecord = this.clientEventDAO
|
this.pingRecord = this.distributedPingCache.initPingForConnection(this.connectionId);
|
||||||
.initPingEvent(this.connectionId)
|
if (this.pingRecord == null) {
|
||||||
.getOr(null);
|
// try once again
|
||||||
|
this.pingRecord = this.distributedPingCache.initPingForConnection(this.connectionId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,10 +66,7 @@ public abstract class AbstractPingIndicator extends AbstractClientIndicator {
|
||||||
// Update last ping time on persistent storage
|
// Update last ping time on persistent storage
|
||||||
final long millisecondsNow = DateTimeUtils.currentTimeMillis();
|
final long millisecondsNow = DateTimeUtils.currentTimeMillis();
|
||||||
if (millisecondsNow - this.lastUpdate > INTERVAL_FOR_PERSISTENT_UPDATE) {
|
if (millisecondsNow - this.lastUpdate > INTERVAL_FOR_PERSISTENT_UPDATE) {
|
||||||
this.pingRecord.setClientTime(timestamp);
|
this.distributedPingCache.updatePing(this.pingRecord, millisecondsNow);
|
||||||
this.pingRecord.setServerTime(millisecondsNow);
|
|
||||||
this.clientEventDAO.updatePingEvent(this.pingRecord);
|
|
||||||
this.lastUpdate = millisecondsNow;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,187 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2021 ETH Zürich, Educational Development and Technology (LET)
|
||||||
|
*
|
||||||
|
* This Source Code Form is subject to the terms of the Mozilla Public
|
||||||
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||||
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator;
|
||||||
|
|
||||||
|
import static org.mybatis.dynamic.sql.SqlBuilder.isEqualTo;
|
||||||
|
import static org.mybatis.dynamic.sql.SqlBuilder.isIn;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ScheduledFuture;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.ehcache.impl.internal.concurrent.ConcurrentHashMap;
|
||||||
|
import org.joda.time.DateTimeUtils;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.beans.factory.DisposableBean;
|
||||||
|
import org.springframework.context.annotation.Lazy;
|
||||||
|
import org.springframework.scheduling.TaskScheduler;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
|
||||||
|
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
|
||||||
|
import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
|
||||||
|
import ch.ethz.seb.sebserver.webservice.WebserviceInfo;
|
||||||
|
import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientEventLastPingMapper;
|
||||||
|
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordDynamicSqlSupport;
|
||||||
|
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordMapper;
|
||||||
|
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
|
||||||
|
|
||||||
|
@Lazy
|
||||||
|
@Component
|
||||||
|
@WebServiceProfile
|
||||||
|
public class DistributedPingCache implements DisposableBean {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(DistributedPingCache.class);
|
||||||
|
|
||||||
|
private final ClientEventLastPingMapper clientEventLastPingMapper;
|
||||||
|
private final ClientEventRecordMapper clientEventRecordMapper;
|
||||||
|
private ScheduledFuture<?> taskRef;
|
||||||
|
|
||||||
|
private final Map<Long, Long> pingCache = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
public DistributedPingCache(
|
||||||
|
final ClientEventLastPingMapper clientEventLastPingMapper,
|
||||||
|
final ClientEventRecordMapper clientEventRecordMapper,
|
||||||
|
final WebserviceInfo webserviceInfo,
|
||||||
|
final TaskScheduler taskScheduler) {
|
||||||
|
|
||||||
|
this.clientEventLastPingMapper = clientEventLastPingMapper;
|
||||||
|
this.clientEventRecordMapper = clientEventRecordMapper;
|
||||||
|
if (webserviceInfo.isDistributed()) {
|
||||||
|
try {
|
||||||
|
this.taskRef = taskScheduler.scheduleAtFixedRate(this::updateCache, 1000);
|
||||||
|
} catch (final Exception e) {
|
||||||
|
log.error("Failed to initialize distributed ping cache update task");
|
||||||
|
this.taskRef = null;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
this.taskRef = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Transactional
|
||||||
|
public Long initPingForConnection(final Long connectionId) {
|
||||||
|
try {
|
||||||
|
Long recordId = this.clientEventLastPingMapper
|
||||||
|
.pingRecordIdByConnectionId(connectionId);
|
||||||
|
|
||||||
|
if (recordId == null) {
|
||||||
|
final long millisecondsNow = DateTimeUtils.currentTimeMillis();
|
||||||
|
final ClientEventRecord clientEventRecord = new ClientEventRecord();
|
||||||
|
clientEventRecord.setClientConnectionId(connectionId);
|
||||||
|
clientEventRecord.setType(EventType.LAST_PING.id);
|
||||||
|
clientEventRecord.setClientTime(millisecondsNow);
|
||||||
|
clientEventRecord.setServerTime(millisecondsNow);
|
||||||
|
this.clientEventRecordMapper.insert(clientEventRecord);
|
||||||
|
|
||||||
|
recordId = this.clientEventLastPingMapper.pingRecordIdByConnectionId(connectionId);
|
||||||
|
}
|
||||||
|
|
||||||
|
return recordId;
|
||||||
|
} catch (final Exception e) {
|
||||||
|
log.error("Failed to initialize ping for connection -> {}", connectionId, e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Transactional
|
||||||
|
public void updatePing(final Long pingRecordId, final Long pingTime) {
|
||||||
|
try {
|
||||||
|
|
||||||
|
this.clientEventLastPingMapper
|
||||||
|
.updatePingTime(pingRecordId, pingTime);
|
||||||
|
|
||||||
|
} catch (final Exception e) {
|
||||||
|
log.error("Failed to update ping for ping record id -> {}", pingRecordId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Transactional
|
||||||
|
public void deletePingForConnection(final Long connectionId) {
|
||||||
|
try {
|
||||||
|
|
||||||
|
this.clientEventRecordMapper
|
||||||
|
.deleteByExample()
|
||||||
|
.where(ClientEventRecordDynamicSqlSupport.clientConnectionId, isEqualTo(connectionId))
|
||||||
|
.and(ClientEventRecordDynamicSqlSupport.type, isEqualTo(EventType.LAST_PING.id))
|
||||||
|
.build()
|
||||||
|
.execute();
|
||||||
|
|
||||||
|
} catch (final Exception e) {
|
||||||
|
log.error("Failed to delete ping for connection -> {}", connectionId, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getLastPing(final Long pingRecordId) {
|
||||||
|
try {
|
||||||
|
Long ping = this.pingCache.get(pingRecordId);
|
||||||
|
if (ping == null) {
|
||||||
|
log.debug("******* Get and cache ping time: {}", pingRecordId);
|
||||||
|
ping = this.clientEventLastPingMapper.selectPingTimeByPrimaryKey(pingRecordId);
|
||||||
|
if (ping != null) {
|
||||||
|
this.pingCache.put(pingRecordId, ping);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ping;
|
||||||
|
} catch (final Exception e) {
|
||||||
|
log.error("Error while trying to get last ping from storage: {}", e.getMessage());
|
||||||
|
return 0L;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Transactional
|
||||||
|
public void updateCache() {
|
||||||
|
|
||||||
|
if (this.pingCache.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
log.debug("****** Update distributed ping cache: {}", this.pingCache);
|
||||||
|
|
||||||
|
try {
|
||||||
|
final ArrayList<Long> pks = new ArrayList<>(this.pingCache.keySet());
|
||||||
|
final Map<Long, Long> mapping = this.clientEventLastPingMapper
|
||||||
|
.selectByExample()
|
||||||
|
.where(
|
||||||
|
ClientEventRecordDynamicSqlSupport.id,
|
||||||
|
isIn(pks))
|
||||||
|
.build()
|
||||||
|
.execute()
|
||||||
|
.stream()
|
||||||
|
.collect(Collectors.toMap(entry -> entry.id, entry -> entry.lastPingTime));
|
||||||
|
|
||||||
|
if (mapping != null) {
|
||||||
|
this.pingCache.clear();
|
||||||
|
this.pingCache.putAll(mapping);
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (final Exception e) {
|
||||||
|
log.error("Error while trying to update distributed ping cache: {}", this.pingCache, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroy() throws Exception {
|
||||||
|
if (this.taskRef != null) {
|
||||||
|
try {
|
||||||
|
final boolean cancel = this.taskRef.cancel(true);
|
||||||
|
if (!cancel) {
|
||||||
|
log.warn("Failed to cancel distributed ping cache update task");
|
||||||
|
}
|
||||||
|
} catch (final Exception e) {
|
||||||
|
log.error("Failed to cancel distributed ping cache update task: ", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -26,9 +26,7 @@ import ch.ethz.seb.sebserver.gbl.model.exam.Indicator;
|
||||||
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType;
|
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType;
|
||||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent;
|
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent;
|
||||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
|
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent.EventType;
|
||||||
import ch.ethz.seb.sebserver.gbl.util.Result;
|
|
||||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
|
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
|
||||||
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientEventDAO;
|
|
||||||
|
|
||||||
@Lazy
|
@Lazy
|
||||||
@Component(IndicatorType.Names.LAST_PING)
|
@Component(IndicatorType.Names.LAST_PING)
|
||||||
|
@ -46,8 +44,8 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator {
|
||||||
private boolean missingPing = false;
|
private boolean missingPing = false;
|
||||||
private boolean hidden = false;
|
private boolean hidden = false;
|
||||||
|
|
||||||
public PingIntervalClientIndicator(final ClientEventDAO clientEventDAO) {
|
public PingIntervalClientIndicator(final DistributedPingCache distributedPingCache) {
|
||||||
super(clientEventDAO);
|
super(distributedPingCache);
|
||||||
this.cachingEnabled = true;
|
this.cachingEnabled = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -129,17 +127,10 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator {
|
||||||
|
|
||||||
// if this indicator is not missing ping
|
// if this indicator is not missing ping
|
||||||
if (!this.isMissingPing()) {
|
if (!this.isMissingPing()) {
|
||||||
|
final Long lastPing = this.distributedPingCache.getLastPing(super.pingRecord);
|
||||||
final Result<Long> lastPing = this.clientEventDAO
|
if (lastPing != null) {
|
||||||
.getLastPing(super.pingRecord.getId());
|
final double doubleValue = lastPing.doubleValue();
|
||||||
|
return Math.max(Double.isNaN(this.currentValue) ? doubleValue : this.currentValue, doubleValue);
|
||||||
if (!lastPing.hasError()) {
|
|
||||||
if (Double.isNaN(this.currentValue)) {
|
|
||||||
return lastPing.get().doubleValue();
|
|
||||||
}
|
|
||||||
return Math.max(this.currentValue, lastPing.get().doubleValue());
|
|
||||||
} else {
|
|
||||||
log.error("Failed to get last ping from persistent: {}", lastPing.getError().getMessage());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -270,7 +270,7 @@ public class ExamAPI_V1_Controller {
|
||||||
final String instructionConfirm = request.getParameter(API.EXAM_API_PING_INSTRUCTION_CONFIRM);
|
final String instructionConfirm = request.getParameter(API.EXAM_API_PING_INSTRUCTION_CONFIRM);
|
||||||
|
|
||||||
if (log.isTraceEnabled()) {
|
if (log.isTraceEnabled()) {
|
||||||
log.trace("****************** SEB client connection: {} ip: ",
|
log.trace("****************** SEB client connection: {} ip: {}",
|
||||||
connectionToken,
|
connectionToken,
|
||||||
getClientAddress(request));
|
getClientAddress(request));
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,7 @@ sebserver.webservice.clean-db-on-startup=false
|
||||||
|
|
||||||
# webservice configuration
|
# webservice configuration
|
||||||
sebserver.init.adminaccount.gen-on-init=false
|
sebserver.init.adminaccount.gen-on-init=false
|
||||||
sebserver.webservice.distributed=false
|
sebserver.webservice.distributed=true
|
||||||
sebserver.webservice.master.delay.threshold=10000
|
sebserver.webservice.master.delay.threshold=10000
|
||||||
sebserver.webservice.http.external.scheme=http
|
sebserver.webservice.http.external.scheme=http
|
||||||
sebserver.webservice.http.external.servername=localhost
|
sebserver.webservice.http.external.servername=localhost
|
||||||
|
|
|
@ -33,9 +33,10 @@ public class PingIntervalClientIndicatorTest {
|
||||||
DateTimeUtils.setCurrentMillisProvider(() -> 1L);
|
DateTimeUtils.setCurrentMillisProvider(() -> 1L);
|
||||||
|
|
||||||
final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class);
|
final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class);
|
||||||
|
final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class);
|
||||||
|
|
||||||
final PingIntervalClientIndicator pingIntervalClientIndicator =
|
final PingIntervalClientIndicator pingIntervalClientIndicator =
|
||||||
new PingIntervalClientIndicator(clientEventDAO);
|
new PingIntervalClientIndicator(distributedPingCache);
|
||||||
assertEquals("0.0", String.valueOf(pingIntervalClientIndicator.getValue()));
|
assertEquals("0.0", String.valueOf(pingIntervalClientIndicator.getValue()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,9 +46,10 @@ public class PingIntervalClientIndicatorTest {
|
||||||
DateTimeUtils.setCurrentMillisProvider(() -> 1L);
|
DateTimeUtils.setCurrentMillisProvider(() -> 1L);
|
||||||
|
|
||||||
final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class);
|
final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class);
|
||||||
|
final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class);
|
||||||
|
|
||||||
final PingIntervalClientIndicator pingIntervalClientIndicator =
|
final PingIntervalClientIndicator pingIntervalClientIndicator =
|
||||||
new PingIntervalClientIndicator(clientEventDAO);
|
new PingIntervalClientIndicator(distributedPingCache);
|
||||||
assertEquals("0.0", String.valueOf(pingIntervalClientIndicator.getValue()));
|
assertEquals("0.0", String.valueOf(pingIntervalClientIndicator.getValue()));
|
||||||
|
|
||||||
DateTimeUtils.setCurrentMillisProvider(() -> 10L);
|
DateTimeUtils.setCurrentMillisProvider(() -> 10L);
|
||||||
|
@ -60,9 +62,10 @@ public class PingIntervalClientIndicatorTest {
|
||||||
DateTimeUtils.setCurrentMillisProvider(() -> 1L);
|
DateTimeUtils.setCurrentMillisProvider(() -> 1L);
|
||||||
|
|
||||||
final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class);
|
final ClientEventDAO clientEventDAO = Mockito.mock(ClientEventDAO.class);
|
||||||
|
final DistributedPingCache distributedPingCache = Mockito.mock(DistributedPingCache.class);
|
||||||
|
|
||||||
final PingIntervalClientIndicator pingIntervalClientIndicator =
|
final PingIntervalClientIndicator pingIntervalClientIndicator =
|
||||||
new PingIntervalClientIndicator(clientEventDAO);
|
new PingIntervalClientIndicator(distributedPingCache);
|
||||||
final JSONMapper jsonMapper = new JSONMapper();
|
final JSONMapper jsonMapper = new JSONMapper();
|
||||||
final String json = jsonMapper.writeValueAsString(pingIntervalClientIndicator);
|
final String json = jsonMapper.writeValueAsString(pingIntervalClientIndicator);
|
||||||
assertEquals("{\"indicatorValue\":0.0,\"indicatorType\":\"LAST_PING\"}", json);
|
assertEquals("{\"indicatorValue\":0.0,\"indicatorType\":\"LAST_PING\"}", json);
|
||||||
|
|
Loading…
Reference in a new issue