New feature: BulkImport full-text files from compatible datasources.

This commit is contained in:
Lampros Smyrnaios 2023-05-11 03:07:55 +03:00
parent 42b93e9429
commit b6e8cd1889
16 changed files with 1289 additions and 61 deletions

View File

@ -112,6 +112,10 @@ dependencies {
// https://mvnrepository.com/artifact/org.json/json
implementation 'org.json:json:20230227'
// https://mvnrepository.com/artifact/com.google.code.gson/gson
implementation 'com.google.code.gson:gson:2.10.1'
// https://mvnrepository.com/artifact/io.micrometer/micrometer-registry-prometheus
runtimeOnly 'io.micrometer:micrometer-registry-prometheus:1.10.6'

View File

@ -17,6 +17,12 @@ services:
- type: bind
source: $HOME/logs
target: /logs
- type: bind
source: /mnt/bulkImport
target: /mnt/bulkImport
- type: bind
source: $HOME/bulkImportReports
target: /bulkImportReports
build:
dockerfile: ./Dockerfile
context: .

View File

@ -1,5 +1,6 @@
package eu.openaire.urls_controller;
import eu.openaire.urls_controller.services.FullTextsServiceImpl;
import eu.openaire.urls_controller.services.UrlsServiceImpl;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.UriBuilder;
@ -51,6 +52,8 @@ public class Application {
shutdownThreads(UrlsServiceImpl.insertsExecutor);
shutdownThreads(FileUtils.hashMatchingExecutor);
shutdownThreads(FullTextsServiceImpl.backgroundExecutor);
shutdownThreads(FullTextsServiceImpl.bulkImportExecutor);
logger.info("Exiting..");
}

View File

@ -0,0 +1,111 @@
package eu.openaire.urls_controller.components;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@ConfigurationProperties(prefix = "bulkimport")
public class BulkImport {
private String baseBulkImportLocation;
private String bulkImportReportLocation;
private Map<String, BulkImportSource> bulkImportSources;
public BulkImport() {
}
public String getBaseBulkImportLocation() {
return baseBulkImportLocation;
}
public void setBaseBulkImportLocation(String baseBulkImportLocation) {
this.baseBulkImportLocation = baseBulkImportLocation;
}
public String getBulkImportReportLocation() {
return bulkImportReportLocation;
}
public void setBulkImportReportLocation(String bulkImportReportLocation) {
this.bulkImportReportLocation = bulkImportReportLocation;
}
public Map<String, BulkImportSource> getBulkImportSources() {
return bulkImportSources;
}
public void setBulkImportSources(Map<String, BulkImportSource> bulkImportSources) {
this.bulkImportSources = bulkImportSources;
}
@Override
public String toString() {
return "BulkImport{" +
"baseBulkImportLocation='" + baseBulkImportLocation + '\'' +
", bulkImportReportLocation='" + bulkImportReportLocation + '\'' +
", bulkImportSources=" + bulkImportSources +
'}';
}
public static class BulkImportSource {
String datasourceID;
String datasourcePrefix;
String pdfUrlPrefix;
String mimeType;
public BulkImportSource() {
}
public String getDatasourceID() {
return datasourceID;
}
public void setDatasourceID(String datasourceID) {
this.datasourceID = datasourceID;
}
public String getDatasourcePrefix() {
return datasourcePrefix;
}
public void setDatasourcePrefix(String datasourcePrefix) {
this.datasourcePrefix = datasourcePrefix;
}
public String getPdfUrlPrefix() {
return pdfUrlPrefix;
}
public void setPdfUrlPrefix(String pdfUrlPrefix) {
this.pdfUrlPrefix = pdfUrlPrefix;
}
public String getMimeType() {
return mimeType;
}
public void setMimeType(String mimeType) {
this.mimeType = mimeType;
}
@Override
public String toString() {
return "BulkImportSource{" +
"datasourceID='" + datasourceID + '\'' +
", datasourcePrefix='" + datasourcePrefix + '\'' +
", pdfUrlPrefix='" + pdfUrlPrefix + '\'' +
", mimeType='" + mimeType + '\'' +
'}';
}
}
}

View File

@ -1,12 +1,18 @@
package eu.openaire.urls_controller.components;
import eu.openaire.urls_controller.services.FullTextsServiceImpl;
import eu.openaire.urls_controller.util.GenericUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
//import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@Component
@ -14,10 +20,40 @@ public class ScheduledTasks {
private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
//@Scheduled(fixedRate = 600_000) // Run every 10 mins: 600_000
public void reportCurrentTime() {
logger.info("Server is live! Time is now {}", dateFormat.format(new Date()));
@Scheduled(fixedDelay = 3_600_000) // Execute this method 1 hour after the last execution, in order for some tasks to have been gathered.
//@Scheduled(fixedDelay = 20_000) // Just for testing (every 20 secs).
public void executeBackgroundTasks()
{
List<Callable<Boolean>> tempList = new ArrayList<>(FullTextsServiceImpl.backgroundCallableTasks); // Copy the list in order to know what was executed and delete only that data later.
// So the items added while this execution happens, will be left in the list, while the other will be deleted.
int numOfTasks = tempList.size(); // Since the temp-list is a deep-copy and not a reference, new tasks that are added will not be executed.
if ( numOfTasks == 0 )
return;
logger.debug(numOfTasks + " background tasks were found inside the \"backgroundCallableTasks\" list and are about to be executed.");
// Execute the tasks and wait for them to finish.
try {
List<Future<Boolean>> futures = FullTextsServiceImpl.backgroundExecutor.invokeAll(tempList);
int sizeOfFutures = futures.size();
for ( int i = 0; i < sizeOfFutures; ++i ) {
try {
Boolean value = futures.get(i).get(); // Get and see if an exception is thrown..
// Add check for the value, if wanted.. (we don't care at the moment)
} catch (ExecutionException ee) {
String stackTraceMessage = GenericUtils.getSelectiveStackTrace(ee, null, 15); // These can be serious errors like an "out of memory exception" (Java HEAP).
logger.error("Task_" + (i+1) + " failed with: " + ee.getMessage() + "\n" + stackTraceMessage);
} catch (CancellationException ce) {
logger.error("Task_" + (i+1) + " was cancelled: " + ce.getMessage());
} catch (IndexOutOfBoundsException ioobe) {
logger.error("IOOBE for task_" + i + " in the futures-list! " + ioobe.getMessage());
} finally {
FullTextsServiceImpl.backgroundCallableTasks.remove(tempList.get(i)); // Remove this object from the global list. Do not use indexes, since they will be different after each deletion and addition.
}
}
} catch (Exception e) {
logger.error("", e);
}
}
}

View File

@ -0,0 +1,204 @@
package eu.openaire.urls_controller.controllers;
import eu.openaire.urls_controller.components.BulkImport;
import eu.openaire.urls_controller.models.BulkImportReport;
import eu.openaire.urls_controller.services.FullTextsService;
import eu.openaire.urls_controller.services.FullTextsServiceImpl;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.GenericUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.io.BufferedReader;
import java.io.File;
import java.io.InputStreamReader;
import java.nio.file.*;
import java.util.Collections;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@RestController
@RequestMapping("")
public class FullTextsController {
private static final Logger logger = LoggerFactory.getLogger(FullTextsController.class);
@Autowired
private FileUtils fileUtils;
private final FullTextsService fullTextsService;
private final String baseBulkImportLocation;
private final String bulkImportReportLocation;
private final HashMap<String, BulkImport.BulkImportSource> bulkImportSources;
public static final Set<String> bulkImportDirs = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
public FullTextsController(FullTextsService fullTextsService, BulkImport bulkImport)
{
String bulkImportReportLocation1;
this.baseBulkImportLocation = bulkImport.getBaseBulkImportLocation();
this.bulkImportSources = new HashMap<>(bulkImport.getBulkImportSources());
bulkImportReportLocation1 = bulkImport.getBulkImportReportLocation();
if ( !bulkImportReportLocation1.endsWith("/") )
bulkImportReportLocation1 += "/";
this.bulkImportReportLocation = bulkImportReportLocation1;
this.fullTextsService = fullTextsService;
}
private static final Pattern LAST_DIR_REGEX = Pattern.compile("^.*/([^/]+[/]?)$");
@GetMapping("bulkImportFullTexts")
public ResponseEntity<?> bulkImportFullTexts(@RequestParam String provenance, @RequestParam String bulkImportDir, @RequestParam boolean shouldDeleteFilesOnFinish) {
BulkImport.BulkImportSource bulkImportSource = bulkImportSources.get(provenance);
if ( bulkImportSource == null ) {
String errorMsg = "The provided provenance \"" + provenance + "\" is not in the list of the bulk-imported sources, so no configuration-rules are available!";
logger.error(errorMsg);
return ResponseEntity.badRequest().body(errorMsg); // It's the user's fault that gave an unsupported provenance.
}
// Check if the given directory parameter exists.
if ( bulkImportDir.isEmpty() ) {
String errorMsg = "The \"bulkImportDir\" was missing from the request!";
logger.error(errorMsg);
return ResponseEntity.badRequest().body(errorMsg);
}
String givenBulkDir = bulkImportDir; // Keep the given value here, to not expose the full-path, in case the user has not provided an absolut path.
// Make sure the whole path ends with "/", so that we can easily append file-names later.
if ( !bulkImportDir.endsWith("/") )
bulkImportDir += "/";
String relativeBulkImportDir = null;
// Check if we have "relative-path" so that we can append it to the "baseBulkImportLocation".
if ( !bulkImportDir.startsWith("/") ) {
// A relative path was given.
relativeBulkImportDir = bulkImportDir;
bulkImportDir = baseBulkImportLocation + bulkImportDir;
} else {
String errMsg = "The bulkImportDir \"" + bulkImportDir + "\" was problematic!";
Matcher matcher = LAST_DIR_REGEX.matcher(bulkImportDir);
if ( !matcher.matches() ) {
logger.error(errMsg);
return ResponseEntity.badRequest().body(errMsg);
}
relativeBulkImportDir = matcher.group(1);
if ( (relativeBulkImportDir == null) || relativeBulkImportDir.isEmpty() ) {
logger.error(errMsg);
return ResponseEntity.badRequest().body(errMsg);
}
}
// The "relativeBulkImportDir" should always be guaranteed to end with "/"! Otherwise, the import-procedure will fail.
logger.info("Received a \"bulkImportFullTexts\" request for \"" + provenance + "\" procedure and bulkImportDir: \"" + bulkImportDir + "\".");
// Check whether the given directory is accessible.
File givenDir = new File(bulkImportDir);
if ( !givenDir.isDirectory() ) {
String errorMsg = "The bulkImportDir \"" + bulkImportDir + "\" is invalid!";
logger.error(errorMsg);
return ResponseEntity.badRequest().body(errorMsg);
}
// Efficiently check if the dir is empty, without loading all the file-entries in memory.
try ( DirectoryStream<Path> directory = Files.newDirectoryStream(givenDir.toPath()) ) {
if ( !directory.iterator().hasNext() ) {
String errorMsg = "The givenDir \"" + givenDir + "\" is empty!";
logger.warn(errorMsg);
return ResponseEntity.badRequest().body(errorMsg);
}
// The above check does not catch the case were the directory has at least one subdirectory, but no full-texts files.
// The "iterator()" will have a "next" entry, but no full-text file will exist. Although, that case will be rare.
} catch (Exception e) {
String errorMsg = "Error when checking if the givenDir \"" + givenDir + "\" is empty!";
logger.error(errorMsg);
return ResponseEntity.internalServerError().body(errorMsg);
}
// Detect if the same directory is scheduled for being processed. In that case, return a 429.
if ( ! bulkImportDirs.add(bulkImportDir) ) {
// We allow multiple jobs for the same provenance, running at the same time, but not multiple jobs for the same bulkImportDirectory.
String errorMsg = "There is a bulk-import request for the directory \"" + bulkImportDir + "\" that is being handled at the moment. Please wait until it's finished being processed, before making another request.";
logger.error(errorMsg);
return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).body(errorMsg);
}
Path currentBulkImportReportLocationDir = Paths.get(this.bulkImportReportLocation, provenance);
try {
Files.createDirectories(currentBulkImportReportLocationDir); // No-op if dir exists. It does not throw a "alreadyExistsException"
} catch (Exception e) {
String errorMsg = "Could nor create the \"bulkImportReportLocation\" for provenance \"" + provenance + "\" : " + currentBulkImportReportLocationDir;
logger.error(errorMsg, e);
return ResponseEntity.internalServerError().body(errorMsg);
}
String bulkImportReportID = provenance + "/" + relativeBulkImportDir.substring(0, (relativeBulkImportDir.length() -1)) + "_report_" + GenericUtils.getRandomNumber(10000, 99999);
String bulkImportReportFullPath = this.bulkImportReportLocation + bulkImportReportID + ".json";
String msg = "The 'bulkImportFullTexts' request for '" + provenance + "' procedure and bulkImportDir: '" + givenBulkDir + "' was accepted and will be scheduled for execution. "
+ (shouldDeleteFilesOnFinish ? "The successfully imported files will be deleted." : "All files will remain inside the directory after processing.")
+ " You can request a report at any moment, using this reportFileID: " + bulkImportReportID;
BulkImportReport bulkImportReport = new BulkImportReport(provenance, bulkImportReportFullPath, bulkImportReportID);
bulkImportReport.addEvent(msg);
String errorMsg = fileUtils.writeToFile(bulkImportReportFullPath, bulkImportReport.getJsonReport());
if ( errorMsg != null )
return ResponseEntity.internalServerError().body(errorMsg);
logger.info(msg);
// Add this to a background job, since it will take a lot of time to be completed, and the caller will get a "read-timeout" at least and a socket-timeout at most (in case of a network failure during those hours).
String finalBulkImportDir = bulkImportDir;
String finalRelativeBulkImportDir = relativeBulkImportDir;
FullTextsServiceImpl.backgroundCallableTasks.add(() ->
fullTextsService.bulkImportFullTextsFromDirectory(bulkImportReport, finalRelativeBulkImportDir, finalBulkImportDir, givenDir, provenance, bulkImportSource, shouldDeleteFilesOnFinish)
);
return ResponseEntity.ok().body(msg);
}
@GetMapping("getBulkImportReport")
public ResponseEntity<?> getBulkImportReport(@RequestParam("id") String bulkImportReportId)
{
// Write the contents of the report-file to a string (efficiently!) and return the whole content as an HTTP-response.
StringBuilder stringBuilder = new StringBuilder(2_000);
String line;
try ( BufferedReader in = new BufferedReader(new InputStreamReader(Files.newInputStream(Paths.get(this.bulkImportReportLocation, bulkImportReportId + ".json"))), FileUtils.tenMb) ) {
while ( (line = in.readLine()) != null )
stringBuilder.append(line).append("\n"); // The "readLine()" does not return the line-term char.
} catch (NoSuchFileException nsfe) {
logger.warn("The requested report-file with ID: \"" + bulkImportReportId + "\" was not found!");
return ResponseEntity.notFound().build();
} catch (Exception e) {
String errorMsg = "Failed to read the contents of report-file with ID: " + bulkImportReportId;
logger.error(errorMsg, e);
return ResponseEntity.internalServerError().body(errorMsg); // It's ok to give the file-path to the user, since the report already contains the file-path.
}
return ResponseEntity.ok().body(stringBuilder.toString());
}
}

View File

@ -0,0 +1,105 @@
package eu.openaire.urls_controller.models;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.gson.Gson;
import eu.openaire.urls_controller.util.GenericUtils;
import java.util.Collection;
import java.util.Map;
@JsonInclude(JsonInclude.Include.NON_NULL)
public class BulkImportReport {
@JsonProperty
private String provenance;
@JsonProperty
private String reportLocation;
@JsonProperty
private String reportID;
// This will not be serialized, since Gson cannot serialize Multimaps. Instead, it will be converted to the "simpler" map below.
transient private SetMultimap<String, String> eventsMultimap = Multimaps.synchronizedSetMultimap(LinkedHashMultimap.create());
// We need a "LinkedHashMultimap", se that the order of the keys (timestamps) stay ascending, so the final report makes sense in chronological order.
// We need for one key (timestamp) to have multiple values (events), in order to not lose events happening at the same time.
@JsonProperty
private Map<String, Collection<String>> eventsMap;
public BulkImportReport(String provenance, String reportLocation, String reportID) {
this.provenance = provenance;
this.reportLocation = reportLocation;
this.reportID = reportID;
}
public void addEvent(String event) {
eventsMultimap.put(GenericUtils.getReadableCurrentTimeAndZone(), event);
}
public String getJsonReport()
{
//Convert the LinkedHashMultiMap<String, String> to Map<String, Collection<String>>, since Gson cannot serialize Multimaps.
eventsMap = eventsMultimap.asMap();
return new Gson().toJson(this);
}
public String getProvenance() {
return provenance;
}
public void setProvenance(String provenance) {
this.provenance = provenance;
}
public String getReportLocation() {
return reportLocation;
}
public void setReportLocation(String reportLocation) {
this.reportLocation = reportLocation;
}
public String getReportID() {
return reportID;
}
public void setReportID(String reportID) {
this.reportID = reportID;
}
public SetMultimap<String, String> getEventsMultimap() {
return eventsMultimap;
}
public void setEventsMultimap(SetMultimap<String, String> eventsMultimap) {
this.eventsMultimap = eventsMultimap;
}
public Map<String, Collection<String>> getEventsMap() {
return eventsMap;
}
public void setEventsMap(Map<String, Collection<String>> eventsMap) {
this.eventsMap = eventsMap;
}
@Override
public String toString() {
return "BulkImportReport{" +
"provenance='" + provenance + '\'' +
", reportLocation='" + reportLocation + '\'' +
", reportID='" + reportID + '\'' +
", eventsMultimap=" + eventsMultimap +
", eventsMap=" + eventsMap +
'}';
}
}

View File

@ -0,0 +1,115 @@
package eu.openaire.urls_controller.models;
import com.google.common.hash.Hashing;
import com.google.common.io.Files;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileOutputStream;
import java.nio.file.Paths;
public class DocFileData {
private static final Logger logger = LoggerFactory.getLogger(DocFileData.class);
private File docFile;
private String hash;
private Long size;
private String location;
private FileOutputStream fileOutputStream;
public DocFileData(File docFile, String hash, Long size, String location, FileOutputStream fileOutputStream) {
this.docFile = docFile;
this.hash = hash;
this.size = size;
this.location = location;
this.fileOutputStream = fileOutputStream;
}
public DocFileData(File docFile, String hash, Long size, String location) {
this.docFile = docFile;
this.hash = hash;
this.size = size;
this.location = location;
}
public DocFileData(File docFile, FileOutputStream fileOutputStream) {
this.docFile = docFile;
this.fileOutputStream = fileOutputStream;
}
public File getDocFile() {
return docFile;
}
public void setDocFile(File docFile) {
this.docFile = docFile;
}
public String getHash() {
return hash;
}
public void setHash(String hash) {
this.hash = hash;
}
public Long getSize() {
return size;
}
public void setSize(Long size) {
this.size = size;
}
/**
* Set this as a separate method (not automatically applied in the contractor), in order to avoid long thread-blocking in the caller method, which downloads and constructs this object inside a synchronized block.
* */
public void calculateAndSetHashAndSize() {
if ( this.docFile == null ) { // Verify the "docFile" is already set, otherwise we get an NPE.
logger.warn("The \"docFile\" was not previously set!");
return;
}
String fileLocation = this.docFile.getAbsolutePath();
try {
this.hash = Files.asByteSource(this.docFile).hash(Hashing.md5()).toString(); // These hashing functions are deprecated, but just to inform us that MD5 is not secure. Luckily, we use MD5 just to identify duplicate files.
//logger.debug("MD5 for file \"" + docFile.getName() + "\": " + this.hash); // DEBUG!
this.size = java.nio.file.Files.size(Paths.get(fileLocation));
//logger.debug("Size of file \"" + docFile.getName() + "\": " + this.size); // DEBUG!
} catch (Exception e) {
logger.error("Could not retrieve the size " + ((this.hash == null) ? "and the MD5-hash " : "") + "of the file: " + fileLocation, e);
}
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
public FileOutputStream getFileOutputStream() {
return fileOutputStream;
}
public void setFileOutputStream(FileOutputStream fileOutputStream) {
this.fileOutputStream = fileOutputStream;
}
@Override
public String toString() {
return "DocFileData{" +
"docFile=" + docFile +
", hash='" + hash + '\'' +
", size=" + size +
", location='" + location + '\'' +
", fileOutputStream=" + fileOutputStream +
'}';
}
}

View File

@ -0,0 +1,89 @@
package eu.openaire.urls_controller.models;
import eu.openaire.urls_controller.util.FileUtils;
import java.util.regex.Matcher;
public class FileLocationData {
String fileDir;
String fileName;
String filenameWithoutExtension;
String fileNameID;
String dotFileExtension;
public FileLocationData(String fileLocation) throws RuntimeException {
// Extract and set LocationData.
Matcher matcher = FileUtils.FILENAME_ID_EXTENSION.matcher(fileLocation);
if ( !matcher.matches() )
throw new RuntimeException("Failed to match the \"" + fileLocation + "\" with the regex: " + FileUtils.FILENAME_ID_EXTENSION);
fileDir = matcher.group(1);
if ( (fileDir == null) || fileDir.isEmpty() )
throw new RuntimeException("Failed to extract the \"fileDir\" from \"" + fileLocation + "\".");
fileName = matcher.group(2);
if ( (fileName == null) || fileName.isEmpty() )
throw new RuntimeException("Failed to extract the \"fileName\" from \"" + fileLocation + "\".");
// The "matcher.group(3)" returns the "filenameWithoutExtension", which is currently not used.
fileNameID = matcher.group(4);
if ( (fileNameID == null) || fileNameID.isEmpty() )
throw new RuntimeException("Failed to extract the \"fileNameID\" from \"" + fileLocation + "\".");
dotFileExtension = matcher.group(5);
if ( (dotFileExtension == null) || dotFileExtension.isEmpty() )
throw new RuntimeException("Failed to extract the \"dotFileExtension\" from \"" + fileLocation + "\".");
}
public String getFileDir() {
return fileDir;
}
public void setFileDir(String fileDir) {
this.fileDir = fileDir;
}
public String getFileName() {
return fileName;
}
public void setFileName(String fileName) {
this.fileName = fileName;
}
public String getFilenameWithoutExtension() {
return filenameWithoutExtension;
}
public void setFilenameWithoutExtension(String filenameWithoutExtension) {
this.filenameWithoutExtension = filenameWithoutExtension;
}
public String getFileNameID() {
return fileNameID;
}
public void setFileNameID(String fileNameID) {
this.fileNameID = fileNameID;
}
public String getDotFileExtension() {
return dotFileExtension;
}
public void setDotFileExtension(String dotFileExtension) {
this.dotFileExtension = dotFileExtension;
}
@Override
public String toString() {
return "FileLocationData{" +
"fileDir='" + fileDir + '\'' +
", fileName='" + fileName + '\'' +
", filenameWithoutExtension='" + filenameWithoutExtension + '\'' +
", fileNameID='" + fileNameID + '\'' +
", dotFileExtension='" + dotFileExtension + '\'' +
'}';
}
}

View File

@ -0,0 +1,18 @@
package eu.openaire.urls_controller.services;
import eu.openaire.urls_controller.components.BulkImport;
import eu.openaire.urls_controller.models.BulkImportReport;
import java.io.File;
import java.util.List;
public interface FullTextsService {
Boolean bulkImportFullTextsFromDirectory(BulkImportReport bulkImportReport, String relativeBulkImportDir, String bulkImportDirName, File bulkImportDir, String provenance, BulkImport.BulkImportSource bulkImportSource, boolean shouldDeleteFilesOnFinish);
List<String> getFileLocationsInsideDir(String directory);
String getMD5hash(String string);
}

View File

@ -0,0 +1,443 @@
package eu.openaire.urls_controller.services;
import com.google.common.collect.Lists;
import eu.openaire.urls_controller.components.BulkImport;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.controllers.FullTextsController;
import eu.openaire.urls_controller.models.BulkImportReport;
import eu.openaire.urls_controller.models.DocFileData;
import eu.openaire.urls_controller.models.FileLocationData;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.GenericUtils;
import eu.openaire.urls_controller.util.ParquetFileUtils;
import org.apache.avro.generic.GenericData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import javax.xml.bind.DatatypeConverter;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Service
public class FullTextsServiceImpl implements FullTextsService {
private static final Logger logger = LoggerFactory.getLogger(FullTextsServiceImpl.class);
@Autowired
private FileUtils fileUtils;
@Autowired
private ParquetFileUtils parquetFileUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
public static final ExecutorService backgroundExecutor = Executors.newFixedThreadPool(2); // At most 2 threads will be used.
public static final List<Callable<Boolean>> backgroundCallableTasks = Collections.synchronizedList(new ArrayList<>());
private static final int numOfBulkImportThreads = 4;
public static final ExecutorService bulkImportExecutor = Executors.newFixedThreadPool(numOfBulkImportThreads); // At most 4 threads will be used.
/**
* Given a directory with full-text-files, this method imports the full-texts files in the PDF Aggregation Service.
* Also, it provides the guarantee that the failed files will not be deleted! A file can "fail" if any of the expected results fail (upload-to-S3, parquet-creation and upload, load-to-db, ect)
* */
public Boolean bulkImportFullTextsFromDirectory(BulkImportReport bulkImportReport, String relativeBulkImportDir, String bulkImportDirName, File bulkImportDir, String provenance, BulkImport.BulkImportSource bulkImportSource, boolean shouldDeleteFilesOnFinish)
{
String bulkImportReportLocation = bulkImportReport.getReportLocation();
// Write to bulkImport-report file.
bulkImportReport.addEvent("Initializing the bulkImport '" + provenance + "' procedure with bulkImportDir '" + bulkImportDirName + "'.");
// Do not write immediately to the file, wait for the following checks.
if ( (ParquetFileUtils.payloadsSchema == null) // Parse the schema if it's not already parsed.
&& ((ParquetFileUtils.payloadsSchema = ParquetFileUtils.parseSchema(ParquetFileUtils.payloadSchemaFilePath)) == null ) ) {
String errorMsg = "The 'payloadsSchema' could not be parsed!";
logger.error(errorMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
FullTextsController.bulkImportDirs.remove(bulkImportDirName);
return false;
}
List<String> fileLocations = getFileLocationsInsideDir(bulkImportDirName);
if ( fileLocations == null ) {
bulkImportReport.addEvent("Could not retrieve the files for bulk-import!");
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
FullTextsController.bulkImportDirs.remove(bulkImportDirName);
return false;
}
int numOfFiles = fileLocations.size();
if ( numOfFiles == 0 ) {
String errorMsg = "No files were found inside the bulkImportDir: " + bulkImportDirName;
logger.warn(errorMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
FullTextsController.bulkImportDirs.remove(bulkImportDirName);
return false;
}
logger.trace("fileLocations:\n" + fileLocations);
String localParquetDir = parquetFileUtils.parquetBaseLocalDirectoryPath + "bulk_import_" + provenance + File.separator + relativeBulkImportDir; // This ends with "/".
try {
Files.createDirectories(Paths.get(localParquetDir)); // No-op if it already exists.
} catch (Exception e) {
String errorMsg = "Could not create the local parquet-directory: " + localParquetDir;
logger.error(errorMsg, e);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
FullTextsController.bulkImportDirs.remove(bulkImportDirName);
return false;
}
// Create a new directory on HDFS, with this bulkImportDir name. So, that there will not be any "load data" operation to fail because another thread has loaded that base-dir right before.
String currentBulkImportHdfsDir = parquetFileUtils.parquetHDFSDirectoryPathPayloadsBulkImport + relativeBulkImportDir;
if ( ! parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + currentBulkImportHdfsDir + parquetFileUtils.mkDirsAndParams) ) { // N0-op if it already exists. It is very quick.
String errorMsg = "Could not create the hdfs-directory: " + currentBulkImportHdfsDir;
logger.error(errorMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
FullTextsController.bulkImportDirs.remove(bulkImportDirName);
return false;
}
long timeMillis = System.currentTimeMillis(); // Store it here, in order to have the same for all current records.
List<Callable<Integer>> callables = new ArrayList<>(numOfFiles);
List<List<String>> subLists = Lists.partition(fileLocations, numOfBulkImportThreads); // Divide the initial list to "numOfBulkImportThreads" subLists. The last one may have marginally fewer files.
int subListsSize = subLists.size();
bulkImportReport.addEvent("Going to import the files in " + subListsSize + " segments, in parallel.");
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
for ( int i = 0; i < subListsSize; ++i ) {
int finalI = i;
callables.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
return processBulkImportedFilesSegment(bulkImportReport, finalI, subLists.get(finalI), bulkImportDirName, localParquetDir, currentBulkImportHdfsDir, provenance, bulkImportSource, timeMillis, shouldDeleteFilesOnFinish);
});
}
int numFailedSegments = 0;
int numFailedFiles = 0;
try {
List<Future<Integer>> futures = bulkImportExecutor.invokeAll(callables); // This waits for all tasks to finish.
int sizeOfFutures = futures.size();
for ( int i = 0; i < sizeOfFutures; ++i ) {
try {
numFailedFiles += futures.get(i).get();
if ( numFailedFiles == subLists.get(i).size() ) { // Get and see if it was successfully or not, or if an exception is thrown..
numFailedSegments++;
}
// In case all the files failed to be bulk-imported, then we will detect it in the "numSuccessfulSegments"-check later.
// The failed-to-be-imported files, will not be deleted, even if the user specifies that he wants to delete the directory.
} catch (ExecutionException ee) {
String stackTraceMessage = GenericUtils.getSelectiveStackTrace(ee, null, 15); // These can be serious errors like an "out of memory exception" (Java HEAP).
logger.error("Task_" + (i+1) + " failed with: " + ee.getMessage() + "\n" + stackTraceMessage);
} catch (CancellationException ce) {
logger.error("Task_" + (i+1) + " was cancelled: " + ce.getMessage());
} catch (IndexOutOfBoundsException ioobe) {
logger.error("IOOBE for task_" + i + " in the futures-list! " + ioobe.getMessage());
}
}
} catch (Exception e) {
String errorMsg = "An error occurred when trying to bulk-import data from bulkImportDir: " + bulkImportDirName;
logger.error(errorMsg, e);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
FullTextsController.bulkImportDirs.remove(bulkImportDirName);
return false;
} finally {
logger.debug("Deleting local parquet directory: " + localParquetDir);
fileUtils.deleteDirectory(new File(localParquetDir)); // It may not exist at all, if none of the parquet files were created.
}
// Check the results.
String msg;
if ( numFailedFiles == numOfFiles ) {
String errorMsg = "None of the files inside the bulkImportDir '" + bulkImportDirName + "' were imported!";
logger.error(errorMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
FullTextsController.bulkImportDirs.remove(bulkImportDirName);
return false;
} else if ( numFailedFiles > 0 ) { // Some failed, but not all.
msg = numFailedFiles + " files" + (numFailedSegments > 0 ? (" and " + numFailedSegments + " whole segments") : "") + " failed to be bulk-imported, from the bulkImportDir: " + bulkImportDirName;
logger.warn(msg);
} else {
msg = "All " + numOfFiles + " files, from bulkImportDir '" + bulkImportDirName + "' were bulkImported successfully.";
logger.info(msg);
}
bulkImportReport.addEvent(msg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
// Merge the parquet files inside the table "payload_bulk_import", to improve performance of future operations.
ImpalaConnector.databaseLock.lock();
String mergeErrorMsg = fileUtils.mergeParquetFiles("payload_bulk_import", "", null);
if ( mergeErrorMsg != null ) {
ImpalaConnector.databaseLock.unlock();
bulkImportReport.addEvent(mergeErrorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
FullTextsController.bulkImportDirs.remove(bulkImportDirName);
return false;
}
ImpalaConnector.databaseLock.unlock();
String successMsg = "Finished the bulk-import procedure for '" + provenance + "' and bulkImportDir: " + bulkImportDirName;
logger.info(successMsg);
bulkImportReport.addEvent(successMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
FullTextsController.bulkImportDirs.remove(bulkImportDirName);
return true;
}
private int processBulkImportedFilesSegment(BulkImportReport bulkImportReport, int segmentCounter, List<String> fileLocationsSegment, String bulkImportDirName, String localParquetDir, String currentBulkImportHdfsDir,
String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis, boolean shouldDeleteFilesOnFinish)
{
// Inside this thread, process a segment of the files.
String bulkImportReportLocation = bulkImportReport.getReportLocation();
int numOfFilesInSegment = fileLocationsSegment.size();
String msg = "Going to import " + numOfFilesInSegment + " files for segment-" + segmentCounter + " , of bulkImport procedure '" + provenance + "' | dir: '" + bulkImportDirName + "'..";
logger.debug(msg);
bulkImportReport.addEvent(msg);
List<GenericData.Record> payloadRecords = new ArrayList<>(numOfFilesInSegment);
// Use a HashSet for the failed files, in order to not remove them in the end.
HashSet<String> failedFiles = new HashSet<>();
int counter = 0;
// Upload files to S3 and collect payloadRecords.
for ( String fileLocation: fileLocationsSegment ) {
GenericData.Record record = processBulkImportedFile(fileLocation, provenance, bulkImportSource, timeMillis);
if ( record != null )
payloadRecords.add(record);
else {
bulkImportReport.addEvent("An error caused the file: '" + fileLocation + "' to not be imported!");
failedFiles.add(fileLocation);
}
if ( ((++counter) % 100) == 0 ) { // Every 100 files, report the status.
bulkImportReport.addEvent("Progress for segment-" + segmentCounter + " : " + payloadRecords.size() + " files have been imported and " + failedFiles.size() + " have failed, out of " + numOfFilesInSegment + " files.");
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
}
}
int numOfPayloadRecords = payloadRecords.size();
if ( numOfPayloadRecords == 0 ) {
String errorMsg = "No payload-records were generated for any of the files inside the bulkImportDir: " + bulkImportDirName;
logger.warn(errorMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
// None of the files of this segment will be deleted, in any case.
return numOfFilesInSegment;
} else if ( numOfPayloadRecords != numOfFilesInSegment ) {
// Write this important note here, in order to certainly be in the report, even if a parquet-file failure happens and the method exists early.
String errorMsg = failedFiles.size() + " out of " + numOfFilesInSegment + " files failed to be imported, for segment-" + segmentCounter + "!";
logger.warn(errorMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
}
// Construct the parquet file, upload it to HDFS and load them it in the "payload_bulk_import" table.
String parquetFileName = "payloads_" + segmentCounter + ".parquet";
String fullLocalParquetFilePath = localParquetDir + parquetFileName;
logger.trace("Going to write " + numOfPayloadRecords + " payload-records to the parquet file: " + fullLocalParquetFilePath); // DEBUG!
if ( ! parquetFileUtils.writeToParquet(payloadRecords, ParquetFileUtils.payloadsSchema, fullLocalParquetFilePath) ) {
bulkImportReport.addEvent("Could not write the payload-records to the parquet-file: '" + parquetFileName + "'!");
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
// None of the files of this segment will be deleted, in any case.
return numOfFilesInSegment;
}
//logger.trace("Parquet file '" + parquetFileName + "' was created and filled."); // DEBUG!
// Upload and insert the data to the "payload" Impala table. (no database-locking is required)
String errorMsg = parquetFileUtils.uploadParquetFileToHDFS(fullLocalParquetFilePath, parquetFileName, currentBulkImportHdfsDir);
if ( errorMsg != null ) { // The possible error-message returned, is already logged by the Controller.
bulkImportReport.addEvent("Could not upload the parquet-file '" + parquetFileName + "' to HDFS!");
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
// None of the files of this segment will be deleted, in any case.
return numOfFilesInSegment;
}
ImpalaConnector.databaseLock.lock();
if ( !parquetFileUtils.loadParquetDataIntoTable((currentBulkImportHdfsDir + parquetFileName), "payload_bulk_import") ) {
ImpalaConnector.databaseLock.unlock();
bulkImportReport.addEvent("Could not load the payload-records to the database!");
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport());
// None of the files of this segment will be deleted, in any case.
return numOfFilesInSegment;
}
ImpalaConnector.databaseLock.unlock();
String segmentSuccessMsg = "Finished importing " + numOfPayloadRecords + " files, out of " + numOfFilesInSegment + " , for segment-" + segmentCounter + ".";
logger.info(segmentSuccessMsg);
bulkImportReport.addEvent(segmentSuccessMsg);
if ( shouldDeleteFilesOnFinish ) {
segmentSuccessMsg = "As the user requested, the successfully imported files of '" + provenance + "' procedure, of bulk-import segment-" + segmentCounter + ", from directory '" + bulkImportDirName + "', will be deleted.";
logger.info(segmentSuccessMsg);
bulkImportReport.addEvent(segmentSuccessMsg);
// Delete all files except the ones in the "failedHashSet"
for ( String fileLocation : fileLocationsSegment ) {
if ( !failedFiles.contains(fileLocation) )
if ( !fileUtils.deleteFile(fileLocation) )
bulkImportReport.addEvent("The file '" + fileLocation + "' could not be deleted! Please make sure you have provided the WRITE-permission.");
}
}
return (numOfFilesInSegment - numOfPayloadRecords); // Return the numOfFailedFiles.
}
private GenericData.Record processBulkImportedFile(String fileLocation, String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis)
{
File fullTextFile = new File(fileLocation);
DocFileData docFileData = new DocFileData(fullTextFile, null, null, null);
docFileData.calculateAndSetHashAndSize();
// Check if this file is already found by crawling. Even though we started excluding this datasource from crawling, many full-texts have already been downloaded.
// Also, it may be the case that this file was downloaded by another datasource.
String fileHash = docFileData.getHash();
if ( fileHash == null )
return null; // No check of past found full-text can be made nor the S3-fileName can be created.
FileLocationData fileLocationData;
try {
fileLocationData = new FileLocationData(fileLocation);
} catch (RuntimeException re) {
logger.error(re.getMessage());
return null;
}
String datasourceId = bulkImportSource.getDatasourceID();
String datasourcePrefix = bulkImportSource.getDatasourcePrefix();
String fileNameID = fileLocationData.getFileNameID();
String actualUrl = (bulkImportSource.getPdfUrlPrefix() + fileNameID); // This is the urls with the ArvixId.
String originalUrl = actualUrl; // We have the full-text files from bulk-import, so let's assume the original-url is also the full-text-link.
final String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload where `hash` = ? limit 1";
final int[] hashArgType = new int[] {Types.VARCHAR};
String alreadyFoundFileLocation = null;
ImpalaConnector.databaseLock.lock();
try {
alreadyFoundFileLocation = jdbcTemplate.queryForObject(getFileLocationForHashQuery, new Object[]{fileHash}, hashArgType, String.class);
} catch (EmptyResultDataAccessException erdae) {
// No fileLocation is found, it's ok. It will be null by default.
} catch (Exception e) {
logger.error("Error when executing or acquiring data from the the 'getFileLocationForHashQuery'!\n", e);
// Continue with bulk-importing the file and uploading it to S3.
} finally {
ImpalaConnector.databaseLock.unlock();
}
String idMd5hash = getMD5hash(fileNameID.toLowerCase());
if ( idMd5hash == null )
return null;
// openaire id = <datasourcePrefix> + "::" + <md5(lowercase(arxivId))>
String openAireId = (datasourcePrefix + "::" + idMd5hash);
//logger.trace("openAireId: " + openAireId);
String s3Url = null;
if ( alreadyFoundFileLocation != null ) // If the full-text of this record is already-found and uploaded.
{
// This full-text was found to already be in the database.
// If it has the same datasourceID, then it likely was crawled before from an ID belonging to this datasource.
// If also has the same ID, then the exact same record from that datasource was retrieved previously.
// Else, the file was downloaded by another record of this datasource.
// ELse if the datasourceID is not the same, then the same file was retrieved from another datasource.
// The above analysis is educational, it does not need to take place and is not currently used.
s3Url = alreadyFoundFileLocation;
} else {
try {
s3Url = fileUtils.constructFileNameAndUploadToS3(fileLocationData.getFileDir(), fileLocationData.getFileName(), openAireId, fileLocationData.getDotFileExtension(), datasourceId, fileHash); // This throws Exception, in case the uploading failed.
if ( s3Url == null )
return null; // In case the 'datasourceID' or 'hash' is null. Which should never happen here, since both of them are checked before the execution reaches here.
} catch (Exception e) {
logger.error("Could not upload the file '" + fileLocationData.getFileName() + "' to the S3 ObjectStore!", e);
return null;
}
}
GenericData.Record record = new GenericData.Record(ParquetFileUtils.payloadsSchema);
record.put("id", openAireId);
record.put("original_url", originalUrl);
record.put("actual_url", actualUrl);
record.put("date", timeMillis);
record.put("mimetype", bulkImportSource.getMimeType());
Long size = docFileData.getSize();
record.put("size", ((size != null) ? String.valueOf(size) : null));
record.put("hash", fileHash); // This is already checked and will not be null here.
record.put("location", s3Url);
record.put("provenance", provenance);
return record;
}
public List<String> getFileLocationsInsideDir(String directory)
{
List<String> fileLocations = null;
try ( Stream<Path> walkStream = Files.find(Paths.get(directory), Integer.MAX_VALUE, (filePath, fileAttr) -> fileAttr.isRegularFile()) )
// In case we ever include other type-of-Files inside the same directory, we need to add this filter: "&& !filePath.toString().endsWith("name.ext")"
{
fileLocations = walkStream.map(Path::toString).collect(Collectors.toList());
} catch (Exception e) {
String errorMsg = "Could not retrieve the files from directory: '" + directory + "'!";
logger.error(errorMsg, e);
return null;
}
return fileLocations;
}
public String getMD5hash(String string)
{
String md5 = null;
try {
MessageDigest md5MD = MessageDigest.getInstance("MD5"); // New instance for any new request. Otherwise, we need to synchronize the use of that object among multiple threads.
md5MD.update(string.getBytes());
md5 = DatatypeConverter.printHexBinary(md5MD.digest()).toLowerCase();
} catch (Exception e) {
logger.error("Error when getting the MD5-hash for: " + string, e);
return null;
}
return md5;
}
}

View File

@ -1,5 +1,6 @@
package eu.openaire.urls_controller.services;
import eu.openaire.urls_controller.components.BulkImport;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.models.*;
import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse;
@ -23,6 +24,7 @@ import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
@ -55,19 +57,30 @@ public class UrlsServiceImpl implements UrlsService {
public static final ExecutorService insertsExecutor = Executors.newFixedThreadPool(6);
public UrlsServiceImpl(@Value("${services.pdfaggregation.controller.maxAttemptsPerRecord}") int maxAttemptsPerRecord,
@Value("${services.pdfaggregation.controller.datasources.excludedIDs}") List<String> excludedIDs) {
public UrlsServiceImpl(@Value("${services.pdfaggregation.controller.maxAttemptsPerRecord}") int maxAttemptsPerRecord, BulkImport bulkImport)
{
maxAttemptsPerRecordAtomic = new AtomicInteger(maxAttemptsPerRecord);
// The "excludedIDs" will not be null, as it will be defined inside the "application.yml" file.
// In case no IDs for excluded Datasources are given, then the "excludedIDs" list will just be empty.
int exclusionListSize = excludedIDs.size();
if ( exclusionListSize == 0 )
HashMap<String, BulkImport.BulkImportSource> bulkImportSources = new HashMap<>(bulkImport.getBulkImportSources());
// The "bulkImportSources" will not be null, as it will be defined inside the "application.yml" file.
// In case no bulkImport Datasources are given, then the "bulkImportSources" list will just be empty.
if ( bulkImportSources.isEmpty() )
return; // So the "excludedDatasourceIDsStringList" -code should be placed last in this Constructor-method.
logger.trace("BulkImportSources:\n" + bulkImportSources);
List<String> excludedIDs = new ArrayList<>();
for ( BulkImport.BulkImportSource source : bulkImportSources.values() ) {
String datasourceID = source.getDatasourceID();
if ( (datasourceID == null) || datasourceID.isEmpty() )
throw new RuntimeException("One of the bulk-imported datasourceIDs was not found! | source: " + source);
excludedIDs.add(datasourceID);
}
int exclusionListSize = excludedIDs.size(); // This list will not be empty.
// Prepare the "excludedDatasourceIDsStringList" to be used inside the "findAssignmentsQuery". Create the following string-pattern:
// ("ID_1", "ID_2", ...)
final StringBuilder sb = new StringBuilder((exclusionListSize * 46) + (exclusionListSize -1) +2 );
sb.append("(");
for ( int i=0; i < exclusionListSize; ++i ) {
@ -78,7 +91,7 @@ public class UrlsServiceImpl implements UrlsService {
sb.append(")");
excludedDatasourceIDsStringList = sb.toString();
logger.info("The following datasources will be excluded from crawling: " + excludedDatasourceIDsStringList);
logger.info("The following bulkImport-datasources will be excluded from crawling: " + excludedDatasourceIDsStringList);
}
@ -302,7 +315,7 @@ public class UrlsServiceImpl implements UrlsService {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg);
} finally {
logger.debug("Deleting parquet directory: " + currentParquetPath);
FileUtils.deleteDirectory(new File(currentParquetPath));
fileUtils.deleteDirectory(new File(currentParquetPath));
}
logger.debug("Going to merge the parquet files for the tables which were altered.");

View File

@ -6,6 +6,7 @@ import com.google.common.collect.SetMultimap;
import eu.openaire.urls_controller.configuration.ImpalaConnector;
import eu.openaire.urls_controller.models.Payload;
import eu.openaire.urls_controller.models.UrlReport;
import org.apache.commons.io.FileDeleteStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -31,6 +32,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -109,7 +112,7 @@ public class FileUtils {
// The following regex might be usefull in a future scenario. It extracts the "plain-filename" and "file-ID" and the "file-extension".
// Possible full-filenames are: "path1/path2/ID.pdf", "ID2.pdf", "path1/path2/ID(12).pdf", "ID2(25).pdf"
private static final Pattern FILENAME_ID_EXTENSION = Pattern.compile("(?:[^.()]+/)?((([^./()]+)[^./]*)(\\.[\\w]{2,10}))$");
public static final Pattern FILENAME_ID_EXTENSION = Pattern.compile("(?:([^.()]+)/)?((([^/()]+)[^./]*)(\\.[\\w]{2,10}))$");
private static final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames).
@ -183,7 +186,7 @@ public class FileUtils {
logger.error("Failed to match the \"fileLocation\": \"" + fileLocation + "\" of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILENAME_ID_EXTENSION);
return null;
}
String fileNameWithExtension = matcher.group(1);
String fileNameWithExtension = matcher.group(2);
if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) {
logger.error("Failed to extract the \"fileNameWithExtension\" from \"fileLocation\": \"" + fileLocation + "\", of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILENAME_ID_EXTENSION);
return null;
@ -382,13 +385,13 @@ public class FileUtils {
logger.error("Failed to match the \"" + fileName + "\" with the regex: " + FILENAME_ID_EXTENSION);
continue;
}
// The "matcher.group(2)" returns the "filenameWithoutExtension", which is currently not used.
String fileNameID = matcher.group(3);
// The "matcher.group(3)" returns the "filenameWithoutExtension", which is currently not used.
String fileNameID = matcher.group(4);
if ( (fileNameID == null) || fileNameID.isEmpty() ) {
logger.error("Failed to extract the \"fileNameID\" from \"" + fileName + "\".");
continue;
}
String dotFileExtension = matcher.group(4);
String dotFileExtension = matcher.group(5);
if ( (dotFileExtension == null) || dotFileExtension.isEmpty() ) {
logger.error("Failed to extract the \"dotFileExtension\" from \"" + fileName + "\".");
continue;
@ -420,23 +423,10 @@ public class FileUtils {
continue;
}
if ( datasourceId == null ) {
logger.error("The retrieved \"datasourceId\" was \"null\" for file: " + fileName);
String s3Url = constructFileNameAndUploadToS3(targetDirectory, fileName, fileNameID, dotFileExtension, datasourceId, hash);
if ( s3Url == null )
continue;
}
if ( hash == null ) {
logger.error("The retrieved \"hash\" was \"null\" for file: " + fileName);
continue;
}
String fileFullPath = targetDirectory + fileName; // The fullPath to the local file.
// Use the "fileNameID" and not the "filenameWithoutExtension", as we want to avoid keeping the possible "parenthesis" with the increasing number (about the duplication of ID-fileName).
// Now we append the file-hash, so it is guaranteed that the filename will be unique.
fileName = datasourceId + "/" + fileNameID + "::" + hash + dotFileExtension; // This is the fileName to be used in the objectStore, not of the local file!
String s3Url = s3ObjectStore.uploadToS3(fileName, fileFullPath);
setFullTextForMultiplePayloads(fileRelatedPayloads, s3Url);
//numUploadedFiles ++;
} catch (Exception e) {
@ -450,6 +440,28 @@ public class FileUtils {
}
public String constructFileNameAndUploadToS3(String fileDir, String fileName, String openAireID, String dotFileExtension, String datasourceId, String hash) throws Exception
{
if ( datasourceId == null ) {
logger.error("The retrieved \"datasourceId\" was \"null\" for file: " + fileName);
return null;
}
if ( hash == null ) {
logger.error("The retrieved \"hash\" was \"null\" for file: " + fileName);
return null;
}
String fileFullPath = fileDir + File.separator + fileName; // The fullPath to the local file.
// Use the "fileNameID" and not the "filenameWithoutExtension", as we want to avoid keeping the possible "parenthesis" with the increasing number (about the duplication of ID-fileName).
// Now we append the file-hash, so it is guaranteed that the filename will be unique.
fileName = datasourceId + "/" + openAireID + "::" + hash + dotFileExtension; // This is the fileName to be used in the objectStore, not of the local file!
return s3ObjectStore.uploadToS3(fileName, fileFullPath);
}
public String getMessageFromResponseBody(HttpURLConnection conn, boolean isError) {
final StringBuilder msgStrB = new StringBuilder(500);
try ( BufferedReader br = new BufferedReader(new InputStreamReader((isError ? conn.getErrorStream() : conn.getInputStream()))) ) { // Try-with-resources
@ -561,11 +573,11 @@ public class FileUtils {
}
public static boolean deleteDirectory(File directory)
public boolean deleteDirectory(File directory)
{
try {
org.apache.commons.io.FileUtils.deleteDirectory(directory);
return true;
return true; // Will return "true" also in case this directory does not exist. So, no Exception will be thrown for that case.
} catch (IOException e) {
logger.error("The following directory could not be deleted: " + directory.getName(), e);
return false;
@ -575,4 +587,35 @@ public class FileUtils {
}
}
public boolean deleteFile(String fileFullPathString)
{
try {
FileDeleteStrategy.FORCE.delete(new File(fileFullPathString));
} catch (IOException e) {
logger.error("Error when deleting the file: " + fileFullPathString);
return false;
}
return true;
}
Lock fileWriteLock = new ReentrantLock(true);
public String writeToFile(String fileFullPath, String stringToWrite)
{
fileWriteLock.lock();
try ( BufferedWriter bufferedWriter = new BufferedWriter(Files.newBufferedWriter(Paths.get(fileFullPath)), FileUtils.tenMb) )
{
bufferedWriter.write(stringToWrite); // This will overwrite the file. If the new string is smaller, then it does not matter.
} catch (Exception e) {
String errorMsg = "Failed to create or acquire the file \"" + fileFullPath + "\"!";
logger.error(errorMsg, e);
return errorMsg;
} finally {
fileWriteLock.unlock();
}
return null;
}
}

View File

@ -1,10 +1,33 @@
package eu.openaire.urls_controller.util;
import java.text.SimpleDateFormat;
import java.util.Date;
public class GenericUtils {
private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd 'at' HH:mm:ss.SSS z");
public static String getReadableCurrentTimeAndZone() {
return (simpleDateFormat.format(new Date(System.currentTimeMillis())));
}
public static int getRandomNumber(int min, int max) {
return (int)(Math.random() * (max - min +1) + min);
}
public static String getSelectiveStackTrace(Throwable thr, String initialMessage, int numOfLines)
{
StackTraceElement[] stels = thr.getStackTrace();
StringBuilder sb = new StringBuilder(numOfLines *100);
if ( initialMessage != null )
sb.append(initialMessage).append(" Stacktrace:").append("\n"); // This StringBuilder is thread-safe as a local-variable.
for ( int i = 0; (i < stels.length) && (i <= numOfLines); ++i ) {
sb.append(stels[i]);
if (i < numOfLines) sb.append("\n");
}
return sb.toString();
}
}

View File

@ -59,18 +59,18 @@ public class ParquetFileUtils {
private JdbcTemplate jdbcTemplate;
@Value("${hdfs.baseUrl}")
private String webHDFSBaseUrl;
public String webHDFSBaseUrl;
private final String hdfsHttpAuthString;
private final String hdfsUserName;
private final String payloadSchemaFilePath = "schemas/payload.avsc";
public static final String payloadSchemaFilePath = "schemas/payload.avsc";
private final String attemptSchemaFilePath = "schemas/attempt.avsc";
private static final String attemptSchemaFilePath = "schemas/attempt.avsc";
public Schema payloadsSchema;
public static Schema payloadsSchema = null;
public Schema attemptsSchema;
public final String parquetHDFSDirectoryPathAttempts;
@ -79,12 +79,17 @@ public class ParquetFileUtils {
public final String parquetHDFSDirectoryPathPayloadsAggregated;
public final String parquetHDFSDirectoryPathPayloadsBulkImport;
public String mkDirsAndParams;
//public String setPermAndParams;
public ParquetFileUtils(@Value("${hdfs.baseUrl}") String webHDFSBaseUrl,
@Value("${hdfs.httpAuth}") String hdfsHttpAuthString, @Value("${hdfs.userName}") String hdfsUserName, @Value("${hdfs.password}") String hdfsPassword, @Value("${services.pdfaggregation.controller.parquetLocalDirectoryPath}") String parquetBaseDirectoryPath,
@Value("${hdfs.parquetRemoteBaseDirectoryPath}") String hdfsParquetBaseDir,
@Value("${services.pdfaggregation.controller.isTestEnvironment}") boolean isTestEnvironment, FileUtils fileUtils) throws IOException
{
this.mkDirsAndParams = "?op=MKDIRS&permission=777&user.name=" + hdfsUserName;
if ( webHDFSBaseUrl.endsWith("/") ) // We don't wand an ending slash in the url (as it causes problems when the file=path is added).
this.webHDFSBaseUrl = webHDFSBaseUrl.substring(0, (webHDFSBaseUrl.length() -1));
else
@ -128,11 +133,13 @@ public class ParquetFileUtils {
this.parquetHDFSDirectoryPathPayloadsAggregated = hdfsParquetBaseDir + "payloads_aggregated/";
this.parquetHDFSDirectoryPathPayloadsBulkImport = hdfsParquetBaseDir + "payloads_bulk_import/";
this.fileUtils = fileUtils;
this.mkDirsAndParams = "?op=MKDIRS&permission=777&user.name=" + hdfsUserName; // All permissions for user, group and others must be set, in order for this service' user to have access to the hdfs directory.
//this.setPermAndParams = "?op=SETPERMISSION&permission=777&user.name=" + hdfsUserName;
createRemoteParquetDirectories(hdfsParquetBaseDir);
}
public Schema parseSchema(String schemaResourcePath) {
public static Schema parseSchema(String schemaResourcePath) {
try {
return (new Schema.Parser()).parse(new ClassPathResource(schemaResourcePath).getInputStream());
} catch (Throwable e) {
@ -464,8 +471,6 @@ public class ParquetFileUtils {
// The WebHDFS uses the "mkdirs" operations which creates all the non-existent directories in the specified path.
// So with one request we will create the "parquet_uploads/" and the "parquet_uploads/attempts/" and with the seconds request, the "parquet_uploads/payloads/" directory.
String mkDirsParams = "?op=MKDIRS&permission=777&user.name=" + hdfsUserName;
logger.info("Going to check if the remote parquet directories exist.");
String listMainDirectoryUrl = webHDFSBaseUrl + parquetBaseRemoteDirectory + "?op=LISTSTATUS&user.name=" + hdfsUserName;
@ -492,9 +497,9 @@ public class ParquetFileUtils {
if ( statusCode == 404 ) {
logger.info("The directory \"" + parquetBaseRemoteDirectory + "\" does not exist. We will create it, along with its sub-directories.");
attemptCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsParams);
payloadAggregatedCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsParams);
payloadBulkImportCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsParams);
attemptCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsAndParams);
payloadAggregatedCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsAndParams);
payloadBulkImportCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsAndParams);
}
else {
// Check the json-response, to see if all the subdirectories exist.
@ -549,19 +554,19 @@ public class ParquetFileUtils {
// For each missing subdirectories, run the mkDirs-request.
if ( !foundAttemptsDir ) {
logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathAttempts + "\" does not exist! Going to create it.");
attemptCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsParams);
attemptCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsAndParams);
} else
logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathAttempts + "\" exists.");
if ( !foundPayloadsAggregatedDir ) {
logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsAggregated + "\" does not exist! Going to create it.");
payloadAggregatedCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsParams);
payloadAggregatedCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsAndParams);
} else
logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsAggregated + "\" exists.");
if ( !foundPayloadsBulkImportDir ) {
logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsBulkImport + "\" does not exist! Going to create it.");
payloadBulkImportCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsParams);
payloadBulkImportCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsAndParams);
} else
logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsBulkImport + "\" exists.");
}
@ -575,10 +580,10 @@ public class ParquetFileUtils {
}
public boolean createHDFSDirectory(String createDirectoryUrl)
public boolean applyHDFOperation(String hdfsOperationUrl)
{
try {
URL url = new URL(createDirectoryUrl);
URL url = new URL(hdfsOperationUrl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("PUT");
conn.setRequestProperty("Authorization", hdfsHttpAuthString);
@ -586,15 +591,15 @@ public class ParquetFileUtils {
conn.connect();
int statusCode = conn.getResponseCode();
if ( statusCode == -1 ) {
logger.error("Problem when getting the \"status-code\" for url: " + createDirectoryUrl);
logger.error("Problem when getting the \"status-code\" for url: " + hdfsOperationUrl);
return false;
}
else if ( statusCode != 200 ) {
String errorMsg = "We expected a \"200 OK\" response, but got: \"" + statusCode + "\" instead, for url: " + createDirectoryUrl;
String errorMsg = "We expected a \"200 OK\" response, but got: \"" + statusCode + "\" instead, for url: " + hdfsOperationUrl;
logger.error(errorMsg + "\n\n" + fileUtils.getMessageFromResponseBody(conn, true));
return false;
}
logger.trace("Creation was successful for hdfs-dir-url: " + createDirectoryUrl + "\n" + fileUtils.getMessageFromResponseBody(conn, false));
logger.trace("The Operation was successful for hdfs-op-url: " + hdfsOperationUrl + "\n" + fileUtils.getMessageFromResponseBody(conn, false));
} catch (Exception e) {
logger.error("", e);
return false;

View File

@ -29,12 +29,22 @@ services:
shouldEmptyBucket: false
shouldShowAllS3Buckets: true
datasources: # Provide a list of datasource IDs, which should be excluded from crawling. Their content is either bulk-imported or is known to be restricted.
excludedIDs: > # Use comma-seperated values (one in each line for best readability), as Spring has is currently incapable of parsing Dropwizard-styled lists (at least without additional config).
opendoar____::6f4922f45568161a8cdf4ad2299f6d23
# Since we use a multi-line value from our list, we add the ID-explanations here (otherwise comments will be part of values):
# First-id: arXiv.org e-Print Archive
bulkImport:
baseBulkImportLocation: /mnt/bulkImport/
bulkImportReportLocation: /bulkImportReports/
bulkImportSources: # These sources are accepted for bulk-import requests and are excluded from crawling.
arxivImport:
datasourceID: opendoar____::6f4922f45568161a8cdf4ad2299f6d23
datasourcePrefix: arXiv_______ # For PID-providing datasource, we use the PID-prefix here. (so not the datasource-prefix: "od________18")
pdfUrlPrefix: https://arxiv.org/pdf/
mimeType: application/pdf
# otherImport:
# datasourceID: othersource__::0123
# datasourcePrefix: other_______
# pdfUrlPrefix: https://example.org/pdf/
# mimeType: application/pdf
spring:
application: