From 20b71164d5e6d018e3a0e14f9c6b68efbc28a369 Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Fri, 26 Nov 2021 17:04:31 +0200 Subject: [PATCH] - The worker will store the files in its local file-system and will send them to the controller in batches, after the latter requests them. When all files from a given assignments-num are sent, the files will be deleted from the Worker, in a scheduled-job. - Implement the "getFullTexts"-endpoint, which returns the requested full-texts in a zip file. - Implement the "getFullText"-endpoint, which returns the requested full-text. - Implement the "getHandledAssignmentsCounts"-endpoint which returns the assignments-numbers, which were handled by that worker. - Make sure each urlReport has the same "Date" for a given assignments-number. Also, make sure the "size" and "hash" have a "null" value, in case the full-text was not found. - Check and log thread-pool shutdown errors. - Add the stack-trace in the error-logs, instead of the Stderr. - Update SpringBoot dependency. - Change log levels. - Code cleanup. --- build.gradle | 2 +- .../urls_worker/UrlsWorkerApplication.java | 45 +++++++- .../components/ScheduledTasks.java | 48 +++++++- .../controllers/FullTextsController.java | 107 ++++++++++++++++++ .../controllers/GeneralController.java | 20 +++- .../exceptions/FileStorageException.java | 16 +++ .../plugins/PublicationsRetrieverPlugin.java | 56 +++++---- .../services/FileStorageService.java | 85 ++++++++++++++ .../urls_worker/util/AssignmentHandler.java | 3 - .../urls_worker/util/FilesZipper.java | 80 +++++++++++++ src/main/resources/application.properties | 6 +- 11 files changed, 431 insertions(+), 37 deletions(-) create mode 100644 src/main/java/eu/openaire/urls_worker/controllers/FullTextsController.java create mode 100644 src/main/java/eu/openaire/urls_worker/exceptions/FileStorageException.java create mode 100644 src/main/java/eu/openaire/urls_worker/services/FileStorageService.java create mode 100644 src/main/java/eu/openaire/urls_worker/util/FilesZipper.java diff --git a/build.gradle b/build.gradle index 994c6b3..779d799 100644 --- a/build.gradle +++ b/build.gradle @@ -1,5 +1,5 @@ plugins { - id 'org.springframework.boot' version '2.5.6' + id 'org.springframework.boot' version '2.6.0' id 'io.spring.dependency-management' version '1.0.11.RELEASE' id 'java' } diff --git a/src/main/java/eu/openaire/urls_worker/UrlsWorkerApplication.java b/src/main/java/eu/openaire/urls_worker/UrlsWorkerApplication.java index 18bf404..14e9ac2 100644 --- a/src/main/java/eu/openaire/urls_worker/UrlsWorkerApplication.java +++ b/src/main/java/eu/openaire/urls_worker/UrlsWorkerApplication.java @@ -2,19 +2,31 @@ package eu.openaire.urls_worker; import eu.openaire.publications_retriever.PublicationsRetriever; import eu.openaire.publications_retriever.util.file.FileUtils; +import eu.openaire.urls_worker.plugins.PublicationsRetrieverPlugin; +import eu.openaire.urls_worker.util.UriBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.core.env.Environment; import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.web.cors.CorsConfiguration; +import org.springframework.web.cors.CorsConfigurationSource; +import org.springframework.web.cors.UrlBasedCorsConfigurationSource; import javax.annotation.PreDestroy; import java.io.File; +import java.util.Arrays; +import java.util.Collections; import java.util.Scanner; import java.util.concurrent.TimeUnit; @SpringBootApplication +@EnableConfigurationProperties @EnableScheduling public class UrlsWorkerApplication { @@ -28,6 +40,7 @@ public class UrlsWorkerApplication { public static void main(String[] args) { setInputData(); // This may cause the Server to terminate early, in case the workerId or the controllerBaseUrl cannot be found. + new PublicationsRetrieverPlugin(); SpringApplication.run(UrlsWorkerApplication.class, args); } @@ -38,18 +51,43 @@ public class UrlsWorkerApplication { { if ( PublicationsRetriever.executor != null ) { + logger.info("Shutting down the threads.."); PublicationsRetriever.executor.shutdown(); // Define that no new tasks will be scheduled. try { if ( !PublicationsRetriever.executor.awaitTermination(1, TimeUnit.MINUTES) ) { logger.warn("The working threads did not finish on time! Stopping them immediately.."); PublicationsRetriever.executor.shutdownNow(); } - } catch (InterruptedException e) { - PublicationsRetriever.executor.shutdownNow(); + } catch (SecurityException se) { + logger.error("Could not shutdown the threads in any way..!", se); + } catch (InterruptedException ie) { + try { + PublicationsRetriever.executor.shutdownNow(); + } catch (SecurityException se) { + logger.error("Could not shutdown the threads in any way..!", se); + } } } } + @Bean + public CorsConfigurationSource corsConfigurationSource() { + CorsConfiguration configuration = new CorsConfiguration(); + configuration.setAllowedOrigins(Collections.singletonList("*")); + configuration.setAllowedMethods(Arrays.asList("GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS")); + configuration.setAllowedHeaders(Arrays.asList("authorization", "content-type", "x-auth-token")); + configuration.setExposedHeaders(Collections.singletonList("x-auth-token")); + UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource(); + source.registerCorsConfiguration("/**", configuration); + return source; + } + + @Bean + public CommandLineRunner setServerBaseUrl(Environment environment) + { + return args -> new UriBuilder(environment); + } + private static void setInputData() { @@ -89,9 +127,8 @@ public class UrlsWorkerApplication { } catch (Exception e) { String errorMsg = "An error prevented the retrieval of the workerId and the controllerBaseUrl from the file: " + inputDataFilePath + "\n" + e.getMessage(); - logger.error(errorMsg); + logger.error(errorMsg, e); System.err.println(errorMsg); - e.printStackTrace(); System.exit(63); } finally { if ( myReader != null ) diff --git a/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java b/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java index 533d752..47e0929 100644 --- a/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java +++ b/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java @@ -1,5 +1,6 @@ package eu.openaire.urls_worker.components; +import eu.openaire.urls_worker.controllers.FullTextsController; import eu.openaire.urls_worker.plugins.PublicationsRetrieverPlugin; import eu.openaire.urls_worker.util.AssignmentHandler; import org.slf4j.Logger; @@ -7,8 +8,14 @@ import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import java.io.File; +import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.io.FileUtils; @Component @@ -27,13 +34,50 @@ public class ScheduledTasks { public void handleNewAssignments() { if ( AssignmentHandler.isAvailableForWork ) AssignmentHandler.handleAssignments(); - else - logger.debug("The worker is not available for work at the moment.."); + else { + //logger.debug("The worker is not available for work at the moment.."); // JUST FOR DEBUG! + } } + + @Scheduled(fixedRate = 7_200_000) // Every 2 hours. + public void deleteHandledAssignmentsFullTexts() + { + Set> entrySet = FullTextsController.assignmentsNumsHandledAndLocallyDeleted.entrySet(); + if ( entrySet.isEmpty() ) + return; + + logger.info("Going to delete the locally stored fullTexts."); + + for ( Map.Entry entry : entrySet ) + { + if ( entry.getValue().equals(true) ) // It is already deleted, move on. + continue; + + Long curAssignments = entry.getKey(); + String currentAssignmentsBasePath = PublicationsRetrieverPlugin.assignmentsBasePath + "assignments_" + curAssignments + "_fullTexts" + File.separator; + logger.debug("Going to delete the files from assignments: " + currentAssignmentsBasePath); + + File curDir = new File(currentAssignmentsBasePath); + if ( !curDir.isDirectory() ) { + logger.error("This assignments-dir does not exist: " + currentAssignmentsBasePath); + continue; + } + + try { + FileUtils.deleteDirectory(curDir); + FullTextsController.assignmentsNumsHandledAndLocallyDeleted.put(curAssignments, true); // Set the is-handled to true. + } catch (IOException e) { + logger.error("The following directory could not be deleted: " + currentAssignmentsBasePath, e); + } + } + } + + //@Scheduled(fixedRate = 20_000) // Every 20 secs. public void testUrlConnection() { String urlToCheck = "https://zenodo.org/record/1145726"; PublicationsRetrieverPlugin.connectWithUrlTest(urlToCheck); } + } diff --git a/src/main/java/eu/openaire/urls_worker/controllers/FullTextsController.java b/src/main/java/eu/openaire/urls_worker/controllers/FullTextsController.java new file mode 100644 index 0000000..95b7da7 --- /dev/null +++ b/src/main/java/eu/openaire/urls_worker/controllers/FullTextsController.java @@ -0,0 +1,107 @@ +package eu.openaire.urls_worker.controllers; + +import eu.openaire.urls_worker.services.FileStorageService; +import eu.openaire.urls_worker.util.FilesZipper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.io.Resource; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.servlet.http.HttpServletRequest; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.util.HashMap; +import java.util.List; + +@RestController +@RequestMapping("full-texts/") +public class FullTextsController { + + private static final Logger logger = LoggerFactory.getLogger(GeneralController.class); + + private final FileStorageService fileStorageService; + + public static HashMap assignmentsNumsHandledAndLocallyDeleted = new HashMap<>(); + + public static String assignmentsBaseDir = null; + + + public FullTextsController(FileStorageService fileStorageService) { + this.fileStorageService = fileStorageService; + assignmentsBaseDir = FileStorageService.assignmentsLocation.toString() + File.separator; + } + + + @GetMapping("getFullTexts/{assignmentsCounter:[\\d]+}/{totalZipBatches:[\\d]+}/{zipBatchCounter:[\\d]+}/{fileNamesWithExtensions}") + public ResponseEntity getMultipleFullTexts(@PathVariable long assignmentsCounter, @PathVariable int totalZipBatches, @PathVariable int zipBatchCounter, @PathVariable List fileNamesWithExtensions, HttpServletRequest request) { + + logger.info("Received a \"getMultipleFullTexts\" request for returning a zip-file containing " + fileNamesWithExtensions.size() + " full-texts, from assignments-" + assignmentsCounter + ", for batch-" + zipBatchCounter); + + String currentAssignmentsBaseFullTextsPath = assignmentsBaseDir + "assignments_" + assignmentsCounter + "_fullTexts" + File.separator; + + File zipFile = FilesZipper.zipMultipleFilesAndGetZip(assignmentsCounter, zipBatchCounter, fileNamesWithExtensions, currentAssignmentsBaseFullTextsPath); + if ( zipFile == null ) { + String errorMsg = "Failed to create the zip file for \"zipBatchCounter\"-" + zipBatchCounter; + logger.error(errorMsg); + return ResponseEntity.internalServerError().body(errorMsg); + } + + String zipName = zipFile.getName(); + String zipFileFullPath = currentAssignmentsBaseFullTextsPath + zipName; + ByteArrayOutputStream byteArrayOutputStream = this.fileStorageService.loadFileAsAStream(zipFileFullPath); + if ( byteArrayOutputStream == null ) + return ResponseEntity.status(HttpStatus.NOT_FOUND).body("Could not load file: " + zipFileFullPath); + + // If this is the last batch for this assignments-count, then make sure it is deleted in the next scheduled delete-operation. + if ( zipBatchCounter == totalZipBatches ) { + assignmentsNumsHandledAndLocallyDeleted.put(assignmentsCounter, false); + logger.debug("Will return the last batch of Assignments_" + assignmentsCounter + " to the Controller and these assignments will be deleted later."); + } + + String contentType = request.getServletContext().getMimeType(zipFileFullPath); + if ( contentType == null ) + contentType = "application/octet-stream"; + + logger.info("Sending the zip file \"" + zipFileFullPath + "\"."); + return ResponseEntity.ok() + .contentType(MediaType.parseMediaType(contentType)) + .header(HttpHeaders.CONTENT_DISPOSITION, "inline; filename=\"" + zipName + "\"") + .body(byteArrayOutputStream.toByteArray()); + } + + + @GetMapping("getFullText/{assignmentsCounter:[\\d]+]}/{fileNameWithExtension:[\\w]+.[\\w]{2,10}}") + public ResponseEntity getFullText(@PathVariable long assignmentsCounter, @PathVariable String fileNameWithExtension, HttpServletRequest request) { + + logger.info("Received a \"getFullText\" request."); + String fullTextFile = assignmentsBaseDir + "assignments_" + assignmentsCounter + "_fullTexts" + File.separator + fileNameWithExtension; + File file = new File(fullTextFile); + if ( !file.isFile() ) { + logger.error("The file \"" + fullTextFile + "\" does not exist!"); + return ResponseEntity.notFound().build(); + } + + Resource resource = this.fileStorageService.loadFileAsResource(fullTextFile); + if ( resource == null ) + return ResponseEntity.internalServerError().body("Could not load file: " + fullTextFile); + + String contentType = null; + contentType = request.getServletContext().getMimeType(file.getAbsolutePath()); + if ( contentType == null ) { + contentType = "application/octet-stream"; + } + + return ResponseEntity.ok() + .contentType(MediaType.parseMediaType(contentType)) + .header(HttpHeaders.CONTENT_DISPOSITION, "inline; filename=\"" + resource.getFilename() + "\"") + .body(resource); + } + +} diff --git a/src/main/java/eu/openaire/urls_worker/controllers/GeneralController.java b/src/main/java/eu/openaire/urls_worker/controllers/GeneralController.java index f4eaca2..7afbbf4 100644 --- a/src/main/java/eu/openaire/urls_worker/controllers/GeneralController.java +++ b/src/main/java/eu/openaire/urls_worker/controllers/GeneralController.java @@ -10,6 +10,10 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + @RestController @RequestMapping("") @@ -37,11 +41,23 @@ public class GeneralController { if ( AssignmentHandler.isAvailableForWork ) { logger.info("The worker is available for an assignment."); return ResponseEntity.status(200).body(new WorkerResponse(UrlsWorkerApplication.workerId, WorkerConstants.ASSIGNMENTS_LIMIT)); - } - else { + } else { logger.info("The worker is busy with another assignment."); return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build(); } } + + @GetMapping("getHandledAssignmentsCounts") + public ResponseEntity getHandledAssignmentsCounts() + { + List handledAssignmentsCounts = new ArrayList<>(FullTextsController.assignmentsNumsHandledAndLocallyDeleted.size()/2); + for ( Map.Entry entry : FullTextsController.assignmentsNumsHandledAndLocallyDeleted.entrySet() ) + { + if ( entry.getValue().equals(true) ) + handledAssignmentsCounts.add(entry.getKey()); + } + return ResponseEntity.ok(handledAssignmentsCounts); + } + } diff --git a/src/main/java/eu/openaire/urls_worker/exceptions/FileStorageException.java b/src/main/java/eu/openaire/urls_worker/exceptions/FileStorageException.java new file mode 100644 index 0000000..a4717d0 --- /dev/null +++ b/src/main/java/eu/openaire/urls_worker/exceptions/FileStorageException.java @@ -0,0 +1,16 @@ +package eu.openaire.urls_worker.exceptions; + +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.ResponseStatus; + +@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR) +public class FileStorageException extends RuntimeException { + + public FileStorageException(String message) { + super(message); + } + + public FileStorageException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/eu/openaire/urls_worker/plugins/PublicationsRetrieverPlugin.java b/src/main/java/eu/openaire/urls_worker/plugins/PublicationsRetrieverPlugin.java index cd933f7..6f81f1b 100644 --- a/src/main/java/eu/openaire/urls_worker/plugins/PublicationsRetrieverPlugin.java +++ b/src/main/java/eu/openaire/urls_worker/plugins/PublicationsRetrieverPlugin.java @@ -4,7 +4,6 @@ import edu.uci.ics.crawler4j.url.URLCanonicalizer; import eu.openaire.publications_retriever.PublicationsRetriever; import eu.openaire.publications_retriever.exceptions.DocFileNotRetrievedException; import eu.openaire.publications_retriever.util.file.FileUtils; -import eu.openaire.publications_retriever.util.file.S3ObjectStoreMinIO; import eu.openaire.publications_retriever.util.http.ConnSupportUtils; import eu.openaire.publications_retriever.util.http.HttpConnUtils; import eu.openaire.publications_retriever.util.url.DataToBeLogged; @@ -14,14 +13,13 @@ import eu.openaire.urls_worker.models.Assignment; import eu.openaire.urls_worker.models.Error; import eu.openaire.urls_worker.models.Payload; import eu.openaire.urls_worker.models.UrlReport; +import eu.openaire.urls_worker.services.FileStorageService; import eu.openaire.urls_worker.util.AssignmentHandler; import eu.openaire.urls_worker.util.WorkerConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; +import java.io.*; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.Callable; @@ -32,20 +30,9 @@ public class PublicationsRetrieverPlugin { private static final Logger logger = LoggerFactory.getLogger(PublicationsRetrieverPlugin.class); - private static final String workingDir = System.getProperty("user.dir") + File.separator; - private static String assignmentsBasePath = workingDir + "assignments" + File.separator; - private static String assignmentsBaseFullTextsPath = assignmentsBasePath + "fullTexts" + File.separator; + public static String assignmentsBasePath; static { - File assignmentsDir = new File(assignmentsBaseFullTextsPath); - if ( !assignmentsDir.exists() ) { - if ( !assignmentsDir.mkdirs() ) { // Create the directory. - logger.error("Could not create the \"assignments directories\": \"" + assignmentsBaseFullTextsPath + "\". Using the \"workingDir\" instead (" + workingDir + ")."); - assignmentsBasePath = workingDir; - assignmentsBaseFullTextsPath = assignmentsBasePath; - } - } - // Specify some configurations LoaderAndChecker.retrieveDocuments = true; LoaderAndChecker.retrieveDatasets = false; @@ -54,22 +41,34 @@ public class PublicationsRetrieverPlugin { PublicationsRetriever.targetUrlType = "docUrl"; FileUtils.jsonBatchSize = WorkerConstants.ASSIGNMENTS_LIMIT; - FileUtils.shouldUploadFilesToS3 = true; - new S3ObjectStoreMinIO(); // Check here on how to create the credentials-file: https://github.com/LSmyrnaios/PublicationsRetriever/blob/master/README.md - int workerThreadsCount = Runtime.getRuntime().availableProcessors() * PublicationsRetriever.threadsMultiplier; logger.info("Use " + workerThreadsCount + " worker-threads."); PublicationsRetriever.executor = Executors.newFixedThreadPool(workerThreadsCount); } + + public PublicationsRetrieverPlugin() { + assignmentsBasePath = FileStorageService.assignmentsLocation.toString(); + if ( !assignmentsBasePath.endsWith(File.separator) ) + assignmentsBasePath += File.separator; + } + private static final List> callableTasks = new ArrayList<>(FileUtils.jsonBatchSize); public static void processAssignments(Long assignmentRequestCounter, Collection assignments) throws RuntimeException, FileNotFoundException { - ConnSupportUtils.setKnownMimeTypes(); - FileUtils.storeDocFilesDir = assignmentsBaseFullTextsPath + "assignment_" + assignmentRequestCounter + "_fullTexts" + File.separator; // It needs the last separator, because of how the docFiles are named and stored. - FileUtils.setOutput(new FileOutputStream(assignmentsBasePath + "assignment_" + assignmentRequestCounter + "_generic_results.json")); + FileUtils.storeDocFilesDir = assignmentsBasePath + "assignments_" + assignmentRequestCounter + "_fullTexts" + File.separator; // It needs the last separator, because of how the docFiles are named and stored. + File curAssignmentsDirs = new File(FileUtils.storeDocFilesDir); + if ( !curAssignmentsDirs.exists() ) { + if ( !curAssignmentsDirs.mkdirs() ) { // Create the directories. + String workingDir = System.getProperty("user.dir") + File.separator; + logger.error("Could not create the \"assignments_fullTexts directories\": \"" + FileUtils.storeDocFilesDir + "\". Using the \"workingDir\" instead (" + workingDir + ")."); + FileUtils.storeDocFilesDir = assignmentsBasePath = workingDir; + } + } + + ConnSupportUtils.setKnownMimeTypes(); int tasksNumber = assignments.size(); int batchCount = 0; int tasksCount = 0; @@ -137,6 +136,8 @@ public class PublicationsRetrieverPlugin { public static void addUrlReportsToWorkerReport() { + Date date = new Date(); + for ( DataToBeLogged data : FileUtils.dataToBeLoggedList ) { UrlReport.StatusType status = null; @@ -183,11 +184,18 @@ public class PublicationsRetrieverPlugin { if ( docOrDatasetUrl.equals(UrlUtils.unreachableDocOrDatasetUrlIndicator) || docOrDatasetUrl.equals(UrlUtils.duplicateUrlIndicator) ) docOrDatasetUrl = null; - Payload payload = new Payload(data.getUrlId(), data.getSourceUrl(), docOrDatasetUrl, new Date(), mimeType, size, hash, fileLocation, "crawl:PublicationsRetriever"); + // Cleanup some data. + if ( (size != null) && (size == 0L) ) + size = null; + + if ( (hash != null) && (hash.equals("null")) ) + hash = null; + + Payload payload = new Payload(data.getUrlId(), data.getSourceUrl(), docOrDatasetUrl, date, mimeType, size, hash, fileLocation, "crawl:PublicationsRetriever"); // TODO - If support is added for other doc-formats other than "pdf", then make sure the "mime_type" is correctly specified. AssignmentHandler.urlReports.add(new UrlReport(status, payload, error)); - } + }// end-for FileUtils.dataToBeLoggedList.clear(); // Empty the list, to be re-populated by the next batch / assignment. } diff --git a/src/main/java/eu/openaire/urls_worker/services/FileStorageService.java b/src/main/java/eu/openaire/urls_worker/services/FileStorageService.java new file mode 100644 index 0000000..ffe422a --- /dev/null +++ b/src/main/java/eu/openaire/urls_worker/services/FileStorageService.java @@ -0,0 +1,85 @@ +package eu.openaire.urls_worker.services; + +import eu.openaire.urls_worker.exceptions.FileStorageException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.io.Resource; +import org.springframework.core.io.UrlResource; +import org.springframework.stereotype.Service; + +import java.io.*; +import java.nio.file.*; +import java.util.Properties; + + +@Service +public class FileStorageService { + + private static final Logger logger = LoggerFactory.getLogger(FileStorageService.class); + + public static Path assignmentsLocation = null; + + static { + String springPropertiesFile = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "application.properties"; + FileReader fReader = null; + try { + fReader = new FileReader(springPropertiesFile); + Properties props = new Properties(); + props.load(fReader); // Load jdbc related properties. + String assignmentsDir = props.getProperty("file.assignments-dir"); + assignmentsLocation = Paths.get(assignmentsDir).toAbsolutePath().normalize(); + } catch (java.io.FileNotFoundException fnfe) { + logger.error("The properties file was not found!", fnfe); + System.exit(-10); + } catch (IOException ioe) { + logger.error("I/O error when reading the properties file!", ioe); + System.exit(-10); + } + } + + + @Autowired + public FileStorageService() throws FileStorageException { + try { + Files.createDirectories(assignmentsLocation); + } catch (Exception ex) { + throw new FileStorageException("Could not create the directory where the uploaded files will be stored.", ex); + } + } + + + private static final int bufferSize = 20971520; + public ByteArrayOutputStream loadFileAsAStream(String fullFileName) + { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(bufferSize); // 20 MB + try { + FileInputStream fileInputStream = new FileInputStream(fullFileName); + int bytesRead; + byte[] byteBuffer = new byte[bufferSize]; + while ( (bytesRead = fileInputStream.read(byteBuffer, 0, bufferSize)) > 0 ) + byteArrayOutputStream.write(byteBuffer, 0, bytesRead); + + return byteArrayOutputStream; + } catch (Exception e) { + if ( e instanceof FileNotFoundException ) + logger.error("File \"" + fullFileName + "\" is not a file!"); + else + logger.error(e.getMessage(), e); + return null; + } + } + + + public Resource loadFileAsResource(String fullFileName) { + try { + Path filePath = assignmentsLocation.resolve(fullFileName).normalize(); + Resource resource = new UrlResource(filePath.toUri()); + return resource.exists() ? resource : null; + } catch (Exception e) { + logger.error("Error when loading file: " + fullFileName, e); + return null; + } + } + +} diff --git a/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java b/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java index daa1aef..36021fb 100644 --- a/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java +++ b/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java @@ -125,7 +125,6 @@ public class AssignmentHandler { { String postUrl = UrlsWorkerApplication.controllerBaseUrl + "urls/addWorkerReport"; logger.info("Going to post the WorkerReport of assignment_" + assignmentRequestCounter + " to the controller-server: " + postUrl); - try { ResponseEntity responseEntity = restTemplate.postForEntity(postUrl, new WorkerReport(UrlsWorkerApplication.workerId, assignmentRequestCounter, urlReports), String.class); int responseCode = responseEntity.getStatusCodeValue(); @@ -135,13 +134,11 @@ public class AssignmentHandler { } } catch (Exception e) { logger.error("Error when submitting the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller: ", e); - e.printStackTrace(); return false; } finally { urlReports.clear(); // Reset, without de-allocating. assignmentsForPlugins.clear(); } - return true; } diff --git a/src/main/java/eu/openaire/urls_worker/util/FilesZipper.java b/src/main/java/eu/openaire/urls_worker/util/FilesZipper.java new file mode 100644 index 0000000..c01c050 --- /dev/null +++ b/src/main/java/eu/openaire/urls_worker/util/FilesZipper.java @@ -0,0 +1,80 @@ +package eu.openaire.urls_worker.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.util.List; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + + +public class FilesZipper +{ + private static final Logger logger = LoggerFactory.getLogger(FilesZipper.class); + + + public static File zipMultipleFilesAndGetZip(long assignmentsCounter, int zipBatchCounter, List filesToZip, String baseDirectory) + { + File zipFile = null; + ZipOutputStream zos = null; + try { + String zipFilename = baseDirectory + "assignments_" + assignmentsCounter + "_full-texts_" + zipBatchCounter + ".zip"; + // For example: assignments_2_full-texts_4.zip | where < 4 > is referred to the 4th batch of files requested by the controller. + zipFile = new File(zipFilename); + zos = new ZipOutputStream(new FileOutputStream(zipFile)); + + // Iterate over the given full-texts and add them to the zip. + for ( String file : filesToZip ) + { + zipAFile(file, zos, baseDirectory); + } + } catch (Exception e) { + logger.error("", e); + return null; + } finally { + try { + if ( zos != null ) + zos.close(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } + return zipFile; + } + + + private static boolean zipAFile(String fileName, ZipOutputStream zos, String baseDir) + { + final int BUFFER = 1048576; // 1 MB + byte[] data = new byte[BUFFER]; + BufferedInputStream bis = null; + String fullFileName = baseDir + fileName; + try { + FileInputStream fis = new FileInputStream(fullFileName); + bis = new BufferedInputStream(fis, BUFFER); + zos.putNextEntry(new ZipEntry(fileName)); + int count; + while ( (count = bis.read(data, 0, BUFFER)) != -1 ) { + zos.write(data, 0, count); + } + zos.closeEntry(); // close the entry here (not the ZipOutputStream) + } catch (FileNotFoundException fnfe) { + logger.error("Error zipping file: " + fullFileName, fnfe.getMessage()); + return false; + } catch (Exception e) { + if ( ! e.getMessage().contains("duplicate") ) + logger.error("Error zipping file: " + fullFileName, e); + return false; + } finally { + try { + if ( bis != null ) + bis.close(); + } catch (IOException e) { + logger.error(e.getMessage(), e); + } + } + return true; + } + +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 22fdc98..49316d2 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -17,8 +17,9 @@ server.servlet.context-path=/api # LOGGING LEVELS logging.config=classpath:logback-spring.xml -logging.level.root=WARN +logging.level.root=INFO logging.level.org.springframework.web=INFO +logging.level.org.springframework.security=WARN logging.level.eu.openaire.urls_worker=DEBUG logging.level.eu.openaire.publications_retriever=DEBUG spring.output.ansi.enabled=always @@ -37,3 +38,6 @@ spring.servlet.multipart.max-file-size=200MB # Max Request Size spring.servlet.multipart.max-request-size=215MB + +# All files uploaded by this SpringApp will be stored in this directory (your/dir) +file.assignments-dir=./assignments/