fixed asynchronous download by using response output-stream directly

This commit is contained in:
anhefti 2019-08-01 09:04:29 +02:00
parent e5f8a995e6
commit ccf241ef47
15 changed files with 230 additions and 101 deletions

View file

@ -266,6 +266,10 @@
<artifactId>spring-security-jwt</artifactId>
<version>1.0.9.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- Apache HTTP -->
<dependency>

View file

@ -323,6 +323,10 @@ public final class Utils {
}
public static String toString(final byte[] byteArray) {
if (byteArray == null) {
return null;
}
return toString(ByteBuffer.wrap(byteArray));
}

View file

@ -8,10 +8,18 @@
package ch.ethz.seb.sebserver.gui.service.remote.webservice.api;
import java.io.IOException;
import org.apache.http.HttpHeaders;
import org.apache.tomcat.util.http.fileupload.ByteArrayOutputStream;
import org.apache.tomcat.util.http.fileupload.IOUtils;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.security.oauth2.client.OAuth2RestTemplate;
import org.springframework.security.oauth2.common.OAuth2AccessToken;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import ch.ethz.seb.sebserver.gbl.util.Result;
@ -26,24 +34,82 @@ public abstract class AbstractExportCall extends RestCall<byte[]> {
super(typeKey, httpMethod, contentType, path);
}
// We need a WebClient here to separate the request from the usual RestTemplate
// and allow also to get async responses
// The OAut2 bearer is get from the current OAuth2RestTemplate
// TODO create better API for this on RestCallBuilder site
@Override
protected Result<byte[]> exchange(final RestCallBuilder builder) {
try {
final ResponseEntity<byte[]> responseEntity = builder
.getRestTemplate()
.exchange(
final OAuth2RestTemplate restTemplate = (OAuth2RestTemplate) builder.getRestTemplate();
final OAuth2AccessToken accessToken = restTemplate.getAccessToken();
final String value = accessToken.getValue();
final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
WebClient.create()
.method(this.httpMethod)
.uri(
builder.buildURI(),
this.httpMethod,
builder.buildRequestEntity(),
byte[].class,
builder.getURIVariables());
builder.getURIVariables())
.header(HttpHeaders.AUTHORIZATION, "Bearer " + value)
.headers(h -> h.addAll(builder.buildRequestEntity().getHeaders()))
.body(BodyInserters.fromObject("grant_type=client_credentials&scope=read,write"))
.accept(MediaType.APPLICATION_OCTET_STREAM)
.retrieve()
.bodyToFlux(DataBuffer.class)
.map(source -> {
if (responseEntity.getStatusCode() == HttpStatus.OK) {
return Result.of(responseEntity.getBody());
}
try {
IOUtils.copyLarge(source.asInputStream(), byteArrayOutputStream);
} catch (final IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return source;
})
.blockLast();
return Result.ofRuntimeError(
"Error while trying to export from webservice. Response: " + responseEntity);
final byte[] byteArray = byteArrayOutputStream.toByteArray();
return Result.of(byteArray);
// final byte[] value = builder
// .getRestTemplate()
// .execute(
// builder.buildURI(),
// this.httpMethod,
// request -> {
// },
// response -> {
// final InputStream input = IOUtils.toBufferedInputStream(response.getBody());
// final ByteArrayOutputStream output = new ByteArrayOutputStream();
// IOUtils.copyLarge(input, output);
// return output.toByteArray();
// },
// builder.getURIVariables());
//
// System.out.println("************************ getResponse " + Utils.toString(value));
//
// return Result.of(value);
//
// final ResponseEntity<byte[]> responseEntity = builder
// .getRestTemplate()
// .exchange(
// builder.buildURI(),
// this.httpMethod,
// builder.buildRequestEntity(),
// byte[].class,
// builder.getURIVariables());
// if (responseEntity.getStatusCode() == HttpStatus.OK) {
// final byte[] body = responseEntity.getBody();
// System.out.println("************************ getResponse " + Utils.toString(body));
// return Result.of(body);
// }
//
// return Result.ofRuntimeError(
// "Error while trying to export from webservice. Response: " + responseEntity);
} catch (final Throwable t) {
return Result.ofError(t);
}

View file

@ -54,17 +54,15 @@ public class NoneEncryptor implements SebConfigCryptor {
IOUtils.copyLarge(input, output);
input.close();
output.flush();
output.close();
} catch (final IOException e) {
log.error("Error while streaming plain data to output: ", e);
} finally {
try {
input.close();
output.flush();
output.close();
} catch (final IOException e) {
log.error("Failed to close InputStream");
log.error("Failed to close InputStream and OutputStream");
}
if (log.isDebugEnabled()) {

View file

@ -64,17 +64,17 @@ public class PasswordEncryptor implements SebConfigCryptor {
IOUtils.copyLarge(input, encryptOutput);
input.close();
encryptOutput.flush();
} catch (final CryptorException e) {
log.error("Error while trying to stream and encrypt data: ", e);
} catch (final IOException e) {
log.error("Error while trying to read/write form/to streams: ", e);
} finally {
try {
if (encryptOutput != null)
input.close();
if (encryptOutput != null) {
encryptOutput.flush();
encryptOutput.close();
}
} catch (final IOException e) {
log.error("Failed to close AES256JNCryptorOutputStream: ", e);
}

View file

@ -190,14 +190,16 @@ public class SebClientConfigServiceImpl implements SebClientConfigService {
} catch (final Exception e) {
log.error("Error while zip and encrypt seb client config stream: ", e);
try {
if (pIn != null)
if (pIn != null) {
pIn.close();
}
} catch (final IOException e1) {
log.error("Failed to close PipedInputStream: ", e1);
}
try {
if (pOut != null)
if (pOut != null) {
pOut.close();
}
} catch (final IOException e1) {
log.error("Failed to close PipedOutputStream: ", e1);
}

View file

@ -72,7 +72,7 @@ public final class SebConfigEncryptionServiceImpl implements SebConfigEncryption
log.debug("Password encryption with strategy: {}", strategy);
}
pout.write(strategy.header);
output.write(strategy.header);
getEncryptor(strategy)
.getOrThrow()
@ -80,16 +80,19 @@ public final class SebConfigEncryptionServiceImpl implements SebConfigEncryption
IOUtils.copyLarge(pin, output);
pin.close();
pout.flush();
pout.close();
} catch (final IOException e) {
log.error("Error while stream encrypted data: ", e);
} finally {
try {
if (pin != null)
if (pin != null) {
pin.close();
}
if (pout != null) {
pout.flush();
pout.close();
}
output.flush();
output.close();
} catch (final IOException e1) {
log.error("Failed to close PipedInputStream: ", e1);
}
@ -126,22 +129,23 @@ public final class SebConfigEncryptionServiceImpl implements SebConfigEncryption
IOUtils.copyLarge(pin, output);
pin.close();
pout.flush();
pout.close();
} catch (final IOException e) {
log.error("Error while stream decrypted data: ", e);
} finally {
try {
if (pin != null)
if (pin != null) {
pin.close();
}
} catch (final IOException e1) {
log.error("Failed to close PipedInputStream: ", e1);
}
try {
if (pout != null)
if (pout != null) {
pout.flush();
pout.close();
}
output.flush();
output.close();
} catch (final IOException e1) {
log.error("Failed to close PipedOutputStream: ", e1);
}

View file

@ -197,10 +197,6 @@ public class SebExamConfigServiceImpl implements SebExamConfigService {
final String configKey = DigestUtils.sha256Hex(pin);
pout.flush();
pout.close();
pin.close();
return Result.of(configKey);
} catch (final Exception e) {
@ -208,14 +204,16 @@ public class SebExamConfigServiceImpl implements SebExamConfigService {
return Result.ofError(e);
} finally {
try {
if (pin != null)
if (pin != null) {
pin.close();
}
} catch (final IOException e1) {
log.error("Failed to close PipedInputStream: ", e1);
}
try {
if (pout != null)
if (pout != null) {
pout.close();
}
} catch (final IOException e1) {
log.error("Failed to close PipedOutputStream: ", e1);
}
@ -250,22 +248,21 @@ public class SebExamConfigServiceImpl implements SebExamConfigService {
IOUtils.copyLarge(pin, out);
pout.flush();
pout.close();
pin.close();
} catch (final Exception e) {
log.error("Error while stream plain text SEB clonfiguration data: ", e);
} finally {
try {
if (pin != null)
if (pin != null) {
pin.close();
}
} catch (final IOException e1) {
log.error("Failed to close PipedInputStream: ", e1);
}
try {
if (pout != null)
if (pout != null) {
pout.flush();
pout.close();
}
} catch (final IOException e1) {
log.error("Failed to close PipedOutputStream: ", e1);
}

View file

@ -43,16 +43,17 @@ public class ZipServiceImpl implements ZipService {
IOUtils.copyLarge(in, zipOutputStream);
in.close();
zipOutputStream.flush();
zipOutputStream.close();
} catch (final IOException e) {
log.error("Error while streaming data to zipped output: ", e);
} finally {
try {
if (zipOutputStream != null)
in.close();
if (zipOutputStream != null) {
zipOutputStream.flush();
zipOutputStream.close();
}
out.flush();
out.close();
} catch (final IOException e) {
log.error("Failed to close ZipOutputStream: ", e);
}

View file

@ -8,16 +8,21 @@
package ch.ethz.seb.sebserver.webservice.weblayer.api;
import java.io.IOException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import org.mybatis.dynamic.sql.SqlTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
import ch.ethz.seb.sebserver.gbl.api.API;
import ch.ethz.seb.sebserver.gbl.api.POSTMapper;
@ -43,6 +48,8 @@ import ch.ethz.seb.sebserver.webservice.servicelayer.validation.BeanValidationSe
@RequestMapping("${sebserver.webservice.api.admin.endpoint}" + API.CONFIGURATION_NODE_ENDPOINT)
public class ConfigurationNodeController extends EntityController<ConfigurationNode, ConfigurationNode> {
private static final Logger log = LoggerFactory.getLogger(ConfigurationNodeController.class);
private final ConfigurationDAO configurationDAO;
private final SebExamConfigService sebExamConfigService;
@ -123,21 +130,34 @@ public class ConfigurationNodeController extends EntityController<ConfigurationN
path = API.MODEL_ID_VAR_PATH_SEGMENT + API.CONFIGURATION_PLAIN_XML_DOWNLOAD_PATH_SEGMENT,
method = RequestMethod.GET,
produces = MediaType.APPLICATION_OCTET_STREAM_VALUE)
public ResponseEntity<StreamingResponseBody> downloadPlainXMLConfig(
public void downloadPlainXMLConfig(
@PathVariable final Long modelId,
@RequestParam(
name = API.PARAM_INSTITUTION_ID,
required = true,
defaultValue = UserService.USERS_INSTITUTION_AS_DEFAULT) final Long institutionId) {
defaultValue = UserService.USERS_INSTITUTION_AS_DEFAULT) final Long institutionId,
final HttpServletResponse response) throws IOException {
this.entityDAO.byPK(modelId)
.flatMap(this.authorization::checkRead)
.map(this.userActivityLogDAO::logExport);
final StreamingResponseBody stream = out -> this.sebExamConfigService
.exportPlainXML(out, institutionId, modelId);
final ServletOutputStream outputStream = response.getOutputStream();
return new ResponseEntity<>(stream, HttpStatus.OK);
try {
this.sebExamConfigService.exportPlainXML(
outputStream,
institutionId,
modelId);
response.setStatus(HttpStatus.OK.value());
} catch (final Exception e) {
log.error("Unexpected error while trying to downstream exam config: ", e);
response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR.value());
} finally {
outputStream.flush();
outputStream.close();
}
}
}

View file

@ -25,7 +25,7 @@ public class ControllerConfig implements WebMvcConfigurer {
@Override
public void configureAsyncSupport(final AsyncSupportConfigurer configurer) {
configurer.setTaskExecutor(threadPoolTaskExecutor());
configurer.setDefaultTimeout(30_000);
configurer.setDefaultTimeout(30000);
}
public AsyncTaskExecutor threadPoolTaskExecutor() {

View file

@ -15,6 +15,7 @@ import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -22,7 +23,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.util.MultiValueMap;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
@ -30,7 +30,6 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
import ch.ethz.seb.sebserver.gbl.api.API;
import ch.ethz.seb.sebserver.gbl.api.APIMessage;
@ -247,11 +246,14 @@ public class ExamAPI_V1_Controller {
method = RequestMethod.GET,
consumes = MediaType.APPLICATION_FORM_URLENCODED_VALUE,
produces = MediaType.APPLICATION_OCTET_STREAM_VALUE)
public ResponseEntity<StreamingResponseBody> getConfig(
public void getConfig(
@RequestHeader(name = API.EXAM_API_SEB_CONNECTION_TOKEN, required = true) final String connectionToken,
@RequestBody(required = false) final MultiValueMap<String, String> formParams,
final Principal principal,
final HttpServletRequest request) {
final HttpServletRequest request,
final HttpServletResponse response) throws IOException {
final ServletOutputStream outputStream = response.getOutputStream();
try {
// if an examId is provided with the request, update the connection first
@ -272,28 +274,35 @@ public class ExamAPI_V1_Controller {
throw new IllegalStateException("Missing exam identider or requested exam is not running");
}
} catch (final Exception e) {
log.error("Unexpected error: ", e);
final StreamingResponseBody stream = out -> {
final APIMessage errorMessage = APIMessage.ErrorMessage.GENERIC.of(e.getMessage());
out.write(Utils.toByteArray(this.jsonMapper.writeValueAsString(errorMessage)));
};
return new ResponseEntity<>(stream, HttpStatus.BAD_REQUEST);
final APIMessage errorMessage = APIMessage.ErrorMessage.GENERIC.of(e.getMessage());
outputStream.write(Utils.toByteArray(this.jsonMapper.writeValueAsString(errorMessage)));
response.setStatus(HttpStatus.BAD_REQUEST.value());
outputStream.flush();
outputStream.close();
return;
}
final StreamingResponseBody stream = out -> {
try {
try {
this.examSessionService
.streamDefaultExamConfig(
connectionToken,
out);
} catch (final Exception e) {
final APIMessage errorMessage = APIMessage.ErrorMessage.GENERIC.of(e.getMessage());
out.write(Utils.toByteArray(this.jsonMapper.writeValueAsString(errorMessage)));
}
};
this.examSessionService
.streamDefaultExamConfig(
connectionToken,
outputStream);
return new ResponseEntity<>(stream, HttpStatus.OK);
response.setStatus(HttpStatus.OK.value());
} catch (final Exception e) {
final APIMessage errorMessage = APIMessage.ErrorMessage.GENERIC.of(e.getMessage());
outputStream.write(Utils.toByteArray(this.jsonMapper.writeValueAsString(errorMessage)));
response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR.value());
} finally {
outputStream.flush();
outputStream.close();
}
}
@RequestMapping(

View file

@ -8,30 +8,34 @@
package ch.ethz.seb.sebserver.webservice.weblayer.api;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.io.IOUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.mybatis.dynamic.sql.SqlTable;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.validation.FieldError;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
import ch.ethz.seb.sebserver.gbl.Constants;
import ch.ethz.seb.sebserver.gbl.api.API;
import ch.ethz.seb.sebserver.gbl.api.APIMessage;
import ch.ethz.seb.sebserver.gbl.api.APIMessage.APIMessageException;
import ch.ethz.seb.sebserver.gbl.api.EntityType;
import ch.ethz.seb.sebserver.gbl.api.POSTMapper;
import ch.ethz.seb.sebserver.gbl.model.Domain;
import ch.ethz.seb.sebserver.gbl.model.sebconfig.SebClientConfig;
import ch.ethz.seb.sebserver.gbl.model.user.PasswordChange;
import ch.ethz.seb.sebserver.gbl.model.user.UserLogActivityType;
import ch.ethz.seb.sebserver.gbl.profile.WebServiceProfile;
import ch.ethz.seb.sebserver.gbl.util.Result;
import ch.ethz.seb.sebserver.webservice.datalayer.batis.mapper.SebClientConfigRecordDynamicSqlSupport;
@ -74,26 +78,43 @@ public class SebClientConfigController extends ActivatableEntityController<SebCl
path = API.SEB_CLIENT_CONFIG_DOWNLOAD_PATH_SEGMENT + API.MODEL_ID_VAR_PATH_SEGMENT,
method = RequestMethod.GET,
produces = MediaType.APPLICATION_OCTET_STREAM_VALUE)
public ResponseEntity<StreamingResponseBody> downloadSEBConfig(
@PathVariable final String modelId) {
public void downloadSEBConfig(
@PathVariable final String modelId,
final HttpServletResponse response) throws IOException {
this.entityDAO.byModelId(modelId)
.flatMap(this.authorization::checkWrite)
.map(this.userActivityLogDAO::logExport);
this.userActivityLogDAO.log(
UserLogActivityType.EXPORT,
EntityType.SEB_CLIENT_CONFIGURATION,
modelId,
"Export of SEB Client Configuration");
final ServletOutputStream outputStream = response.getOutputStream();
PipedOutputStream pout = null;
PipedInputStream pin = null;
try {
pout = new PipedOutputStream();
pin = new PipedInputStream(pout);
final StreamingResponseBody stream = out -> {
this.sebClientConfigService.exportSebClientConfiguration(
out,
pout,
modelId);
};
return new ResponseEntity<>(stream, HttpStatus.OK);
IOUtils.copyLarge(pin, outputStream);
response.setStatus(HttpStatus.OK.value());
outputStream.flush();
} finally {
outputStream.flush();
outputStream.close();
}
// final StreamingResponseBody stream = out -> {
// this.sebClientConfigService.exportSebClientConfiguration(
// out,
// modelId);
// };
//
// return new ResponseEntity<>(stream, HttpStatus.OK);
}
@Override

View file

@ -19,6 +19,7 @@ import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -287,6 +288,10 @@ public class HTTPClientBot {
final byte[] config = exchange.getBody();
if (ArrayUtils.isEmpty(config)) {
log.error("No Exam config get from API. processing anyway");
}
if (log.isDebugEnabled()) {
log.debug("ConnectionBot {} : successfully requested exam config: " + Utils.toString(config),
this.name);

View file

@ -39,7 +39,6 @@ import org.springframework.security.web.FilterChainProxy;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.test.web.servlet.ResultActions;
import org.springframework.test.web.servlet.request.MockHttpServletRequestBuilder;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
@ -280,8 +279,7 @@ public abstract class ExamAPIIntegrationTester {
}
final ResultActions result = this.mockMvc
.perform(builder)
.andDo(MvcResult::getAsyncResult);
.perform(builder);
return result.andReturn().getResponse();
}