diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/ClientConnectionDAO.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/ClientConnectionDAO.java index f2109fdf..0f37e322 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/ClientConnectionDAO.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/ClientConnectionDAO.java @@ -59,6 +59,14 @@ public interface ClientConnectionDAO extends * @return Result refer to the collection of connection tokens or to an error when happened */ Result> getActiveConnctionTokens(Long examId); + /** Get all inactive connection tokens from the set of given tokens. + * This is usually used for cleanup purposes to filter a bunch of connection tokens + * by activity. Inactive connections are in state CLOSED or DISABLED + * + * @param connectionTokens The set of connection tokens to filter + * @return Result refer to all inactive connection tokens from the given set */ + Result> getInactiveConnctionTokens(Set connectionTokens); + /** Get a collection of all client connections records that needs a room update * and that are in the status ACTIVE. * This also flags the involved connections for no update needed within the diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/ClientInstructionDAO.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/ClientInstructionDAO.java index c6314363..795a801f 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/ClientInstructionDAO.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/ClientInstructionDAO.java @@ -10,6 +10,7 @@ package ch.ethz.seb.sebserver.webservice.servicelayer.dao; import java.util.Collection; +import ch.ethz.seb.sebserver.gbl.model.EntityKey; import ch.ethz.seb.sebserver.gbl.model.session.ClientInstruction.InstructionType; import ch.ethz.seb.sebserver.gbl.util.Result; import ch.ethz.seb.sebserver.webservice.datalayer.batis.model.ClientInstructionRecord; @@ -44,6 +45,14 @@ public interface ClientInstructionDAO { * @return Collection of all active instructions for specified connection token */ Result> getAllActive(String connectionToken); + /** Deletes all old instructions form the persistent storage to clean-up. + * Old in this case means the timestamp is older then one minute or a configured time interval + * + * @param timestamp the time-stamp (milliseconds) of the time in the past from that earlier instructions are + * considered inactive + * @return Result collection of keys of deleted entities or refer to an error when happened */ + Result> deleteAllInactive(long timestamp); + /** Deletes the specified instruction form the data base * * @param id the identifier (PK) if the ClientInstruction to delete diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/ClientConnectionDAOImpl.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/ClientConnectionDAOImpl.java index c2c60e33..404d9f05 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/ClientConnectionDAOImpl.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/ClientConnectionDAOImpl.java @@ -138,7 +138,7 @@ public class ClientConnectionDAOImpl implements ClientConnectionDAO { return Result.ofRuntimeError("Null or empty set reference"); } return Result.tryCatch(() -> this.clientConnectionRecordMapper.selectByExample() - .where(ClientConnectionRecordDynamicSqlSupport.id, isIn(new ArrayList<>(pks))) + .where(ClientConnectionRecordDynamicSqlSupport.id, SqlBuilder.isIn(new ArrayList<>(pks))) .build() .execute() .stream() @@ -220,11 +220,38 @@ public class ClientConnectionDAOImpl implements ClientConnectionDAO { return updateRecord; } + @Override + @Transactional(readOnly = true) + public Result> getInactiveConnctionTokens(final Set connectionTokens) { + return Result.tryCatch(() -> { + if (connectionTokens == null || connectionTokens.isEmpty()) { + return Collections.emptyList(); + } + return this.clientConnectionRecordMapper + .selectByExample() + .where( + ClientConnectionRecordDynamicSqlSupport.connectionToken, + SqlBuilder.isIn(new ArrayList<>(connectionTokens))) + .and(ClientConnectionRecordDynamicSqlSupport.status, isNotEqualTo(ConnectionStatus.ACTIVE.name())) + .and( + ClientConnectionRecordDynamicSqlSupport.status, + isNotEqualTo(ConnectionStatus.AUTHENTICATED.name())) + .and(ClientConnectionRecordDynamicSqlSupport.status, + isNotEqualTo(ConnectionStatus.CONNECTION_REQUESTED.name())) + .build() + .execute() + .stream() + .map(r -> r.getConnectionToken()) + .collect(Collectors.toList()); + }); + } + @Override @Transactional public Result> getAllConnectionIdsForRoomUpdateInactive() { return Result.tryCatch(() -> { - final Collection records = this.clientConnectionRecordMapper.selectByExample() + final Collection records = this.clientConnectionRecordMapper + .selectByExample() .where(ClientConnectionRecordDynamicSqlSupport.remoteProctoringRoomUpdate, isNotEqualTo(0)) .and(ClientConnectionRecordDynamicSqlSupport.status, isNotEqualTo(ConnectionStatus.ACTIVE.name())) .build() diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/ClientInstructionDAOImpl.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/ClientInstructionDAOImpl.java index f87c57ca..a72871d1 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/ClientInstructionDAOImpl.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/ClientInstructionDAOImpl.java @@ -9,7 +9,10 @@ package ch.ethz.seb.sebserver.webservice.servicelayer.dao.impl; import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -23,7 +26,9 @@ import org.springframework.transaction.annotation.Transactional; import com.fasterxml.jackson.core.type.TypeReference; import ch.ethz.seb.sebserver.gbl.api.API; +import ch.ethz.seb.sebserver.gbl.api.EntityType; import ch.ethz.seb.sebserver.gbl.api.JSONMapper; +import ch.ethz.seb.sebserver.gbl.model.EntityKey; import ch.ethz.seb.sebserver.gbl.model.session.ClientInstruction.InstructionType; import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile; import ch.ethz.seb.sebserver.gbl.util.Result; @@ -65,6 +70,7 @@ public class ClientInstructionDAOImpl implements ClientInstructionDAO { } @Override + @Transactional(readOnly = true) public Result> getAllActive(final String connectionToken) { return Result.tryCatch(() -> { final long millisNowMinusOneMinute = DateTime.now(DateTimeZone.UTC).minusMinutes(1).getMillis(); @@ -80,6 +86,37 @@ public class ClientInstructionDAOImpl implements ClientInstructionDAO { }); } + @Override + @Transactional + public Result> deleteAllInactive(final long timestamp) { + return Result.tryCatch(() -> { + + final List inactive = this.clientInstructionRecordMapper + .selectByExample() + .where(ClientInstructionRecordDynamicSqlSupport.timestamp, + SqlBuilder.isLessThanOrEqualTo(timestamp)) + .build() + .execute(); + + if (inactive != null && !inactive.isEmpty()) { + + this.clientInstructionRecordMapper + .deleteByExample() + .where(ClientInstructionRecordDynamicSqlSupport.timestamp, + SqlBuilder.isLessThanOrEqualTo(timestamp)) + .build() + .execute(); + + return inactive.stream() + .map(r -> new EntityKey(r.getId(), EntityType.CLIENT_INSTRUCTION)) + .collect(Collectors.toList()); + + } + + return Collections.emptyList(); + }); + } + @Override @Transactional public Result delete(final Long id) { diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/SEBClientConnectionService.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/SEBClientConnectionService.java index b737543c..790923fc 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/SEBClientConnectionService.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/SEBClientConnectionService.java @@ -129,6 +129,9 @@ public interface SEBClientConnectionService { * overflowed ping is back to normal, a ping back to normal event. */ void updatePingEvents(); + /** Used to cleanup old instructions from the persistent storage */ + void cleanupInstructions(); + /** Notify a ping for a certain client connection. * * @param connectionToken the connection token diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/SEBClientInstructionService.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/SEBClientInstructionService.java index 4db83f4c..e06ba41a 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/SEBClientInstructionService.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/SEBClientInstructionService.java @@ -105,4 +105,7 @@ public interface SEBClientInstructionService { * @param instructionConfirm the instruction confirm identifier */ void confirmInstructionDone(String connectionToken, String instructionConfirm); + /** Used to cleanup out-dated instructions on the persistent storage */ + void cleanupInstructions(); + } diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ExamSessionControlTask.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ExamSessionControlTask.java index 5f091e1e..49af6ff9 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ExamSessionControlTask.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/ExamSessionControlTask.java @@ -116,6 +116,7 @@ class ExamSessionControlTask implements DisposableBean { } this.sebClientConnectionService.updatePingEvents(); + this.sebClientConnectionService.cleanupInstructions(); this.examProcotringRoomService.updateProctoringCollectingRooms(); } diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java index d9618586..b7c1458b 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientConnectionServiceImpl.java @@ -546,6 +546,11 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic } } + @Override + public void cleanupInstructions() { + this.sebInstructionService.cleanupInstructions(); + } + @Override public String notifyPing( final String connectionToken, diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientInstructionServiceImpl.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientInstructionServiceImpl.java index c1d25300..b3044978 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientInstructionServiceImpl.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientInstructionServiceImpl.java @@ -8,6 +8,7 @@ package ch.ethz.seb.sebserver.webservice.servicelayer.session.impl; +import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.Objects; @@ -16,6 +17,8 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.StringUtils; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Lazy; @@ -26,6 +29,7 @@ import ch.ethz.seb.sebserver.SEBServerInit; import ch.ethz.seb.sebserver.SEBServerInitEvent; import ch.ethz.seb.sebserver.gbl.Constants; import ch.ethz.seb.sebserver.gbl.api.JSONMapper; +import ch.ethz.seb.sebserver.gbl.model.EntityKey; import ch.ethz.seb.sebserver.gbl.model.session.ClientInstruction.InstructionType; import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile; import ch.ethz.seb.sebserver.gbl.util.Result; @@ -56,6 +60,7 @@ public class SEBClientInstructionServiceImpl implements SEBClientInstructionServ private final Map> instructions; private long lastRefresh = 0; + private long lastClean = 0; public SEBClientInstructionServiceImpl( final WebserviceInfo webserviceInfo, @@ -154,22 +159,8 @@ public class SEBClientInstructionServiceImpl implements SEBClientInstructionServ @Override public String getInstructionJSON(final String connectionToken) { - refreshCache(connectionToken); - if (this.instructions.isEmpty()) { - return null; - } - if (!this.instructions.containsKey(connectionToken)) { - return null; - } - - final SizedArrayNonBlockingQueue queue = this.instructions.get(connectionToken); - if (queue.isEmpty()) { - return null; - } - - // Remove the head instruction from the queue - final ClientInstructionRecord clientInstruction = queue.poll(); + final ClientInstructionRecord clientInstruction = getNextInstruction(connectionToken); if (clientInstruction == null) { return null; } @@ -177,7 +168,10 @@ public class SEBClientInstructionServiceImpl implements SEBClientInstructionServ final boolean needsConfirm = BooleanUtils.toBoolean(clientInstruction.getNeedsConfirmation()); if (needsConfirm) { // add the instruction back to the queue's tail if it need a confirmation - queue.add(clientInstruction); + final SizedArrayNonBlockingQueue queue = this.instructions.get(connectionToken); + if (queue != null) { + queue.add(clientInstruction); + } } else { // otherwise remove it also from the persistent storage @@ -222,18 +216,16 @@ public class SEBClientInstructionServiceImpl implements SEBClientInstructionServ @Override public void confirmInstructionDone(final String connectionToken, final String instructionConfirm) { try { + final SizedArrayNonBlockingQueue queue = this.instructions.get(connectionToken); + final Long instructionId = Long.valueOf(instructionConfirm); + this.clientInstructionDAO.delete(instructionId); if (queue.isEmpty()) { return; } - final Long instructionId = Long.valueOf(instructionConfirm); - if (queue.removeIf(instruction -> instructionId.equals(instruction.getId()))) { - this.clientInstructionDAO.delete(instructionId); - } else { - log.warn("SEB instruction confirmation mismatch. No pending instruction found for id: {}", - instructionConfirm); - } + queue.removeIf(instruction -> instructionId.equals(instruction.getId())); + } catch (final Exception e) { log.error( "Failed to remove SEB instruction after confirmation: connectionToken: {} instructionConfirm: {} connectionToken: {}", @@ -243,11 +235,47 @@ public class SEBClientInstructionServiceImpl implements SEBClientInstructionServ } } - private void refreshCache(final String connectionToken) { - if (!this.webserviceInfo.isDistributed() && !this.instructions.isEmpty()) { - return; + @Override + public void cleanupInstructions() { + try { + + final long millisNowMinusOneMinute = DateTime + .now(DateTimeZone.UTC) + .minusMinutes(1) + .getMillis(); + + if (this.lastClean < millisNowMinusOneMinute) { + + final Collection deleted = this.clientInstructionDAO + .deleteAllInactive(millisNowMinusOneMinute) + .getOrThrow(); + + if (!deleted.isEmpty()) { + log.info("Deleted out-dated instructions from persistent storage: {}", deleted); + } + + cleanupCache(); + + this.lastClean = System.currentTimeMillis(); + } + + } catch (final Exception e) { + log.error("Unexpected error while trying to cleanup instructions in persistent storage", e); + } + } + + private ClientInstructionRecord getNextInstruction(final String connectionToken) { + // if we still have instruction for given connectionToken, process them first + final long activeTime = DateTime.now(DateTimeZone.UTC).minusMinutes(1).getMillis(); + final SizedArrayNonBlockingQueue queue = this.instructions.computeIfAbsent( + connectionToken, + key -> new SizedArrayNonBlockingQueue<>(INSTRUCTION_QUEUE_MAX_SIZE)); + final ClientInstructionRecord nextActive = getNextActive(activeTime, queue); + if (nextActive != null) { + return nextActive; } + // Since the queue is empty check periodically if there are active instructions on the persistent storage final long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis - this.lastRefresh > Constants.SECOND_IN_MILLIS) { this.lastRefresh = currentTimeMillis; @@ -255,6 +283,41 @@ public class SEBClientInstructionServiceImpl implements SEBClientInstructionServ .onError(error -> log.error( "Failed load instructions from persistent storage and to refresh cache: ", error)); + + if (!queue.isEmpty()) { + return getNextInstruction(connectionToken); + } + } + + return null; + } + + private void cleanupCache() { + // check if there are still queues in the cache, whether they are empty or not, + // for closed or disposed client connections and remove them from cache + synchronized (this.instructions) { + final Result> result = this.clientConnectionDAO + .getInactiveConnctionTokens(this.instructions.keySet()); + if (result.hasValue()) { + result.get().stream().forEach(token -> this.instructions.remove(token)); + } + } + } + + // Go recursively through the given queue to find the next active instruction + private ClientInstructionRecord getNextActive( + final long activeTime, + final SizedArrayNonBlockingQueue queue) { + + if (queue != null && !queue.isEmpty()) { + final ClientInstructionRecord rec = queue.poll(); + if (rec.getTimestamp().longValue() < activeTime) { + return getNextActive(activeTime, queue); + } else { + return rec; + } + } else { + return null; } }