client instruction service improvements

This commit is contained in:
anhefti 2021-05-19 13:19:16 +02:00
parent 39e8846f64
commit 51078d11bb
9 changed files with 184 additions and 28 deletions

View file

@ -59,6 +59,14 @@ public interface ClientConnectionDAO extends
* @return Result refer to the collection of connection tokens or to an error when happened */
Result<Collection<String>> getActiveConnctionTokens(Long examId);
/** Get all inactive connection tokens from the set of given tokens.
* This is usually used for cleanup purposes to filter a bunch of connection tokens
* by activity. Inactive connections are in state CLOSED or DISABLED
*
* @param connectionTokens The set of connection tokens to filter
* @return Result refer to all inactive connection tokens from the given set */
Result<Collection<String>> getInactiveConnctionTokens(Set<String> connectionTokens);
/** Get a collection of all client connections records that needs a room update
* and that are in the status ACTIVE.
* This also flags the involved connections for no update needed within the

View file

@ -10,6 +10,7 @@ package ch.ethz.seb.sebserver.webservice.servicelayer.dao;
import java.util.Collection;
import ch.ethz.seb.sebserver.gbl.model.EntityKey;
import ch.ethz.seb.sebserver.gbl.model.session.ClientInstruction.InstructionType;
import ch.ethz.seb.sebserver.gbl.util.Result;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientInstructionRecord;
@ -44,6 +45,14 @@ public interface ClientInstructionDAO {
* @return Collection of all active instructions for specified connection token */
Result<Collection<ClientInstructionRecord>> getAllActive(String connectionToken);
/** Deletes all old instructions form the persistent storage to clean-up.
* Old in this case means the timestamp is older then one minute or a configured time interval
*
* @param timestamp the time-stamp (milliseconds) of the time in the past from that earlier instructions are
* considered inactive
* @return Result collection of keys of deleted entities or refer to an error when happened */
Result<Collection<EntityKey>> deleteAllInactive(long timestamp);
/** Deletes the specified instruction form the data base
*
* @param id the identifier (PK) if the ClientInstruction to delete

View file

@ -138,7 +138,7 @@ public class ClientConnectionDAOImpl implements ClientConnectionDAO {
return Result.ofRuntimeError("Null or empty set reference");
}
return Result.tryCatch(() -> this.clientConnectionRecordMapper.selectByExample()
.where(ClientConnectionRecordDynamicSqlSupport.id, isIn(new ArrayList<>(pks)))
.where(ClientConnectionRecordDynamicSqlSupport.id, SqlBuilder.isIn(new ArrayList<>(pks)))
.build()
.execute()
.stream()
@ -220,11 +220,38 @@ public class ClientConnectionDAOImpl implements ClientConnectionDAO {
return updateRecord;
}
@Override
@Transactional(readOnly = true)
public Result<Collection<String>> getInactiveConnctionTokens(final Set<String> connectionTokens) {
return Result.tryCatch(() -> {
if (connectionTokens == null || connectionTokens.isEmpty()) {
return Collections.emptyList();
}
return this.clientConnectionRecordMapper
.selectByExample()
.where(
ClientConnectionRecordDynamicSqlSupport.connectionToken,
SqlBuilder.isIn(new ArrayList<>(connectionTokens)))
.and(ClientConnectionRecordDynamicSqlSupport.status, isNotEqualTo(ConnectionStatus.ACTIVE.name()))
.and(
ClientConnectionRecordDynamicSqlSupport.status,
isNotEqualTo(ConnectionStatus.AUTHENTICATED.name()))
.and(ClientConnectionRecordDynamicSqlSupport.status,
isNotEqualTo(ConnectionStatus.CONNECTION_REQUESTED.name()))
.build()
.execute()
.stream()
.map(r -> r.getConnectionToken())
.collect(Collectors.toList());
});
}
@Override
@Transactional
public Result<Collection<ClientConnectionRecord>> getAllConnectionIdsForRoomUpdateInactive() {
return Result.tryCatch(() -> {
final Collection<ClientConnectionRecord> records = this.clientConnectionRecordMapper.selectByExample()
final Collection<ClientConnectionRecord> records = this.clientConnectionRecordMapper
.selectByExample()
.where(ClientConnectionRecordDynamicSqlSupport.remoteProctoringRoomUpdate, isNotEqualTo(0))
.and(ClientConnectionRecordDynamicSqlSupport.status, isNotEqualTo(ConnectionStatus.ACTIVE.name()))
.build()

View file

@ -9,7 +9,10 @@
package ch.ethz.seb.sebserver.webservice.servicelayer.dao.impl;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@ -23,7 +26,9 @@ import org.springframework.transaction.annotation.Transactional;
import com.fasterxml.jackson.core.type.TypeReference;
import ch.ethz.seb.sebserver.gbl.api.API;
import ch.ethz.seb.sebserver.gbl.api.EntityType;
import ch.ethz.seb.sebserver.gbl.api.JSONMapper;
import ch.ethz.seb.sebserver.gbl.model.EntityKey;
import ch.ethz.seb.sebserver.gbl.model.session.ClientInstruction.InstructionType;
import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
import ch.ethz.seb.sebserver.gbl.util.Result;
@ -65,6 +70,7 @@ public class ClientInstructionDAOImpl implements ClientInstructionDAO {
}
@Override
@Transactional(readOnly = true)
public Result<Collection<ClientInstructionRecord>> getAllActive(final String connectionToken) {
return Result.tryCatch(() -> {
final long millisNowMinusOneMinute = DateTime.now(DateTimeZone.UTC).minusMinutes(1).getMillis();
@ -80,6 +86,37 @@ public class ClientInstructionDAOImpl implements ClientInstructionDAO {
});
}
@Override
@Transactional
public Result<Collection<EntityKey>> deleteAllInactive(final long timestamp) {
return Result.tryCatch(() -> {
final List<ClientInstructionRecord> inactive = this.clientInstructionRecordMapper
.selectByExample()
.where(ClientInstructionRecordDynamicSqlSupport.timestamp,
SqlBuilder.isLessThanOrEqualTo(timestamp))
.build()
.execute();
if (inactive != null && !inactive.isEmpty()) {
this.clientInstructionRecordMapper
.deleteByExample()
.where(ClientInstructionRecordDynamicSqlSupport.timestamp,
SqlBuilder.isLessThanOrEqualTo(timestamp))
.build()
.execute();
return inactive.stream()
.map(r -> new EntityKey(r.getId(), EntityType.CLIENT_INSTRUCTION))
.collect(Collectors.toList());
}
return Collections.emptyList();
});
}
@Override
@Transactional
public Result<Void> delete(final Long id) {

View file

@ -129,6 +129,9 @@ public interface SEBClientConnectionService {
* overflowed ping is back to normal, a ping back to normal event. */
void updatePingEvents();
/** Used to cleanup old instructions from the persistent storage */
void cleanupInstructions();
/** Notify a ping for a certain client connection.
*
* @param connectionToken the connection token

View file

@ -105,4 +105,7 @@ public interface SEBClientInstructionService {
* @param instructionConfirm the instruction confirm identifier */
void confirmInstructionDone(String connectionToken, String instructionConfirm);
/** Used to cleanup out-dated instructions on the persistent storage */
void cleanupInstructions();
}

