SEBSERV-314 partially fixed
This commit is contained in:
parent
bfe15f794a
commit
0ba33c66e0
9 changed files with 94 additions and 83 deletions
|
@ -8,8 +8,10 @@
|
|||
|
||||
package ch.ethz.seb.sebserver.gbl.model.session;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
|
@ -46,6 +48,11 @@ public final class ClientConnection implements GrantEntity {
|
|||
}
|
||||
}
|
||||
|
||||
public final static List<String> ACTIVE_STATES = Arrays.asList(
|
||||
ConnectionStatus.ACTIVE.name(),
|
||||
ConnectionStatus.AUTHENTICATED.name(),
|
||||
ConnectionStatus.CONNECTION_REQUESTED.name());
|
||||
|
||||
public static final ClientConnection EMPTY_CLIENT_CONNECTION = new ClientConnection(
|
||||
-1L, -1L, -1L,
|
||||
ConnectionStatus.UNDEFINED,
|
||||
|
|
|
@ -25,20 +25,19 @@ import org.mybatis.dynamic.sql.util.SqlProviderAdapter;
|
|||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientConnectionRecordDynamicSqlSupport;
|
||||
|
||||
@Mapper
|
||||
public interface ClientConnectionMinMapper {
|
||||
public interface ClientConnectionTokenMapper {
|
||||
|
||||
@SelectProvider(type = SqlProviderAdapter.class, method = "select")
|
||||
Long num(SelectStatementProvider selectStatement);
|
||||
|
||||
@SelectProvider(type = SqlProviderAdapter.class, method = "select")
|
||||
@ResultType(ClientConnectionMinRecord.class)
|
||||
@ResultType(ClientConnectionTokenRecord.class)
|
||||
@ConstructorArgs({
|
||||
@Arg(column = "connection_token", javaType = String.class, jdbcType = JdbcType.VARCHAR, id = true),
|
||||
@Arg(column = "update_time", javaType = Long.class, jdbcType = JdbcType.BIGINT),
|
||||
@Arg(column = "connection_token", javaType = String.class, jdbcType = JdbcType.VARCHAR, id = true)
|
||||
})
|
||||
Collection<ClientConnectionMinRecord> selectMany(SelectStatementProvider select);
|
||||
Collection<ClientConnectionTokenRecord> selectMany(SelectStatementProvider select);
|
||||
|
||||
default QueryExpressionDSL<MyBatis3SelectModelAdapter<Collection<ClientConnectionMinRecord>>> selectByExample() {
|
||||
default QueryExpressionDSL<MyBatis3SelectModelAdapter<Collection<ClientConnectionTokenRecord>>> selectByExample() {
|
||||
return SelectDSL.selectWithMapper(
|
||||
this::selectMany,
|
||||
|
||||
|
@ -48,17 +47,12 @@ public interface ClientConnectionMinMapper {
|
|||
.from(ClientConnectionRecordDynamicSqlSupport.clientConnectionRecord);
|
||||
}
|
||||
|
||||
final class ClientConnectionMinRecord {
|
||||
final class ClientConnectionTokenRecord {
|
||||
|
||||
public final String connection_token;
|
||||
public final Long update_time;
|
||||
|
||||
public ClientConnectionMinRecord(
|
||||
final String connection_token,
|
||||
final Long update_time) {
|
||||
|
||||
public ClientConnectionTokenRecord(final String connection_token) {
|
||||
this.connection_token = connection_token;
|
||||
this.update_time = update_time;
|
||||
}
|
||||
}
|
||||
|
|
@ -48,22 +48,19 @@ public interface ClientConnectionDAO extends
|
|||
|
||||
}
|
||||
|
||||
/** Get a list of all connection tokens of all connections (no matter what state)
|
||||
* of an exam.
|
||||
*
|
||||
* @param examId The exam identifier
|
||||
* @return list of all connection tokens of all connections (no matter what state)
|
||||
* of an exam */
|
||||
default Result<Collection<String>> getConnectionTokensNoCache(final Long examId) {
|
||||
return getConnectionTokens(examId);
|
||||
}
|
||||
|
||||
/** Get a list of all connection tokens of all connections of an exam
|
||||
* that are in state active
|
||||
* that are in state <code>ConnectionStatus.ACTIVE</code>
|
||||
*
|
||||
* @param examId The exam identifier
|
||||
* @return Result refer to the collection of connection tokens or to an error when happened */
|
||||
Result<Collection<String>> getActiveConnctionTokens(Long examId);
|
||||
Result<Collection<String>> getActiveConnectionTokens(Long examId);
|
||||
|
||||
/** Get a list of all connection tokens of all connections of an exam
|
||||
* that are in state an active state. See <code>ClientConnection</code>
|
||||
*
|
||||
* @param examId The exam identifier
|
||||
* @return Result refer to the collection of connection tokens or to an error when happened */
|
||||
Result<Collection<String>> getAllActiveConnectionTokens(Long examId);
|
||||
|
||||
/** Get all inactive connection tokens from the set of given tokens.
|
||||
* This is usually used for cleanup purposes to filter a bunch of connection tokens
|
||||
|
|
|
@ -38,6 +38,7 @@ import ch.ethz.seb.sebserver.gbl.model.session.ClientConnection.ConnectionStatus
|
|||
import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
|
||||
import ch.ethz.seb.sebserver.gbl.util.Result;
|
||||
import ch.ethz.seb.sebserver.gbl.util.Utils;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.ClientConnectionTokenMapper;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientConnectionRecordDynamicSqlSupport;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientConnectionRecordMapper;
|
||||
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.ClientEventRecordDynamicSqlSupport;
|
||||
|
@ -70,19 +71,22 @@ public class ClientConnectionDAOImpl implements ClientConnectionDAO {
|
|||
private final ClientInstructionRecordMapper clientInstructionRecordMapper;
|
||||
private final ClientIndicatorRecordMapper clientIndicatorRecordMapper;
|
||||
private final ClientNotificationRecordMapper clientNotificationRecordMapper;
|
||||
private final ClientConnectionTokenMapper clientConnectionMinMapper;
|
||||
|
||||
protected ClientConnectionDAOImpl(
|
||||
final ClientConnectionRecordMapper clientConnectionRecordMapper,
|
||||
final ClientEventRecordMapper clientEventRecordMapper,
|
||||
final ClientInstructionRecordMapper clientInstructionRecordMapper,
|
||||
final ClientIndicatorRecordMapper clientIndicatorRecordMapper,
|
||||
final ClientNotificationRecordMapper clientNotificationRecordMapper) {
|
||||
final ClientNotificationRecordMapper clientNotificationRecordMapper,
|
||||
final ClientConnectionTokenMapper clientConnectionMinMapper) {
|
||||
|
||||
this.clientConnectionRecordMapper = clientConnectionRecordMapper;
|
||||
this.clientEventRecordMapper = clientEventRecordMapper;
|
||||
this.clientInstructionRecordMapper = clientInstructionRecordMapper;
|
||||
this.clientIndicatorRecordMapper = clientIndicatorRecordMapper;
|
||||
this.clientNotificationRecordMapper = clientNotificationRecordMapper;
|
||||
this.clientConnectionMinMapper = clientConnectionMinMapper;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -162,7 +166,7 @@ public class ClientConnectionDAOImpl implements ClientConnectionDAO {
|
|||
@Override
|
||||
@Transactional(readOnly = true)
|
||||
public Result<Collection<String>> getConnectionTokens(final Long examId) {
|
||||
return Result.tryCatch(() -> this.clientConnectionRecordMapper
|
||||
return Result.tryCatch(() -> this.clientConnectionMinMapper
|
||||
.selectByExample()
|
||||
.where(
|
||||
ClientConnectionRecordDynamicSqlSupport.examId,
|
||||
|
@ -170,15 +174,15 @@ public class ClientConnectionDAOImpl implements ClientConnectionDAO {
|
|||
.build()
|
||||
.execute()
|
||||
.stream()
|
||||
.map(ClientConnectionRecord::getConnectionToken)
|
||||
.map(rec -> rec.connection_token)
|
||||
.filter(StringUtils::isNotBlank)
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(readOnly = true)
|
||||
public Result<Collection<String>> getActiveConnctionTokens(final Long examId) {
|
||||
return Result.tryCatch(() -> this.clientConnectionRecordMapper
|
||||
public Result<Collection<String>> getActiveConnectionTokens(final Long examId) {
|
||||
return Result.tryCatch(() -> this.clientConnectionMinMapper
|
||||
.selectByExample()
|
||||
.where(
|
||||
ClientConnectionRecordDynamicSqlSupport.examId,
|
||||
|
@ -189,7 +193,26 @@ public class ClientConnectionDAOImpl implements ClientConnectionDAO {
|
|||
.build()
|
||||
.execute()
|
||||
.stream()
|
||||
.map(ClientConnectionRecord::getConnectionToken)
|
||||
.map(rec -> rec.connection_token)
|
||||
.filter(StringUtils::isNotBlank)
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(readOnly = true)
|
||||
public Result<Collection<String>> getAllActiveConnectionTokens(final Long examId) {
|
||||
return Result.tryCatch(() -> this.clientConnectionMinMapper
|
||||
.selectByExample()
|
||||
.where(
|
||||
ClientConnectionRecordDynamicSqlSupport.examId,
|
||||
SqlBuilder.isEqualTo(examId))
|
||||
.and(
|
||||
ClientConnectionRecordDynamicSqlSupport.status,
|
||||
SqlBuilder.isIn(ClientConnection.ACTIVE_STATES))
|
||||
.build()
|
||||
.execute()
|
||||
.stream()
|
||||
.map(rec -> rec.connection_token)
|
||||
.filter(StringUtils::isNotBlank)
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
|
|
|
@ -210,7 +210,7 @@ public interface ExamSessionService {
|
|||
* @return Result with reference to the given Exam or to an error if happened */
|
||||
Result<Exam> flushCache(final Exam exam);
|
||||
|
||||
/** Is is supposed to be the single access point to internally get client connection
|
||||
/** This is supposed to be the single access point to internally get client connection
|
||||
* data for a specified connection token.
|
||||
* This uses the client connection data cache for lookup and also synchronizes asynchronous
|
||||
* cache calls to prevent parallel creation of ClientConnectionDataInternal
|
||||
|
|
|
@ -424,7 +424,7 @@ public class ExamSessionServiceImpl implements ExamSessionService {
|
|||
@Override
|
||||
public Result<Collection<String>> getActiveConnectionTokens(final Long examId) {
|
||||
return this.clientConnectionDAO
|
||||
.getActiveConnctionTokens(examId);
|
||||
.getActiveConnectionTokens(examId);
|
||||
}
|
||||
|
||||
@EventListener
|
||||
|
|
|
@ -11,9 +11,9 @@ package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl;
|
|||
import java.math.BigDecimal;
|
||||
import java.security.Principal;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
@ -22,8 +22,6 @@ import org.apache.commons.lang3.BooleanUtils;
|
|||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.cache.Cache;
|
||||
import org.springframework.cache.CacheManager;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.security.access.AccessDeniedException;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
@ -69,7 +67,6 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
|
|||
|
||||
private final ExamSessionService examSessionService;
|
||||
private final ExamSessionCacheService examSessionCacheService;
|
||||
private final CacheManager cacheManager;
|
||||
private final EventHandlingStrategy eventHandlingStrategy;
|
||||
private final ClientConnectionDAO clientConnectionDAO;
|
||||
private final SEBClientConfigDAO sebClientConfigDAO;
|
||||
|
@ -90,7 +87,6 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
|
|||
|
||||
this.examSessionService = examSessionService;
|
||||
this.examSessionCacheService = examSessionService.getExamSessionCacheService();
|
||||
this.cacheManager = examSessionService.getCacheManager();
|
||||
this.clientConnectionDAO = examSessionService.getClientConnectionDAO();
|
||||
this.eventHandlingStrategy = eventHandlingStrategyFactory.get();
|
||||
this.sebClientConfigDAO = sebClientConfigDAO;
|
||||
|
@ -645,28 +641,19 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
|
|||
public void updatePingEvents() {
|
||||
try {
|
||||
|
||||
final Cache cache = this.cacheManager.getCache(ExamSessionCacheService.CACHE_NAME_ACTIVE_CLIENT_CONNECTION);
|
||||
final long now = Utils.getMillisecondsNow();
|
||||
final Consumer<ClientConnectionDataInternal> missingPingUpdate = missingPingUpdate(now);
|
||||
this.examSessionService
|
||||
.getExamDAO()
|
||||
.allRunningExamIds()
|
||||
.getOrThrow()
|
||||
.stream()
|
||||
.flatMap(examId -> this.isDistributedSetup
|
||||
? this.clientConnectionDAO
|
||||
.getConnectionTokensNoCache(examId)
|
||||
.getOrThrow()
|
||||
.stream()
|
||||
: this.clientConnectionDAO
|
||||
.getConnectionTokens(examId)
|
||||
.getOrThrow()
|
||||
.stream())
|
||||
.map(token -> cache.get(token, ClientConnectionDataInternal.class))
|
||||
.flatMap(examId -> this.clientConnectionDAO
|
||||
.getAllActiveConnectionTokens(examId)
|
||||
.getOr(Collections.emptyList())
|
||||
.stream())
|
||||
.map(this.examSessionService::getConnectionDataInternal)
|
||||
.filter(Objects::nonNull)
|
||||
.filter(connection -> connection.pingIndicator != null &&
|
||||
connection.clientConnection.status.clientActiveStatus)
|
||||
.forEach(connection -> missingPingUpdate.accept(connection));
|
||||
.filter(connection -> connection.pingIndicator != null)
|
||||
.forEach(this::missingPingUpdate);
|
||||
|
||||
} catch (final Exception e) {
|
||||
log.error("Failed to update ping events: ", e);
|
||||
|
@ -905,31 +892,29 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
|
|||
return this.examSessionService.getConnectionDataInternal(connectionToken);
|
||||
}
|
||||
|
||||
private Consumer<ClientConnectionDataInternal> missingPingUpdate(final long now) {
|
||||
private void missingPingUpdate(final ClientConnectionDataInternal connection) {
|
||||
if (connection.pingIndicator.changeOnIncident()) {
|
||||
|
||||
return connection -> {
|
||||
final boolean missingPing = connection.getMissingPing();
|
||||
final long millisecondsNow = Utils.getMillisecondsNow();
|
||||
final ClientEventRecord clientEventRecord = new ClientEventRecord(
|
||||
null,
|
||||
connection.getConnectionId(),
|
||||
(missingPing) ? EventType.ERROR_LOG.id : EventType.INFO_LOG.id,
|
||||
millisecondsNow,
|
||||
millisecondsNow,
|
||||
new BigDecimal(connection.pingIndicator.getValue()),
|
||||
(missingPing) ? "Missing Client Ping" : "Client Ping Back To Normal");
|
||||
|
||||
if (connection.pingIndicator.missingPingUpdate(now)) {
|
||||
final boolean missingPing = connection.getMissingPing();
|
||||
final ClientEventRecord clientEventRecord = new ClientEventRecord(
|
||||
null,
|
||||
connection.getConnectionId(),
|
||||
(missingPing) ? EventType.ERROR_LOG.id : EventType.INFO_LOG.id,
|
||||
now,
|
||||
now,
|
||||
new BigDecimal(connection.pingIndicator.getValue()),
|
||||
(missingPing) ? "Missing Client Ping" : "Client Ping Back To Normal");
|
||||
// store event and and flush cache
|
||||
this.eventHandlingStrategy.accept(clientEventRecord);
|
||||
|
||||
// store event and and flush cache
|
||||
this.eventHandlingStrategy.accept(clientEventRecord);
|
||||
|
||||
// update indicators
|
||||
if (clientEventRecord.getType() != null && EventType.ERROR_LOG.id == clientEventRecord.getType()) {
|
||||
connection.getIndicatorMapping(EventType.ERROR_LOG)
|
||||
.forEach(indicator -> indicator.notifyValueChange(clientEventRecord));
|
||||
}
|
||||
// update indicators
|
||||
if (clientEventRecord.getType() != null && EventType.ERROR_LOG.id == clientEventRecord.getType()) {
|
||||
connection.getIndicatorMapping(EventType.ERROR_LOG)
|
||||
.forEach(indicator -> indicator.notifyValueChange(clientEventRecord));
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -118,22 +118,27 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator {
|
|||
|
||||
@Override
|
||||
public final boolean hasIncident() {
|
||||
if (!this.active) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return getValue() >= super.incidentThreshold;
|
||||
}
|
||||
|
||||
private double lastCheckVal = 0;
|
||||
|
||||
public final boolean missingPingUpdate(final long now) {
|
||||
if (this.currentValue <= 0) {
|
||||
public final boolean changeOnIncident() {
|
||||
if (!this.active || this.currentValue <= 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final double val = now - this.currentValue;
|
||||
// check if incidentThreshold was passed (up or down) since last update
|
||||
final boolean result = (this.lastCheckVal < this.incidentThreshold && val >= this.incidentThreshold) ||
|
||||
final double val = getValue();
|
||||
// check if incident threshold has passed (up or down) since last update
|
||||
final boolean changed = (this.lastCheckVal < this.incidentThreshold && val >= this.incidentThreshold) ||
|
||||
(this.lastCheckVal >= this.incidentThreshold && val < this.incidentThreshold);
|
||||
|
||||
this.lastCheckVal = val;
|
||||
return result;
|
||||
return changed;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
server.address=localhost
|
||||
server.port=8080
|
||||
server.port=8090
|
||||
|
||||
sebserver.gui.http.external.scheme=http
|
||||
sebserver.gui.entrypoint=/gui
|
||||
sebserver.gui.webservice.protocol=http
|
||||
sebserver.gui.webservice.address=localhost
|
||||
sebserver.gui.webservice.port=8080
|
||||
sebserver.gui.webservice.port=8090
|
||||
sebserver.gui.webservice.apipath=/admin-api/v1
|
||||
# defines the polling interval that is used to poll the webservice for client connection data on a monitored exam page
|
||||
sebserver.gui.webservice.poll-interval=1000
|
||||
|
|
Loading…
Reference in a new issue