forked from lsmyrnaios/UrlsWorker
- Upgrade the zip-file delivery by using the "InputStreamResource". This way is more reliable, have better performance and uses less memory.
- Use the "InputStreamResource" also in "get(single)FullText"-endpoint, in order to avoid loading a big full-text file in memory. - Decrease the system-reserved memory by 128 MB. - Fix path-variable regexes for "getFullText"-endpoint. - Optimize imports. - Code cleanup.
This commit is contained in:
parent
4fb5becace
commit
c46c8c448a
|
@ -65,7 +65,7 @@ if [[ justInstall -eq 0 ]]; then
|
||||||
|
|
||||||
# Update the max-heap-size based on the machine's physical memory.
|
# Update the max-heap-size based on the machine's physical memory.
|
||||||
machine_memory_mb=$(grep MemTotal /proc/meminfo | awk '{print $2}' | xargs -I {} echo "scale=4; {}/1024" | bc) # It returns the size in MB.
|
machine_memory_mb=$(grep MemTotal /proc/meminfo | awk '{print $2}' | xargs -I {} echo "scale=4; {}/1024" | bc) # It returns the size in MB.
|
||||||
max_heap_size_mb=$(echo "($machine_memory_mb - 1024)/1" | bc) # Leave 1024 MB to the system (the "()/1" is used to take the floor value).
|
max_heap_size_mb=$(echo "($machine_memory_mb - 896)/1" | bc) # Leave 896 MB to the system (the "()/1" is used to take the floor value).
|
||||||
# Now, we replace the "-Xmx" parameter inside the "./build.gradle" file, with "-Xmx${max_heap_size}m"
|
# Now, we replace the "-Xmx" parameter inside the "./build.gradle" file, with "-Xmx${max_heap_size}m"
|
||||||
echo -e "\n\nThe max-heap-size (-Xmx) will be set to: ${max_heap_size_mb}m\n\n"
|
echo -e "\n\nThe max-heap-size (-Xmx) will be set to: ${max_heap_size_mb}m\n\n"
|
||||||
sed -i "s/'-Xmx[0-9]\+[gm]'/'-Xmx${max_heap_size_mb}m'/g" ./build.gradle
|
sed -i "s/'-Xmx[0-9]\+[gm]'/'-Xmx${max_heap_size_mb}m'/g" ./build.gradle
|
||||||
|
|
|
@ -3,6 +3,7 @@ package eu.openaire.urls_worker.components;
|
||||||
import eu.openaire.urls_worker.controllers.FullTextsController;
|
import eu.openaire.urls_worker.controllers.FullTextsController;
|
||||||
import eu.openaire.urls_worker.plugins.PublicationsRetrieverPlugin;
|
import eu.openaire.urls_worker.plugins.PublicationsRetrieverPlugin;
|
||||||
import eu.openaire.urls_worker.util.AssignmentsHandler;
|
import eu.openaire.urls_worker.util.AssignmentsHandler;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
|
@ -15,8 +16,6 @@ import java.util.Date;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
|
||||||
|
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
public class ScheduledTasks {
|
public class ScheduledTasks {
|
||||||
|
|
|
@ -4,19 +4,17 @@ import eu.openaire.urls_worker.services.FileStorageService;
|
||||||
import eu.openaire.urls_worker.util.FilesZipper;
|
import eu.openaire.urls_worker.util.FilesZipper;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.core.io.Resource;
|
import org.springframework.core.io.InputStreamResource;
|
||||||
import org.springframework.http.HttpHeaders;
|
import org.springframework.http.HttpHeaders;
|
||||||
import org.springframework.http.HttpStatus;
|
|
||||||
import org.springframework.http.MediaType;
|
import org.springframework.http.MediaType;
|
||||||
import org.springframework.http.ResponseEntity;
|
import org.springframework.http.ResponseEntity;
|
||||||
import org.springframework.web.bind.annotation.GetMapping;
|
import org.springframework.web.bind.annotation.GetMapping;
|
||||||
import org.springframework.web.bind.annotation.PathVariable;
|
import org.springframework.web.bind.annotation.PathVariable;
|
||||||
import org.springframework.web.bind.annotation.RequestMapping;
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
import org.springframework.web.bind.annotation.RestController;
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
|
|
||||||
|
|
||||||
import javax.servlet.http.HttpServletRequest;
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -26,21 +24,18 @@ public class FullTextsController {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(GeneralController.class);
|
private static final Logger logger = LoggerFactory.getLogger(GeneralController.class);
|
||||||
|
|
||||||
private final FileStorageService fileStorageService;
|
|
||||||
|
|
||||||
public static HashMap<Long, Boolean> assignmentsNumsHandledAndLocallyDeleted = new HashMap<>();
|
public static HashMap<Long, Boolean> assignmentsNumsHandledAndLocallyDeleted = new HashMap<>();
|
||||||
|
|
||||||
public static String assignmentsBaseDir = null;
|
public static String assignmentsBaseDir = null;
|
||||||
|
|
||||||
|
|
||||||
public FullTextsController(FileStorageService fileStorageService) {
|
public FullTextsController() {
|
||||||
this.fileStorageService = fileStorageService;
|
|
||||||
assignmentsBaseDir = FileStorageService.assignmentsLocation.toString() + File.separator;
|
assignmentsBaseDir = FileStorageService.assignmentsLocation.toString() + File.separator;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@GetMapping("getFullTexts/{assignmentsCounter:[\\d]+}/{totalZipBatches:[\\d]+}/{zipBatchCounter:[\\d]+}/{fileNamesWithExtensions}")
|
@GetMapping("getFullTexts/{assignmentsCounter:[\\d]+}/{totalZipBatches:[\\d]+}/{zipBatchCounter:[\\d]+}/{fileNamesWithExtensions}")
|
||||||
public ResponseEntity<?> getMultipleFullTexts(@PathVariable long assignmentsCounter, @PathVariable int totalZipBatches, @PathVariable int zipBatchCounter, @PathVariable List<String> fileNamesWithExtensions, HttpServletRequest request) {
|
public Object getMultipleFullTexts(@PathVariable long assignmentsCounter, @PathVariable int totalZipBatches, @PathVariable int zipBatchCounter, @PathVariable List<String> fileNamesWithExtensions) {
|
||||||
|
|
||||||
int fileNamesListNum = fileNamesWithExtensions.size();
|
int fileNamesListNum = fileNamesWithExtensions.size();
|
||||||
if ( (fileNamesListNum == 1) && (fileNamesWithExtensions.get(0).length() == 0) ) { // In case the last "/" in the url was given, then this list will not be empty, but have one empty item instead.
|
if ( (fileNamesListNum == 1) && (fileNamesWithExtensions.get(0).length() == 0) ) { // In case the last "/" in the url was given, then this list will not be empty, but have one empty item instead.
|
||||||
|
@ -73,28 +68,29 @@ public class FullTextsController {
|
||||||
return ResponseEntity.internalServerError().body(errorMsg);
|
return ResponseEntity.internalServerError().body(errorMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
String zipName = zipFile.getName();
|
|
||||||
String zipFileFullPath = currentAssignmentsBaseFullTextsPath + zipName;
|
|
||||||
StreamingResponseBody streamingResponseBody = this.fileStorageService.loadFileAsAStream(zipFileFullPath);
|
|
||||||
if ( streamingResponseBody == null )
|
|
||||||
return ResponseEntity.status(HttpStatus.NOT_FOUND).body("Could not load zip-file: " + zipFileFullPath);
|
|
||||||
|
|
||||||
// If this is the last batch for this assignments-count, then make sure it is deleted in the next scheduled delete-operation.
|
// If this is the last batch for this assignments-count, then make sure it is deleted in the next scheduled delete-operation.
|
||||||
if ( zipBatchCounter == totalZipBatches ) {
|
if ( zipBatchCounter == totalZipBatches ) {
|
||||||
assignmentsNumsHandledAndLocallyDeleted.put(assignmentsCounter, false);
|
assignmentsNumsHandledAndLocallyDeleted.put(assignmentsCounter, false);
|
||||||
logger.debug("Will return the last batch (" + zipBatchCounter + ") of Assignments_" + assignmentsCounter + " to the Controller and these assignments will be deleted later.");
|
logger.debug("Will return the last batch (" + zipBatchCounter + ") of Assignments_" + assignmentsCounter + " to the Controller and these assignments will be deleted later.");
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.info("Sending the zip file \"" + zipFileFullPath + "\".");
|
String zipName = zipFile.getName();
|
||||||
return ResponseEntity.ok()
|
String zipFileFullPath = currentAssignmentsBaseFullTextsPath + zipName;
|
||||||
// Do not set any contentType here, Spring will handle it itself (otherwise it fails with "application/zip" and "application/octet-stream").
|
try {
|
||||||
.header(HttpHeaders.CONTENT_DISPOSITION, "inline; filename=\"" + zipName + "\"")
|
return ResponseEntity.ok()
|
||||||
.body(streamingResponseBody);
|
.contentType(MediaType.APPLICATION_OCTET_STREAM)
|
||||||
|
.header(HttpHeaders.CONTENT_DISPOSITION, "inline; filename=\"" + zipName + "\"")
|
||||||
|
.body(new InputStreamResource(new FileInputStream(zipFileFullPath)));
|
||||||
|
} catch (Exception e) {
|
||||||
|
String errorMsg = "Could not load the FileInputStream of the zip-file \"" + zipFileFullPath + "\"!";
|
||||||
|
logger.error(errorMsg, e);
|
||||||
|
return ResponseEntity.internalServerError().body(errorMsg);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@GetMapping("getFullText/{assignmentsCounter:[\\d]+]}/{fileNameWithExtension:[\\w]+.[\\w]{2,10}}")
|
@GetMapping("getFullText/{assignmentsCounter:[\\d]+}/{fileNameWithExtension:[\\w_:]+.[\\w]{2,10}}")
|
||||||
public ResponseEntity<?> getFullText(@PathVariable long assignmentsCounter, @PathVariable String fileNameWithExtension, HttpServletRequest request) {
|
public ResponseEntity<?> getFullText(@PathVariable long assignmentsCounter, @PathVariable String fileNameWithExtension) {
|
||||||
|
|
||||||
logger.info("Received a \"getFullText\" request.");
|
logger.info("Received a \"getFullText\" request.");
|
||||||
String fullTextFileFullPath = assignmentsBaseDir + "assignments_" + assignmentsCounter + "_fullTexts" + File.separator + fileNameWithExtension;
|
String fullTextFileFullPath = assignmentsBaseDir + "assignments_" + assignmentsCounter + "_fullTexts" + File.separator + fileNameWithExtension;
|
||||||
|
@ -104,20 +100,16 @@ public class FullTextsController {
|
||||||
return ResponseEntity.notFound().build();
|
return ResponseEntity.notFound().build();
|
||||||
}
|
}
|
||||||
|
|
||||||
Resource resource = this.fileStorageService.loadFileAsResource(fullTextFileFullPath);
|
try {
|
||||||
if ( resource == null )
|
return ResponseEntity.ok()
|
||||||
return ResponseEntity.internalServerError().body("Could not load file: " + fullTextFileFullPath);
|
.contentType(MediaType.APPLICATION_OCTET_STREAM)
|
||||||
|
.header(HttpHeaders.CONTENT_DISPOSITION, "inline; filename=\"" + file.getName() + "\"")
|
||||||
String contentType = null;
|
.body(new InputStreamResource(new FileInputStream(fullTextFileFullPath)));
|
||||||
contentType = request.getServletContext().getMimeType(fullTextFileFullPath);
|
} catch (Exception e) {
|
||||||
if ( contentType == null ) {
|
String errorMsg = "Could not load the FileInputStream of the full-text-file \"" + fullTextFileFullPath + "\"!";
|
||||||
contentType = "application/octet-stream";
|
logger.error(errorMsg, e);
|
||||||
|
return ResponseEntity.internalServerError().body(errorMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
return ResponseEntity.ok()
|
|
||||||
.contentType(MediaType.parseMediaType(contentType))
|
|
||||||
.header(HttpHeaders.CONTENT_DISPOSITION, "inline; filename=\"" + resource.getFilename() + "\"")
|
|
||||||
.body(resource);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,12 +3,13 @@ package eu.openaire.urls_worker.controllers;
|
||||||
import eu.openaire.urls_worker.UrlsWorkerApplication;
|
import eu.openaire.urls_worker.UrlsWorkerApplication;
|
||||||
import eu.openaire.urls_worker.payloads.responces.WorkerResponse;
|
import eu.openaire.urls_worker.payloads.responces.WorkerResponse;
|
||||||
import eu.openaire.urls_worker.util.AssignmentsHandler;
|
import eu.openaire.urls_worker.util.AssignmentsHandler;
|
||||||
import eu.openaire.urls_worker.util.WorkerConstants;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.http.HttpStatus;
|
import org.springframework.http.HttpStatus;
|
||||||
import org.springframework.http.ResponseEntity;
|
import org.springframework.http.ResponseEntity;
|
||||||
import org.springframework.web.bind.annotation.*;
|
import org.springframework.web.bind.annotation.GetMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
|
@ -18,10 +18,12 @@ import eu.openaire.urls_worker.util.AssignmentsHandler;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.*;
|
import java.io.File;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.sql.Timestamp;
|
import java.sql.Timestamp;
|
||||||
import java.util.*;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
|
||||||
|
|
|
@ -7,12 +7,14 @@ import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.core.io.Resource;
|
import org.springframework.core.io.Resource;
|
||||||
import org.springframework.core.io.UrlResource;
|
import org.springframework.core.io.UrlResource;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
|
|
||||||
|
|
||||||
import java.io.*;
|
import java.io.File;
|
||||||
import java.nio.file.*;
|
import java.io.FileReader;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.Paths;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
|
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
|
@ -36,7 +38,7 @@ public class FileStorageService {
|
||||||
System.exit(-10);
|
System.exit(-10);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
logger.error("I/O error when reading the properties file!", ioe);
|
logger.error("I/O error when reading the properties file!", ioe);
|
||||||
System.exit(-10);
|
System.exit(-11);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -51,40 +53,6 @@ public class FileStorageService {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static final int bufferSize = 10485760; // 10 MB
|
|
||||||
public StreamingResponseBody loadFileAsAStream(String fullFileName)
|
|
||||||
{
|
|
||||||
StreamingResponseBody streamingResponseBody = null;
|
|
||||||
AtomicReference<FileInputStream> fileInputStream = new AtomicReference<>();
|
|
||||||
try {
|
|
||||||
streamingResponseBody = response -> { // This does not need to be explicitly closed.
|
|
||||||
fileInputStream.set(new FileInputStream(fullFileName));
|
|
||||||
int bytesRead;
|
|
||||||
byte[] byteBuffer = new byte[bufferSize];
|
|
||||||
FileInputStream fInS = fileInputStream.get();
|
|
||||||
while ( (bytesRead = fInS.read(byteBuffer, 0, bufferSize)) > 0 )
|
|
||||||
response.write(byteBuffer, 0, bytesRead);
|
|
||||||
};
|
|
||||||
return streamingResponseBody;
|
|
||||||
} catch (Exception e) {
|
|
||||||
if ( e instanceof FileNotFoundException )
|
|
||||||
logger.error("File \"" + fullFileName + "\" is not a file!");
|
|
||||||
else
|
|
||||||
logger.error(e.getMessage(), e);
|
|
||||||
return null;
|
|
||||||
} finally {
|
|
||||||
FileInputStream fInS = fileInputStream.get();
|
|
||||||
if ( fInS != null ) {
|
|
||||||
try {
|
|
||||||
fInS.close();
|
|
||||||
} catch (IOException ex) {
|
|
||||||
logger.error("", ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public Resource loadFileAsResource(String fullFileName) {
|
public Resource loadFileAsResource(String fullFileName) {
|
||||||
try {
|
try {
|
||||||
Path filePath = assignmentsLocation.resolve(fullFileName).normalize();
|
Path filePath = assignmentsLocation.resolve(fullFileName).normalize();
|
||||||
|
|
|
@ -17,7 +17,9 @@ import org.springframework.web.client.RestClientException;
|
||||||
import org.springframework.web.client.RestTemplate;
|
import org.springframework.web.client.RestTemplate;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.*;
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
|
||||||
public class AssignmentsHandler {
|
public class AssignmentsHandler {
|
||||||
|
@ -27,7 +29,6 @@ public class AssignmentsHandler {
|
||||||
public static boolean isAvailableForWork = true;
|
public static boolean isAvailableForWork = true;
|
||||||
public static List<UrlReport> urlReports = null;
|
public static List<UrlReport> urlReports = null;
|
||||||
private static final int expectedDatasourcesPerRequest = 1400; // Per 10_000 assignments.
|
private static final int expectedDatasourcesPerRequest = 1400; // Per 10_000 assignments.
|
||||||
private static int expectedAssignmentsPerDatasource = 0;
|
|
||||||
public static Multimap<String, Assignment> assignmentsForPlugins = null;
|
public static Multimap<String, Assignment> assignmentsForPlugins = null;
|
||||||
private static final boolean askForTest = false; // Enable this only for testing.
|
private static final boolean askForTest = false; // Enable this only for testing.
|
||||||
|
|
||||||
|
@ -41,10 +42,11 @@ public class AssignmentsHandler {
|
||||||
public AssignmentsHandler()
|
public AssignmentsHandler()
|
||||||
{
|
{
|
||||||
urlReports = new ArrayList<>(UrlsWorkerApplication.maxAssignmentsLimitPerBatch);
|
urlReports = new ArrayList<>(UrlsWorkerApplication.maxAssignmentsLimitPerBatch);
|
||||||
expectedAssignmentsPerDatasource = (UrlsWorkerApplication.maxAssignmentsLimitPerBatch / expectedDatasourcesPerRequest);
|
int expectedAssignmentsPerDatasource = (UrlsWorkerApplication.maxAssignmentsLimitPerBatch / expectedDatasourcesPerRequest);
|
||||||
assignmentsForPlugins = HashMultimap.create(expectedDatasourcesPerRequest, expectedAssignmentsPerDatasource);
|
assignmentsForPlugins = HashMultimap.create(expectedDatasourcesPerRequest, expectedAssignmentsPerDatasource);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public static AssignmentsRequest requestAssignments()
|
public static AssignmentsRequest requestAssignments()
|
||||||
{
|
{
|
||||||
String requestUrl = UrlsWorkerApplication.controllerBaseUrl + "urls" + (askForTest ? "/test" : "") + "?workerId=" + UrlsWorkerApplication.workerId + "&workerAssignmentsLimit=" + UrlsWorkerApplication.maxAssignmentsLimitPerBatch;
|
String requestUrl = UrlsWorkerApplication.controllerBaseUrl + "urls" + (askForTest ? "/test" : "") + "?workerId=" + UrlsWorkerApplication.workerId + "&workerAssignmentsLimit=" + UrlsWorkerApplication.maxAssignmentsLimitPerBatch;
|
||||||
|
|
Loading…
Reference in New Issue