fix concurrency error

This commit is contained in:
anhefti 2023-06-06 09:01:18 +02:00
parent f29aa52dd0
commit 4924c7f125

View file

@ -79,7 +79,7 @@ public class SEBClientEventBatchService {
} }
private final BlockingDeque<EventData> eventDataQueue = new LinkedBlockingDeque<>(); private final BlockingDeque<EventData> eventDataQueue = new LinkedBlockingDeque<>();
private final Collection<EventData> events = new ArrayList<>(); //private final Collection<EventData> events = new ArrayList<>();
public void accept(final String connectionToken, final String jsonBody) { public void accept(final String connectionToken, final String jsonBody) {
this.eventDataQueue.add(new EventData( this.eventDataQueue.add(new EventData(
@ -92,39 +92,47 @@ public class SEBClientEventBatchService {
this.eventDataQueue.add(eventData); this.eventDataQueue.add(eventData);
} }
private final Collection<EventData> events1 = new ArrayList<>();
@Scheduled( @Scheduled(
fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:500}", fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}",
initialDelay = 100) initialDelay = 100)
public void worker1() { public void worker1() {
processEvents("worker1"); processEvents("worker1", this.events1);
} }
// @Scheduled( private final Collection<EventData> events2 = new ArrayList<>();
// fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}",
// initialDelay = 300) @Scheduled(
// public void worker2() { fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}",
// processEvents("worker2"); initialDelay = 300)
// } public void worker2() {
// processEvents("worker2", this.events2);
// @Scheduled( }
// fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}",
// initialDelay = 600) private final Collection<EventData> events3 = new ArrayList<>();
// public void worker3() {
// processEvents("worker3"); @Scheduled(
// } fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}",
// initialDelay = 600)
// @Scheduled( public void worker3() {
// fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}", processEvents("worker3", this.events3);
// initialDelay = 900) }
// public void worker4() {
// processEvents("worker4"); private final Collection<EventData> events4 = new ArrayList<>();
// }
@Scheduled(
fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}",
initialDelay = 900)
public void worker4() {
processEvents("worker4", this.events4);
}
public void processOneTime() { public void processOneTime() {
processEvents("One Time Call"); processEvents("One Time Call", new ArrayList<>());
} }
private void processEvents(final String workerName) { private void processEvents(final String workerName, final Collection<EventData> events) {
long start = 0L; long start = 0L;
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
@ -144,14 +152,14 @@ public class SEBClientEventBatchService {
try { try {
this.events.clear(); events.clear();
this.eventDataQueue.drainTo(this.events); this.eventDataQueue.drainTo(events);
if (this.events.isEmpty()) { if (events.isEmpty()) {
return; return;
} }
final List<ClientEventRecord> events = this.events final List<ClientEventRecord> records = events
.stream() .stream()
.map(this::convertData) .map(this::convertData)
.map(this::storeNotifications) .map(this::storeNotifications)
@ -162,7 +170,7 @@ public class SEBClientEventBatchService {
this.transactionTemplate this.transactionTemplate
.execute(status -> { .execute(status -> {
events.stream().forEach(this.clientEventMapper::insert); records.stream().forEach(this.clientEventMapper::insert);
return null; return null;
}); });