SEBSERV-250 detach client event post processing (update and
notifications)
This commit is contained in:
parent
06b433e6cc
commit
b99502dd41
2 changed files with 62 additions and 26 deletions
|
@ -12,6 +12,7 @@ import java.math.BigDecimal;
|
||||||
import java.security.Principal;
|
import java.security.Principal;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
|
|
||||||
|
@ -19,11 +20,13 @@ import org.apache.commons.lang3.BooleanUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
import org.springframework.cache.Cache;
|
import org.springframework.cache.Cache;
|
||||||
import org.springframework.cache.CacheManager;
|
import org.springframework.cache.CacheManager;
|
||||||
import org.springframework.context.annotation.Lazy;
|
import org.springframework.context.annotation.Lazy;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import ch.ethz.seb.sebserver.gbl.async.AsyncServiceSpringConfig;
|
||||||
import ch.ethz.seb.sebserver.gbl.model.exam.Exam;
|
import ch.ethz.seb.sebserver.gbl.model.exam.Exam;
|
||||||
import ch.ethz.seb.sebserver.gbl.model.sebconfig.SEBClientConfig;
|
import ch.ethz.seb.sebserver.gbl.model.sebconfig.SEBClientConfig;
|
||||||
import ch.ethz.seb.sebserver.gbl.model.sebconfig.SEBClientConfig.VDIType;
|
import ch.ethz.seb.sebserver.gbl.model.sebconfig.SEBClientConfig.VDIType;
|
||||||
|
@ -72,6 +75,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
|
||||||
private final SEBClientNotificationService sebClientNotificationService;
|
private final SEBClientNotificationService sebClientNotificationService;
|
||||||
private final ExamAdminService examAdminService;
|
private final ExamAdminService examAdminService;
|
||||||
private final DistributedPingService distributedPingCache;
|
private final DistributedPingService distributedPingCache;
|
||||||
|
private final Executor indicatorUpdateExecutor;
|
||||||
private final boolean isDistributedSetup;
|
private final boolean isDistributedSetup;
|
||||||
|
|
||||||
protected SEBClientConnectionServiceImpl(
|
protected SEBClientConnectionServiceImpl(
|
||||||
|
@ -81,7 +85,8 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
|
||||||
final SEBClientInstructionService sebInstructionService,
|
final SEBClientInstructionService sebInstructionService,
|
||||||
final SEBClientNotificationService sebClientNotificationService,
|
final SEBClientNotificationService sebClientNotificationService,
|
||||||
final ExamAdminService examAdminService,
|
final ExamAdminService examAdminService,
|
||||||
final DistributedPingService distributedPingCache) {
|
final DistributedPingService distributedPingCache,
|
||||||
|
@Qualifier(AsyncServiceSpringConfig.EXAM_API_EXECUTOR_BEAN_NAME) final Executor indicatorUpdateExecutor) {
|
||||||
|
|
||||||
this.examSessionService = examSessionService;
|
this.examSessionService = examSessionService;
|
||||||
this.examSessionCacheService = examSessionService.getExamSessionCacheService();
|
this.examSessionCacheService = examSessionService.getExamSessionCacheService();
|
||||||
|
@ -93,6 +98,7 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
|
||||||
this.sebClientNotificationService = sebClientNotificationService;
|
this.sebClientNotificationService = sebClientNotificationService;
|
||||||
this.examAdminService = examAdminService;
|
this.examAdminService = examAdminService;
|
||||||
this.distributedPingCache = distributedPingCache;
|
this.distributedPingCache = distributedPingCache;
|
||||||
|
this.indicatorUpdateExecutor = indicatorUpdateExecutor;
|
||||||
this.isDistributedSetup = sebInstructionService.getWebserviceInfo().isDistributed();
|
this.isDistributedSetup = sebInstructionService.getWebserviceInfo().isDistributed();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -630,6 +636,22 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
|
||||||
event,
|
event,
|
||||||
activeClientConnection.getConnectionId()));
|
activeClientConnection.getConnectionId()));
|
||||||
|
|
||||||
|
this.indicatorUpdateExecutor
|
||||||
|
.execute(() -> updateIndicator(connectionToken, event, activeClientConnection));
|
||||||
|
|
||||||
|
} else {
|
||||||
|
log.warn("No active ClientConnection found for connectionToken: {}", connectionToken);
|
||||||
|
}
|
||||||
|
} catch (final Exception e) {
|
||||||
|
log.error("Failed to process SEB client event: ", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateIndicator(
|
||||||
|
final String connectionToken,
|
||||||
|
final ClientEvent event,
|
||||||
|
final ClientConnectionDataInternal activeClientConnection) {
|
||||||
|
|
||||||
switch (event.eventType) {
|
switch (event.eventType) {
|
||||||
case NOTIFICATION: {
|
case NOTIFICATION: {
|
||||||
this.sebClientNotificationService
|
this.sebClientNotificationService
|
||||||
|
@ -642,17 +664,11 @@ public class SEBClientConnectionServiceImpl implements SEBClientConnectionServic
|
||||||
}
|
}
|
||||||
default: {
|
default: {
|
||||||
// update indicators
|
// update indicators
|
||||||
activeClientConnection.getIndicatorMapping(event.eventType)
|
activeClientConnection
|
||||||
|
.getIndicatorMapping(event.eventType)
|
||||||
.forEach(indicator -> indicator.notifyValueChange(event));
|
.forEach(indicator -> indicator.notifyValueChange(event));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
|
||||||
log.warn("No active ClientConnection found for connectionToken: {}", connectionToken);
|
|
||||||
}
|
|
||||||
} catch (final Exception e) {
|
|
||||||
log.error("Failed to process SEB client event: ", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -12,11 +12,15 @@ import static org.junit.Assert.*;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
import org.springframework.test.context.jdbc.Sql;
|
import org.springframework.test.context.jdbc.Sql;
|
||||||
|
|
||||||
|
import ch.ethz.seb.sebserver.gbl.async.AsyncServiceSpringConfig;
|
||||||
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType;
|
import ch.ethz.seb.sebserver.gbl.model.exam.Indicator.IndicatorType;
|
||||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientConnection;
|
import ch.ethz.seb.sebserver.gbl.model.session.ClientConnection;
|
||||||
import ch.ethz.seb.sebserver.gbl.model.session.ClientConnection.ConnectionStatus;
|
import ch.ethz.seb.sebserver.gbl.model.session.ClientConnection.ConnectionStatus;
|
||||||
|
@ -41,6 +45,9 @@ public class ClientEventServiceTest extends AdministrationAPIIntegrationTester {
|
||||||
private ClientEventDAO clientEventDAO;
|
private ClientEventDAO clientEventDAO;
|
||||||
@Autowired
|
@Autowired
|
||||||
private SEBClientConnectionService sebClientConnectionService;
|
private SEBClientConnectionService sebClientConnectionService;
|
||||||
|
@Autowired
|
||||||
|
@Qualifier(AsyncServiceSpringConfig.EXAM_API_EXECUTOR_BEAN_NAME)
|
||||||
|
private Executor executor;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCreateLogEvents() {
|
public void testCreateLogEvents() {
|
||||||
|
@ -96,13 +103,13 @@ public class ClientEventServiceTest extends AdministrationAPIIntegrationTester {
|
||||||
this.sebClientConnectionService.notifyClientEvent(
|
this.sebClientConnectionService.notifyClientEvent(
|
||||||
"token1",
|
"token1",
|
||||||
new ClientEvent(null, connection.id, EventType.ERROR_LOG, 1L, 1L, 1.0, "some error"));
|
new ClientEvent(null, connection.id, EventType.ERROR_LOG, 1L, 1L, 1.0, "some error"));
|
||||||
|
waitForExecutor();
|
||||||
assertEquals("1", IndicatorValue.getDisplayValue(clientIndicator));
|
assertEquals("1", IndicatorValue.getDisplayValue(clientIndicator));
|
||||||
|
|
||||||
this.sebClientConnectionService.notifyClientEvent(
|
this.sebClientConnectionService.notifyClientEvent(
|
||||||
"token1",
|
"token1",
|
||||||
new ClientEvent(null, connection.id, EventType.ERROR_LOG, 1L, 1L, 1.0, "some error"));
|
new ClientEvent(null, connection.id, EventType.ERROR_LOG, 1L, 1L, 1.0, "some error"));
|
||||||
|
waitForExecutor();
|
||||||
assertEquals("2", IndicatorValue.getDisplayValue(clientIndicator));
|
assertEquals("2", IndicatorValue.getDisplayValue(clientIndicator));
|
||||||
|
|
||||||
// test reset indicator value and load it from persistent storage
|
// test reset indicator value and load it from persistent storage
|
||||||
|
@ -135,41 +142,54 @@ public class ClientEventServiceTest extends AdministrationAPIIntegrationTester {
|
||||||
.findFirst();
|
.findFirst();
|
||||||
assertTrue(findFirst.isPresent());
|
assertTrue(findFirst.isPresent());
|
||||||
final IndicatorValue clientIndicator = findFirst.get();
|
final IndicatorValue clientIndicator = findFirst.get();
|
||||||
|
|
||||||
assertEquals("0", IndicatorValue.getDisplayValue(clientIndicator));
|
assertEquals("0", IndicatorValue.getDisplayValue(clientIndicator));
|
||||||
|
|
||||||
this.sebClientConnectionService.notifyClientEvent(
|
this.sebClientConnectionService.notifyClientEvent(
|
||||||
"token2",
|
"token2",
|
||||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "some error"));
|
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "some error"));
|
||||||
|
waitForExecutor();
|
||||||
assertEquals("0", IndicatorValue.getDisplayValue(clientIndicator));
|
assertEquals("0", IndicatorValue.getDisplayValue(clientIndicator));
|
||||||
this.sebClientConnectionService.notifyClientEvent(
|
this.sebClientConnectionService.notifyClientEvent(
|
||||||
"token2",
|
"token2",
|
||||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "<top> some error"));
|
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "<top> some error"));
|
||||||
|
waitForExecutor();
|
||||||
assertEquals("1", IndicatorValue.getDisplayValue(clientIndicator));
|
assertEquals("1", IndicatorValue.getDisplayValue(clientIndicator));
|
||||||
this.sebClientConnectionService.notifyClientEvent(
|
this.sebClientConnectionService.notifyClientEvent(
|
||||||
"token2",
|
"token2",
|
||||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "some error"));
|
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "some error"));
|
||||||
|
waitForExecutor();
|
||||||
assertEquals("1", IndicatorValue.getDisplayValue(clientIndicator));
|
assertEquals("1", IndicatorValue.getDisplayValue(clientIndicator));
|
||||||
this.sebClientConnectionService.notifyClientEvent(
|
this.sebClientConnectionService.notifyClientEvent(
|
||||||
"token2",
|
"token2",
|
||||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "<vip> some error"));
|
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "<vip> some error"));
|
||||||
|
waitForExecutor();
|
||||||
assertEquals("2", IndicatorValue.getDisplayValue(clientIndicator));
|
assertEquals("2", IndicatorValue.getDisplayValue(clientIndicator));
|
||||||
this.sebClientConnectionService.notifyClientEvent(
|
this.sebClientConnectionService.notifyClientEvent(
|
||||||
"token2",
|
"token2",
|
||||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "some error"));
|
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "some error"));
|
||||||
|
waitForExecutor();
|
||||||
assertEquals("2", IndicatorValue.getDisplayValue(clientIndicator));
|
assertEquals("2", IndicatorValue.getDisplayValue(clientIndicator));
|
||||||
|
|
||||||
this.sebClientConnectionService.notifyClientEvent(
|
this.sebClientConnectionService.notifyClientEvent(
|
||||||
"token2",
|
"token2",
|
||||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "<vip> some error"));
|
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "<vip> some error"));
|
||||||
|
waitForExecutor();
|
||||||
// test reset indicator value and load it from persistent storage
|
// test reset indicator value and load it from persistent storage
|
||||||
((AbstractLogLevelCountIndicator) clientIndicator).reset();
|
((AbstractLogLevelCountIndicator) clientIndicator).reset();
|
||||||
assertEquals("3", IndicatorValue.getDisplayValue(clientIndicator));
|
assertEquals("3", IndicatorValue.getDisplayValue(clientIndicator));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void waitForExecutor() {
|
||||||
|
try {
|
||||||
|
while (((ThreadPoolTaskExecutor) this.executor).getActiveCount() > 0) {
|
||||||
|
Thread.sleep(20);
|
||||||
|
}
|
||||||
|
} catch (final Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBatteryStatusIndicator() {
|
public void testBatteryStatusIndicator() {
|
||||||
|
|
||||||
|
@ -200,23 +220,23 @@ public class ClientEventServiceTest extends AdministrationAPIIntegrationTester {
|
||||||
this.sebClientConnectionService.notifyClientEvent(
|
this.sebClientConnectionService.notifyClientEvent(
|
||||||
"token3",
|
"token3",
|
||||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "some info other"));
|
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "some info other"));
|
||||||
|
waitForExecutor();
|
||||||
this.sebClientConnectionService.notifyClientEvent(
|
this.sebClientConnectionService.notifyClientEvent(
|
||||||
"token3",
|
"token3",
|
||||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "<vip> some info other"));
|
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 1.0, "<vip> some info other"));
|
||||||
|
waitForExecutor();
|
||||||
assertEquals("--", IndicatorValue.getDisplayValue(clientIndicator));
|
assertEquals("--", IndicatorValue.getDisplayValue(clientIndicator));
|
||||||
|
|
||||||
this.sebClientConnectionService.notifyClientEvent(
|
this.sebClientConnectionService.notifyClientEvent(
|
||||||
"token3",
|
"token3",
|
||||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 90.0, "<battery> some info other"));
|
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 90.0, "<battery> some info other"));
|
||||||
|
waitForExecutor();
|
||||||
assertEquals("90", IndicatorValue.getDisplayValue(clientIndicator));
|
assertEquals("90", IndicatorValue.getDisplayValue(clientIndicator));
|
||||||
|
|
||||||
this.sebClientConnectionService.notifyClientEvent(
|
this.sebClientConnectionService.notifyClientEvent(
|
||||||
"token3",
|
"token3",
|
||||||
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 40.0, "<battery> some info other"));
|
new ClientEvent(null, connection.id, EventType.INFO_LOG, 1L, 1L, 40.0, "<battery> some info other"));
|
||||||
|
waitForExecutor();
|
||||||
assertEquals("40", IndicatorValue.getDisplayValue(clientIndicator));
|
assertEquals("40", IndicatorValue.getDisplayValue(clientIndicator));
|
||||||
|
|
||||||
// test reset indicator value and load it from persistent storage
|
// test reset indicator value and load it from persistent storage
|
||||||
|
|
Loading…
Reference in a new issue