View file

@ -116,6 +116,7 @@ class ExamSessionControlTask implements DisposableBean {
}
this.sebClientConnectionService.updatePingEvents();
this.sebClientConnectionService.cleanupInstructions();
this.examProcotringRoomService.updateProctoringCollectingRooms();
}

View file

@ -546,6 +546,11 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
}
}
@Override
public void cleanupInstructions() {
this.sebInstructionService.cleanupInstructions();
}
@Override
public String notifyPing(
final String connectionToken,

View file

@ -8,6 +8,7 @@
package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
@ -16,6 +17,8 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Lazy;
@ -26,6 +29,7 @@ import ch.ethz.seb.sebserver.SEBServerInit;
import ch.ethz.seb.sebserver.SEBServerInitEvent;
import ch.ethz.seb.sebserver.gbl.Constants;
import ch.ethz.seb.sebserver.gbl.api.JSONMapper;
import ch.ethz.seb.sebserver.gbl.model.EntityKey;
import ch.ethz.seb.sebserver.gbl.model.session.ClientInstruction.InstructionType;
import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
import ch.ethz.seb.sebserver.gbl.util.Result;
@ -56,6 +60,7 @@ public class SEBClientInstructionServiceImpl implements SEBClientInstructionServ
private final Map<String, SizedArrayNonBlockingQueue<ClientInstructionRecord>> instructions;
private long lastRefresh = 0;
private long lastClean = 0;
public SEBClientInstructionServiceImpl(
final WebserviceInfo webserviceInfo,
@ -154,22 +159,8 @@ public class SEBClientInstructionServiceImpl implements SEBClientInstructionServ
@Override
public String getInstructionJSON(final String connectionToken) {
refreshCache(connectionToken);
if (this.instructions.isEmpty()) {
return null;
}
if (!this.instructions.containsKey(connectionToken)) {
return null;
}
final SizedArrayNonBlockingQueue<ClientInstructionRecord> queue = this.instructions.get(connectionToken);
if (queue.isEmpty()) {
return null;
}
// Remove the head instruction from the queue
final ClientInstructionRecord clientInstruction = queue.poll();
final ClientInstructionRecord clientInstruction = getNextInstruction(connectionToken);
if (clientInstruction == null) {
return null;
}
@ -177,7 +168,10 @@ public class SEBClientInstructionServiceImpl implements SEBClientInstructionServ
final boolean needsConfirm = BooleanUtils.toBoolean(clientInstruction.getNeedsConfirmation());
if (needsConfirm) {
// add the instruction back to the queue's tail if it need a confirmation
queue.add(clientInstruction);
final SizedArrayNonBlockingQueue<ClientInstructionRecord> queue = this.instructions.get(connectionToken);
if (queue != null) {
queue.add(clientInstruction);
}
} else {
// otherwise remove it also from the persistent storage
@ -222,18 +216,16 @@ public class SEBClientInstructionServiceImpl implements SEBClientInstructionServ
@Override
public void confirmInstructionDone(final String connectionToken, final String instructionConfirm) {
try {
final SizedArrayNonBlockingQueue<ClientInstructionRecord> queue = this.instructions.get(connectionToken);
final Long instructionId = Long.valueOf(instructionConfirm);
this.clientInstructionDAO.delete(instructionId);
if (queue.isEmpty()) {
return;
}
final Long instructionId = Long.valueOf(instructionConfirm);
if (queue.removeIf(instruction -> instructionId.equals(instruction.getId()))) {
this.clientInstructionDAO.delete(instructionId);
} else {
log.warn("SEB instruction confirmation mismatch. No pending instruction found for id: {}",
instructionConfirm);
}
queue.removeIf(instruction -> instructionId.equals(instruction.getId()));
} catch (final Exception e) {
log.error(
"Failed to remove SEB instruction after confirmation: connectionToken: {} instructionConfirm: {} connectionToken: {}",
@ -243,11 +235,47 @@ public class SEBClientInstructionServiceImpl implements SEBClientInstructionServ
}
}
private void refreshCache(final String connectionToken) {
if (!this.webserviceInfo.isDistributed() && !this.instructions.isEmpty()) {
return;
@Override
public void cleanupInstructions() {
try {
final long millisNowMinusOneMinute = DateTime
.now(DateTimeZone.UTC)
.minusMinutes(1)
.getMillis();
if (this.lastClean < millisNowMinusOneMinute) {
final Collection<EntityKey> deleted = this.clientInstructionDAO
.deleteAllInactive(millisNowMinusOneMinute)
.getOrThrow();
if (!deleted.isEmpty()) {
log.info("Deleted out-dated instructions from persistent storage: {}", deleted);
}
cleanupCache();
this.lastClean = System.currentTimeMillis();
}
} catch (final Exception e) {
log.error("Unexpected error while trying to cleanup instructions in persistent storage", e);
}
}
private ClientInstructionRecord getNextInstruction(final String connectionToken) {
// if we still have instruction for given connectionToken, process them first
final long activeTime = DateTime.now(DateTimeZone.UTC).minusMinutes(1).getMillis();
final SizedArrayNonBlockingQueue<ClientInstructionRecord> queue = this.instructions.computeIfAbsent(
connectionToken,
key -> new SizedArrayNonBlockingQueue<>(INSTRUCTION_QUEUE_MAX_SIZE));
final ClientInstructionRecord nextActive = getNextActive(activeTime, queue);
if (nextActive != null) {
return nextActive;
}
// Since the queue is empty check periodically if there are active instructions on the persistent storage
final long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis - this.lastRefresh > Constants.SECOND_IN_MILLIS) {
this.lastRefresh = currentTimeMillis;
@ -255,6 +283,41 @@ public class SEBClientInstructionServiceImpl implements SEBClientInstructionServ
.onError(error -> log.error(
"Failed load instructions from persistent storage and to refresh cache: ",
error));
if (!queue.isEmpty()) {
return getNextInstruction(connectionToken);
}
}
return null;
}
private void cleanupCache() {
// check if there are still queues in the cache, whether they are empty or not,
// for closed or disposed client connections and remove them from cache
synchronized (this.instructions) {
final Result<Collection<String>> result = this.clientConnectionDAO
.getInactiveConnctionTokens(this.instructions.keySet());
if (result.hasValue()) {
result.get().stream().forEach(token -> this.instructions.remove(token));
}
}
}
// Go recursively through the given queue to find the next active instruction
private ClientInstructionRecord getNextActive(
final long activeTime,
final SizedArrayNonBlockingQueue<ClientInstructionRecord> queue) {
if (queue != null && !queue.isEmpty()) {
final ClientInstructionRecord rec = queue.poll();
if (rec.getTimestamp().longValue() < activeTime) {
return getNextActive(activeTime, queue);
} else {
return rec;
}
} else {
return null;
}
}