diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/bulkaction/BatchActionService.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/bulkaction/BatchActionService.java index 7cc308bd..0c35fedb 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/bulkaction/BatchActionService.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/bulkaction/BatchActionService.java @@ -22,10 +22,11 @@ public interface BatchActionService { /** Use this to register a new batch action for further processing. * + * @param institutionId The institution identifier * @param actionType The batch action type * @param ids comma separated String of model ids to process * @return Result refer to the stored batch action or to an error when happened */ - Result registerNewBatchAction(BatchActionType actionType, String ids); + Result registerNewBatchAction(final Long institutionId, BatchActionType actionType, String ids); /** Use this to get all currently running batch actions for a given institution. * diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/bulkaction/impl/BatchActionServiceImpl.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/bulkaction/impl/BatchActionServiceImpl.java index 93862ce4..9940f57e 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/bulkaction/impl/BatchActionServiceImpl.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/bulkaction/impl/BatchActionServiceImpl.java @@ -9,17 +9,22 @@ package ch.ethz.seb.sebserver.webservice.servicelayer.bulkaction.impl; import java.time.Instant; +import java.util.Arrays; import java.util.Collection; import java.util.EnumMap; import java.util.HashSet; import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.TaskScheduler; import org.springframework.stereotype.Service; +import ch.ethz.seb.sebserver.gbl.Constants; +import ch.ethz.seb.sebserver.gbl.api.API; import ch.ethz.seb.sebserver.gbl.api.API.BatchActionType; import ch.ethz.seb.sebserver.gbl.api.EntityType; import ch.ethz.seb.sebserver.gbl.model.BatchAction; @@ -29,6 +34,7 @@ import ch.ethz.seb.sebserver.gbl.util.Result; import ch.ethz.seb.sebserver.webservice.servicelayer.bulkaction.BatchActionExec; import ch.ethz.seb.sebserver.webservice.servicelayer.bulkaction.BatchActionService; import ch.ethz.seb.sebserver.webservice.servicelayer.dao.BatchActionDAO; +import ch.ethz.seb.sebserver.webservice.servicelayer.dao.FilterMap; import ch.ethz.seb.sebserver.webservice.servicelayer.dao.ResourceNotFoundException; @Service @@ -59,27 +65,59 @@ public class BatchActionServiceImpl implements BatchActionService { } @Override - public Result registerNewBatchAction(final BatchActionType actionType, final String ids) { - // TODO Auto-generated method stub - return null; + public Result registerNewBatchAction( + final Long institutionId, + final BatchActionType actionType, + final String ids) { + + return Result.tryCatch(() -> { + + final Collection sourceIds = Arrays.asList(StringUtils.split( + ids, + Constants.LIST_SEPARATOR)); + + return this.batchActionDAO + .createNew(new BatchAction(null, institutionId, actionType, sourceIds, null, null, null)) + .map(res -> { + processNextBatchAction(); + return res; + }) + .getOrThrow(); + }); } @Override public Result> getRunningActions(final Long institutionId) { - // TODO Auto-generated method stub - return null; + return this.batchActionDAO.allMatching(new FilterMap().putIfAbsent( + API.PARAM_INSTITUTION_ID, + String.valueOf(institutionId))) + .map(results -> results.stream() + .filter(action -> StringUtils.isNotBlank(action.processorId) && + !action.processorId.endsWith(BatchActionDAO.FLAG_FINISHED)) + .collect(Collectors.toList())); } @Override public Result> getRunningActions(final Long institutionId, final EntityType entityType) { - // TODO Auto-generated method stub - return null; + return this.batchActionDAO.allMatching(new FilterMap().putIfAbsent( + API.PARAM_INSTITUTION_ID, + String.valueOf(institutionId))) + .map(results -> results.stream() + .filter(action -> StringUtils.isNotBlank(action.processorId) && + !action.processorId.endsWith(BatchActionDAO.FLAG_FINISHED)) + .filter(action -> action.actionType.entityType == entityType) + .collect(Collectors.toList())); } @Override public Result> getFinishedActions(final Long institutionId) { - // TODO Auto-generated method stub - return null; + return this.batchActionDAO.allMatching(new FilterMap().putIfAbsent( + API.PARAM_INSTITUTION_ID, + String.valueOf(institutionId))) + .map(results -> results.stream() + .filter(action -> StringUtils.isNotBlank(action.processorId) && + action.processorId.endsWith(BatchActionDAO.FLAG_FINISHED)) + .collect(Collectors.toList())); } private void processNextBatchAction() { @@ -161,7 +199,6 @@ public class BatchActionServiceImpl implements BatchActionService { log.info("Skip this batch action."); } } - } } diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/BatchActionDAO.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/BatchActionDAO.java index 9a2add0d..a36bd531 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/BatchActionDAO.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/BatchActionDAO.java @@ -15,10 +15,30 @@ import ch.ethz.seb.sebserver.gbl.util.Result; public interface BatchActionDAO extends EntityDAO { + public static final String FLAG_FINISHED = "_FINISHED"; + + /** This checks if there is a pending batch action to process next. + * If so this reserves the pending batch action and mark it to be processed + * by the given pocessId. + * If there is no pending batch action this results with a ResourceNotFoundException. + * + * @param processId The process id to reserve a pending batch action before processing + * @return Result refer to the batch action to process or to an error when happened */ Result getAndReserveNext(String processId); + /** Use this to update the processing of a running batch action + * + * @param actionId The batch action identifier + * @param processId The process identifier (must match with the processId on persistent storage) + * @param modelIds Collection of model identifiers of entities that has successfully been processed. + * @return Result refer to the involved batch action or to an error when happened. */ Result updateProgress(Long actionId, String processId, Collection modelIds); + /** Use this to mark processing of a single entity of a specified batch action as successful completed. + * + * @param actionId The batch action identifier + * @param processId The process identifier (must match with the processId on persistent storage) + * @param modelId The model identifier to mark as completed for the given batch action */ void setSuccessfull(Long actionId, String processId, String modelId); /** This is used by a processing background task that is processing a batch action to finish up diff --git a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/BatchActionDAOImpl.java b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/BatchActionDAOImpl.java index a083c63d..94b99627 100644 --- a/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/BatchActionDAOImpl.java +++ b/src/main/java/ch/ethz/seb/sebserver/webservice/servicelayer/dao/impl/BatchActionDAOImpl.java @@ -48,8 +48,6 @@ import ch.ethz.seb.sebserver.webservice.servicelayer.dao.TransactionHandler; @WebServiceProfile public class BatchActionDAOImpl implements BatchActionDAO { - private static final String FLAG_FINISHED = "_FINISHED"; - private final BatchActionRecordMapper batchActionRecordMapper; public BatchActionDAOImpl(final BatchActionRecordMapper batchActionRecordMapper) { @@ -140,6 +138,22 @@ public class BatchActionDAOImpl implements BatchActionDAO { public void setSuccessfull(final Long actionId, final String processId, final String modelId) { try { + final BatchActionRecord rec = this.batchActionRecordMapper.selectByPrimaryKey(actionId); + + if (!processId.equals(rec.getProcessorId())) { + throw new RuntimeException("Batch action processor id mismatch: " + processId + " " + rec); + } + + final BatchActionRecord newRecord = new BatchActionRecord( + actionId, + null, + null, + null, + rec.getSuccessful() + Constants.LIST_SEPARATOR + modelId, + Utils.getMillisecondsNow(), + processId); + this.batchActionRecordMapper.updateByPrimaryKeySelective(newRecord); + } catch (final Exception e) { log.error("Failed to mark entity sucessfuly processed: modelId: {}, processId"); }