fixed distributed setup with distributed indicator value init
This commit is contained in:
parent
11f879ba2c
commit
c3a4e78218
7 changed files with 145 additions and 55 deletions
|
@ -26,6 +26,7 @@ import org.springframework.web.util.UriComponentsBuilder;
|
|||
|
||||
import ch.ethz.seb.sebserver.gbl.Constants;
|
||||
import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.WebserviceInfoDAO;
|
||||
|
||||
@Lazy
|
||||
@Service
|
||||
|
@ -64,7 +65,14 @@ public class WebserviceInfo {
|
|||
|
||||
private Map<String, String> lmsExternalAddressAlias;
|
||||
|
||||
public WebserviceInfo(final Environment environment) {
|
||||
private final WebserviceInfoDAO webserviceInfoDAO;
|
||||
private boolean isMaster = false;
|
||||
|
||||
public WebserviceInfo(
|
||||
final WebserviceInfoDAO webserviceInfoDAO,
|
||||
final Environment environment) {
|
||||
|
||||
this.webserviceInfoDAO = webserviceInfoDAO;
|
||||
this.webserviceUUID = UUID.randomUUID().toString();
|
||||
this.sebServerVersion = environment.getRequiredProperty(VERSION_KEY);
|
||||
this.testProperty = environment.getProperty(WEB_SERVICE_TEST_PROPERTY, "NOT_AVAILABLE");
|
||||
|
@ -123,6 +131,14 @@ public class WebserviceInfo {
|
|||
}
|
||||
}
|
||||
|
||||
public boolean isMaster() {
|
||||
return this.isMaster;
|
||||
}
|
||||
|
||||
public void updateMaster() {
|
||||
this.isMaster = this.webserviceInfoDAO.isMaster(this.getWebserviceUUID());
|
||||
}
|
||||
|
||||
public String getWebserviceUUID() {
|
||||
return this.webserviceUUID;
|
||||
}
|
||||
|
|
|
@ -79,17 +79,6 @@ public interface ClientIndicatorValueMapper {
|
|||
.execute();
|
||||
}
|
||||
|
||||
default Long pingRecordIdByConnectionId(final Long connectionId) {
|
||||
return SelectDSL.selectDistinctWithMapper(
|
||||
this::selectPK,
|
||||
id.as("id"))
|
||||
.from(clientIndicatorRecord)
|
||||
.where(clientConnectionId, isEqualTo(connectionId))
|
||||
.and(type, isEqualTo(IndicatorType.LAST_PING.id))
|
||||
.build()
|
||||
.execute();
|
||||
}
|
||||
|
||||
default QueryExpressionDSL<MyBatis3SelectModelAdapter<Collection<ClientIndicatorValueRecord>>> selectByExample() {
|
||||
|
||||
return SelectDSL.selectWithMapper(
|
||||
|
|
|
@ -137,6 +137,7 @@ public class ClientEventDAOImpl implements ClientEventDAO {
|
|||
}
|
||||
|
||||
@Override
|
||||
@Transactional(readOnly = true)
|
||||
public Result<Collection<ExtendedClientEvent>> allMatchingExtended(
|
||||
final FilterMap filterMap,
|
||||
final Predicate<ExtendedClientEvent> predicate) {
|
||||
|
|
|
@ -28,13 +28,12 @@ import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
|
|||
import ch.ethz.seb.sebserver.gbl.util.Result;
|
||||
import ch.ethz.seb.sebserver.webservice.WebserviceInfo;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ExamDAO;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.WebserviceInfoDAO;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.ExamProctoringRoomService;
|
||||
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientConnectionService;
|
||||
|
||||
@Service
|
||||
@WebServiceProfile
|
||||
class ExamSessionControlTask implements DisposableBean {
|
||||
public class ExamSessionControlTask implements DisposableBean {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(ExamSessionControlTask.class);
|
||||
|
||||
|
@ -43,7 +42,6 @@ class ExamSessionControlTask implements DisposableBean {
|
|||
private final ExamUpdateHandler examUpdateHandler;
|
||||
private final ExamProctoringRoomService examProcotringRoomService;
|
||||
private final WebserviceInfo webserviceInfo;
|
||||
private final WebserviceInfoDAO webserviceInfoDAO;
|
||||
|
||||
private final Long examTimePrefix;
|
||||
private final Long examTimeSuffix;
|
||||
|
@ -56,7 +54,6 @@ class ExamSessionControlTask implements DisposableBean {
|
|||
final ExamUpdateHandler examUpdateHandler,
|
||||
final ExamProctoringRoomService examProcotringRoomService,
|
||||
final WebserviceInfo webserviceInfo,
|
||||
final WebserviceInfoDAO webserviceInfoDAO,
|
||||
@Value("${sebserver.webservice.api.exam.time-prefix:3600000}") final Long examTimePrefix,
|
||||
@Value("${sebserver.webservice.api.exam.time-suffix:3600000}") final Long examTimeSuffix,
|
||||
@Value("${sebserver.webservice.api.exam.update-interval:1 * * * * *}") final String examTaskCron,
|
||||
|
@ -66,7 +63,6 @@ class ExamSessionControlTask implements DisposableBean {
|
|||
this.sebClientConnectionService = sebClientConnectionService;
|
||||
this.examUpdateHandler = examUpdateHandler;
|
||||
this.webserviceInfo = webserviceInfo;
|
||||
this.webserviceInfoDAO = webserviceInfoDAO;
|
||||
this.examTimePrefix = examTimePrefix;
|
||||
this.examTimeSuffix = examTimeSuffix;
|
||||
this.examTaskCron = examTaskCron;
|
||||
|
@ -84,7 +80,7 @@ class ExamSessionControlTask implements DisposableBean {
|
|||
this.examTimePrefix,
|
||||
this.examTimeSuffix);
|
||||
|
||||
this.webserviceInfoDAO.isMaster(this.webserviceInfo.getWebserviceUUID());
|
||||
this.updateMaster();
|
||||
|
||||
SEBServerInit.INIT_LOGGER.info("------>");
|
||||
SEBServerInit.INIT_LOGGER.info(
|
||||
|
@ -97,7 +93,7 @@ class ExamSessionControlTask implements DisposableBean {
|
|||
initialDelay = 10000)
|
||||
public void examRunUpdateTask() {
|
||||
|
||||
if (!this.webserviceInfoDAO.isMaster(this.webserviceInfo.getWebserviceUUID())) {
|
||||
if (!this.webserviceInfo.isMaster()) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -117,9 +113,11 @@ class ExamSessionControlTask implements DisposableBean {
|
|||
initialDelay = 5000)
|
||||
public void examSessionUpdateTask() {
|
||||
|
||||
updateMaster();
|
||||
|
||||
this.sebClientConnectionService.updatePingEvents();
|
||||
|
||||
if (!this.webserviceInfoDAO.isMaster(this.webserviceInfo.getWebserviceUUID())) {
|
||||
if (!this.webserviceInfo.isMaster()) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -136,7 +134,7 @@ class ExamSessionControlTask implements DisposableBean {
|
|||
initialDelay = 30000)
|
||||
public void examSessionCleanupTask() {
|
||||
|
||||
if (!this.webserviceInfoDAO.isMaster(this.webserviceInfo.getWebserviceUUID())) {
|
||||
if (!this.webserviceInfo.isMaster()) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -198,6 +196,10 @@ class ExamSessionControlTask implements DisposableBean {
|
|||
}
|
||||
}
|
||||
|
||||
private void updateMaster() {
|
||||
this.webserviceInfo.updateMaster();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
// TODO try to reset master
|
||||
|
|
|
@ -36,6 +36,8 @@ public abstract class AbstractClientIndicator implements ClientIndicator {
|
|||
|
||||
protected double incidentThreshold = 0.0;
|
||||
|
||||
protected long lastUpdate = 0;
|
||||
|
||||
public AbstractClientIndicator(final DistributedIndicatorValueService distributedPingCache) {
|
||||
super();
|
||||
this.distributedPingCache = distributedPingCache;
|
||||
|
@ -122,6 +124,22 @@ public abstract class AbstractClientIndicator implements ClientIndicator {
|
|||
|
||||
@Override
|
||||
public double getValue() {
|
||||
|
||||
if (this.initialized && !this.cachingEnabled && this.active
|
||||
&& this.lastUpdate != this.distributedPingCache.lastUpdate()) {
|
||||
|
||||
if (this.ditributedIndicatorValueRecordId == null) {
|
||||
this.tryRecoverIndicatorRecord();
|
||||
}
|
||||
|
||||
final Long indicatorValue = this.distributedPingCache
|
||||
.getIndicatorValue(this.ditributedIndicatorValueRecordId);
|
||||
if (indicatorValue != null) {
|
||||
this.currentValue = indicatorValue.doubleValue();
|
||||
}
|
||||
this.lastUpdate = this.distributedPingCache.lastUpdate();
|
||||
}
|
||||
|
||||
return this.currentValue;
|
||||
}
|
||||
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.springframework.context.event.EventListener;
|
|||
import org.springframework.scheduling.TaskScheduler;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.transaction.interceptor.TransactionInterceptor;
|
||||
|
||||
import ch.ethz.seb.sebserver.SEBServerInit;
|
||||
import ch.ethz.seb.sebserver.SEBServerInitEvent;
|
||||
|
@ -65,6 +66,8 @@ public class DistributedIndicatorValueService implements DisposableBean {
|
|||
private final Executor indicatorValueUpdateExecutor;
|
||||
private final ClientIndicatorRecordMapper clientIndicatorRecordMapper;
|
||||
private final ClientIndicatorValueMapper clientIndicatorValueMapper;
|
||||
private final WebserviceInfo webserviceInfo;
|
||||
long distributedUpdateInterval = 2000;
|
||||
private long updateTolerance;
|
||||
|
||||
private ScheduledFuture<?> taskRef;
|
||||
|
@ -74,11 +77,17 @@ public class DistributedIndicatorValueService implements DisposableBean {
|
|||
public DistributedIndicatorValueService(
|
||||
@Qualifier(AsyncServiceSpringConfig.EXAM_API_PING_SERVICE_EXECUTOR_BEAN_NAME) final Executor pingUpdateExecutor,
|
||||
final ClientIndicatorRecordMapper clientIndicatorRecordMapper,
|
||||
final ClientIndicatorValueMapper clientIndicatorValueMapper) {
|
||||
final ClientIndicatorValueMapper clientIndicatorValueMapper,
|
||||
final WebserviceInfo webserviceInfo) {
|
||||
|
||||
this.indicatorValueUpdateExecutor = pingUpdateExecutor;
|
||||
this.clientIndicatorRecordMapper = clientIndicatorRecordMapper;
|
||||
this.clientIndicatorValueMapper = clientIndicatorValueMapper;
|
||||
this.webserviceInfo = webserviceInfo;
|
||||
}
|
||||
|
||||
long lastUpdate() {
|
||||
return this.lastUpdate;
|
||||
}
|
||||
|
||||
/** Initializes the service by attaching it to the scheduler for periodical update.
|
||||
|
@ -95,18 +104,18 @@ public class DistributedIndicatorValueService implements DisposableBean {
|
|||
SEBServerInit.INIT_LOGGER.info("------> Activate distributed indicator value service:");
|
||||
|
||||
final TaskScheduler taskScheduler = applicationContext.getBean(TaskScheduler.class);
|
||||
final long distributedUpdateInterval = webserviceInfo.getDistributedUpdateInterval();
|
||||
this.updateTolerance = distributedUpdateInterval * 2 / 3;
|
||||
this.distributedUpdateInterval = webserviceInfo.getDistributedUpdateInterval();
|
||||
this.updateTolerance = this.distributedUpdateInterval * 2 / 3;
|
||||
|
||||
SEBServerInit.INIT_LOGGER.info("------> with distributedUpdateInterval: {}",
|
||||
distributedUpdateInterval);
|
||||
this.distributedUpdateInterval);
|
||||
SEBServerInit.INIT_LOGGER.info("------> with taskScheduler: {}", taskScheduler);
|
||||
|
||||
try {
|
||||
|
||||
this.taskRef = taskScheduler.scheduleAtFixedRate(
|
||||
this::updateIndicatorValueCache,
|
||||
distributedUpdateInterval);
|
||||
this.distributedUpdateInterval);
|
||||
|
||||
SEBServerInit.INIT_LOGGER.info("------> distributed indicator value service successfully initialized!");
|
||||
|
||||
|
@ -141,38 +150,78 @@ public class DistributedIndicatorValueService implements DisposableBean {
|
|||
log.trace("*** Initialize indicator value record for SEB connection: {}", connectionId);
|
||||
}
|
||||
|
||||
final Long recordId = this.clientIndicatorValueMapper
|
||||
.indicatorRecordIdByConnectionId(connectionId, type);
|
||||
synchronized (this) {
|
||||
|
||||
if (recordId == null) {
|
||||
final ClientIndicatorRecord clientEventRecord = new ClientIndicatorRecord(
|
||||
null, connectionId, type.id, value);
|
||||
|
||||
this.clientIndicatorRecordMapper.insert(clientEventRecord);
|
||||
Long recordId = null;
|
||||
|
||||
try {
|
||||
// This also double-check by trying again. If we have more then one entry here
|
||||
// this will throw an exception that causes a rollback
|
||||
return this.clientIndicatorValueMapper
|
||||
recordId = this.clientIndicatorValueMapper
|
||||
.indicatorRecordIdByConnectionId(connectionId, type);
|
||||
|
||||
} catch (final Exception e) {
|
||||
// There is already more then one indicator record entry!!!
|
||||
// delete the second one and work on with the first one
|
||||
|
||||
log.warn(
|
||||
"Detected multiple client indicator entries for connection: {} and type: {}. Force rollback to prevent",
|
||||
log.warn("Duplicate indicator entry detected for connectionId: {}, type: {} --> try to recover",
|
||||
connectionId, type);
|
||||
|
||||
// force rollback
|
||||
throw new RuntimeException("Detected multiple client indicator value entries");
|
||||
}
|
||||
}
|
||||
try {
|
||||
final List<ClientIndicatorRecord> records = this.clientIndicatorRecordMapper.selectByExample()
|
||||
.where(ClientIndicatorRecordDynamicSqlSupport.clientConnectionId,
|
||||
isEqualTo(connectionId))
|
||||
.and(ClientIndicatorRecordDynamicSqlSupport.type, isEqualTo(type.id))
|
||||
.build()
|
||||
.execute();
|
||||
if (records.size() > 1) {
|
||||
this.clientIndicatorRecordMapper.deleteByPrimaryKey(records.get(1).getId());
|
||||
}
|
||||
|
||||
return recordId;
|
||||
return records.get(0).getId();
|
||||
} catch (final Exception ee) {
|
||||
log.error("Failed to recover from duplicate indicator entry: ", ee);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
if (recordId == null) {
|
||||
if (!this.webserviceInfo.isMaster()) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Skip indicator record init because this is no master instance");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
final ClientIndicatorRecord clientEventRecord = new ClientIndicatorRecord(
|
||||
null, connectionId, type.id, value);
|
||||
|
||||
this.clientIndicatorRecordMapper.insert(clientEventRecord);
|
||||
|
||||
try {
|
||||
// This also double-check by trying again. If we have more then one entry here
|
||||
// this will throw an exception that causes a rollback
|
||||
return this.clientIndicatorValueMapper
|
||||
.indicatorRecordIdByConnectionId(connectionId, type);
|
||||
|
||||
} catch (final Exception e) {
|
||||
|
||||
log.warn(
|
||||
"Detected multiple client indicator entries for connection: {} and type: {}. Force rollback to prevent",
|
||||
connectionId, type);
|
||||
|
||||
// force rollback
|
||||
TransactionInterceptor.currentTransactionStatus().setRollbackOnly();
|
||||
throw new RuntimeException("Detected multiple client indicator value entries");
|
||||
}
|
||||
}
|
||||
|
||||
return recordId;
|
||||
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
|
||||
log.error("Failed to initialize indicator value for connection -> {}", connectionId, e);
|
||||
|
||||
// force rollback
|
||||
TransactionInterceptor.currentTransactionStatus().setRollbackOnly();
|
||||
throw new RuntimeException("Failed to initialize indicator value for connection -> " + connectionId, e);
|
||||
}
|
||||
}
|
||||
|
@ -230,23 +279,29 @@ public class DistributedIndicatorValueService implements DisposableBean {
|
|||
* @param indicatorPK The indicator record id (PK).
|
||||
* @return The actual (last) indicator value. */
|
||||
public Long getIndicatorValue(final Long indicatorPK) {
|
||||
try {
|
||||
if (indicatorPK == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Long value = this.indicatorValueCache.get(indicatorPK);
|
||||
if (value == null) {
|
||||
Long value = this.indicatorValueCache.get(indicatorPK);
|
||||
if (value == null) {
|
||||
try {
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("*** Get and cache ping time: {}", indicatorPK);
|
||||
}
|
||||
|
||||
value = this.clientIndicatorValueMapper.selectValueByPrimaryKey(indicatorPK);
|
||||
}
|
||||
if (value != null) {
|
||||
this.indicatorValueCache.put(indicatorPK, value);
|
||||
}
|
||||
|
||||
return value;
|
||||
} catch (final Exception e) {
|
||||
log.error("Error while trying to get last indicator value from storage: {}", e.getMessage());
|
||||
return 0L;
|
||||
} catch (final Exception e) {
|
||||
log.error("Error while trying to get last indicator value from storage: {}", e.getMessage());
|
||||
return -1L;
|
||||
}
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
/** Updates the internal indicator value cache by loading all actual SEB client indicators from persistent storage
|
||||
|
@ -284,6 +339,8 @@ public class DistributedIndicatorValueService implements DisposableBean {
|
|||
this.lastUpdate = millisecondsNow;
|
||||
}
|
||||
|
||||
// System.out.println("*** Update distributed indicator value cache: " + this.indicatorValueCache);
|
||||
|
||||
} catch (final Exception e) {
|
||||
log.error("Error while trying to update distributed indicator value cache: {}", this.indicatorValueCache,
|
||||
e);
|
||||
|
@ -293,7 +350,10 @@ public class DistributedIndicatorValueService implements DisposableBean {
|
|||
}
|
||||
|
||||
/** Update last ping time on persistent storage asynchronously within a defines thread pool with no
|
||||
* waiting queue to skip further ping updates if all update threads are busy **/
|
||||
* waiting queue to skip further ping updates if all update threads are busy
|
||||
*
|
||||
* TODO: we need a better handling strategy here.
|
||||
* Try to apply a batch update and store the pings in a concurrent hash map **/
|
||||
void updatePingAsync(final Long pingRecord) {
|
||||
try {
|
||||
this.indicatorValueUpdateExecutor
|
||||
|
|
|
@ -81,7 +81,12 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator {
|
|||
if (!this.initialized) {
|
||||
return Double.NaN;
|
||||
}
|
||||
return DateTimeUtils.currentTimeMillis() - this.currentValue;
|
||||
final long currentTimeMillis = DateTimeUtils.currentTimeMillis();
|
||||
if (this.initialized && !this.cachingEnabled && this.active
|
||||
&& this.lastUpdate != this.distributedPingCache.lastUpdate()) {
|
||||
this.currentValue = computeValueAt(currentTimeMillis);
|
||||
}
|
||||
return currentTimeMillis - this.currentValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -96,7 +101,6 @@ public final class PingIntervalClientIndicator extends AbstractPingIndicator {
|
|||
|
||||
@Override
|
||||
public final double computeValueAt(final long timestamp) {
|
||||
|
||||
if (!this.cachingEnabled && super.ditributedIndicatorValueRecordId != null) {
|
||||
|
||||
final Long lastPing = this.distributedPingCache
|
||||
|
|
Loading…
Reference in a new issue