- The worker will store the files in its local file-system and will send them to the controller in batches, after the latter requests them. When all files from a given assignments-num are sent, the files will be deleted from the Worker, in a scheduled-job.
- Implement the "getFullTexts"-endpoint, which returns the requested full-texts in a zip file. - Implement the "getFullText"-endpoint, which returns the requested full-text. - Implement the "getHandledAssignmentsCounts"-endpoint which returns the assignments-numbers, which were handled by that worker. - Make sure each urlReport has the same "Date" for a given assignments-number. Also, make sure the "size" and "hash" have a "null" value, in case the full-text was not found. - Check and log thread-pool shutdown errors. - Add the stack-trace in the error-logs, instead of the Stderr. - Update SpringBoot dependency. - Change log levels. - Code cleanup.
This commit is contained in:
parent
3220c97373
commit
20b71164d5
|
@ -1,5 +1,5 @@
|
|||
plugins {
|
||||
id 'org.springframework.boot' version '2.5.6'
|
||||
id 'org.springframework.boot' version '2.6.0'
|
||||
id 'io.spring.dependency-management' version '1.0.11.RELEASE'
|
||||
id 'java'
|
||||
}
|
||||
|
|
|
@ -2,19 +2,31 @@ package eu.openaire.urls_worker;
|
|||
|
||||
import eu.openaire.publications_retriever.PublicationsRetriever;
|
||||
import eu.openaire.publications_retriever.util.file.FileUtils;
|
||||
import eu.openaire.urls_worker.plugins.PublicationsRetrieverPlugin;
|
||||
import eu.openaire.urls_worker.util.UriBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.boot.CommandLineRunner;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.core.env.Environment;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
import org.springframework.web.cors.CorsConfiguration;
|
||||
import org.springframework.web.cors.CorsConfigurationSource;
|
||||
import org.springframework.web.cors.UrlBasedCorsConfigurationSource;
|
||||
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.io.File;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Scanner;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
||||
@SpringBootApplication
|
||||
@EnableConfigurationProperties
|
||||
@EnableScheduling
|
||||
public class UrlsWorkerApplication {
|
||||
|
||||
|
@ -28,6 +40,7 @@ public class UrlsWorkerApplication {
|
|||
public static void main(String[] args) {
|
||||
|
||||
setInputData(); // This may cause the Server to terminate early, in case the workerId or the controllerBaseUrl cannot be found.
|
||||
new PublicationsRetrieverPlugin();
|
||||
|
||||
SpringApplication.run(UrlsWorkerApplication.class, args);
|
||||
}
|
||||
|
@ -38,18 +51,43 @@ public class UrlsWorkerApplication {
|
|||
{
|
||||
if ( PublicationsRetriever.executor != null )
|
||||
{
|
||||
logger.info("Shutting down the threads..");
|
||||
PublicationsRetriever.executor.shutdown(); // Define that no new tasks will be scheduled.
|
||||
try {
|
||||
if ( !PublicationsRetriever.executor.awaitTermination(1, TimeUnit.MINUTES) ) {
|
||||
logger.warn("The working threads did not finish on time! Stopping them immediately..");
|
||||
PublicationsRetriever.executor.shutdownNow();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
PublicationsRetriever.executor.shutdownNow();
|
||||
} catch (SecurityException se) {
|
||||
logger.error("Could not shutdown the threads in any way..!", se);
|
||||
} catch (InterruptedException ie) {
|
||||
try {
|
||||
PublicationsRetriever.executor.shutdownNow();
|
||||
} catch (SecurityException se) {
|
||||
logger.error("Could not shutdown the threads in any way..!", se);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Bean
|
||||
public CorsConfigurationSource corsConfigurationSource() {
|
||||
CorsConfiguration configuration = new CorsConfiguration();
|
||||
configuration.setAllowedOrigins(Collections.singletonList("*"));
|
||||
configuration.setAllowedMethods(Arrays.asList("GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"));
|
||||
configuration.setAllowedHeaders(Arrays.asList("authorization", "content-type", "x-auth-token"));
|
||||
configuration.setExposedHeaders(Collections.singletonList("x-auth-token"));
|
||||
UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
|
||||
source.registerCorsConfiguration("/**", configuration);
|
||||
return source;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public CommandLineRunner setServerBaseUrl(Environment environment)
|
||||
{
|
||||
return args -> new UriBuilder(environment);
|
||||
}
|
||||
|
||||
|
||||
private static void setInputData()
|
||||
{
|
||||
|
@ -89,9 +127,8 @@ public class UrlsWorkerApplication {
|
|||
|
||||
} catch (Exception e) {
|
||||
String errorMsg = "An error prevented the retrieval of the workerId and the controllerBaseUrl from the file: " + inputDataFilePath + "\n" + e.getMessage();
|
||||
logger.error(errorMsg);
|
||||
logger.error(errorMsg, e);
|
||||
System.err.println(errorMsg);
|
||||
e.printStackTrace();
|
||||
System.exit(63);
|
||||
} finally {
|
||||
if ( myReader != null )
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
package eu.openaire.urls_worker.components;
|
||||
|
||||
import eu.openaire.urls_worker.controllers.FullTextsController;
|
||||
import eu.openaire.urls_worker.plugins.PublicationsRetrieverPlugin;
|
||||
import eu.openaire.urls_worker.util.AssignmentHandler;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -7,8 +8,14 @@ import org.slf4j.LoggerFactory;
|
|||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
|
||||
|
||||
@Component
|
||||
|
@ -27,13 +34,50 @@ public class ScheduledTasks {
|
|||
public void handleNewAssignments() {
|
||||
if ( AssignmentHandler.isAvailableForWork )
|
||||
AssignmentHandler.handleAssignments();
|
||||
else
|
||||
logger.debug("The worker is not available for work at the moment..");
|
||||
else {
|
||||
//logger.debug("The worker is not available for work at the moment.."); // JUST FOR DEBUG!
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Scheduled(fixedRate = 7_200_000) // Every 2 hours.
|
||||
public void deleteHandledAssignmentsFullTexts()
|
||||
{
|
||||
Set<Map.Entry<Long, Boolean>> entrySet = FullTextsController.assignmentsNumsHandledAndLocallyDeleted.entrySet();
|
||||
if ( entrySet.isEmpty() )
|
||||
return;
|
||||
|
||||
logger.info("Going to delete the locally stored fullTexts.");
|
||||
|
||||
for ( Map.Entry<Long,Boolean> entry : entrySet )
|
||||
{
|
||||
if ( entry.getValue().equals(true) ) // It is already deleted, move on.
|
||||
continue;
|
||||
|
||||
Long curAssignments = entry.getKey();
|
||||
String currentAssignmentsBasePath = PublicationsRetrieverPlugin.assignmentsBasePath + "assignments_" + curAssignments + "_fullTexts" + File.separator;
|
||||
logger.debug("Going to delete the files from assignments: " + currentAssignmentsBasePath);
|
||||
|
||||
File curDir = new File(currentAssignmentsBasePath);
|
||||
if ( !curDir.isDirectory() ) {
|
||||
logger.error("This assignments-dir does not exist: " + currentAssignmentsBasePath);
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
FileUtils.deleteDirectory(curDir);
|
||||
FullTextsController.assignmentsNumsHandledAndLocallyDeleted.put(curAssignments, true); // Set the is-handled to true.
|
||||
} catch (IOException e) {
|
||||
logger.error("The following directory could not be deleted: " + currentAssignmentsBasePath, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//@Scheduled(fixedRate = 20_000) // Every 20 secs.
|
||||
public void testUrlConnection() {
|
||||
String urlToCheck = "https://zenodo.org/record/1145726";
|
||||
PublicationsRetrieverPlugin.connectWithUrlTest(urlToCheck);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,107 @@
|
|||
package eu.openaire.urls_worker.controllers;
|
||||
|
||||
import eu.openaire.urls_worker.services.FileStorageService;
|
||||
import eu.openaire.urls_worker.util.FilesZipper;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
@RestController
|
||||
@RequestMapping("full-texts/")
|
||||
public class FullTextsController {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(GeneralController.class);
|
||||
|
||||
private final FileStorageService fileStorageService;
|
||||
|
||||
public static HashMap<Long, Boolean> assignmentsNumsHandledAndLocallyDeleted = new HashMap<>();
|
||||
|
||||
public static String assignmentsBaseDir = null;
|
||||
|
||||
|
||||
public FullTextsController(FileStorageService fileStorageService) {
|
||||
this.fileStorageService = fileStorageService;
|
||||
assignmentsBaseDir = FileStorageService.assignmentsLocation.toString() + File.separator;
|
||||
}
|
||||
|
||||
|
||||
@GetMapping("getFullTexts/{assignmentsCounter:[\\d]+}/{totalZipBatches:[\\d]+}/{zipBatchCounter:[\\d]+}/{fileNamesWithExtensions}")
|
||||
public ResponseEntity<?> getMultipleFullTexts(@PathVariable long assignmentsCounter, @PathVariable int totalZipBatches, @PathVariable int zipBatchCounter, @PathVariable List<String> fileNamesWithExtensions, HttpServletRequest request) {
|
||||
|
||||
logger.info("Received a \"getMultipleFullTexts\" request for returning a zip-file containing " + fileNamesWithExtensions.size() + " full-texts, from assignments-" + assignmentsCounter + ", for batch-" + zipBatchCounter);
|
||||
|
||||
String currentAssignmentsBaseFullTextsPath = assignmentsBaseDir + "assignments_" + assignmentsCounter + "_fullTexts" + File.separator;
|
||||
|
||||
File zipFile = FilesZipper.zipMultipleFilesAndGetZip(assignmentsCounter, zipBatchCounter, fileNamesWithExtensions, currentAssignmentsBaseFullTextsPath);
|
||||
if ( zipFile == null ) {
|
||||
String errorMsg = "Failed to create the zip file for \"zipBatchCounter\"-" + zipBatchCounter;
|
||||
logger.error(errorMsg);
|
||||
return ResponseEntity.internalServerError().body(errorMsg);
|
||||
}
|
||||
|
||||
String zipName = zipFile.getName();
|
||||
String zipFileFullPath = currentAssignmentsBaseFullTextsPath + zipName;
|
||||
ByteArrayOutputStream byteArrayOutputStream = this.fileStorageService.loadFileAsAStream(zipFileFullPath);
|
||||
if ( byteArrayOutputStream == null )
|
||||
return ResponseEntity.status(HttpStatus.NOT_FOUND).body("Could not load file: " + zipFileFullPath);
|
||||
|
||||
// If this is the last batch for this assignments-count, then make sure it is deleted in the next scheduled delete-operation.
|
||||
if ( zipBatchCounter == totalZipBatches ) {
|
||||
assignmentsNumsHandledAndLocallyDeleted.put(assignmentsCounter, false);
|
||||
logger.debug("Will return the last batch of Assignments_" + assignmentsCounter + " to the Controller and these assignments will be deleted later.");
|
||||
}
|
||||
|
||||
String contentType = request.getServletContext().getMimeType(zipFileFullPath);
|
||||
if ( contentType == null )
|
||||
contentType = "application/octet-stream";
|
||||
|
||||
logger.info("Sending the zip file \"" + zipFileFullPath + "\".");
|
||||
return ResponseEntity.ok()
|
||||
.contentType(MediaType.parseMediaType(contentType))
|
||||
.header(HttpHeaders.CONTENT_DISPOSITION, "inline; filename=\"" + zipName + "\"")
|
||||
.body(byteArrayOutputStream.toByteArray());
|
||||
}
|
||||
|
||||
|
||||
@GetMapping("getFullText/{assignmentsCounter:[\\d]+]}/{fileNameWithExtension:[\\w]+.[\\w]{2,10}}")
|
||||
public ResponseEntity<?> getFullText(@PathVariable long assignmentsCounter, @PathVariable String fileNameWithExtension, HttpServletRequest request) {
|
||||
|
||||
logger.info("Received a \"getFullText\" request.");
|
||||
String fullTextFile = assignmentsBaseDir + "assignments_" + assignmentsCounter + "_fullTexts" + File.separator + fileNameWithExtension;
|
||||
File file = new File(fullTextFile);
|
||||
if ( !file.isFile() ) {
|
||||
logger.error("The file \"" + fullTextFile + "\" does not exist!");
|
||||
return ResponseEntity.notFound().build();
|
||||
}
|
||||
|
||||
Resource resource = this.fileStorageService.loadFileAsResource(fullTextFile);
|
||||
if ( resource == null )
|
||||
return ResponseEntity.internalServerError().body("Could not load file: " + fullTextFile);
|
||||
|
||||
String contentType = null;
|
||||
contentType = request.getServletContext().getMimeType(file.getAbsolutePath());
|
||||
if ( contentType == null ) {
|
||||
contentType = "application/octet-stream";
|
||||
}
|
||||
|
||||
return ResponseEntity.ok()
|
||||
.contentType(MediaType.parseMediaType(contentType))
|
||||
.header(HttpHeaders.CONTENT_DISPOSITION, "inline; filename=\"" + resource.getFilename() + "\"")
|
||||
.body(resource);
|
||||
}
|
||||
|
||||
}
|
|
@ -10,6 +10,10 @@ import org.springframework.http.HttpStatus;
|
|||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
@RestController
|
||||
@RequestMapping("")
|
||||
|
@ -37,11 +41,23 @@ public class GeneralController {
|
|||
if ( AssignmentHandler.isAvailableForWork ) {
|
||||
logger.info("The worker is available for an assignment.");
|
||||
return ResponseEntity.status(200).body(new WorkerResponse(UrlsWorkerApplication.workerId, WorkerConstants.ASSIGNMENTS_LIMIT));
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
logger.info("The worker is busy with another assignment.");
|
||||
return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@GetMapping("getHandledAssignmentsCounts")
|
||||
public ResponseEntity<?> getHandledAssignmentsCounts()
|
||||
{
|
||||
List<Long> handledAssignmentsCounts = new ArrayList<>(FullTextsController.assignmentsNumsHandledAndLocallyDeleted.size()/2);
|
||||
for ( Map.Entry<Long,Boolean> entry : FullTextsController.assignmentsNumsHandledAndLocallyDeleted.entrySet() )
|
||||
{
|
||||
if ( entry.getValue().equals(true) )
|
||||
handledAssignmentsCounts.add(entry.getKey());
|
||||
}
|
||||
return ResponseEntity.ok(handledAssignmentsCounts);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
package eu.openaire.urls_worker.exceptions;
|
||||
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.web.bind.annotation.ResponseStatus;
|
||||
|
||||
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
|
||||
public class FileStorageException extends RuntimeException {
|
||||
|
||||
public FileStorageException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public FileStorageException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
|
@ -4,7 +4,6 @@ import edu.uci.ics.crawler4j.url.URLCanonicalizer;
|
|||
import eu.openaire.publications_retriever.PublicationsRetriever;
|
||||
import eu.openaire.publications_retriever.exceptions.DocFileNotRetrievedException;
|
||||
import eu.openaire.publications_retriever.util.file.FileUtils;
|
||||
import eu.openaire.publications_retriever.util.file.S3ObjectStoreMinIO;
|
||||
import eu.openaire.publications_retriever.util.http.ConnSupportUtils;
|
||||
import eu.openaire.publications_retriever.util.http.HttpConnUtils;
|
||||
import eu.openaire.publications_retriever.util.url.DataToBeLogged;
|
||||
|
@ -14,14 +13,13 @@ import eu.openaire.urls_worker.models.Assignment;
|
|||
import eu.openaire.urls_worker.models.Error;
|
||||
import eu.openaire.urls_worker.models.Payload;
|
||||
import eu.openaire.urls_worker.models.UrlReport;
|
||||
import eu.openaire.urls_worker.services.FileStorageService;
|
||||
import eu.openaire.urls_worker.util.AssignmentHandler;
|
||||
import eu.openaire.urls_worker.util.WorkerConstants;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.*;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Callable;
|
||||
|
@ -32,20 +30,9 @@ public class PublicationsRetrieverPlugin {
|
|||
|
||||
private static final Logger logger = LoggerFactory.getLogger(PublicationsRetrieverPlugin.class);
|
||||
|
||||
private static final String workingDir = System.getProperty("user.dir") + File.separator;
|
||||
private static String assignmentsBasePath = workingDir + "assignments" + File.separator;
|
||||
private static String assignmentsBaseFullTextsPath = assignmentsBasePath + "fullTexts" + File.separator;
|
||||
public static String assignmentsBasePath;
|
||||
|
||||
static {
|
||||
File assignmentsDir = new File(assignmentsBaseFullTextsPath);
|
||||
if ( !assignmentsDir.exists() ) {
|
||||
if ( !assignmentsDir.mkdirs() ) { // Create the directory.
|
||||
logger.error("Could not create the \"assignments directories\": \"" + assignmentsBaseFullTextsPath + "\". Using the \"workingDir\" instead (" + workingDir + ").");
|
||||
assignmentsBasePath = workingDir;
|
||||
assignmentsBaseFullTextsPath = assignmentsBasePath;
|
||||
}
|
||||
}
|
||||
|
||||
// Specify some configurations
|
||||
LoaderAndChecker.retrieveDocuments = true;
|
||||
LoaderAndChecker.retrieveDatasets = false;
|
||||
|
@ -54,22 +41,34 @@ public class PublicationsRetrieverPlugin {
|
|||
PublicationsRetriever.targetUrlType = "docUrl";
|
||||
FileUtils.jsonBatchSize = WorkerConstants.ASSIGNMENTS_LIMIT;
|
||||
|
||||
FileUtils.shouldUploadFilesToS3 = true;
|
||||
new S3ObjectStoreMinIO(); // Check here on how to create the credentials-file: https://github.com/LSmyrnaios/PublicationsRetriever/blob/master/README.md
|
||||
|
||||
int workerThreadsCount = Runtime.getRuntime().availableProcessors() * PublicationsRetriever.threadsMultiplier;
|
||||
logger.info("Use " + workerThreadsCount + " worker-threads.");
|
||||
PublicationsRetriever.executor = Executors.newFixedThreadPool(workerThreadsCount);
|
||||
}
|
||||
|
||||
|
||||
public PublicationsRetrieverPlugin() {
|
||||
assignmentsBasePath = FileStorageService.assignmentsLocation.toString();
|
||||
if ( !assignmentsBasePath.endsWith(File.separator) )
|
||||
assignmentsBasePath += File.separator;
|
||||
}
|
||||
|
||||
private static final List<Callable<Boolean>> callableTasks = new ArrayList<>(FileUtils.jsonBatchSize);
|
||||
|
||||
public static void processAssignments(Long assignmentRequestCounter, Collection<Assignment> assignments) throws RuntimeException, FileNotFoundException
|
||||
{
|
||||
ConnSupportUtils.setKnownMimeTypes();
|
||||
FileUtils.storeDocFilesDir = assignmentsBaseFullTextsPath + "assignment_" + assignmentRequestCounter + "_fullTexts" + File.separator; // It needs the last separator, because of how the docFiles are named and stored.
|
||||
FileUtils.setOutput(new FileOutputStream(assignmentsBasePath + "assignment_" + assignmentRequestCounter + "_generic_results.json"));
|
||||
FileUtils.storeDocFilesDir = assignmentsBasePath + "assignments_" + assignmentRequestCounter + "_fullTexts" + File.separator; // It needs the last separator, because of how the docFiles are named and stored.
|
||||
|
||||
File curAssignmentsDirs = new File(FileUtils.storeDocFilesDir);
|
||||
if ( !curAssignmentsDirs.exists() ) {
|
||||
if ( !curAssignmentsDirs.mkdirs() ) { // Create the directories.
|
||||
String workingDir = System.getProperty("user.dir") + File.separator;
|
||||
logger.error("Could not create the \"assignments_fullTexts directories\": \"" + FileUtils.storeDocFilesDir + "\". Using the \"workingDir\" instead (" + workingDir + ").");
|
||||
FileUtils.storeDocFilesDir = assignmentsBasePath = workingDir;
|
||||
}
|
||||
}
|
||||
|
||||
ConnSupportUtils.setKnownMimeTypes();
|
||||
int tasksNumber = assignments.size();
|
||||
int batchCount = 0;
|
||||
int tasksCount = 0;
|
||||
|
@ -137,6 +136,8 @@ public class PublicationsRetrieverPlugin {
|
|||
|
||||
public static void addUrlReportsToWorkerReport()
|
||||
{
|
||||
Date date = new Date();
|
||||
|
||||
for ( DataToBeLogged data : FileUtils.dataToBeLoggedList )
|
||||
{
|
||||
UrlReport.StatusType status = null;
|
||||
|
@ -183,11 +184,18 @@ public class PublicationsRetrieverPlugin {
|
|||
if ( docOrDatasetUrl.equals(UrlUtils.unreachableDocOrDatasetUrlIndicator) || docOrDatasetUrl.equals(UrlUtils.duplicateUrlIndicator) )
|
||||
docOrDatasetUrl = null;
|
||||
|
||||
Payload payload = new Payload(data.getUrlId(), data.getSourceUrl(), docOrDatasetUrl, new Date(), mimeType, size, hash, fileLocation, "crawl:PublicationsRetriever");
|
||||
// Cleanup some data.
|
||||
if ( (size != null) && (size == 0L) )
|
||||
size = null;
|
||||
|
||||
if ( (hash != null) && (hash.equals("null")) )
|
||||
hash = null;
|
||||
|
||||
Payload payload = new Payload(data.getUrlId(), data.getSourceUrl(), docOrDatasetUrl, date, mimeType, size, hash, fileLocation, "crawl:PublicationsRetriever");
|
||||
// TODO - If support is added for other doc-formats other than "pdf", then make sure the "mime_type" is correctly specified.
|
||||
|
||||
AssignmentHandler.urlReports.add(new UrlReport(status, payload, error));
|
||||
}
|
||||
}// end-for
|
||||
FileUtils.dataToBeLoggedList.clear(); // Empty the list, to be re-populated by the next batch / assignment.
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
package eu.openaire.urls_worker.services;
|
||||
|
||||
import eu.openaire.urls_worker.exceptions.FileStorageException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.core.io.UrlResource;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.file.*;
|
||||
import java.util.Properties;
|
||||
|
||||
|
||||
@Service
|
||||
public class FileStorageService {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(FileStorageService.class);
|
||||
|
||||
public static Path assignmentsLocation = null;
|
||||
|
||||
static {
|
||||
String springPropertiesFile = System.getProperty("user.dir") + File.separator + "src" + File.separator + "main" + File.separator + "resources" + File.separator + "application.properties";
|
||||
FileReader fReader = null;
|
||||
try {
|
||||
fReader = new FileReader(springPropertiesFile);
|
||||
Properties props = new Properties();
|
||||
props.load(fReader); // Load jdbc related properties.
|
||||
String assignmentsDir = props.getProperty("file.assignments-dir");
|
||||
assignmentsLocation = Paths.get(assignmentsDir).toAbsolutePath().normalize();
|
||||
} catch (java.io.FileNotFoundException fnfe) {
|
||||
logger.error("The properties file was not found!", fnfe);
|
||||
System.exit(-10);
|
||||
} catch (IOException ioe) {
|
||||
logger.error("I/O error when reading the properties file!", ioe);
|
||||
System.exit(-10);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Autowired
|
||||
public FileStorageService() throws FileStorageException {
|
||||
try {
|
||||
Files.createDirectories(assignmentsLocation);
|
||||
} catch (Exception ex) {
|
||||
throw new FileStorageException("Could not create the directory where the uploaded files will be stored.", ex);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static final int bufferSize = 20971520;
|
||||
public ByteArrayOutputStream loadFileAsAStream(String fullFileName)
|
||||
{
|
||||
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(bufferSize); // 20 MB
|
||||
try {
|
||||
FileInputStream fileInputStream = new FileInputStream(fullFileName);
|
||||
int bytesRead;
|
||||
byte[] byteBuffer = new byte[bufferSize];
|
||||
while ( (bytesRead = fileInputStream.read(byteBuffer, 0, bufferSize)) > 0 )
|
||||
byteArrayOutputStream.write(byteBuffer, 0, bytesRead);
|
||||
|
||||
return byteArrayOutputStream;
|
||||
} catch (Exception e) {
|
||||
if ( e instanceof FileNotFoundException )
|
||||
logger.error("File \"" + fullFileName + "\" is not a file!");
|
||||
else
|
||||
logger.error(e.getMessage(), e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public Resource loadFileAsResource(String fullFileName) {
|
||||
try {
|
||||
Path filePath = assignmentsLocation.resolve(fullFileName).normalize();
|
||||
Resource resource = new UrlResource(filePath.toUri());
|
||||
return resource.exists() ? resource : null;
|
||||
} catch (Exception e) {
|
||||
logger.error("Error when loading file: " + fullFileName, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -125,7 +125,6 @@ public class AssignmentHandler {
|
|||
{
|
||||
String postUrl = UrlsWorkerApplication.controllerBaseUrl + "urls/addWorkerReport";
|
||||
logger.info("Going to post the WorkerReport of assignment_" + assignmentRequestCounter + " to the controller-server: " + postUrl);
|
||||
|
||||
try {
|
||||
ResponseEntity<String> responseEntity = restTemplate.postForEntity(postUrl, new WorkerReport(UrlsWorkerApplication.workerId, assignmentRequestCounter, urlReports), String.class);
|
||||
int responseCode = responseEntity.getStatusCodeValue();
|
||||
|
@ -135,13 +134,11 @@ public class AssignmentHandler {
|
|||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Error when submitting the WorkerReport of assignment_" + assignmentRequestCounter + " to the Controller: ", e);
|
||||
e.printStackTrace();
|
||||
return false;
|
||||
} finally {
|
||||
urlReports.clear(); // Reset, without de-allocating.
|
||||
assignmentsForPlugins.clear();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,80 @@
|
|||
package eu.openaire.urls_worker.util;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.List;
|
||||
import java.util.zip.ZipEntry;
|
||||
import java.util.zip.ZipOutputStream;
|
||||
|
||||
|
||||
public class FilesZipper
|
||||
{
|
||||
private static final Logger logger = LoggerFactory.getLogger(FilesZipper.class);
|
||||
|
||||
|
||||
public static File zipMultipleFilesAndGetZip(long assignmentsCounter, int zipBatchCounter, List<String> filesToZip, String baseDirectory)
|
||||
{
|
||||
File zipFile = null;
|
||||
ZipOutputStream zos = null;
|
||||
try {
|
||||
String zipFilename = baseDirectory + "assignments_" + assignmentsCounter + "_full-texts_" + zipBatchCounter + ".zip";
|
||||
// For example: assignments_2_full-texts_4.zip | where < 4 > is referred to the 4th batch of files requested by the controller.
|
||||
zipFile = new File(zipFilename);
|
||||
zos = new ZipOutputStream(new FileOutputStream(zipFile));
|
||||
|
||||
// Iterate over the given full-texts and add them to the zip.
|
||||
for ( String file : filesToZip )
|
||||
{
|
||||
zipAFile(file, zos, baseDirectory);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("", e);
|
||||
return null;
|
||||
} finally {
|
||||
try {
|
||||
if ( zos != null )
|
||||
zos.close();
|
||||
} catch (IOException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
return zipFile;
|
||||
}
|
||||
|
||||
|
||||
private static boolean zipAFile(String fileName, ZipOutputStream zos, String baseDir)
|
||||
{
|
||||
final int BUFFER = 1048576; // 1 MB
|
||||
byte[] data = new byte[BUFFER];
|
||||
BufferedInputStream bis = null;
|
||||
String fullFileName = baseDir + fileName;
|
||||
try {
|
||||
FileInputStream fis = new FileInputStream(fullFileName);
|
||||
bis = new BufferedInputStream(fis, BUFFER);
|
||||
zos.putNextEntry(new ZipEntry(fileName));
|
||||
int count;
|
||||
while ( (count = bis.read(data, 0, BUFFER)) != -1 ) {
|
||||
zos.write(data, 0, count);
|
||||
}
|
||||
zos.closeEntry(); // close the entry here (not the ZipOutputStream)
|
||||
} catch (FileNotFoundException fnfe) {
|
||||
logger.error("Error zipping file: " + fullFileName, fnfe.getMessage());
|
||||
return false;
|
||||
} catch (Exception e) {
|
||||
if ( ! e.getMessage().contains("duplicate") )
|
||||
logger.error("Error zipping file: " + fullFileName, e);
|
||||
return false;
|
||||
} finally {
|
||||
try {
|
||||
if ( bis != null )
|
||||
bis.close();
|
||||
} catch (IOException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
|
@ -17,8 +17,9 @@ server.servlet.context-path=/api
|
|||
|
||||
# LOGGING LEVELS
|
||||
logging.config=classpath:logback-spring.xml
|
||||
logging.level.root=WARN
|
||||
logging.level.root=INFO
|
||||
logging.level.org.springframework.web=INFO
|
||||
logging.level.org.springframework.security=WARN
|
||||
logging.level.eu.openaire.urls_worker=DEBUG
|
||||
logging.level.eu.openaire.publications_retriever=DEBUG
|
||||
spring.output.ansi.enabled=always
|
||||
|
@ -37,3 +38,6 @@ spring.servlet.multipart.max-file-size=200MB
|
|||
|
||||
# Max Request Size
|
||||
spring.servlet.multipart.max-request-size=215MB
|
||||
|
||||
# All files uploaded by this SpringApp will be stored in this directory (your/dir)
|
||||
file.assignments-dir=./assignments/
|
||||
|
|
Loading…
Reference in New Issue