fixed instruction confirmation blocking bug
This commit is contained in:
parent
23be314a44
commit
366fc3b89b
1 changed files with 13 additions and 39 deletions
|
@ -167,14 +167,19 @@ public class SEBClientInstructionServiceImpl implements SEBClientInstructionServ
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
final ClientInstructionRecord clientInstruction = queue.peek();
|
// Remove the head instruction from the queue
|
||||||
|
final ClientInstructionRecord clientInstruction = queue.poll();
|
||||||
if (clientInstruction == null) {
|
if (clientInstruction == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
final boolean needsConfirm = BooleanUtils.toBoolean(clientInstruction.getNeedsConfirmation());
|
final boolean needsConfirm = BooleanUtils.toBoolean(clientInstruction.getNeedsConfirmation());
|
||||||
if (!needsConfirm) {
|
if (needsConfirm) {
|
||||||
queue.poll();
|
// add the instruction back to the queue's tail if it need a confirmation
|
||||||
|
queue.add(clientInstruction);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// otherwise remove it also from the persistent storage
|
||||||
final Result<Void> delete = this.clientInstructionDAO.delete(clientInstruction.getId());
|
final Result<Void> delete = this.clientInstructionDAO.delete(clientInstruction.getId());
|
||||||
if (delete.hasError()) {
|
if (delete.hasError()) {
|
||||||
log.error("Failed to delete SEB client instruction on persistent storage: ", delete.getError());
|
log.error("Failed to delete SEB client instruction on persistent storage: ", delete.getError());
|
||||||
|
@ -221,14 +226,12 @@ public class SEBClientInstructionServiceImpl implements SEBClientInstructionServ
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
final ClientInstructionRecord instruction = queue.peek();
|
final Long instructionId = Long.valueOf(instructionConfirm);
|
||||||
if (String.valueOf(instruction.getId()).equals(String.valueOf(instruction.getId()))) {
|
if (queue.removeIf(instruction -> instructionId.equals(instruction.getId()))) {
|
||||||
queue.poll();
|
this.clientInstructionDAO.delete(instructionId);
|
||||||
this.clientInstructionDAO.delete(Long.valueOf(instructionConfirm));
|
|
||||||
} else {
|
} else {
|
||||||
log.warn("SEB instruction confirmation mismatch. Sent instructionConfirm: {} pending instruction: {}",
|
log.warn("SEB instruction confirmation mismatch. No pending instruction found for id: {}",
|
||||||
instructionConfirm,
|
instructionConfirm);
|
||||||
instruction.getId());
|
|
||||||
}
|
}
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
log.error(
|
log.error(
|
||||||
|
@ -261,33 +264,6 @@ public class SEBClientInstructionServiceImpl implements SEBClientInstructionServ
|
||||||
.forEach(this::putToCacheIfAbsent));
|
.forEach(this::putToCacheIfAbsent));
|
||||||
}
|
}
|
||||||
|
|
||||||
// private ClientInstructionRecord chacheInstruction(final ClientInstructionRecord instruction) {
|
|
||||||
//
|
|
||||||
//
|
|
||||||
//
|
|
||||||
// final String connectionToken = instruction.getConnectionToken();
|
|
||||||
// if (this.instructions.containsKey(connectionToken)) {
|
|
||||||
// // check if previous instruction is still valid
|
|
||||||
// final ClientInstructionRecord clientInstructionRecord = this.instructions.get(connectionToken);
|
|
||||||
//
|
|
||||||
// System.out.println("************* previous instruction still active: " + clientInstructionRecord);
|
|
||||||
//
|
|
||||||
// if (BooleanUtils.toBoolean(BooleanUtils.toBooleanObject(clientInstructionRecord.getNeedsConfirmation()))) {
|
|
||||||
// // check if time is out
|
|
||||||
// final long now = DateTime.now(DateTimeZone.UTC).getMillis();
|
|
||||||
// final Long timestamp = clientInstructionRecord.getTimestamp();
|
|
||||||
// if (timestamp != null && now - timestamp > Constants.MINUTE_IN_MILLIS) {
|
|
||||||
// // remove old instruction and add new one
|
|
||||||
// System.out.println("************* remove old instruction and put new: ");
|
|
||||||
// this.instructions.put(connectionToken, instruction);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// } else {
|
|
||||||
// this.instructions.put(connectionToken, instruction);
|
|
||||||
// }
|
|
||||||
// return instruction;
|
|
||||||
// }
|
|
||||||
|
|
||||||
private ClientInstructionRecord putToCacheIfAbsent(final ClientInstructionRecord instruction) {
|
private ClientInstructionRecord putToCacheIfAbsent(final ClientInstructionRecord instruction) {
|
||||||
final SizedArrayNonBlockingQueue<ClientInstructionRecord> queue = this.instructions.computeIfAbsent(
|
final SizedArrayNonBlockingQueue<ClientInstructionRecord> queue = this.instructions.computeIfAbsent(
|
||||||
instruction.getConnectionToken(),
|
instruction.getConnectionToken(),
|
||||||
|
@ -302,8 +278,6 @@ public class SEBClientInstructionServiceImpl implements SEBClientInstructionServ
|
||||||
log.debug("Put SEB instruction into instruction queue: {}", instruction);
|
log.debug("Put SEB instruction into instruction queue: {}", instruction);
|
||||||
}
|
}
|
||||||
|
|
||||||
System.out.println("************* register instruction: " + instruction);
|
|
||||||
|
|
||||||
queue.add(instruction);
|
queue.add(instruction);
|
||||||
return instruction;
|
return instruction;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue