fixed distributed indicator value cache handling (patch-fix 1.3.2)

This commit is contained in:
anhefti 2022-03-30 10:10:47 +02:00
parent a7b587a8f2
commit dc330c0eed
10 changed files with 167 additions and 147 deletions

View file

@ -26,8 +26,10 @@ import ch.ethz.seb.sebserver.gbl.model.exam.Indicator;
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType;
import ch.ethz.seb.sebserver.gbl.model.session.ClientConnection;
import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
import ch.ethz.seb.sebserver.gbl.util.Utils;
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.IndicatorDAO;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.ClientIndicator;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator.DistributedIndicatorValueService;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.indicator.PingIntervalClientIndicator;
@Lazy
@ -39,27 +41,66 @@ public class ClientIndicatorFactory {
private final ApplicationContext applicationContext;
private final IndicatorDAO indicatorDAO;
private final DistributedIndicatorValueService distributedPingCache;
private final boolean distributedSetup;
private final boolean enableCaching;
@Autowired
public ClientIndicatorFactory(
final ApplicationContext applicationContext,
final IndicatorDAO indicatorDAO,
final DistributedIndicatorValueService distributedPingCache,
@Value("${sebserver.webservice.distributed:false}") final boolean distributedSetup,
@Value("${sebserver.webservice.api.exam.enable-indicator-cache:true}") final boolean enableCaching) {
this.applicationContext = applicationContext;
this.indicatorDAO = indicatorDAO;
this.distributedPingCache = distributedPingCache;
this.distributedSetup = distributedSetup;
this.enableCaching = distributedSetup ? false : enableCaching;
}
public List<ClientIndicator> createFor(final ClientConnection clientConnection) {
return createFor(clientConnection, false);
public void initializeDistributedCaches(final ClientConnection clientConnection) {
try {
if (!this.distributedSetup || clientConnection.examId == null) {
return;
}
final Collection<Indicator> examIndicators = this.indicatorDAO
.allForExam(clientConnection.examId)
.getOrThrow();
boolean pingIndicatorAvailable = false;
for (final Indicator indicatorDef : examIndicators) {
this.distributedPingCache.createIndicatorForConnection(
clientConnection.id,
indicatorDef.type,
indicatorDef.type == IndicatorType.LAST_PING ? Utils.getMillisecondsNow() : 0L);
if (!pingIndicatorAvailable) {
pingIndicatorAvailable = indicatorDef.type == IndicatorType.LAST_PING;
}
}
// If there is no ping interval indicator set from the exam, we add a hidden one
// to at least create missing ping events and track missing state
if (!pingIndicatorAvailable) {
this.distributedPingCache.createIndicatorForConnection(
clientConnection.id,
IndicatorType.LAST_PING,
Utils.getMillisecondsNow());
}
} catch (final Exception e) {
log.error("Unexpected error while trying to initialize distributed indicator value cache for: {}",
clientConnection,
e);
}
}
public List<ClientIndicator> createFor(
final ClientConnection clientConnection,
final boolean enableCachingOverride) {
public List<ClientIndicator> createFor(final ClientConnection clientConnection) {
final List<ClientIndicator> result = new ArrayList<>();
if (clientConnection.examId == null) {
@ -73,7 +114,6 @@ public class ClientIndicatorFactory {
.getOrThrow();
boolean pingIndicatorAvailable = false;
for (final Indicator indicatorDef : examIndicators) {
try {
@ -88,11 +128,12 @@ public class ClientIndicatorFactory {
indicatorDef,
clientConnection.id,
clientConnection.status.clientActiveStatus,
this.enableCaching || enableCachingOverride);
this.enableCaching);
result.add(indicator);
} catch (final Exception e) {
log.warn("No Indicator with type: {} found as registered bean. Ignore this one.", indicatorDef.type,
log.warn("No Indicator with type: {} found as registered bean. Ignore this one.",
indicatorDef.type,
e);
}
}
@ -117,7 +158,7 @@ public class ClientIndicatorFactory {
indicator,
clientConnection.id,
clientConnection.status.clientActiveStatus,
this.enableCaching || enableCachingOverride);
this.enableCaching);
result.add(pingIndicator);
}

View file

@ -148,9 +148,7 @@ public class ExamSessionCacheService {
if (clientConnection == null) {
return null;
} else {
return this.internalClientConnectionDataFactory.createClientConnectionData(
clientConnection,
this.getRunningExam(clientConnection.examId) != null);
return this.internalClientConnectionDataFactory.createClientConnectionData(clientConnection);
}
}

View file

@ -281,8 +281,8 @@ public class ExamSessionServiceImpl implements ExamSessionService {
throw new IllegalStateException("Missing exam identifier or requested exam is not running");
}
if (log.isDebugEnabled()) {
log.debug("Trying to get exam from InMemorySEBConfig");
if (log.isTraceEnabled()) {
log.trace("Trying to get exam from InMemorySEBConfig");
}
final InMemorySEBConfig sebConfigForExam = this.examSessionCacheService
@ -295,14 +295,14 @@ public class ExamSessionServiceImpl implements ExamSessionService {
try {
if (log.isDebugEnabled()) {
log.debug("SEB exam configuration download request, start writing SEB exam configuration");
if (log.isTraceEnabled()) {
log.trace("SEB exam configuration download request, start writing SEB exam configuration");
}
out.write(sebConfigForExam.getData());
if (log.isDebugEnabled()) {
log.debug("SEB exam configuration download request, finished writing SEB exam configuration");
if (log.isTraceEnabled()) {
log.trace("SEB exam configuration download request, finished writing SEB exam configuration");
}
} catch (final IOException e) {

View file

@ -32,16 +32,7 @@ public class InternalClientConnectionDataFactory {
this.sebClientNotificationService = sebClientNotificationService;
}
public ClientConnectionDataInternal createClientConnectionData(
final ClientConnection clientConnection,
final boolean examRunning) {
if (!examRunning) {
return new ClientConnectionDataInternal(
clientConnection,
() -> false,
this.clientIndicatorFactory.createFor(clientConnection, true));
}
public ClientConnectionDataInternal createClientConnectionData(final ClientConnection clientConnection) {
if (clientConnection.status == ConnectionStatus.CLOSED
|| clientConnection.status == ConnectionStatus.DISABLED) {

View file

@ -74,8 +74,8 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
private final SEBClientConfigDAO sebClientConfigDAO;
private final SEBClientInstructionService sebInstructionService;
private final ExamAdminService examAdminService;
// TODO get rid of this dependency and use application events for signaling client connection state changes
private final DistributedIndicatorValueService distributedPingCache;
private final ClientIndicatorFactory clientIndicatorFactory;
private final boolean isDistributedSetup;
protected SEBClientConnectionServiceImpl(
@ -84,7 +84,8 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
final SEBClientConfigDAO sebClientConfigDAO,
final SEBClientInstructionService sebInstructionService,
final ExamAdminService examAdminService,
final DistributedIndicatorValueService distributedPingCache) {
final DistributedIndicatorValueService distributedPingCache,
final ClientIndicatorFactory clientIndicatorFactory) {
this.examSessionService = examSessionService;
this.examSessionCacheService = examSessionService.getExamSessionCacheService();
@ -96,6 +97,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
this.examAdminService = examAdminService;
this.distributedPingCache = distributedPingCache;
this.isDistributedSetup = sebInstructionService.getWebserviceInfo().isDistributed();
this.clientIndicatorFactory = clientIndicatorFactory;
}
@Override
@ -165,6 +167,11 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
null))
.getOrThrow();
// initialize distributed indicator value caches if possible and needed
if (clientConnection.examId != null && this.isDistributedSetup) {
this.clientIndicatorFactory.initializeDistributedCaches(clientConnection);
}
// load client connection data into cache
final ClientConnectionDataInternal activeClientConnection = this.examSessionService
.getConnectionDataInternal(connectionToken);
@ -262,6 +269,11 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
null))
.getOrThrow();
// initialize distributed indicator value caches if possible and needed
if (examId != null && this.isDistributedSetup) {
this.clientIndicatorFactory.initializeDistributedCaches(clientConnection);
}
final ClientConnectionDataInternal activeClientConnection =
reloadConnectionCache(connectionToken);
@ -402,6 +414,11 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
// check exam integrity for established connection
checkExamIntegrity(establishedClientConnection.examId);
// initialize distributed indicator value caches if possible and needed
if (examId != null && this.isDistributedSetup) {
this.clientIndicatorFactory.initializeDistributedCaches(clientConnection);
}
// if proctoring is enabled for exam, mark for room update
if (proctoringEnabled) {
this.clientConnectionDAO.markForProctoringUpdate(updatedClientConnection.id);
@ -869,13 +886,6 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
connection.getIndicatorMapping(EventType.ERROR_LOG)
.forEach(indicator -> indicator.notifyValueChange(clientEventRecord));
}
if (this.isDistributedSetup) {
// mark for update and flush the cache
this.clientConnectionDAO.save(connection.clientConnection);
this.examSessionCacheService.evictClientConnection(
connection.clientConnection.connectionToken);
}
}
};
}

View file

@ -69,37 +69,34 @@ public abstract class AbstractClientIndicator implements ClientIndicator {
this.cachingEnabled = cachingEnabled;
if (!this.cachingEnabled && this.active) {
try {
this.ditributedIndicatorValueRecordId = this.distributedPingCache.initIndicatorForConnection(
connectionId,
getType(),
initValue());
} catch (final Exception e) {
tryRecoverIndicatorRecord();
}
this.ditributedIndicatorValueRecordId = this.distributedPingCache
.getIndicatorForConnection(connectionId, getType());
// if (this.ditributedIndicatorValueRecordId == null) {
// tryRecoverIndicatorRecord();
// }
// try {
// this.ditributedIndicatorValueRecordId = this.distributedPingCache.initIndicatorForConnection(
// connectionId,
// getType(),
// initValue());
// } catch (final Exception e) {
// tryRecoverIndicatorRecord();
// }
}
this.currentValue = computeValueAt(Utils.getMillisecondsNow());
this.initialized = true;
}
protected long initValue() {
return 0;
}
protected void tryRecoverIndicatorRecord() {
this.ditributedIndicatorValueRecordId = this.distributedPingCache.getIndicatorForConnection(
this.connectionId,
getType());
if (log.isWarnEnabled()) {
log.warn("*** Missing indicator value record for connection: {}. Try to recover...", this.connectionId);
}
try {
this.ditributedIndicatorValueRecordId = this.distributedPingCache.initIndicatorForConnection(
if (this.ditributedIndicatorValueRecordId == null) {
log.warn("Failed to recover from missing indicator value cache record: {} type: {}",
this.connectionId,
getType(),
initValue());
} catch (final Exception e) {
log.error("Failed to recover indicator value record for connection: {}", this.connectionId, e);
getType());
}
}

View file

@ -55,8 +55,8 @@ public abstract class AbstractLogLevelCountIndicator extends AbstractLogIndicato
@Override
public double computeValueAt(final long timestamp) {
if (log.isDebugEnabled()) {
log.debug("computeValueAt: {}", timestamp);
if (log.isTraceEnabled()) {
log.trace("computeValueAt: {}", timestamp);
}
try {

View file

@ -73,8 +73,8 @@ public abstract class AbstractLogNumberIndicator extends AbstractLogIndicator {
@Override
public double computeValueAt(final long timestamp) {
if (log.isDebugEnabled()) {
log.debug("computeValueAt: {}", timestamp);
if (log.isTraceEnabled()) {
log.trace("computeValueAt: {}", timestamp);
}
try {

View file

@ -129,100 +129,89 @@ public class DistributedIndicatorValueService implements DisposableBean {
}
}
/** This initializes a SEB client indicator on the persistent storage for a given SEB client
* connection identifier and of given IndicatorType.
* If there is already such an indicator for the specified SEB client connection identifier and type,
* this returns the id of the existing one.
/** This creates a distributed indicator value cache record for a given SEB connection and indicator
* if it not already exists and returns the PK for the specified distributed indicator value cache record
*
* @param connectionId SEB client connection identifier
* @param type indicator type
* @param value the initial indicator value
* @return SEB client indicator value identifier (PK) */
* @param connectionId the client connection identifier
* @param type the indicator type
* @param value the initialization value
* @return the PK of the created or existing distributed indicator value cache record or null when a unexpected
* error happened */
@Transactional
public Long initIndicatorForConnection(
public Long createIndicatorForConnection(
final Long connectionId,
final IndicatorType type,
final Long value) {
final long initValue) {
if (!this.webserviceInfo.isDistributed()) {
log.warn("No distributed setup, skip createIndicatorForConnection");
return null;
}
try {
if (log.isDebugEnabled()) {
log.trace("*** Initialize indicator value record for SEB connection: {}", connectionId);
// first check if the record already exists
final Long recId = this.clientIndicatorValueMapper.indicatorRecordIdByConnectionId(
connectionId,
type);
if (recId != null) {
log.debug("Distributed indicator value cache already exists for: {}, {}", connectionId, type);
return recId;
}
synchronized (this) {
// if not, create new one and return PK
final ClientIndicatorRecord clientEventRecord = new ClientIndicatorRecord(
null, connectionId, type.id, initValue);
this.clientIndicatorRecordMapper.insert(clientEventRecord);
Long recordId = null;
try {
// This also double-check by trying again. If we have more then one entry here
// this will throw an exception that causes a rollback
return this.clientIndicatorValueMapper
.indicatorRecordIdByConnectionId(connectionId, type);
try {
recordId = this.clientIndicatorValueMapper
.indicatorRecordIdByConnectionId(connectionId, type);
} catch (final Exception e) {
// There is already more then one indicator record entry!!!
// delete the second one and work on with the first one
} catch (final Exception e) {
log.warn("Duplicate indicator entry detected for connectionId: {}, type: {} --> try to recover",
connectionId, type);
try {
final List<ClientIndicatorRecord> records = this.clientIndicatorRecordMapper.selectByExample()
.where(ClientIndicatorRecordDynamicSqlSupport.clientConnectionId,
isEqualTo(connectionId))
.and(ClientIndicatorRecordDynamicSqlSupport.type, isEqualTo(type.id))
.build()
.execute();
if (records.size() > 1) {
this.clientIndicatorRecordMapper.deleteByPrimaryKey(records.get(1).getId());
}
return records.get(0).getId();
} catch (final Exception ee) {
log.error("Failed to recover from duplicate indicator entry: ", ee);
return null;
}
}
if (recordId == null) {
if (!this.webserviceInfo.isMaster()) {
if (log.isDebugEnabled()) {
log.debug("Skip indicator record init because this is no master instance");
}
return null;
}
final ClientIndicatorRecord clientEventRecord = new ClientIndicatorRecord(
null, connectionId, type.id, value);
this.clientIndicatorRecordMapper.insert(clientEventRecord);
try {
// This also double-check by trying again. If we have more then one entry here
// this will throw an exception that causes a rollback
return this.clientIndicatorValueMapper
.indicatorRecordIdByConnectionId(connectionId, type);
} catch (final Exception e) {
log.warn(
"Detected multiple client indicator entries for connection: {} and type: {}. Force rollback to prevent",
connectionId, type);
// force rollback
TransactionInterceptor.currentTransactionStatus().setRollbackOnly();
throw new RuntimeException("Detected multiple client indicator value entries");
}
}
return recordId;
log.warn(
"Detected multiple client indicator entries for connection: {} and type: {}. Force rollback to prevent",
connectionId, type);
// force rollback
TransactionInterceptor.currentTransactionStatus().setRollbackOnly();
throw new RuntimeException("Detected multiple client indicator value entries");
}
} catch (final Exception e) {
log.error(
"Failed to initialize distributed indicator value cache in persistent store. connectionId: {} type: {}",
connectionId, type, e);
log.error("Failed to initialize indicator value for connection -> {}", connectionId, e);
return null;
}
}
// force rollback
TransactionInterceptor.currentTransactionStatus().setRollbackOnly();
throw new RuntimeException("Failed to initialize indicator value for connection -> " + connectionId, e);
/** Get the distributed indicator value cache record PK for a given SEB connection and indicator if available.
* If not existing for the specified connection and indicator this return null
*
* @param connectionId the client connection identifier
* @param type the indicator type
* @return the indicator value cache record PK or null of not defined */
@Transactional(readOnly = true)
public Long getIndicatorForConnection(final Long connectionId, final IndicatorType type) {
try {
return this.clientIndicatorValueMapper
.indicatorRecordIdByConnectionId(connectionId, type);
} catch (final Exception e) {
if (log.isDebugEnabled()) {
log.debug("Failed to get indicator PK for connection: {} type: {} cause: {}",
connectionId,
type,
e.getMessage());
}
return null;
}
}
@ -235,7 +224,7 @@ public class DistributedIndicatorValueService implements DisposableBean {
try {
if (log.isDebugEnabled()) {
log.debug("*** Delete indicator value record for SEB connection: {}", connectionId);
log.debug("Delete indicator value record for SEB connection: {}", connectionId);
}
final Collection<ClientIndicatorValueRecord> records = this.clientIndicatorValueMapper

View file

@ -20,7 +20,6 @@ import ch.ethz.seb.sebserver.gbl.Constants;
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator;
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType;
import ch.ethz.seb.sebserver.gbl.model.session.ClientEvent;
import ch.ethz.seb.sebserver.gbl.util.Utils;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientEventRecord;
@Lazy
@ -40,11 +39,6 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator {
this.cachingEnabled = true;
}
@Override
protected long initValue() {
return Utils.getMillisecondsNow();
}
@Override
public void init(
final Indicator indicatorDefinition,