From 749cbea287a4115faf6d696bdec71ae0be20e3cd Mon Sep 17 00:00:00 2001 From: anhefti Date: Thu, 1 Apr 2021 09:37:46 +0200 Subject: [PATCH] Minor improvement in instruction handling (distributed setup) --- .../dao/ClientInstructionDAO.java | 6 ++++ .../dao/impl/ClientInstructionDAOImpl.java | 16 +++++++++ .../impl/SEBClientInstructionServiceImpl.java | 33 ++++++++++++++----- 3 files changed, 46 insertions(+), 9 deletions(-) diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/ClientInstructionDAO.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/ClientInstructionDAO.java index 6e7c0414..c6314363 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/ClientInstructionDAO.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/ClientInstructionDAO.java @@ -38,6 +38,12 @@ public interface ClientInstructionDAO { * @return Result refer to all instructions that are younger then one minute or to an error when happened */ Result> getAllActive(); + /** Get all active instructions for a specified connection token + * + * @param connectionToken the connection token + * @return Collection of all active instructions for specified connection token */ + Result> getAllActive(String connectionToken); + /** Deletes the specified instruction form the data base * * @param id the identifier (PK) if the ClientInstruction to delete diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/ClientInstructionDAOImpl.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/ClientInstructionDAOImpl.java index d2bdfbb4..f87c57ca 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/ClientInstructionDAOImpl.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/ClientInstructionDAOImpl.java @@ -64,6 +64,22 @@ public class ClientInstructionDAOImpl implements ClientInstructionDAO { }); } + @Override + public Result> getAllActive(final String connectionToken) { + return Result.tryCatch(() -> { + final long millisNowMinusOneMinute = DateTime.now(DateTimeZone.UTC).minusMinutes(1).getMillis(); + return this.clientInstructionRecordMapper + .selectByExample() + .where(ClientInstructionRecordDynamicSqlSupport.timestamp, + SqlBuilder.isGreaterThanOrEqualTo(millisNowMinusOneMinute)) + .and( + ClientInstructionRecordDynamicSqlSupport.connectionToken, + SqlBuilder.isEqualTo(connectionToken)) + .build() + .execute(); + }); + } + @Override @Transactional public Result delete(final Long id) { diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientInstructionServiceImpl.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientInstructionServiceImpl.java index 5ec6fae1..c1d25300 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientInstructionServiceImpl.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/session/impl/SEBClientInstructionServiceImpl.java @@ -113,7 +113,7 @@ public class SEBClientInstructionServiceImpl implements SEBClientInstructionServ final String attributesString = this.jsonMapper.writeValueAsString(attributes); this.clientInstructionDAO .insert(examId, type, attributesString, connectionToken, needsConfirm) - .map(this::putToCacheIfAbsent) + .map(this::putToCache) .onError(error -> log.error("Failed to register instruction: {}", error.getMessage())) .getOrThrow(); } catch (final Exception e) { @@ -141,19 +141,20 @@ public class SEBClientInstructionServiceImpl implements SEBClientInstructionServ connectionTokens .stream() .filter(activeConnections::contains) - .map(token -> this.clientInstructionDAO.insert(examId, type, attributesString, token, needsConfirm)) + .map(token -> this.clientInstructionDAO + .insert(examId, type, attributesString, token, needsConfirm)) .map(result -> result.get( error -> log.error("Failed to register instruction: {}", error.getMessage()), () -> null)) .filter(Objects::nonNull) - .forEach(this::putToCacheIfAbsent); + .forEach(this::putToCache); }); } @Override public String getInstructionJSON(final String connectionToken) { - refreshCache(); + refreshCache(connectionToken); if (this.instructions.isEmpty()) { return null; } @@ -242,35 +243,49 @@ public class SEBClientInstructionServiceImpl implements SEBClientInstructionServ } } - private void refreshCache() { - if (!this.webserviceInfo.isDistributed()) { + private void refreshCache(final String connectionToken) { + if (!this.webserviceInfo.isDistributed() && !this.instructions.isEmpty()) { return; } final long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis - this.lastRefresh > Constants.SECOND_IN_MILLIS) { this.lastRefresh = currentTimeMillis; - - loadInstructions() + loadInstructions(connectionToken) .onError(error -> log.error( "Failed load instructions from persistent storage and to refresh cache: ", error)); } } + private Result loadInstructions(final String connectionToken) { + return Result.tryCatch(() -> this.clientInstructionDAO.getAllActive(connectionToken) + .getOrThrow() + .forEach(this::putToCacheIfAbsent)); + } + private Result loadInstructions() { return Result.tryCatch(() -> this.clientInstructionDAO.getAllActive() .getOrThrow() .forEach(this::putToCacheIfAbsent)); } + // NOTE: In a distributed setup we only fill the cache from persistent storage + // whereas in a none distributed setup we can put the instruction directly in the cache + // and store the instruction into persistent only for recovering reasons. + private ClientInstructionRecord putToCache(final ClientInstructionRecord instruction) { + if (!this.webserviceInfo.isDistributed()) { + return putToCacheIfAbsent(instruction); + } + return instruction; + } + private ClientInstructionRecord putToCacheIfAbsent(final ClientInstructionRecord instruction) { final SizedArrayNonBlockingQueue queue = this.instructions.computeIfAbsent( instruction.getConnectionToken(), key -> new SizedArrayNonBlockingQueue<>(INSTRUCTION_QUEUE_MAX_SIZE)); if (queue.contains(instruction)) { - log.warn("Instruction alread in the queue: {}", instruction); return instruction; }