Refactor the full-texts deletion process to reduce storage space and complexity:
- Delete the assignments-batch full-texts after the whole procedure (for each assignments-batch) is finished, either successfully or not. - Do not check for remaining files, when the Worker shuts down, since, in case of problematic handling the files are deleted anyway. The full-texts are not needed to be kept, in case of an error, since the Controller will reassign the non-downloaded id-url records to some worker (maybe different) and these files will be downloaded again and handled there. Also, change the "assignmentsNumsHandled" to hold data only for assignments which are handled all the way, including the upload of the full-texts from the Controller and also the insertion of the WorkerReport to the database.
This commit is contained in:
parent
326af0f12d
commit
d37cd738a0
|
@ -2,7 +2,7 @@ package eu.openaire.urls_worker;
|
||||||
|
|
||||||
import eu.openaire.publications_retriever.PublicationsRetriever;
|
import eu.openaire.publications_retriever.PublicationsRetriever;
|
||||||
import eu.openaire.publications_retriever.util.file.FileUtils;
|
import eu.openaire.publications_retriever.util.file.FileUtils;
|
||||||
import eu.openaire.urls_worker.components.ScheduledTasks;
|
import eu.openaire.urls_worker.controllers.FullTextsController;
|
||||||
import eu.openaire.urls_worker.plugins.PublicationsRetrieverPlugin;
|
import eu.openaire.urls_worker.plugins.PublicationsRetrieverPlugin;
|
||||||
import eu.openaire.urls_worker.util.AssignmentsHandler;
|
import eu.openaire.urls_worker.util.AssignmentsHandler;
|
||||||
import eu.openaire.urls_worker.util.UriBuilder;
|
import eu.openaire.urls_worker.util.UriBuilder;
|
||||||
|
@ -95,8 +95,7 @@ public class UrlsWorkerApplication {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ScheduledTasks.isLastTime = true;
|
FullTextsController.deleteDirectory(-1);
|
||||||
ScheduledTasks.deleteHandledAssignmentsFullTexts();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
|
|
|
@ -1,25 +1,13 @@
|
||||||
package eu.openaire.urls_worker.components;
|
package eu.openaire.urls_worker.components;
|
||||||
|
|
||||||
import eu.openaire.urls_worker.UrlsWorkerApplication;
|
import eu.openaire.urls_worker.UrlsWorkerApplication;
|
||||||
import eu.openaire.urls_worker.controllers.FullTextsController;
|
|
||||||
import eu.openaire.urls_worker.controllers.GeneralController;
|
import eu.openaire.urls_worker.controllers.GeneralController;
|
||||||
import eu.openaire.urls_worker.plugins.PublicationsRetrieverPlugin;
|
|
||||||
import eu.openaire.urls_worker.util.AssignmentsHandler;
|
import eu.openaire.urls_worker.util.AssignmentsHandler;
|
||||||
import org.apache.commons.io.FileUtils;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.nio.file.Files;
|
|
||||||
import java.nio.file.Path;
|
|
||||||
import java.nio.file.Paths;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.stream.Stream;
|
|
||||||
|
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
public class ScheduledTasks {
|
public class ScheduledTasks {
|
||||||
|
@ -44,66 +32,4 @@ public class ScheduledTasks {
|
||||||
AssignmentsHandler.handleAssignments();
|
AssignmentsHandler.handleAssignments();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public static boolean isLastTime = false;
|
|
||||||
|
|
||||||
|
|
||||||
@Scheduled(fixedDelay = 43_200_000, initialDelay = 43_200_000) // Every 12 hours, after 12 hours from the start of this app.
|
|
||||||
public static void deleteHandledAssignmentsFullTexts()
|
|
||||||
{
|
|
||||||
Set<Map.Entry<Long, Boolean>> entrySet = FullTextsController.assignmentsNumsHandledAndApprovedToBeDeleted.entrySet();
|
|
||||||
if ( entrySet.isEmpty() )
|
|
||||||
return;
|
|
||||||
|
|
||||||
logger.info("Going to delete the locally stored fullTexts.");
|
|
||||||
|
|
||||||
for ( Map.Entry<Long,Boolean> entry : entrySet )
|
|
||||||
{
|
|
||||||
if ( entry.getValue().equals(true) ) // It is already deleted, move on.
|
|
||||||
continue;
|
|
||||||
|
|
||||||
Long curAssignments = entry.getKey();
|
|
||||||
String currentAssignmentsBasePath = PublicationsRetrieverPlugin.assignmentsBasePath + "assignments_" + curAssignments + "_fullTexts" + File.separator;
|
|
||||||
logger.debug("Going to delete the files from assignments: " + currentAssignmentsBasePath);
|
|
||||||
|
|
||||||
File curDir = new File(currentAssignmentsBasePath);
|
|
||||||
if ( !curDir.isDirectory() ) {
|
|
||||||
logger.error("This assignments-dir does not exist: " + currentAssignmentsBasePath);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
FileUtils.deleteDirectory(curDir);
|
|
||||||
FullTextsController.assignmentsNumsHandledAndApprovedToBeDeleted.put(curAssignments, true); // Set the is-handled to true.
|
|
||||||
} catch (IOException e) {
|
|
||||||
logger.error("The following directory could not be deleted: " + currentAssignmentsBasePath, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if ( isLastTime ) { // Delete the parent "assignments" directory if not files are left behind.
|
|
||||||
// In case something went wrong in the full-texts delivering to the controller, then the non-transferred files will remain in the Worker's local storage, for future fix.
|
|
||||||
// So, delete the parent directory, only if it's empty!
|
|
||||||
|
|
||||||
logger.info("Going to delete the parent \"" + PublicationsRetrieverPlugin.assignmentsBasePath + "\" directory.");
|
|
||||||
|
|
||||||
boolean isAnEmptyDir = false;
|
|
||||||
try ( Stream<Path> stream = Files.list(Paths.get(PublicationsRetrieverPlugin.assignmentsBasePath)) ) {
|
|
||||||
isAnEmptyDir = ! stream.findAny().isPresent();
|
|
||||||
} catch (IOException e) {
|
|
||||||
logger.error("Could not list the contents of the parent directory: " + PublicationsRetrieverPlugin.assignmentsBasePath, e);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ( isAnEmptyDir ) {
|
|
||||||
try {
|
|
||||||
FileUtils.deleteDirectory(new File(PublicationsRetrieverPlugin.assignmentsBasePath));
|
|
||||||
} catch (IOException e) {
|
|
||||||
logger.error("The following directory could not be deleted: " + PublicationsRetrieverPlugin.assignmentsBasePath, e);
|
|
||||||
}
|
|
||||||
} else
|
|
||||||
logger.warn("The parent directory \"" + PublicationsRetrieverPlugin.assignmentsBasePath + "\" was not empty! Which means there were some unhandled full-text batches!");
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
package eu.openaire.urls_worker.controllers;
|
package eu.openaire.urls_worker.controllers;
|
||||||
|
|
||||||
|
import eu.openaire.urls_worker.plugins.PublicationsRetrieverPlugin;
|
||||||
import eu.openaire.urls_worker.services.FileStorageService;
|
import eu.openaire.urls_worker.services.FileStorageService;
|
||||||
import eu.openaire.urls_worker.util.FilesZipper;
|
import eu.openaire.urls_worker.util.FilesZipper;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.core.io.InputStreamResource;
|
import org.springframework.core.io.InputStreamResource;
|
||||||
|
@ -14,9 +16,9 @@ import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
import org.springframework.web.bind.annotation.RestController;
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@RestController
|
@RestController
|
||||||
|
@ -25,8 +27,6 @@ public class FullTextsController {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(GeneralController.class);
|
private static final Logger logger = LoggerFactory.getLogger(GeneralController.class);
|
||||||
|
|
||||||
public static HashMap<Long, Boolean> assignmentsNumsHandledAndApprovedToBeDeleted = new HashMap<>();
|
|
||||||
|
|
||||||
public static String assignmentsBaseDir = null;
|
public static String assignmentsBaseDir = null;
|
||||||
|
|
||||||
|
|
||||||
|
@ -74,11 +74,8 @@ public class FullTextsController {
|
||||||
return ResponseEntity.internalServerError().body(errorMsg);
|
return ResponseEntity.internalServerError().body(errorMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
// If this is the last batch for this assignments-count, then make sure it is deleted in the next scheduled delete-operation.
|
if ( zipBatchCounter == totalZipBatches )
|
||||||
if ( zipBatchCounter == totalZipBatches ) {
|
logger.debug("Will return the " + ((totalZipBatches > 1) ? "last" : "only") + " batch (" + zipBatchCounter + ") of Assignments_" + assignmentsCounter + " to the Controller.");
|
||||||
assignmentsNumsHandledAndApprovedToBeDeleted.put(assignmentsCounter, false);
|
|
||||||
logger.debug("Will return the " + ((totalZipBatches > 1) ? "last" : "only") + " batch (" + zipBatchCounter + ") of Assignments_" + assignmentsCounter + " to the Controller and these assignments will be deleted later.");
|
|
||||||
}
|
|
||||||
|
|
||||||
String zipName = zipFile.getName();
|
String zipName = zipFile.getName();
|
||||||
String zipFileFullPath = currentAssignmentsBaseFullTextsPath + zipName;
|
String zipFileFullPath = currentAssignmentsBaseFullTextsPath + zipName;
|
||||||
|
@ -92,6 +89,9 @@ public class FullTextsController {
|
||||||
logger.error(errorMsg, e);
|
logger.error(errorMsg, e);
|
||||||
return ResponseEntity.internalServerError().body(errorMsg);
|
return ResponseEntity.internalServerError().body(errorMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The related fulltext and zip files will be deleted in "AssignmentsHandler.postWorkerReport()", after the Controller has finished transferring them. They will be deleted even in case of a Controller-error.
|
||||||
|
// In case of an error and file-deletion, the related id-url records will just be re-processed in the future by some (maybe different) Worker.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -118,4 +118,27 @@ public class FullTextsController {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static boolean deleteDirectory(long curAssignments)
|
||||||
|
{
|
||||||
|
String directoryPath = PublicationsRetrieverPlugin.assignmentsBasePath;
|
||||||
|
|
||||||
|
if ( curAssignments != -1 ) {
|
||||||
|
directoryPath += "assignments_" + curAssignments + "_fullTexts" + File.separator;
|
||||||
|
logger.debug("Going to delete the files inside the directory of assignments_" + curAssignments);
|
||||||
|
} else
|
||||||
|
logger.debug("Going to delete the parent directory: " + directoryPath);
|
||||||
|
|
||||||
|
try {
|
||||||
|
FileUtils.deleteDirectory(new File(directoryPath));
|
||||||
|
return true;
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.error("The following directory could not be deleted: " + directoryPath, e);
|
||||||
|
return false;
|
||||||
|
} catch (IllegalArgumentException iae) {
|
||||||
|
logger.error("This assignments-dir does not exist: " + directoryPath);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package eu.openaire.urls_worker.controllers;
|
package eu.openaire.urls_worker.controllers;
|
||||||
|
|
||||||
import eu.openaire.urls_worker.UrlsWorkerApplication;
|
import eu.openaire.urls_worker.UrlsWorkerApplication;
|
||||||
|
import eu.openaire.urls_worker.util.AssignmentsHandler;
|
||||||
import eu.openaire.urls_worker.util.UriBuilder;
|
import eu.openaire.urls_worker.util.UriBuilder;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -12,9 +13,6 @@ import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
import org.springframework.web.bind.annotation.RestController;
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
import javax.servlet.http.HttpServletRequest;
|
import javax.servlet.http.HttpServletRequest;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
|
|
||||||
@RestController
|
@RestController
|
||||||
|
@ -69,13 +67,7 @@ public class GeneralController {
|
||||||
@GetMapping("getHandledAssignmentsCounts")
|
@GetMapping("getHandledAssignmentsCounts")
|
||||||
public ResponseEntity<?> getHandledAssignmentsCounts()
|
public ResponseEntity<?> getHandledAssignmentsCounts()
|
||||||
{
|
{
|
||||||
List<Long> handledAssignmentsCounts = new ArrayList<>(FullTextsController.assignmentsNumsHandledAndApprovedToBeDeleted.size()/2);
|
return ResponseEntity.ok(AssignmentsHandler.assignmentsNumsHandled);
|
||||||
for ( Map.Entry<Long,Boolean> entry : FullTextsController.assignmentsNumsHandledAndApprovedToBeDeleted.entrySet() )
|
|
||||||
{
|
|
||||||
if ( entry.getValue().equals(true) )
|
|
||||||
handledAssignmentsCounts.add(entry.getKey());
|
|
||||||
}
|
|
||||||
return ResponseEntity.ok(handledAssignmentsCounts);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@ import com.google.common.collect.Multimap;
|
||||||
import eu.openaire.publications_retriever.util.url.GenericUtils;
|
import eu.openaire.publications_retriever.util.url.GenericUtils;
|
||||||
import eu.openaire.publications_retriever.util.url.UrlUtils;
|
import eu.openaire.publications_retriever.util.url.UrlUtils;
|
||||||
import eu.openaire.urls_worker.UrlsWorkerApplication;
|
import eu.openaire.urls_worker.UrlsWorkerApplication;
|
||||||
|
import eu.openaire.urls_worker.controllers.FullTextsController;
|
||||||
import eu.openaire.urls_worker.controllers.GeneralController;
|
import eu.openaire.urls_worker.controllers.GeneralController;
|
||||||
import eu.openaire.urls_worker.models.Assignment;
|
import eu.openaire.urls_worker.models.Assignment;
|
||||||
import eu.openaire.urls_worker.models.UrlReport;
|
import eu.openaire.urls_worker.models.UrlReport;
|
||||||
|
@ -21,6 +22,7 @@ import org.springframework.web.client.RestTemplate;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
@ -176,10 +178,13 @@ public class AssignmentsHandler {
|
||||||
// The scheduler will handle calling it every 15 mins, in case the Worker is available for work..
|
// The scheduler will handle calling it every 15 mins, in case the Worker is available for work..
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static HashSet<Long> assignmentsNumsHandled = new HashSet<>();
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Post the worker report and wait for the Controller to request the publication-files.
|
* Post the worker report and wait for the Controller to request the publication-files.
|
||||||
* Once the Controller finishes with uploading the files to the S3-ObjectStore, it returns an "HTTP-200-OK" response to the Worker.
|
* Once the Controller finishes with uploading the files to the S3-ObjectStore, it returns an "HTTP-200-OK" response to the Worker.
|
||||||
|
* Afterwards, the Worker, even in case of an error, deletes the fulltext and zip files.
|
||||||
* */
|
* */
|
||||||
public static boolean postWorkerReport(Long assignmentRequestCounter)
|
public static boolean postWorkerReport(Long assignmentRequestCounter)
|
||||||
{
|
{
|
||||||
|
@ -187,16 +192,20 @@ public class AssignmentsHandler {
|
||||||
logger.info("Going to post the WorkerReport of assignment_" + assignmentRequestCounter + " to the controller-server: " + postUrl);
|
logger.info("Going to post the WorkerReport of assignment_" + assignmentRequestCounter + " to the controller-server: " + postUrl);
|
||||||
try {
|
try {
|
||||||
ResponseEntity<String> responseEntity = restTemplate.postForEntity(postUrl, new WorkerReport(UrlsWorkerApplication.workerId, assignmentRequestCounter, urlReports), String.class);
|
ResponseEntity<String> responseEntity = restTemplate.postForEntity(postUrl, new WorkerReport(UrlsWorkerApplication.workerId, assignmentRequestCounter, urlReports), String.class);
|
||||||
|
|
||||||
|
// The worker sends the "WorkerReport" and before this "POST"-request returns here, the Controller, after analyzing the report, opens new request to the Worker in order to receive the full-texts.
|
||||||
|
// After the report and the full-texts are received and uploaded to the database/S3, the Controller returns a response to the Worker.
|
||||||
|
|
||||||
int responseCode = responseEntity.getStatusCodeValue();
|
int responseCode = responseEntity.getStatusCodeValue();
|
||||||
if ( responseCode == HttpStatus.OK.value() ) {
|
if ( responseCode == HttpStatus.OK.value() ) {
|
||||||
logger.info("The submission of the WorkerReport of assignments_" + assignmentRequestCounter + " to the Controller, and the full-text delivering, were successful!");
|
logger.info("The submission of the WorkerReport of assignments_" + assignmentRequestCounter + " to the Controller, and the full-text delivering, were successful.");
|
||||||
|
assignmentsNumsHandled.add(assignmentRequestCounter);
|
||||||
return true;
|
return true;
|
||||||
}
|
} else if ( responseCode == HttpStatus.MULTI_STATUS.value() ) {
|
||||||
else if ( responseCode == HttpStatus.MULTI_STATUS.value() ) {
|
|
||||||
logger.warn("The submission of the WorkerReport of assignments_" + assignmentRequestCounter + " to the Controller was successful, but the full-texts' delivering failed!");
|
logger.warn("The submission of the WorkerReport of assignments_" + assignmentRequestCounter + " to the Controller was successful, but the full-texts' delivering failed!");
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
logger.error("HTTP-Connection problem with the submission of the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller. Error-code was: " + responseCode);
|
logger.error("HTTP-Connection problem with the submission of the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller! Error-code was: " + responseCode);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -205,6 +214,11 @@ public class AssignmentsHandler {
|
||||||
} finally {
|
} finally {
|
||||||
urlReports.clear(); // Reset, without de-allocating.
|
urlReports.clear(); // Reset, without de-allocating.
|
||||||
assignmentsForPlugins.clear();
|
assignmentsForPlugins.clear();
|
||||||
|
|
||||||
|
// It is possible that one or more full-texts-batches, are not sent to the Controller, or that the Controller failed to process them.
|
||||||
|
// In that case, the related "attempt"-records will keep their "success" state, but the related "payload" records will not be inserted into the database.
|
||||||
|
// When all the id-urls are processed at least one time, the Controller will start returning all the "couldRetry" records without a related "payload"-record.
|
||||||
|
FullTextsController.deleteDirectory(assignmentRequestCounter);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue