added configurable ping strategies

This commit is contained in:
anhefti 2023-10-31 09:24:07 +01:00
parent 0e1ee3330a
commit 99ea2472f4
10 changed files with 249 additions and 25 deletions

View file

@ -78,7 +78,7 @@ public class AsyncServiceSpringConfig implements AsyncConfigurer {
@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
final ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(5);
threadPoolTaskScheduler.setPoolSize(10);
threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(false);
threadPoolTaskScheduler.setThreadNamePrefix("SEB-Server-BgTask-");
return threadPoolTaskScheduler;

View file

@ -23,6 +23,7 @@ import ch.ethz.seb.sebserver.SEBServerInit;
import ch.ethz.seb.sebserver.SEBServerInitEvent;
import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
import ch.ethz.seb.sebserver.webservice.servicelayer.dao.WebserviceInfoDAO;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.SEBClientPingServiceFactory;
@Component
@WebServiceProfile
@ -38,6 +39,7 @@ public class WebserviceInit implements ApplicationListener<ApplicationReadyEvent
private final WebserviceInfoDAO webserviceInfoDAO;
private final DBIntegrityChecker dbIntegrityChecker;
private final SEBServerMigrationStrategy sebServerMigrationStrategy;
private final SEBClientPingServiceFactory sebClientPingServiceFactory;
protected WebserviceInit(
final SEBServerInit sebServerInit,
@ -47,7 +49,8 @@ public class WebserviceInit implements ApplicationListener<ApplicationReadyEvent
final WebserviceInfoDAO webserviceInfoDAO,
final DBIntegrityChecker dbIntegrityChecker,
final ApplicationContext applicationContext,
final SEBServerMigrationStrategy sebServerMigrationStrategy) {
final SEBServerMigrationStrategy sebServerMigrationStrategy,
final SEBClientPingServiceFactory sebClientPingServiceFactory) {
this.applicationContext = applicationContext;
this.sebServerInit = sebServerInit;
@ -58,6 +61,7 @@ public class WebserviceInit implements ApplicationListener<ApplicationReadyEvent
this.webserviceInfoDAO = webserviceInfoDAO;
this.dbIntegrityChecker = dbIntegrityChecker;
this.sebServerMigrationStrategy = sebServerMigrationStrategy;
this.sebClientPingServiceFactory = sebClientPingServiceFactory;
}
public ApplicationContext getApplicationContext() {
@ -120,6 +124,10 @@ public class WebserviceInit implements ApplicationListener<ApplicationReadyEvent
this.environment.getProperty("sebserver.webservice.distributed.connectionUpdate", "2000"));
}
SEBServerInit.INIT_LOGGER.info("----> ");
SEBServerInit.INIT_LOGGER.info("----> Working with ping service: {}",
this.sebClientPingServiceFactory.getWorkingServiceType());
SEBServerInit.INIT_LOGGER.info("----> ");
SEBServerInit.INIT_LOGGER.info("----> Server address: {}", this.environment.getProperty("server.address"));
SEBServerInit.INIT_LOGGER.info("----> Server port: {}", this.environment.getProperty("server.port"));

View file

@ -530,13 +530,15 @@ public class ClientConnectionDAOImpl implements ClientConnectionDAO {
return Collections.emptyList();
}
return this.clientConnectionRecordMapper
final List<ClientConnectionRecord> execute = this.clientConnectionRecordMapper
.selectByExample()
.where(ClientConnectionRecordDynamicSqlSupport.screenProctoringGroupId, isNull())
.and(ClientConnectionRecordDynamicSqlSupport.examId, isIn(examIds))
.and(ClientConnectionRecordDynamicSqlSupport.status, isEqualTo(ConnectionStatus.ACTIVE.name()))
.build()
.execute();
return execute;
});
}

View file

