Merge remote-tracking branch 'origin/newSEBEventStoreStrategy' into development
This commit is contained in:
commit
2ffa17b839
3 changed files with 64 additions and 13 deletions
|
@ -79,7 +79,7 @@ public class SEBClientEventBatchService {
|
||||||
}
|
}
|
||||||
|
|
||||||
private final BlockingDeque<EventData> eventDataQueue = new LinkedBlockingDeque<>();
|
private final BlockingDeque<EventData> eventDataQueue = new LinkedBlockingDeque<>();
|
||||||
private final Collection<EventData> events = new ArrayList<>();
|
//private final Collection<EventData> events = new ArrayList<>();
|
||||||
|
|
||||||
public void accept(final String connectionToken, final String jsonBody) {
|
public void accept(final String connectionToken, final String jsonBody) {
|
||||||
this.eventDataQueue.add(new EventData(
|
this.eventDataQueue.add(new EventData(
|
||||||
|
@ -92,14 +92,58 @@ public class SEBClientEventBatchService {
|
||||||
this.eventDataQueue.add(eventData);
|
this.eventDataQueue.add(eventData);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final Collection<EventData> events1 = new ArrayList<>();
|
||||||
|
|
||||||
@Scheduled(
|
@Scheduled(
|
||||||
fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}",
|
fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}",
|
||||||
initialDelay = 100)
|
initialDelay = 100)
|
||||||
public void processEvents() {
|
public void worker1() {
|
||||||
|
processEvents("worker1", this.events1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final Collection<EventData> events2 = new ArrayList<>();
|
||||||
|
|
||||||
|
@Scheduled(
|
||||||
|
fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}",
|
||||||
|
initialDelay = 300)
|
||||||
|
public void worker2() {
|
||||||
|
processEvents("worker2", this.events2);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final Collection<EventData> events3 = new ArrayList<>();
|
||||||
|
|
||||||
|
@Scheduled(
|
||||||
|
fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}",
|
||||||
|
initialDelay = 600)
|
||||||
|
public void worker3() {
|
||||||
|
processEvents("worker3", this.events3);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final Collection<EventData> events4 = new ArrayList<>();
|
||||||
|
|
||||||
|
@Scheduled(
|
||||||
|
fixedDelayString = "${sebserver.webservice.api.exam.session.event.batch.interval:1000}",
|
||||||
|
initialDelay = 900)
|
||||||
|
public void worker4() {
|
||||||
|
processEvents("worker4", this.events4);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void processOneTime() {
|
||||||
|
processEvents("One Time Call", new ArrayList<>());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void processEvents(final String workerName, final Collection<EventData> events) {
|
||||||
|
|
||||||
|
long start = 0L;
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
start = Utils.getMillisecondsNow();
|
||||||
|
}
|
||||||
|
|
||||||
final int size = this.eventDataQueue.size();
|
final int size = this.eventDataQueue.size();
|
||||||
if (size > 1000) {
|
if (size > 1000) {
|
||||||
log.warn("-----> There are more then 1000 SEB client logs in the waiting queue: {}", size);
|
log.warn("-----> There are more then 1000 SEB client logs in the waiting queue: {}, worker: {}",
|
||||||
|
size,
|
||||||
|
workerName);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (size == 0) {
|
if (size == 0) {
|
||||||
|
@ -108,14 +152,14 @@ public class SEBClientEventBatchService {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
this.events.clear();
|
events.clear();
|
||||||
this.eventDataQueue.drainTo(this.events);
|
this.eventDataQueue.drainTo(events);
|
||||||
|
|
||||||
if (this.events.isEmpty()) {
|
if (events.isEmpty()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
final List<ClientEventRecord> events = this.events
|
final List<ClientEventRecord> records = events
|
||||||
.stream()
|
.stream()
|
||||||
.map(this::convertData)
|
.map(this::convertData)
|
||||||
.map(this::storeNotifications)
|
.map(this::storeNotifications)
|
||||||
|
@ -126,12 +170,19 @@ public class SEBClientEventBatchService {
|
||||||
|
|
||||||
this.transactionTemplate
|
this.transactionTemplate
|
||||||
.execute(status -> {
|
.execute(status -> {
|
||||||
events.stream().forEach(this.clientEventMapper::insert);
|
records.stream().forEach(this.clientEventMapper::insert);
|
||||||
return null;
|
return null;
|
||||||
});
|
});
|
||||||
|
|
||||||
this.sqlSessionTemplate.flushStatements();
|
this.sqlSessionTemplate.flushStatements();
|
||||||
|
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("SEBClientEventBatchService worker {} processes batch of size {} in {} ms",
|
||||||
|
workerName,
|
||||||
|
size,
|
||||||
|
start - Utils.getMillisecondsNow());
|
||||||
|
}
|
||||||
|
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
log.error("Failed to process SEB events from eventDataQueue: ", e);
|
log.error("Failed to process SEB events from eventDataQueue: ", e);
|
||||||
}
|
}
|
||||||
|
@ -200,7 +251,7 @@ public class SEBClientEventBatchService {
|
||||||
return null;
|
return null;
|
||||||
|
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
log.error("Failed to verify and process notification for SEB event: {}", eventData);
|
log.error("Failed to verify and process notification for SEB event: {}", eventData, e);
|
||||||
return eventData;
|
return eventData;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -561,7 +561,7 @@ public class SebConnectionTest extends ExamAPIIntegrationTester {
|
||||||
// check correct response
|
// check correct response
|
||||||
assertTrue(HttpStatus.NO_CONTENT.value() == sendEvent.getStatus());
|
assertTrue(HttpStatus.NO_CONTENT.value() == sendEvent.getStatus());
|
||||||
|
|
||||||
this.sebClientEventBatchStore.processEvents();
|
this.sebClientEventBatchStore.processOneTime();
|
||||||
|
|
||||||
// check event stored on db
|
// check event stored on db
|
||||||
List<ClientEventRecord> events = this.clientEventRecordMapper
|
List<ClientEventRecord> events = this.clientEventRecordMapper
|
||||||
|
@ -587,7 +587,7 @@ public class SebConnectionTest extends ExamAPIIntegrationTester {
|
||||||
10000.0,
|
10000.0,
|
||||||
"testEvent2");
|
"testEvent2");
|
||||||
|
|
||||||
this.sebClientEventBatchStore.processEvents();
|
this.sebClientEventBatchStore.processOneTime();
|
||||||
// check correct response
|
// check correct response
|
||||||
assertTrue(HttpStatus.NO_CONTENT.value() == sendEvent.getStatus());
|
assertTrue(HttpStatus.NO_CONTENT.value() == sendEvent.getStatus());
|
||||||
|
|
||||||
|
@ -622,7 +622,7 @@ public class SebConnectionTest extends ExamAPIIntegrationTester {
|
||||||
"testEvent1");
|
"testEvent1");
|
||||||
// check correct response
|
// check correct response
|
||||||
assertTrue(HttpStatus.NO_CONTENT.value() == sendEvent.getStatus());
|
assertTrue(HttpStatus.NO_CONTENT.value() == sendEvent.getStatus());
|
||||||
this.sebClientEventBatchStore.processEvents();
|
this.sebClientEventBatchStore.processOneTime();
|
||||||
final List<ClientEventRecord> events = this.clientEventRecordMapper
|
final List<ClientEventRecord> events = this.clientEventRecordMapper
|
||||||
.selectByExample()
|
.selectByExample()
|
||||||
.build()
|
.build()
|
||||||
|
|
|
@ -210,7 +210,7 @@ public class ClientEventServiceTest extends AdministrationAPIIntegrationTester {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void waitForExecutor() {
|
private void waitForExecutor() {
|
||||||
this.sebClientEventBatchStore.processEvents();
|
this.sebClientEventBatchStore.processOneTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in a new issue