Minor improvement in instruction handling (distributed setup)

This commit is contained in:
anhefti 2021-04-01 09:37:46 +02:00
parent 804911e3b8
commit 749cbea287
3 changed files with 46 additions and 9 deletions

View file

@ -38,6 +38,12 @@ public interface ClientInstructionDAO {
* @return Result refer to all instructions that are younger then one minute or to an error when happened */
Result<Collection<ClientInstructionRecord>> getAllActive();
/** Get all active instructions for a specified connection token
*
* @param connectionToken the connection token
* @return Collection of all active instructions for specified connection token */
Result<Collection<ClientInstructionRecord>> getAllActive(String connectionToken);
/** Deletes the specified instruction form the data base
*
* @param id the identifier (PK) if the ClientInstruction to delete

View file

@ -64,6 +64,22 @@ public class ClientInstructionDAOImpl implements ClientInstructionDAO {
});
}
@Override
public Result<Collection<ClientInstructionRecord>> getAllActive(final String connectionToken) {
return Result.tryCatch(() -> {
final long millisNowMinusOneMinute = DateTime.now(DateTimeZone.UTC).minusMinutes(1).getMillis();
return this.clientInstructionRecordMapper
.selectByExample()
.where(ClientInstructionRecordDynamicSqlSupport.timestamp,
SqlBuilder.isGreaterThanOrEqualTo(millisNowMinusOneMinute))
.and(
ClientInstructionRecordDynamicSqlSupport.connectionToken,
SqlBuilder.isEqualTo(connectionToken))
.build()
.execute();
});
}
@Override
@Transactional
public Result<Void> delete(final Long id) {

View file

@ -113,7 +113,7 @@ public class SEBClientInstructionServiceImpl implements SEBClientInstructionServ
final String attributesString = this.jsonMapper.writeValueAsString(attributes);
this.clientInstructionDAO
.insert(examId, type, attributesString, connectionToken, needsConfirm)
.map(this::putToCacheIfAbsent)
.map(this::putToCache)
.onError(error -> log.error("Failed to register instruction: {}", error.getMessage()))
.getOrThrow();
} catch (final Exception e) {
@ -141,19 +141,20 @@ public class SEBClientInstructionServiceImpl implements SEBClientInstructionServ
connectionTokens
.stream()
.filter(activeConnections::contains)
.map(token -> this.clientInstructionDAO.insert(examId, type, attributesString, token, needsConfirm))
.map(token -> this.clientInstructionDAO
.insert(examId, type, attributesString, token, needsConfirm))
.map(result -> result.get(
error -> log.error("Failed to register instruction: {}", error.getMessage()),
() -> null))
.filter(Objects::nonNull)
.forEach(this::putToCacheIfAbsent);
.forEach(this::putToCache);
});
}
@Override
public String getInstructionJSON(final String connectionToken) {
refreshCache();
refreshCache(connectionToken);
if (this.instructions.isEmpty()) {
return null;
}
@ -242,35 +243,49 @@ public class SEBClientInstructionServiceImpl implements SEBClientInstructionServ
}
}
private void refreshCache() {
if (!this.webserviceInfo.isDistributed()) {
private void refreshCache(final String connectionToken) {
if (!this.webserviceInfo.isDistributed() && !this.instructions.isEmpty()) {
return;
}
final long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis - this.lastRefresh > Constants.SECOND_IN_MILLIS) {
this.lastRefresh = currentTimeMillis;
loadInstructions()
loadInstructions(connectionToken)
.onError(error -> log.error(
"Failed load instructions from persistent storage and to refresh cache: ",
error));
}
}
private Result<Void> loadInstructions(final String connectionToken) {
return Result.tryCatch(() -> this.clientInstructionDAO.getAllActive(connectionToken)
.getOrThrow()
.forEach(this::putToCacheIfAbsent));
}
private Result<Void> loadInstructions() {
return Result.tryCatch(() -> this.clientInstructionDAO.getAllActive()
.getOrThrow()
.forEach(this::putToCacheIfAbsent));
}
// NOTE: In a distributed setup we only fill the cache from persistent storage
// whereas in a none distributed setup we can put the instruction directly in the cache
// and store the instruction into persistent only for recovering reasons.
private ClientInstructionRecord putToCache(final ClientInstructionRecord instruction) {
if (!this.webserviceInfo.isDistributed()) {
return putToCacheIfAbsent(instruction);
}
return instruction;
}
private ClientInstructionRecord putToCacheIfAbsent(final ClientInstructionRecord instruction) {
final SizedArrayNonBlockingQueue<ClientInstructionRecord> queue = this.instructions.computeIfAbsent(
instruction.getConnectionToken(),
key -> new SizedArrayNonBlockingQueue<>(INSTRUCTION_QUEUE_MAX_SIZE));
if (queue.contains(instruction)) {
log.warn("Instruction alread in the queue: {}", instruction);
return instruction;
}