@ -0,0 +1,22 @@
/*
* Copyright (c) 2023 ETH Zürich, Educational Development and Technology (LET)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package ch.ethz.seb.sebserver.webservice.servicelayer.session;
public interface SEBClientPingService {
static enum PingServiceType {
BLOCKING,
BATCH
}
PingServiceType pingServiceType();
String notifyPing(String connectionToken, String instructionConfirm);
}

View file

@ -533,7 +533,7 @@ public class ExamSessionServiceImpl implements ExamSessionService {
public Result<Exam> updateExamCache(final Long examId) {
// TODO check how often this is called in distributed environments
System.out.println("************** performance check: updateExamCache");
//System.out.println("************** performance check: updateExamCache");
try {
final Cache cache = this.cacheManager.getCache(ExamSessionCacheService.CACHE_NAME_RUNNING_EXAM);

View file

@ -11,41 +11,67 @@ package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.PreDestroy;
import org.apache.commons.lang3.StringUtils;
import org.ehcache.impl.internal.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;
import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
import ch.ethz.seb.sebserver.gbl.util.Utils;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientInstructionService;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientPingService;
@Lazy
@Component
@WebServiceProfile
public class SEBClientPingBatchService {
public class SEBClientPingBatchService implements SEBClientPingService {
private static final Logger log = LoggerFactory.getLogger(SEBClientPingBatchService.class);
private final ExamSessionCacheService examSessionCacheService;
private final SEBClientInstructionService sebClientInstructionService;
private final ThreadPoolTaskScheduler threadPoolTaskScheduler;
private final long schendulerInterval;
private final Map<String, String> pings = new ConcurrentHashMap<>();
private final Map<String, String> instructions = new ConcurrentHashMap<>();
private ScheduledFuture<?> scheduleAtFixedRate = null;
public SEBClientPingBatchService(
final ExamSessionCacheService examSessionCacheService,
final SEBClientInstructionService sebClientInstructionService) {
final SEBClientInstructionService sebClientInstructionService,
final ThreadPoolTaskScheduler threadPoolTaskScheduler,
@Value("${sebserver.webservice.api.exam.session.ping.batch.interval:500}") final long schendulerInterval) {
this.examSessionCacheService = examSessionCacheService;
this.sebClientInstructionService = sebClientInstructionService;
this.threadPoolTaskScheduler = threadPoolTaskScheduler;
this.schendulerInterval = schendulerInterval;
}
@Scheduled(fixedDelayString = "${sebserver.webservice.api.exam.session.ping.batch.interval:500}")
void init() {
if (this.scheduleAtFixedRate == null) {
log.info(
"Initialize SEBClientPingBatchService for schedule batch update at a rate of {} milliseconds",
this.schendulerInterval);
this.scheduleAtFixedRate = this.threadPoolTaskScheduler.scheduleAtFixedRate(
() -> processPings(),
this.schendulerInterval);
}
}
//@Scheduled(fixedDelayString = "${sebserver.webservice.api.exam.session.ping.batch.interval:500}")
public void processPings() {
if (this.pings.isEmpty()) {
return;
@ -69,17 +95,34 @@ public class SEBClientPingBatchService {
}
}
@Override
public PingServiceType pingServiceType() {
return PingServiceType.BATCH;
}
@Override
public final String notifyPing(
final String connectionToken,
final String instructionConfirm) {
final String instruction = this.instructions.remove(connectionToken);
if (instructionConfirm != null) {
System.out.println("************ put instructionConfirm: " + instructionConfirm + " instructions: "
+ this.instructions);
this.pings.put(connectionToken, instructionConfirm);
// TODO is this a good idea or is there another better way to deal with instruction confirm synchronization?
if (instruction != null && instruction.contains("\"instruction-confirm\":\"" + instructionConfirm + "\"")) {
return null;
}
} else if (!this.pings.containsKey(connectionToken)) {
this.pings.put(connectionToken, StringUtils.EMPTY);
}
return this.instructions.remove(connectionToken);
// System.out.println(
// "**************** notifyPing instructionConfirm: " + instructionConfirm + " pings: " + this.pings);
return instruction;
}
private void processPing(
@ -110,4 +153,11 @@ public class SEBClientPingBatchService {
}
}
@PreDestroy
protected void shutdown() {
if (this.scheduleAtFixedRate != null) {
this.scheduleAtFixedRate.cancel(true);
}
}
}

View file

@ -0,0 +1,67 @@
/*
* Copyright (c) 2023 ETH Zürich, Educational Development and Technology (LET)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
import ch.ethz.seb.sebserver.gbl.util.Utils;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientInstructionService;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientPingService;
@Lazy
@Component
@WebServiceProfile
public class SEBClientPingBlockingService implements SEBClientPingService {
private static final Logger log = LoggerFactory.getLogger(SEBClientPingBlockingService.class);
private final ExamSessionCacheService examSessionCacheService;
private final SEBClientInstructionService sebClientInstructionService;
public SEBClientPingBlockingService(
final ExamSessionCacheService examSessionCacheService,
final SEBClientInstructionService sebClientInstructionService) {
this.examSessionCacheService = examSessionCacheService;
this.sebClientInstructionService = sebClientInstructionService;
}
@Override
public PingServiceType pingServiceType() {
return PingServiceType.BLOCKING;
}
@Override
public String notifyPing(final String connectionToken, final String instructionConfirm) {
if (connectionToken == null) {
return null;
}
final ClientConnectionDataInternal activeClientConnection = this.examSessionCacheService
.getClientConnection(connectionToken);
if (activeClientConnection != null) {
activeClientConnection.notifyPing(Utils.getMillisecondsNow());
} else {
log.error("Failed to get ClientConnectionDataInternal for: {}", connectionToken);
}
if (instructionConfirm != StringUtils.EMPTY) {
this.sebClientInstructionService.confirmInstructionDone(connectionToken, instructionConfirm);
}
return this.sebClientInstructionService.getInstructionJSON(connectionToken);
}
}

View file

@ -0,0 +1,73 @@
/*
* Copyright (c) 2023 ETH Zürich, Educational Development and Technology (LET)
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl;
import java.util.Collection;
import java.util.EnumMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientPingService;
@Lazy
@Component
@WebServiceProfile
public class SEBClientPingServiceFactory {
private static final Logger log = LoggerFactory.getLogger(SEBClientPingServiceFactory.class);
private final EnumMap<SEBClientPingService.PingServiceType, SEBClientPingService> serviceMapping =
new EnumMap<>(SEBClientPingService.PingServiceType.class);
private final SEBClientPingService.PingServiceType workingServiceType;
public SEBClientPingServiceFactory(
final Collection<SEBClientPingService> serviceBeans,
@Value("${sebserver.webservice.api.exam.session.ping.service.type:BLOCKING}") final String serviceType) {
SEBClientPingService.PingServiceType serviceTypeToSet = SEBClientPingService.PingServiceType.BLOCKING;
try {
serviceTypeToSet = SEBClientPingService.PingServiceType.valueOf(serviceType);
} catch (final Exception e) {
serviceTypeToSet = SEBClientPingService.PingServiceType.BLOCKING;
}
this.workingServiceType = serviceTypeToSet;
serviceBeans.stream().forEach(service -> this.serviceMapping.putIfAbsent(service.pingServiceType(), service));
}
public SEBClientPingService.PingServiceType getWorkingServiceType() {
return this.workingServiceType;
}
public SEBClientPingService getSEBClientPingService() {
log.info("Work with SEBClientPingService of type: {}", this.workingServiceType);
switch (this.workingServiceType) {
case BATCH: {
final SEBClientPingService service =
this.serviceMapping.get(SEBClientPingService.PingServiceType.BATCH);
if (service != null) {
((SEBClientPingBatchService) service).init();
return service;
} else {
return this.serviceMapping.get(SEBClientPingService.PingServiceType.BLOCKING);
}
}
default:
return this.serviceMapping.get(SEBClientPingService.PingServiceType.BLOCKING);
}
}
}

View file

@ -29,6 +29,7 @@ import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ClientConnectionDAO;
import ch.ethz.seb.sebserver.webservice.servicelayer.institution.SecurityKeyService;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.ExamSessionService;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientInstructionService;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientPingService;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientSessionService;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.SEBClientVersionService;
import ch.ethz.seb.sebserver.webservice.servicelayer.session.impl.SEBClientEventBatchService.EventData;
@ -48,7 +49,7 @@ public class SEBClientSessionServiceImpl implements SEBClientSessionService {
private final InternalClientConnectionDataFactory internalClientConnectionDataFactory;
private final SecurityKeyService securityKeyService;
private final SEBClientVersionService sebClientVersionService;
private final SEBClientPingBatchService sebClientPingService;
private final SEBClientPingService sebClientPingService;
public SEBClientSessionServiceImpl(
final ClientConnectionDAO clientConnectionDAO,
@ -59,7 +60,7 @@ public class SEBClientSessionServiceImpl implements SEBClientSessionService {
final InternalClientConnectionDataFactory internalClientConnectionDataFactory,
final SecurityKeyService securityKeyService,
final SEBClientVersionService sebClientVersionService,
final SEBClientPingBatchService sebClientPingService) {
final SEBClientPingServiceFactory sebClientPingServiceFactory) {
this.clientConnectionDAO = clientConnectionDAO;
this.examSessionService = examSessionService;
@ -69,7 +70,7 @@ public class SEBClientSessionServiceImpl implements SEBClientSessionService {
this.internalClientConnectionDataFactory = internalClientConnectionDataFactory;
this.securityKeyService = securityKeyService;
this.sebClientVersionService = sebClientVersionService;
this.sebClientPingService = sebClientPingService;
this.sebClientPingService = sebClientPingServiceFactory.getSEBClientPingService();
}
@Override

View file

@ -319,14 +319,14 @@ public class ScreenProctoringServiceImpl implements ScreenProctoringService {
} catch (final Exception e) {
log.error("Failed to apply screen proctoring session to SEB with connection: ", ccRecord, e);
// if (placeReservedInGroup != null) {
// // release reserved place in group
// this.screenProctoringGroupDAO.releasePlaceInCollectingGroup(
// ccRecord.getExamId(),
// placeReservedInGroup)
// .onError(
// error -> log.warn("Failed to release reserved place in group: {}", error.getMessage()));
// }
if (placeReservedInGroup != null) {
// release reserved place in group
this.screenProctoringGroupDAO.releasePlaceInCollectingGroup(
ccRecord.getExamId(),
placeReservedInGroup)
.onError(
error -> log.warn("Failed to release reserved place in group: {}", error.getMessage()));
}
}
}
@ -336,7 +336,7 @@ public class ScreenProctoringServiceImpl implements ScreenProctoringService {
if (!exam.additionalAttributes.containsKey(ScreenProctoringSettings.ATTR_COLLECTING_STRATEGY)) {
log.warn("Can't verify collecting strategy for exam: {} use default group assignment.", exam.id);
return applyToDefaultGroup(ccRecord, exam);
return applyToDefaultGroup(ccRecord.getId(), ccRecord.getConnectionToken(), exam);
}
final CollectingStrategy strategy = CollectingStrategy.valueOf(exam.additionalAttributes
@ -350,20 +350,21 @@ public class ScreenProctoringServiceImpl implements ScreenProctoringService {
case EXAM:
case FIX_SIZE:
default: {
return applyToDefaultGroup(ccRecord, exam);
return applyToDefaultGroup(ccRecord.getId(), ccRecord.getConnectionToken(), exam);
}
}
}
private ScreenProctoringGroup applyToDefaultGroup(
final ClientConnectionRecord ccRecord,
final Long connectioId,
final String connectionToken,
final Exam exam) {
final ScreenProctoringGroup screenProctoringGroup = reservePlaceOnProctoringGroup(exam);
this.clientConnectionDAO.assignToScreenProctoringGroup(
exam.id,
ccRecord.getConnectionToken(),
connectioId,
connectionToken,
screenProctoringGroup.id)
.getOrThrow();