From b6e8cd18897d58d26cce0bc826bf51f068ce58e2 Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Thu, 11 May 2023 03:07:55 +0300 Subject: [PATCH] New feature: BulkImport full-text files from compatible datasources. --- build.gradle | 4 + docker-compose.yml | 6 + .../openaire/urls_controller/Application.java | 3 + .../components/BulkImport.java | 111 +++++ .../components/ScheduledTasks.java | 50 +- .../controllers/FullTextsController.java | 204 ++++++++ .../models/BulkImportReport.java | 105 +++++ .../urls_controller/models/DocFileData.java | 115 +++++ .../models/FileLocationData.java | 89 ++++ .../services/FullTextsService.java | 18 + .../services/FullTextsServiceImpl.java | 443 ++++++++++++++++++ .../services/UrlsServiceImpl.java | 31 +- .../urls_controller/util/FileUtils.java | 87 +++- .../urls_controller/util/GenericUtils.java | 23 + .../util/ParquetFileUtils.java | 41 +- src/main/resources/application.yml | 20 +- 16 files changed, 1289 insertions(+), 61 deletions(-) create mode 100644 src/main/java/eu/openaire/urls_controller/components/BulkImport.java create mode 100644 src/main/java/eu/openaire/urls_controller/controllers/FullTextsController.java create mode 100644 src/main/java/eu/openaire/urls_controller/models/BulkImportReport.java create mode 100644 src/main/java/eu/openaire/urls_controller/models/DocFileData.java create mode 100644 src/main/java/eu/openaire/urls_controller/models/FileLocationData.java create mode 100644 src/main/java/eu/openaire/urls_controller/services/FullTextsService.java create mode 100644 src/main/java/eu/openaire/urls_controller/services/FullTextsServiceImpl.java diff --git a/build.gradle b/build.gradle index 65a1d8a..80a5376 100644 --- a/build.gradle +++ b/build.gradle @@ -112,6 +112,10 @@ dependencies { // https://mvnrepository.com/artifact/org.json/json implementation 'org.json:json:20230227' + // https://mvnrepository.com/artifact/com.google.code.gson/gson + implementation 'com.google.code.gson:gson:2.10.1' + + // https://mvnrepository.com/artifact/io.micrometer/micrometer-registry-prometheus runtimeOnly 'io.micrometer:micrometer-registry-prometheus:1.10.6' diff --git a/docker-compose.yml b/docker-compose.yml index 73b2d97..92696a6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -17,6 +17,12 @@ services: - type: bind source: $HOME/logs target: /logs + - type: bind + source: /mnt/bulkImport + target: /mnt/bulkImport + - type: bind + source: $HOME/bulkImportReports + target: /bulkImportReports build: dockerfile: ./Dockerfile context: . diff --git a/src/main/java/eu/openaire/urls_controller/Application.java b/src/main/java/eu/openaire/urls_controller/Application.java index 4f4f92a..59a029f 100644 --- a/src/main/java/eu/openaire/urls_controller/Application.java +++ b/src/main/java/eu/openaire/urls_controller/Application.java @@ -1,5 +1,6 @@ package eu.openaire.urls_controller; +import eu.openaire.urls_controller.services.FullTextsServiceImpl; import eu.openaire.urls_controller.services.UrlsServiceImpl; import eu.openaire.urls_controller.util.FileUtils; import eu.openaire.urls_controller.util.UriBuilder; @@ -51,6 +52,8 @@ public class Application { shutdownThreads(UrlsServiceImpl.insertsExecutor); shutdownThreads(FileUtils.hashMatchingExecutor); + shutdownThreads(FullTextsServiceImpl.backgroundExecutor); + shutdownThreads(FullTextsServiceImpl.bulkImportExecutor); logger.info("Exiting.."); } diff --git a/src/main/java/eu/openaire/urls_controller/components/BulkImport.java b/src/main/java/eu/openaire/urls_controller/components/BulkImport.java new file mode 100644 index 0000000..7b4f4c2 --- /dev/null +++ b/src/main/java/eu/openaire/urls_controller/components/BulkImport.java @@ -0,0 +1,111 @@ +package eu.openaire.urls_controller.components; + + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +import java.util.Map; + +@Component +@ConfigurationProperties(prefix = "bulkimport") +public class BulkImport { + + private String baseBulkImportLocation; + + private String bulkImportReportLocation; + + private Map bulkImportSources; + + public BulkImport() { + } + + public String getBaseBulkImportLocation() { + return baseBulkImportLocation; + } + + public void setBaseBulkImportLocation(String baseBulkImportLocation) { + this.baseBulkImportLocation = baseBulkImportLocation; + } + + public String getBulkImportReportLocation() { + return bulkImportReportLocation; + } + + public void setBulkImportReportLocation(String bulkImportReportLocation) { + this.bulkImportReportLocation = bulkImportReportLocation; + } + + public Map getBulkImportSources() { + return bulkImportSources; + } + + public void setBulkImportSources(Map bulkImportSources) { + this.bulkImportSources = bulkImportSources; + } + + @Override + public String toString() { + return "BulkImport{" + + "baseBulkImportLocation='" + baseBulkImportLocation + '\'' + + ", bulkImportReportLocation='" + bulkImportReportLocation + '\'' + + ", bulkImportSources=" + bulkImportSources + + '}'; + } + + + public static class BulkImportSource { + String datasourceID; + String datasourcePrefix; + String pdfUrlPrefix; + String mimeType; + + + public BulkImportSource() { + } + + + public String getDatasourceID() { + return datasourceID; + } + + public void setDatasourceID(String datasourceID) { + this.datasourceID = datasourceID; + } + + public String getDatasourcePrefix() { + return datasourcePrefix; + } + + public void setDatasourcePrefix(String datasourcePrefix) { + this.datasourcePrefix = datasourcePrefix; + } + + public String getPdfUrlPrefix() { + return pdfUrlPrefix; + } + + public void setPdfUrlPrefix(String pdfUrlPrefix) { + this.pdfUrlPrefix = pdfUrlPrefix; + } + + public String getMimeType() { + return mimeType; + } + + public void setMimeType(String mimeType) { + this.mimeType = mimeType; + } + + + @Override + public String toString() { + return "BulkImportSource{" + + "datasourceID='" + datasourceID + '\'' + + ", datasourcePrefix='" + datasourcePrefix + '\'' + + ", pdfUrlPrefix='" + pdfUrlPrefix + '\'' + + ", mimeType='" + mimeType + '\'' + + '}'; + } + } + +} diff --git a/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java b/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java index e1686fd..b3bd0bb 100644 --- a/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java +++ b/src/main/java/eu/openaire/urls_controller/components/ScheduledTasks.java @@ -1,12 +1,18 @@ package eu.openaire.urls_controller.components; +import eu.openaire.urls_controller.services.FullTextsServiceImpl; +import eu.openaire.urls_controller.util.GenericUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -//import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import java.text.SimpleDateFormat; -import java.util.Date; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; @Component @@ -14,10 +20,40 @@ public class ScheduledTasks { private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); - private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss"); - //@Scheduled(fixedRate = 600_000) // Run every 10 mins: 600_000 - public void reportCurrentTime() { - logger.info("Server is live! Time is now {}", dateFormat.format(new Date())); + @Scheduled(fixedDelay = 3_600_000) // Execute this method 1 hour after the last execution, in order for some tasks to have been gathered. + //@Scheduled(fixedDelay = 20_000) // Just for testing (every 20 secs). + public void executeBackgroundTasks() + { + List> tempList = new ArrayList<>(FullTextsServiceImpl.backgroundCallableTasks); // Copy the list in order to know what was executed and delete only that data later. + // So the items added while this execution happens, will be left in the list, while the other will be deleted. + int numOfTasks = tempList.size(); // Since the temp-list is a deep-copy and not a reference, new tasks that are added will not be executed. + if ( numOfTasks == 0 ) + return; + + logger.debug(numOfTasks + " background tasks were found inside the \"backgroundCallableTasks\" list and are about to be executed."); + // Execute the tasks and wait for them to finish. + try { + List> futures = FullTextsServiceImpl.backgroundExecutor.invokeAll(tempList); + int sizeOfFutures = futures.size(); + for ( int i = 0; i < sizeOfFutures; ++i ) { + try { + Boolean value = futures.get(i).get(); // Get and see if an exception is thrown.. + // Add check for the value, if wanted.. (we don't care at the moment) + } catch (ExecutionException ee) { + String stackTraceMessage = GenericUtils.getSelectiveStackTrace(ee, null, 15); // These can be serious errors like an "out of memory exception" (Java HEAP). + logger.error("Task_" + (i+1) + " failed with: " + ee.getMessage() + "\n" + stackTraceMessage); + } catch (CancellationException ce) { + logger.error("Task_" + (i+1) + " was cancelled: " + ce.getMessage()); + } catch (IndexOutOfBoundsException ioobe) { + logger.error("IOOBE for task_" + i + " in the futures-list! " + ioobe.getMessage()); + } finally { + FullTextsServiceImpl.backgroundCallableTasks.remove(tempList.get(i)); // Remove this object from the global list. Do not use indexes, since they will be different after each deletion and addition. + } + } + } catch (Exception e) { + logger.error("", e); + } } + } diff --git a/src/main/java/eu/openaire/urls_controller/controllers/FullTextsController.java b/src/main/java/eu/openaire/urls_controller/controllers/FullTextsController.java new file mode 100644 index 0000000..6522bd2 --- /dev/null +++ b/src/main/java/eu/openaire/urls_controller/controllers/FullTextsController.java @@ -0,0 +1,204 @@ +package eu.openaire.urls_controller.controllers; + +import eu.openaire.urls_controller.components.BulkImport; +import eu.openaire.urls_controller.models.BulkImportReport; +import eu.openaire.urls_controller.services.FullTextsService; +import eu.openaire.urls_controller.services.FullTextsServiceImpl; +import eu.openaire.urls_controller.util.FileUtils; +import eu.openaire.urls_controller.util.GenericUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStreamReader; +import java.nio.file.*; +import java.util.Collections; +import java.util.HashMap; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@RestController +@RequestMapping("") +public class FullTextsController { + + private static final Logger logger = LoggerFactory.getLogger(FullTextsController.class); + + @Autowired + private FileUtils fileUtils; + + private final FullTextsService fullTextsService; + + private final String baseBulkImportLocation; + + private final String bulkImportReportLocation; + + private final HashMap bulkImportSources; + + public static final Set bulkImportDirs = Collections.newSetFromMap(new ConcurrentHashMap()); + + + + public FullTextsController(FullTextsService fullTextsService, BulkImport bulkImport) + { + String bulkImportReportLocation1; + this.baseBulkImportLocation = bulkImport.getBaseBulkImportLocation(); + + this.bulkImportSources = new HashMap<>(bulkImport.getBulkImportSources()); + + bulkImportReportLocation1 = bulkImport.getBulkImportReportLocation(); + if ( !bulkImportReportLocation1.endsWith("/") ) + bulkImportReportLocation1 += "/"; + this.bulkImportReportLocation = bulkImportReportLocation1; + + this.fullTextsService = fullTextsService; + } + + + private static final Pattern LAST_DIR_REGEX = Pattern.compile("^.*/([^/]+[/]?)$"); + + @GetMapping("bulkImportFullTexts") + public ResponseEntity bulkImportFullTexts(@RequestParam String provenance, @RequestParam String bulkImportDir, @RequestParam boolean shouldDeleteFilesOnFinish) { + + BulkImport.BulkImportSource bulkImportSource = bulkImportSources.get(provenance); + if ( bulkImportSource == null ) { + String errorMsg = "The provided provenance \"" + provenance + "\" is not in the list of the bulk-imported sources, so no configuration-rules are available!"; + logger.error(errorMsg); + return ResponseEntity.badRequest().body(errorMsg); // It's the user's fault that gave an unsupported provenance. + } + + // Check if the given directory parameter exists. + if ( bulkImportDir.isEmpty() ) { + String errorMsg = "The \"bulkImportDir\" was missing from the request!"; + logger.error(errorMsg); + return ResponseEntity.badRequest().body(errorMsg); + } + + String givenBulkDir = bulkImportDir; // Keep the given value here, to not expose the full-path, in case the user has not provided an absolut path. + + // Make sure the whole path ends with "/", so that we can easily append file-names later. + if ( !bulkImportDir.endsWith("/") ) + bulkImportDir += "/"; + + String relativeBulkImportDir = null; + + // Check if we have "relative-path" so that we can append it to the "baseBulkImportLocation". + if ( !bulkImportDir.startsWith("/") ) { + // A relative path was given. + relativeBulkImportDir = bulkImportDir; + bulkImportDir = baseBulkImportLocation + bulkImportDir; + } else { + String errMsg = "The bulkImportDir \"" + bulkImportDir + "\" was problematic!"; + Matcher matcher = LAST_DIR_REGEX.matcher(bulkImportDir); + if ( !matcher.matches() ) { + logger.error(errMsg); + return ResponseEntity.badRequest().body(errMsg); + } + relativeBulkImportDir = matcher.group(1); + if ( (relativeBulkImportDir == null) || relativeBulkImportDir.isEmpty() ) { + logger.error(errMsg); + return ResponseEntity.badRequest().body(errMsg); + } + } + + // The "relativeBulkImportDir" should always be guaranteed to end with "/"! Otherwise, the import-procedure will fail. + logger.info("Received a \"bulkImportFullTexts\" request for \"" + provenance + "\" procedure and bulkImportDir: \"" + bulkImportDir + "\"."); + + // Check whether the given directory is accessible. + File givenDir = new File(bulkImportDir); + if ( !givenDir.isDirectory() ) { + String errorMsg = "The bulkImportDir \"" + bulkImportDir + "\" is invalid!"; + logger.error(errorMsg); + return ResponseEntity.badRequest().body(errorMsg); + } + + // Efficiently check if the dir is empty, without loading all the file-entries in memory. + try ( DirectoryStream directory = Files.newDirectoryStream(givenDir.toPath()) ) { + if ( !directory.iterator().hasNext() ) { + String errorMsg = "The givenDir \"" + givenDir + "\" is empty!"; + logger.warn(errorMsg); + return ResponseEntity.badRequest().body(errorMsg); + } + // The above check does not catch the case were the directory has at least one subdirectory, but no full-texts files. + // The "iterator()" will have a "next" entry, but no full-text file will exist. Although, that case will be rare. + } catch (Exception e) { + String errorMsg = "Error when checking if the givenDir \"" + givenDir + "\" is empty!"; + logger.error(errorMsg); + return ResponseEntity.internalServerError().body(errorMsg); + } + + // Detect if the same directory is scheduled for being processed. In that case, return a 429. + if ( ! bulkImportDirs.add(bulkImportDir) ) { + // We allow multiple jobs for the same provenance, running at the same time, but not multiple jobs for the same bulkImportDirectory. + String errorMsg = "There is a bulk-import request for the directory \"" + bulkImportDir + "\" that is being handled at the moment. Please wait until it's finished being processed, before making another request."; + logger.error(errorMsg); + return ResponseEntity.status(HttpStatus.TOO_MANY_REQUESTS).body(errorMsg); + } + + Path currentBulkImportReportLocationDir = Paths.get(this.bulkImportReportLocation, provenance); + try { + Files.createDirectories(currentBulkImportReportLocationDir); // No-op if dir exists. It does not throw a "alreadyExistsException" + } catch (Exception e) { + String errorMsg = "Could nor create the \"bulkImportReportLocation\" for provenance \"" + provenance + "\" : " + currentBulkImportReportLocationDir; + logger.error(errorMsg, e); + return ResponseEntity.internalServerError().body(errorMsg); + } + + String bulkImportReportID = provenance + "/" + relativeBulkImportDir.substring(0, (relativeBulkImportDir.length() -1)) + "_report_" + GenericUtils.getRandomNumber(10000, 99999); + String bulkImportReportFullPath = this.bulkImportReportLocation + bulkImportReportID + ".json"; + + String msg = "The 'bulkImportFullTexts' request for '" + provenance + "' procedure and bulkImportDir: '" + givenBulkDir + "' was accepted and will be scheduled for execution. " + + (shouldDeleteFilesOnFinish ? "The successfully imported files will be deleted." : "All files will remain inside the directory after processing.") + + " You can request a report at any moment, using this reportFileID: " + bulkImportReportID; + + BulkImportReport bulkImportReport = new BulkImportReport(provenance, bulkImportReportFullPath, bulkImportReportID); + bulkImportReport.addEvent(msg); + + String errorMsg = fileUtils.writeToFile(bulkImportReportFullPath, bulkImportReport.getJsonReport()); + if ( errorMsg != null ) + return ResponseEntity.internalServerError().body(errorMsg); + + logger.info(msg); + + // Add this to a background job, since it will take a lot of time to be completed, and the caller will get a "read-timeout" at least and a socket-timeout at most (in case of a network failure during those hours). + String finalBulkImportDir = bulkImportDir; + String finalRelativeBulkImportDir = relativeBulkImportDir; + FullTextsServiceImpl.backgroundCallableTasks.add(() -> + fullTextsService.bulkImportFullTextsFromDirectory(bulkImportReport, finalRelativeBulkImportDir, finalBulkImportDir, givenDir, provenance, bulkImportSource, shouldDeleteFilesOnFinish) + ); + + return ResponseEntity.ok().body(msg); + } + + + @GetMapping("getBulkImportReport") + public ResponseEntity getBulkImportReport(@RequestParam("id") String bulkImportReportId) + { + // Write the contents of the report-file to a string (efficiently!) and return the whole content as an HTTP-response. + StringBuilder stringBuilder = new StringBuilder(2_000); + String line; + try ( BufferedReader in = new BufferedReader(new InputStreamReader(Files.newInputStream(Paths.get(this.bulkImportReportLocation, bulkImportReportId + ".json"))), FileUtils.tenMb) ) { + while ( (line = in.readLine()) != null ) + stringBuilder.append(line).append("\n"); // The "readLine()" does not return the line-term char. + } catch (NoSuchFileException nsfe) { + logger.warn("The requested report-file with ID: \"" + bulkImportReportId + "\" was not found!"); + return ResponseEntity.notFound().build(); + } catch (Exception e) { + String errorMsg = "Failed to read the contents of report-file with ID: " + bulkImportReportId; + logger.error(errorMsg, e); + return ResponseEntity.internalServerError().body(errorMsg); // It's ok to give the file-path to the user, since the report already contains the file-path. + } + + return ResponseEntity.ok().body(stringBuilder.toString()); + } + +} diff --git a/src/main/java/eu/openaire/urls_controller/models/BulkImportReport.java b/src/main/java/eu/openaire/urls_controller/models/BulkImportReport.java new file mode 100644 index 0000000..57c15ff --- /dev/null +++ b/src/main/java/eu/openaire/urls_controller/models/BulkImportReport.java @@ -0,0 +1,105 @@ +package eu.openaire.urls_controller.models; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.LinkedHashMultimap; +import com.google.common.collect.Multimaps; +import com.google.common.collect.SetMultimap; +import com.google.gson.Gson; +import eu.openaire.urls_controller.util.GenericUtils; + +import java.util.Collection; +import java.util.Map; + + +@JsonInclude(JsonInclude.Include.NON_NULL) +public class BulkImportReport { + + @JsonProperty + private String provenance; + + @JsonProperty + private String reportLocation; + + @JsonProperty + private String reportID; + + // This will not be serialized, since Gson cannot serialize Multimaps. Instead, it will be converted to the "simpler" map below. + transient private SetMultimap eventsMultimap = Multimaps.synchronizedSetMultimap(LinkedHashMultimap.create()); + // We need a "LinkedHashMultimap", se that the order of the keys (timestamps) stay ascending, so the final report makes sense in chronological order. + // We need for one key (timestamp) to have multiple values (events), in order to not lose events happening at the same time. + + @JsonProperty + private Map> eventsMap; + + + public BulkImportReport(String provenance, String reportLocation, String reportID) { + this.provenance = provenance; + this.reportLocation = reportLocation; + this.reportID = reportID; + } + + + public void addEvent(String event) { + eventsMultimap.put(GenericUtils.getReadableCurrentTimeAndZone(), event); + } + + public String getJsonReport() + { + //Convert the LinkedHashMultiMap to Map>, since Gson cannot serialize Multimaps. + eventsMap = eventsMultimap.asMap(); + return new Gson().toJson(this); + } + + public String getProvenance() { + return provenance; + } + + public void setProvenance(String provenance) { + this.provenance = provenance; + } + + public String getReportLocation() { + return reportLocation; + } + + public void setReportLocation(String reportLocation) { + this.reportLocation = reportLocation; + } + + public String getReportID() { + return reportID; + } + + public void setReportID(String reportID) { + this.reportID = reportID; + } + + public SetMultimap getEventsMultimap() { + return eventsMultimap; + } + + public void setEventsMultimap(SetMultimap eventsMultimap) { + this.eventsMultimap = eventsMultimap; + } + + public Map> getEventsMap() { + return eventsMap; + } + + public void setEventsMap(Map> eventsMap) { + this.eventsMap = eventsMap; + } + + @Override + public String toString() { + return "BulkImportReport{" + + "provenance='" + provenance + '\'' + + ", reportLocation='" + reportLocation + '\'' + + ", reportID='" + reportID + '\'' + + ", eventsMultimap=" + eventsMultimap + + ", eventsMap=" + eventsMap + + '}'; + } + +} diff --git a/src/main/java/eu/openaire/urls_controller/models/DocFileData.java b/src/main/java/eu/openaire/urls_controller/models/DocFileData.java new file mode 100644 index 0000000..3869c90 --- /dev/null +++ b/src/main/java/eu/openaire/urls_controller/models/DocFileData.java @@ -0,0 +1,115 @@ +package eu.openaire.urls_controller.models; + +import com.google.common.hash.Hashing; +import com.google.common.io.Files; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.nio.file.Paths; + +public class DocFileData { + + private static final Logger logger = LoggerFactory.getLogger(DocFileData.class); + + private File docFile; + private String hash; + private Long size; + private String location; + + private FileOutputStream fileOutputStream; + + + public DocFileData(File docFile, String hash, Long size, String location, FileOutputStream fileOutputStream) { + this.docFile = docFile; + this.hash = hash; + this.size = size; + this.location = location; + this.fileOutputStream = fileOutputStream; + } + + + public DocFileData(File docFile, String hash, Long size, String location) { + this.docFile = docFile; + this.hash = hash; + this.size = size; + this.location = location; + } + + public DocFileData(File docFile, FileOutputStream fileOutputStream) { + this.docFile = docFile; + this.fileOutputStream = fileOutputStream; + } + + public File getDocFile() { + return docFile; + } + + public void setDocFile(File docFile) { + this.docFile = docFile; + } + + public String getHash() { + return hash; + } + + public void setHash(String hash) { + this.hash = hash; + } + + public Long getSize() { + return size; + } + + public void setSize(Long size) { + this.size = size; + } + + /** + * Set this as a separate method (not automatically applied in the contractor), in order to avoid long thread-blocking in the caller method, which downloads and constructs this object inside a synchronized block. + * */ + public void calculateAndSetHashAndSize() { + if ( this.docFile == null ) { // Verify the "docFile" is already set, otherwise we get an NPE. + logger.warn("The \"docFile\" was not previously set!"); + return; + } + + String fileLocation = this.docFile.getAbsolutePath(); + try { + this.hash = Files.asByteSource(this.docFile).hash(Hashing.md5()).toString(); // These hashing functions are deprecated, but just to inform us that MD5 is not secure. Luckily, we use MD5 just to identify duplicate files. + //logger.debug("MD5 for file \"" + docFile.getName() + "\": " + this.hash); // DEBUG! + this.size = java.nio.file.Files.size(Paths.get(fileLocation)); + //logger.debug("Size of file \"" + docFile.getName() + "\": " + this.size); // DEBUG! + } catch (Exception e) { + logger.error("Could not retrieve the size " + ((this.hash == null) ? "and the MD5-hash " : "") + "of the file: " + fileLocation, e); + } + } + + public String getLocation() { + return location; + } + + public void setLocation(String location) { + this.location = location; + } + + public FileOutputStream getFileOutputStream() { + return fileOutputStream; + } + + public void setFileOutputStream(FileOutputStream fileOutputStream) { + this.fileOutputStream = fileOutputStream; + } + + @Override + public String toString() { + return "DocFileData{" + + "docFile=" + docFile + + ", hash='" + hash + '\'' + + ", size=" + size + + ", location='" + location + '\'' + + ", fileOutputStream=" + fileOutputStream + + '}'; + } +} diff --git a/src/main/java/eu/openaire/urls_controller/models/FileLocationData.java b/src/main/java/eu/openaire/urls_controller/models/FileLocationData.java new file mode 100644 index 0000000..58c766d --- /dev/null +++ b/src/main/java/eu/openaire/urls_controller/models/FileLocationData.java @@ -0,0 +1,89 @@ +package eu.openaire.urls_controller.models; + +import eu.openaire.urls_controller.util.FileUtils; + +import java.util.regex.Matcher; + +public class FileLocationData { + + String fileDir; + + String fileName; + + String filenameWithoutExtension; + + String fileNameID; + + String dotFileExtension; + + public FileLocationData(String fileLocation) throws RuntimeException { + // Extract and set LocationData. + Matcher matcher = FileUtils.FILENAME_ID_EXTENSION.matcher(fileLocation); + if ( !matcher.matches() ) + throw new RuntimeException("Failed to match the \"" + fileLocation + "\" with the regex: " + FileUtils.FILENAME_ID_EXTENSION); + fileDir = matcher.group(1); + if ( (fileDir == null) || fileDir.isEmpty() ) + throw new RuntimeException("Failed to extract the \"fileDir\" from \"" + fileLocation + "\"."); + fileName = matcher.group(2); + if ( (fileName == null) || fileName.isEmpty() ) + throw new RuntimeException("Failed to extract the \"fileName\" from \"" + fileLocation + "\"."); + // The "matcher.group(3)" returns the "filenameWithoutExtension", which is currently not used. + fileNameID = matcher.group(4); + if ( (fileNameID == null) || fileNameID.isEmpty() ) + throw new RuntimeException("Failed to extract the \"fileNameID\" from \"" + fileLocation + "\"."); + dotFileExtension = matcher.group(5); + if ( (dotFileExtension == null) || dotFileExtension.isEmpty() ) + throw new RuntimeException("Failed to extract the \"dotFileExtension\" from \"" + fileLocation + "\"."); + } + + public String getFileDir() { + return fileDir; + } + + public void setFileDir(String fileDir) { + this.fileDir = fileDir; + } + + public String getFileName() { + return fileName; + } + + public void setFileName(String fileName) { + this.fileName = fileName; + } + + public String getFilenameWithoutExtension() { + return filenameWithoutExtension; + } + + public void setFilenameWithoutExtension(String filenameWithoutExtension) { + this.filenameWithoutExtension = filenameWithoutExtension; + } + + public String getFileNameID() { + return fileNameID; + } + + public void setFileNameID(String fileNameID) { + this.fileNameID = fileNameID; + } + + public String getDotFileExtension() { + return dotFileExtension; + } + + public void setDotFileExtension(String dotFileExtension) { + this.dotFileExtension = dotFileExtension; + } + + @Override + public String toString() { + return "FileLocationData{" + + "fileDir='" + fileDir + '\'' + + ", fileName='" + fileName + '\'' + + ", filenameWithoutExtension='" + filenameWithoutExtension + '\'' + + ", fileNameID='" + fileNameID + '\'' + + ", dotFileExtension='" + dotFileExtension + '\'' + + '}'; + } +} diff --git a/src/main/java/eu/openaire/urls_controller/services/FullTextsService.java b/src/main/java/eu/openaire/urls_controller/services/FullTextsService.java new file mode 100644 index 0000000..6cb9cb4 --- /dev/null +++ b/src/main/java/eu/openaire/urls_controller/services/FullTextsService.java @@ -0,0 +1,18 @@ +package eu.openaire.urls_controller.services; + +import eu.openaire.urls_controller.components.BulkImport; +import eu.openaire.urls_controller.models.BulkImportReport; + +import java.io.File; +import java.util.List; + +public interface FullTextsService { + + + Boolean bulkImportFullTextsFromDirectory(BulkImportReport bulkImportReport, String relativeBulkImportDir, String bulkImportDirName, File bulkImportDir, String provenance, BulkImport.BulkImportSource bulkImportSource, boolean shouldDeleteFilesOnFinish); + + List getFileLocationsInsideDir(String directory); + + String getMD5hash(String string); + +} diff --git a/src/main/java/eu/openaire/urls_controller/services/FullTextsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/FullTextsServiceImpl.java new file mode 100644 index 0000000..2864c6d --- /dev/null +++ b/src/main/java/eu/openaire/urls_controller/services/FullTextsServiceImpl.java @@ -0,0 +1,443 @@ +package eu.openaire.urls_controller.services; + + +import com.google.common.collect.Lists; +import eu.openaire.urls_controller.components.BulkImport; +import eu.openaire.urls_controller.configuration.ImpalaConnector; +import eu.openaire.urls_controller.controllers.FullTextsController; +import eu.openaire.urls_controller.models.BulkImportReport; +import eu.openaire.urls_controller.models.DocFileData; +import eu.openaire.urls_controller.models.FileLocationData; +import eu.openaire.urls_controller.util.FileUtils; +import eu.openaire.urls_controller.util.GenericUtils; +import eu.openaire.urls_controller.util.ParquetFileUtils; +import org.apache.avro.generic.GenericData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.EmptyResultDataAccessException; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Service; + +import javax.xml.bind.DatatypeConverter; +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.MessageDigest; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.*; +import java.util.stream.Collectors; +import java.util.stream.Stream; + + +@Service +public class FullTextsServiceImpl implements FullTextsService { + + private static final Logger logger = LoggerFactory.getLogger(FullTextsServiceImpl.class); + + + @Autowired + private FileUtils fileUtils; + + + @Autowired + private ParquetFileUtils parquetFileUtils; + + @Autowired + private JdbcTemplate jdbcTemplate; + + public static final ExecutorService backgroundExecutor = Executors.newFixedThreadPool(2); // At most 2 threads will be used. + + public static final List> backgroundCallableTasks = Collections.synchronizedList(new ArrayList<>()); + + + private static final int numOfBulkImportThreads = 4; + public static final ExecutorService bulkImportExecutor = Executors.newFixedThreadPool(numOfBulkImportThreads); // At most 4 threads will be used. + + + /** + * Given a directory with full-text-files, this method imports the full-texts files in the PDF Aggregation Service. + * Also, it provides the guarantee that the failed files will not be deleted! A file can "fail" if any of the expected results fail (upload-to-S3, parquet-creation and upload, load-to-db, ect) + * */ + public Boolean bulkImportFullTextsFromDirectory(BulkImportReport bulkImportReport, String relativeBulkImportDir, String bulkImportDirName, File bulkImportDir, String provenance, BulkImport.BulkImportSource bulkImportSource, boolean shouldDeleteFilesOnFinish) + { + String bulkImportReportLocation = bulkImportReport.getReportLocation(); + + // Write to bulkImport-report file. + bulkImportReport.addEvent("Initializing the bulkImport '" + provenance + "' procedure with bulkImportDir '" + bulkImportDirName + "'."); + // Do not write immediately to the file, wait for the following checks. + + if ( (ParquetFileUtils.payloadsSchema == null) // Parse the schema if it's not already parsed. + && ((ParquetFileUtils.payloadsSchema = ParquetFileUtils.parseSchema(ParquetFileUtils.payloadSchemaFilePath)) == null ) ) { + String errorMsg = "The 'payloadsSchema' could not be parsed!"; + logger.error(errorMsg); + bulkImportReport.addEvent(errorMsg); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + FullTextsController.bulkImportDirs.remove(bulkImportDirName); + return false; + } + + List fileLocations = getFileLocationsInsideDir(bulkImportDirName); + if ( fileLocations == null ) { + bulkImportReport.addEvent("Could not retrieve the files for bulk-import!"); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + FullTextsController.bulkImportDirs.remove(bulkImportDirName); + return false; + } + + int numOfFiles = fileLocations.size(); + if ( numOfFiles == 0 ) { + String errorMsg = "No files were found inside the bulkImportDir: " + bulkImportDirName; + logger.warn(errorMsg); + bulkImportReport.addEvent(errorMsg); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + FullTextsController.bulkImportDirs.remove(bulkImportDirName); + return false; + } + + logger.trace("fileLocations:\n" + fileLocations); + + String localParquetDir = parquetFileUtils.parquetBaseLocalDirectoryPath + "bulk_import_" + provenance + File.separator + relativeBulkImportDir; // This ends with "/". + try { + Files.createDirectories(Paths.get(localParquetDir)); // No-op if it already exists. + } catch (Exception e) { + String errorMsg = "Could not create the local parquet-directory: " + localParquetDir; + logger.error(errorMsg, e); + bulkImportReport.addEvent(errorMsg); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + FullTextsController.bulkImportDirs.remove(bulkImportDirName); + return false; + } + + // Create a new directory on HDFS, with this bulkImportDir name. So, that there will not be any "load data" operation to fail because another thread has loaded that base-dir right before. + String currentBulkImportHdfsDir = parquetFileUtils.parquetHDFSDirectoryPathPayloadsBulkImport + relativeBulkImportDir; + if ( ! parquetFileUtils.applyHDFOperation(parquetFileUtils.webHDFSBaseUrl + currentBulkImportHdfsDir + parquetFileUtils.mkDirsAndParams) ) { // N0-op if it already exists. It is very quick. + String errorMsg = "Could not create the hdfs-directory: " + currentBulkImportHdfsDir; + logger.error(errorMsg); + bulkImportReport.addEvent(errorMsg); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + FullTextsController.bulkImportDirs.remove(bulkImportDirName); + return false; + } + + long timeMillis = System.currentTimeMillis(); // Store it here, in order to have the same for all current records. + + List> callables = new ArrayList<>(numOfFiles); + List> subLists = Lists.partition(fileLocations, numOfBulkImportThreads); // Divide the initial list to "numOfBulkImportThreads" subLists. The last one may have marginally fewer files. + int subListsSize = subLists.size(); + + bulkImportReport.addEvent("Going to import the files in " + subListsSize + " segments, in parallel."); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + + for ( int i = 0; i < subListsSize; ++i ) { + int finalI = i; + callables.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries. + return processBulkImportedFilesSegment(bulkImportReport, finalI, subLists.get(finalI), bulkImportDirName, localParquetDir, currentBulkImportHdfsDir, provenance, bulkImportSource, timeMillis, shouldDeleteFilesOnFinish); + }); + } + + int numFailedSegments = 0; + int numFailedFiles = 0; + try { + List> futures = bulkImportExecutor.invokeAll(callables); // This waits for all tasks to finish. + int sizeOfFutures = futures.size(); + for ( int i = 0; i < sizeOfFutures; ++i ) { + try { + numFailedFiles += futures.get(i).get(); + if ( numFailedFiles == subLists.get(i).size() ) { // Get and see if it was successfully or not, or if an exception is thrown.. + numFailedSegments++; + } + // In case all the files failed to be bulk-imported, then we will detect it in the "numSuccessfulSegments"-check later. + // The failed-to-be-imported files, will not be deleted, even if the user specifies that he wants to delete the directory. + } catch (ExecutionException ee) { + String stackTraceMessage = GenericUtils.getSelectiveStackTrace(ee, null, 15); // These can be serious errors like an "out of memory exception" (Java HEAP). + logger.error("Task_" + (i+1) + " failed with: " + ee.getMessage() + "\n" + stackTraceMessage); + } catch (CancellationException ce) { + logger.error("Task_" + (i+1) + " was cancelled: " + ce.getMessage()); + } catch (IndexOutOfBoundsException ioobe) { + logger.error("IOOBE for task_" + i + " in the futures-list! " + ioobe.getMessage()); + } + } + } catch (Exception e) { + String errorMsg = "An error occurred when trying to bulk-import data from bulkImportDir: " + bulkImportDirName; + logger.error(errorMsg, e); + bulkImportReport.addEvent(errorMsg); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + FullTextsController.bulkImportDirs.remove(bulkImportDirName); + return false; + } finally { + logger.debug("Deleting local parquet directory: " + localParquetDir); + fileUtils.deleteDirectory(new File(localParquetDir)); // It may not exist at all, if none of the parquet files were created. + } + + // Check the results. + String msg; + if ( numFailedFiles == numOfFiles ) { + String errorMsg = "None of the files inside the bulkImportDir '" + bulkImportDirName + "' were imported!"; + logger.error(errorMsg); + bulkImportReport.addEvent(errorMsg); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + FullTextsController.bulkImportDirs.remove(bulkImportDirName); + return false; + } else if ( numFailedFiles > 0 ) { // Some failed, but not all. + msg = numFailedFiles + " files" + (numFailedSegments > 0 ? (" and " + numFailedSegments + " whole segments") : "") + " failed to be bulk-imported, from the bulkImportDir: " + bulkImportDirName; + logger.warn(msg); + } else { + msg = "All " + numOfFiles + " files, from bulkImportDir '" + bulkImportDirName + "' were bulkImported successfully."; + logger.info(msg); + } + bulkImportReport.addEvent(msg); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + + // Merge the parquet files inside the table "payload_bulk_import", to improve performance of future operations. + ImpalaConnector.databaseLock.lock(); + String mergeErrorMsg = fileUtils.mergeParquetFiles("payload_bulk_import", "", null); + if ( mergeErrorMsg != null ) { + ImpalaConnector.databaseLock.unlock(); + bulkImportReport.addEvent(mergeErrorMsg); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + FullTextsController.bulkImportDirs.remove(bulkImportDirName); + return false; + } + ImpalaConnector.databaseLock.unlock(); + + String successMsg = "Finished the bulk-import procedure for '" + provenance + "' and bulkImportDir: " + bulkImportDirName; + logger.info(successMsg); + bulkImportReport.addEvent(successMsg); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + + FullTextsController.bulkImportDirs.remove(bulkImportDirName); + return true; + } + + + private int processBulkImportedFilesSegment(BulkImportReport bulkImportReport, int segmentCounter, List fileLocationsSegment, String bulkImportDirName, String localParquetDir, String currentBulkImportHdfsDir, + String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis, boolean shouldDeleteFilesOnFinish) + { + // Inside this thread, process a segment of the files. + String bulkImportReportLocation = bulkImportReport.getReportLocation(); + + int numOfFilesInSegment = fileLocationsSegment.size(); + String msg = "Going to import " + numOfFilesInSegment + " files for segment-" + segmentCounter + " , of bulkImport procedure '" + provenance + "' | dir: '" + bulkImportDirName + "'.."; + logger.debug(msg); + bulkImportReport.addEvent(msg); + + List payloadRecords = new ArrayList<>(numOfFilesInSegment); + + // Use a HashSet for the failed files, in order to not remove them in the end. + HashSet failedFiles = new HashSet<>(); + int counter = 0; + + // Upload files to S3 and collect payloadRecords. + for ( String fileLocation: fileLocationsSegment ) { + GenericData.Record record = processBulkImportedFile(fileLocation, provenance, bulkImportSource, timeMillis); + if ( record != null ) + payloadRecords.add(record); + else { + bulkImportReport.addEvent("An error caused the file: '" + fileLocation + "' to not be imported!"); + failedFiles.add(fileLocation); + } + + if ( ((++counter) % 100) == 0 ) { // Every 100 files, report the status. + bulkImportReport.addEvent("Progress for segment-" + segmentCounter + " : " + payloadRecords.size() + " files have been imported and " + failedFiles.size() + " have failed, out of " + numOfFilesInSegment + " files."); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + } + } + + int numOfPayloadRecords = payloadRecords.size(); + if ( numOfPayloadRecords == 0 ) { + String errorMsg = "No payload-records were generated for any of the files inside the bulkImportDir: " + bulkImportDirName; + logger.warn(errorMsg); + bulkImportReport.addEvent(errorMsg); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + // None of the files of this segment will be deleted, in any case. + return numOfFilesInSegment; + } else if ( numOfPayloadRecords != numOfFilesInSegment ) { + // Write this important note here, in order to certainly be in the report, even if a parquet-file failure happens and the method exists early. + String errorMsg = failedFiles.size() + " out of " + numOfFilesInSegment + " files failed to be imported, for segment-" + segmentCounter + "!"; + logger.warn(errorMsg); + bulkImportReport.addEvent(errorMsg); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + } + + // Construct the parquet file, upload it to HDFS and load them it in the "payload_bulk_import" table. + String parquetFileName = "payloads_" + segmentCounter + ".parquet"; + String fullLocalParquetFilePath = localParquetDir + parquetFileName; + logger.trace("Going to write " + numOfPayloadRecords + " payload-records to the parquet file: " + fullLocalParquetFilePath); // DEBUG! + + if ( ! parquetFileUtils.writeToParquet(payloadRecords, ParquetFileUtils.payloadsSchema, fullLocalParquetFilePath) ) { + bulkImportReport.addEvent("Could not write the payload-records to the parquet-file: '" + parquetFileName + "'!"); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + // None of the files of this segment will be deleted, in any case. + return numOfFilesInSegment; + } + //logger.trace("Parquet file '" + parquetFileName + "' was created and filled."); // DEBUG! + + // Upload and insert the data to the "payload" Impala table. (no database-locking is required) + String errorMsg = parquetFileUtils.uploadParquetFileToHDFS(fullLocalParquetFilePath, parquetFileName, currentBulkImportHdfsDir); + if ( errorMsg != null ) { // The possible error-message returned, is already logged by the Controller. + bulkImportReport.addEvent("Could not upload the parquet-file '" + parquetFileName + "' to HDFS!"); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + // None of the files of this segment will be deleted, in any case. + return numOfFilesInSegment; + } + + ImpalaConnector.databaseLock.lock(); + if ( !parquetFileUtils.loadParquetDataIntoTable((currentBulkImportHdfsDir + parquetFileName), "payload_bulk_import") ) { + ImpalaConnector.databaseLock.unlock(); + bulkImportReport.addEvent("Could not load the payload-records to the database!"); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport()); + // None of the files of this segment will be deleted, in any case. + return numOfFilesInSegment; + } + ImpalaConnector.databaseLock.unlock(); + + String segmentSuccessMsg = "Finished importing " + numOfPayloadRecords + " files, out of " + numOfFilesInSegment + " , for segment-" + segmentCounter + "."; + logger.info(segmentSuccessMsg); + bulkImportReport.addEvent(segmentSuccessMsg); + + if ( shouldDeleteFilesOnFinish ) { + segmentSuccessMsg = "As the user requested, the successfully imported files of '" + provenance + "' procedure, of bulk-import segment-" + segmentCounter + ", from directory '" + bulkImportDirName + "', will be deleted."; + logger.info(segmentSuccessMsg); + bulkImportReport.addEvent(segmentSuccessMsg); + + // Delete all files except the ones in the "failedHashSet" + for ( String fileLocation : fileLocationsSegment ) { + if ( !failedFiles.contains(fileLocation) ) + if ( !fileUtils.deleteFile(fileLocation) ) + bulkImportReport.addEvent("The file '" + fileLocation + "' could not be deleted! Please make sure you have provided the WRITE-permission."); + } + } + + return (numOfFilesInSegment - numOfPayloadRecords); // Return the numOfFailedFiles. + } + + + private GenericData.Record processBulkImportedFile(String fileLocation, String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis) + { + File fullTextFile = new File(fileLocation); + DocFileData docFileData = new DocFileData(fullTextFile, null, null, null); + docFileData.calculateAndSetHashAndSize(); + + // Check if this file is already found by crawling. Even though we started excluding this datasource from crawling, many full-texts have already been downloaded. + // Also, it may be the case that this file was downloaded by another datasource. + + String fileHash = docFileData.getHash(); + if ( fileHash == null ) + return null; // No check of past found full-text can be made nor the S3-fileName can be created. + + FileLocationData fileLocationData; + try { + fileLocationData = new FileLocationData(fileLocation); + } catch (RuntimeException re) { + logger.error(re.getMessage()); + return null; + } + + String datasourceId = bulkImportSource.getDatasourceID(); + String datasourcePrefix = bulkImportSource.getDatasourcePrefix(); + String fileNameID = fileLocationData.getFileNameID(); + + String actualUrl = (bulkImportSource.getPdfUrlPrefix() + fileNameID); // This is the urls with the ArvixId. + String originalUrl = actualUrl; // We have the full-text files from bulk-import, so let's assume the original-url is also the full-text-link. + + final String getFileLocationForHashQuery = "select `location` from " + ImpalaConnector.databaseName + ".payload where `hash` = ? limit 1"; + final int[] hashArgType = new int[] {Types.VARCHAR}; + String alreadyFoundFileLocation = null; + ImpalaConnector.databaseLock.lock(); + try { + alreadyFoundFileLocation = jdbcTemplate.queryForObject(getFileLocationForHashQuery, new Object[]{fileHash}, hashArgType, String.class); + } catch (EmptyResultDataAccessException erdae) { + // No fileLocation is found, it's ok. It will be null by default. + } catch (Exception e) { + logger.error("Error when executing or acquiring data from the the 'getFileLocationForHashQuery'!\n", e); + // Continue with bulk-importing the file and uploading it to S3. + } finally { + ImpalaConnector.databaseLock.unlock(); + } + + String idMd5hash = getMD5hash(fileNameID.toLowerCase()); + if ( idMd5hash == null ) + return null; + + // openaire id = + "::" + + String openAireId = (datasourcePrefix + "::" + idMd5hash); + //logger.trace("openAireId: " + openAireId); + + String s3Url = null; + + if ( alreadyFoundFileLocation != null ) // If the full-text of this record is already-found and uploaded. + { + // This full-text was found to already be in the database. + // If it has the same datasourceID, then it likely was crawled before from an ID belonging to this datasource. + // If also has the same ID, then the exact same record from that datasource was retrieved previously. + // Else, the file was downloaded by another record of this datasource. + // ELse if the datasourceID is not the same, then the same file was retrieved from another datasource. + // The above analysis is educational, it does not need to take place and is not currently used. + + s3Url = alreadyFoundFileLocation; + } else { + try { + s3Url = fileUtils.constructFileNameAndUploadToS3(fileLocationData.getFileDir(), fileLocationData.getFileName(), openAireId, fileLocationData.getDotFileExtension(), datasourceId, fileHash); // This throws Exception, in case the uploading failed. + if ( s3Url == null ) + return null; // In case the 'datasourceID' or 'hash' is null. Which should never happen here, since both of them are checked before the execution reaches here. + } catch (Exception e) { + logger.error("Could not upload the file '" + fileLocationData.getFileName() + "' to the S3 ObjectStore!", e); + return null; + } + } + + GenericData.Record record = new GenericData.Record(ParquetFileUtils.payloadsSchema); + record.put("id", openAireId); + record.put("original_url", originalUrl); + record.put("actual_url", actualUrl); + record.put("date", timeMillis); + record.put("mimetype", bulkImportSource.getMimeType()); + Long size = docFileData.getSize(); + record.put("size", ((size != null) ? String.valueOf(size) : null)); + record.put("hash", fileHash); // This is already checked and will not be null here. + record.put("location", s3Url); + record.put("provenance", provenance); + + return record; + } + + + public List getFileLocationsInsideDir(String directory) + { + List fileLocations = null; + + try ( Stream walkStream = Files.find(Paths.get(directory), Integer.MAX_VALUE, (filePath, fileAttr) -> fileAttr.isRegularFile()) ) + // In case we ever include other type-of-Files inside the same directory, we need to add this filter: "&& !filePath.toString().endsWith("name.ext")" + { + fileLocations = walkStream.map(Path::toString).collect(Collectors.toList()); + } catch (Exception e) { + String errorMsg = "Could not retrieve the files from directory: '" + directory + "'!"; + logger.error(errorMsg, e); + return null; + } + + return fileLocations; + } + + + public String getMD5hash(String string) + { + String md5 = null; + try { + MessageDigest md5MD = MessageDigest.getInstance("MD5"); // New instance for any new request. Otherwise, we need to synchronize the use of that object among multiple threads. + md5MD.update(string.getBytes()); + md5 = DatatypeConverter.printHexBinary(md5MD.digest()).toLowerCase(); + } catch (Exception e) { + logger.error("Error when getting the MD5-hash for: " + string, e); + return null; + } + return md5; + } + +} diff --git a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java index d83e499..26887e6 100644 --- a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java @@ -1,5 +1,6 @@ package eu.openaire.urls_controller.services; +import eu.openaire.urls_controller.components.BulkImport; import eu.openaire.urls_controller.configuration.ImpalaConnector; import eu.openaire.urls_controller.models.*; import eu.openaire.urls_controller.payloads.responces.AssignmentsResponse; @@ -23,6 +24,7 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Timestamp; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -55,19 +57,30 @@ public class UrlsServiceImpl implements UrlsService { public static final ExecutorService insertsExecutor = Executors.newFixedThreadPool(6); - public UrlsServiceImpl(@Value("${services.pdfaggregation.controller.maxAttemptsPerRecord}") int maxAttemptsPerRecord, - @Value("${services.pdfaggregation.controller.datasources.excludedIDs}") List excludedIDs) { + public UrlsServiceImpl(@Value("${services.pdfaggregation.controller.maxAttemptsPerRecord}") int maxAttemptsPerRecord, BulkImport bulkImport) + { maxAttemptsPerRecordAtomic = new AtomicInteger(maxAttemptsPerRecord); - // The "excludedIDs" will not be null, as it will be defined inside the "application.yml" file. - // In case no IDs for excluded Datasources are given, then the "excludedIDs" list will just be empty. - int exclusionListSize = excludedIDs.size(); - if ( exclusionListSize == 0 ) + HashMap bulkImportSources = new HashMap<>(bulkImport.getBulkImportSources()); + // The "bulkImportSources" will not be null, as it will be defined inside the "application.yml" file. + // In case no bulkImport Datasources are given, then the "bulkImportSources" list will just be empty. + if ( bulkImportSources.isEmpty() ) return; // So the "excludedDatasourceIDsStringList" -code should be placed last in this Constructor-method. + logger.trace("BulkImportSources:\n" + bulkImportSources); + + List excludedIDs = new ArrayList<>(); + for ( BulkImport.BulkImportSource source : bulkImportSources.values() ) { + String datasourceID = source.getDatasourceID(); + if ( (datasourceID == null) || datasourceID.isEmpty() ) + throw new RuntimeException("One of the bulk-imported datasourceIDs was not found! | source: " + source); + excludedIDs.add(datasourceID); + } + + int exclusionListSize = excludedIDs.size(); // This list will not be empty. + // Prepare the "excludedDatasourceIDsStringList" to be used inside the "findAssignmentsQuery". Create the following string-pattern: // ("ID_1", "ID_2", ...) - final StringBuilder sb = new StringBuilder((exclusionListSize * 46) + (exclusionListSize -1) +2 ); sb.append("("); for ( int i=0; i < exclusionListSize; ++i ) { @@ -78,7 +91,7 @@ public class UrlsServiceImpl implements UrlsService { sb.append(")"); excludedDatasourceIDsStringList = sb.toString(); - logger.info("The following datasources will be excluded from crawling: " + excludedDatasourceIDsStringList); + logger.info("The following bulkImport-datasources will be excluded from crawling: " + excludedDatasourceIDsStringList); } @@ -302,7 +315,7 @@ public class UrlsServiceImpl implements UrlsService { return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(errorMsg); } finally { logger.debug("Deleting parquet directory: " + currentParquetPath); - FileUtils.deleteDirectory(new File(currentParquetPath)); + fileUtils.deleteDirectory(new File(currentParquetPath)); } logger.debug("Going to merge the parquet files for the tables which were altered."); diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index f5f79a8..60eec41 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -6,6 +6,7 @@ import com.google.common.collect.SetMultimap; import eu.openaire.urls_controller.configuration.ImpalaConnector; import eu.openaire.urls_controller.models.Payload; import eu.openaire.urls_controller.models.UrlReport; +import org.apache.commons.io.FileDeleteStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -31,6 +32,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -109,7 +112,7 @@ public class FileUtils { // The following regex might be usefull in a future scenario. It extracts the "plain-filename" and "file-ID" and the "file-extension". // Possible full-filenames are: "path1/path2/ID.pdf", "ID2.pdf", "path1/path2/ID(12).pdf", "ID2(25).pdf" - private static final Pattern FILENAME_ID_EXTENSION = Pattern.compile("(?:[^.()]+/)?((([^./()]+)[^./]*)(\\.[\\w]{2,10}))$"); + public static final Pattern FILENAME_ID_EXTENSION = Pattern.compile("(?:([^.()]+)/)?((([^/()]+)[^./]*)(\\.[\\w]{2,10}))$"); private static final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames). @@ -183,7 +186,7 @@ public class FileUtils { logger.error("Failed to match the \"fileLocation\": \"" + fileLocation + "\" of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILENAME_ID_EXTENSION); return null; } - String fileNameWithExtension = matcher.group(1); + String fileNameWithExtension = matcher.group(2); if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) { logger.error("Failed to extract the \"fileNameWithExtension\" from \"fileLocation\": \"" + fileLocation + "\", of id: \"" + payload.getId() + "\", originalUrl: \"" + payload.getOriginal_url() + "\", using this regex: " + FILENAME_ID_EXTENSION); return null; @@ -382,13 +385,13 @@ public class FileUtils { logger.error("Failed to match the \"" + fileName + "\" with the regex: " + FILENAME_ID_EXTENSION); continue; } - // The "matcher.group(2)" returns the "filenameWithoutExtension", which is currently not used. - String fileNameID = matcher.group(3); + // The "matcher.group(3)" returns the "filenameWithoutExtension", which is currently not used. + String fileNameID = matcher.group(4); if ( (fileNameID == null) || fileNameID.isEmpty() ) { logger.error("Failed to extract the \"fileNameID\" from \"" + fileName + "\"."); continue; } - String dotFileExtension = matcher.group(4); + String dotFileExtension = matcher.group(5); if ( (dotFileExtension == null) || dotFileExtension.isEmpty() ) { logger.error("Failed to extract the \"dotFileExtension\" from \"" + fileName + "\"."); continue; @@ -420,23 +423,10 @@ public class FileUtils { continue; } - if ( datasourceId == null ) { - logger.error("The retrieved \"datasourceId\" was \"null\" for file: " + fileName); + String s3Url = constructFileNameAndUploadToS3(targetDirectory, fileName, fileNameID, dotFileExtension, datasourceId, hash); + if ( s3Url == null ) continue; - } - if ( hash == null ) { - logger.error("The retrieved \"hash\" was \"null\" for file: " + fileName); - continue; - } - - String fileFullPath = targetDirectory + fileName; // The fullPath to the local file. - - // Use the "fileNameID" and not the "filenameWithoutExtension", as we want to avoid keeping the possible "parenthesis" with the increasing number (about the duplication of ID-fileName). - // Now we append the file-hash, so it is guaranteed that the filename will be unique. - fileName = datasourceId + "/" + fileNameID + "::" + hash + dotFileExtension; // This is the fileName to be used in the objectStore, not of the local file! - - String s3Url = s3ObjectStore.uploadToS3(fileName, fileFullPath); setFullTextForMultiplePayloads(fileRelatedPayloads, s3Url); //numUploadedFiles ++; } catch (Exception e) { @@ -450,6 +440,28 @@ public class FileUtils { } + public String constructFileNameAndUploadToS3(String fileDir, String fileName, String openAireID, String dotFileExtension, String datasourceId, String hash) throws Exception + { + if ( datasourceId == null ) { + logger.error("The retrieved \"datasourceId\" was \"null\" for file: " + fileName); + return null; + } + + if ( hash == null ) { + logger.error("The retrieved \"hash\" was \"null\" for file: " + fileName); + return null; + } + + String fileFullPath = fileDir + File.separator + fileName; // The fullPath to the local file. + + // Use the "fileNameID" and not the "filenameWithoutExtension", as we want to avoid keeping the possible "parenthesis" with the increasing number (about the duplication of ID-fileName). + // Now we append the file-hash, so it is guaranteed that the filename will be unique. + fileName = datasourceId + "/" + openAireID + "::" + hash + dotFileExtension; // This is the fileName to be used in the objectStore, not of the local file! + + return s3ObjectStore.uploadToS3(fileName, fileFullPath); + } + + public String getMessageFromResponseBody(HttpURLConnection conn, boolean isError) { final StringBuilder msgStrB = new StringBuilder(500); try ( BufferedReader br = new BufferedReader(new InputStreamReader((isError ? conn.getErrorStream() : conn.getInputStream()))) ) { // Try-with-resources @@ -561,11 +573,11 @@ public class FileUtils { } - public static boolean deleteDirectory(File directory) + public boolean deleteDirectory(File directory) { try { org.apache.commons.io.FileUtils.deleteDirectory(directory); - return true; + return true; // Will return "true" also in case this directory does not exist. So, no Exception will be thrown for that case. } catch (IOException e) { logger.error("The following directory could not be deleted: " + directory.getName(), e); return false; @@ -575,4 +587,35 @@ public class FileUtils { } } + + public boolean deleteFile(String fileFullPathString) + { + try { + FileDeleteStrategy.FORCE.delete(new File(fileFullPathString)); + } catch (IOException e) { + logger.error("Error when deleting the file: " + fileFullPathString); + return false; + } + return true; + } + + + Lock fileWriteLock = new ReentrantLock(true); + + public String writeToFile(String fileFullPath, String stringToWrite) + { + fileWriteLock.lock(); + try ( BufferedWriter bufferedWriter = new BufferedWriter(Files.newBufferedWriter(Paths.get(fileFullPath)), FileUtils.tenMb) ) + { + bufferedWriter.write(stringToWrite); // This will overwrite the file. If the new string is smaller, then it does not matter. + } catch (Exception e) { + String errorMsg = "Failed to create or acquire the file \"" + fileFullPath + "\"!"; + logger.error(errorMsg, e); + return errorMsg; + } finally { + fileWriteLock.unlock(); + } + return null; + } + } \ No newline at end of file diff --git a/src/main/java/eu/openaire/urls_controller/util/GenericUtils.java b/src/main/java/eu/openaire/urls_controller/util/GenericUtils.java index 91f770b..a578312 100644 --- a/src/main/java/eu/openaire/urls_controller/util/GenericUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/GenericUtils.java @@ -1,10 +1,33 @@ package eu.openaire.urls_controller.util; +import java.text.SimpleDateFormat; +import java.util.Date; + public class GenericUtils { + private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd 'at' HH:mm:ss.SSS z"); + + public static String getReadableCurrentTimeAndZone() { + return (simpleDateFormat.format(new Date(System.currentTimeMillis()))); + } + public static int getRandomNumber(int min, int max) { return (int)(Math.random() * (max - min +1) + min); } + + public static String getSelectiveStackTrace(Throwable thr, String initialMessage, int numOfLines) + { + StackTraceElement[] stels = thr.getStackTrace(); + StringBuilder sb = new StringBuilder(numOfLines *100); + if ( initialMessage != null ) + sb.append(initialMessage).append(" Stacktrace:").append("\n"); // This StringBuilder is thread-safe as a local-variable. + for ( int i = 0; (i < stels.length) && (i <= numOfLines); ++i ) { + sb.append(stels[i]); + if (i < numOfLines) sb.append("\n"); + } + return sb.toString(); + } + } diff --git a/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java b/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java index 98a5529..65ba5ca 100644 --- a/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/ParquetFileUtils.java @@ -59,18 +59,18 @@ public class ParquetFileUtils { private JdbcTemplate jdbcTemplate; @Value("${hdfs.baseUrl}") - private String webHDFSBaseUrl; + public String webHDFSBaseUrl; private final String hdfsHttpAuthString; private final String hdfsUserName; - private final String payloadSchemaFilePath = "schemas/payload.avsc"; + public static final String payloadSchemaFilePath = "schemas/payload.avsc"; - private final String attemptSchemaFilePath = "schemas/attempt.avsc"; + private static final String attemptSchemaFilePath = "schemas/attempt.avsc"; - public Schema payloadsSchema; + public static Schema payloadsSchema = null; public Schema attemptsSchema; public final String parquetHDFSDirectoryPathAttempts; @@ -79,12 +79,17 @@ public class ParquetFileUtils { public final String parquetHDFSDirectoryPathPayloadsAggregated; public final String parquetHDFSDirectoryPathPayloadsBulkImport; + public String mkDirsAndParams; + + //public String setPermAndParams; + public ParquetFileUtils(@Value("${hdfs.baseUrl}") String webHDFSBaseUrl, @Value("${hdfs.httpAuth}") String hdfsHttpAuthString, @Value("${hdfs.userName}") String hdfsUserName, @Value("${hdfs.password}") String hdfsPassword, @Value("${services.pdfaggregation.controller.parquetLocalDirectoryPath}") String parquetBaseDirectoryPath, @Value("${hdfs.parquetRemoteBaseDirectoryPath}") String hdfsParquetBaseDir, @Value("${services.pdfaggregation.controller.isTestEnvironment}") boolean isTestEnvironment, FileUtils fileUtils) throws IOException { + this.mkDirsAndParams = "?op=MKDIRS&permission=777&user.name=" + hdfsUserName; if ( webHDFSBaseUrl.endsWith("/") ) // We don't wand an ending slash in the url (as it causes problems when the file=path is added). this.webHDFSBaseUrl = webHDFSBaseUrl.substring(0, (webHDFSBaseUrl.length() -1)); else @@ -128,11 +133,13 @@ public class ParquetFileUtils { this.parquetHDFSDirectoryPathPayloadsAggregated = hdfsParquetBaseDir + "payloads_aggregated/"; this.parquetHDFSDirectoryPathPayloadsBulkImport = hdfsParquetBaseDir + "payloads_bulk_import/"; this.fileUtils = fileUtils; + this.mkDirsAndParams = "?op=MKDIRS&permission=777&user.name=" + hdfsUserName; // All permissions for user, group and others must be set, in order for this service' user to have access to the hdfs directory. + //this.setPermAndParams = "?op=SETPERMISSION&permission=777&user.name=" + hdfsUserName; createRemoteParquetDirectories(hdfsParquetBaseDir); } - public Schema parseSchema(String schemaResourcePath) { + public static Schema parseSchema(String schemaResourcePath) { try { return (new Schema.Parser()).parse(new ClassPathResource(schemaResourcePath).getInputStream()); } catch (Throwable e) { @@ -464,8 +471,6 @@ public class ParquetFileUtils { // The WebHDFS uses the "mkdirs" operations which creates all the non-existent directories in the specified path. // So with one request we will create the "parquet_uploads/" and the "parquet_uploads/attempts/" and with the seconds request, the "parquet_uploads/payloads/" directory. - String mkDirsParams = "?op=MKDIRS&permission=777&user.name=" + hdfsUserName; - logger.info("Going to check if the remote parquet directories exist."); String listMainDirectoryUrl = webHDFSBaseUrl + parquetBaseRemoteDirectory + "?op=LISTSTATUS&user.name=" + hdfsUserName; @@ -492,9 +497,9 @@ public class ParquetFileUtils { if ( statusCode == 404 ) { logger.info("The directory \"" + parquetBaseRemoteDirectory + "\" does not exist. We will create it, along with its sub-directories."); - attemptCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsParams); - payloadAggregatedCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsParams); - payloadBulkImportCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsParams); + attemptCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsAndParams); + payloadAggregatedCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsAndParams); + payloadBulkImportCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsAndParams); } else { // Check the json-response, to see if all the subdirectories exist. @@ -549,19 +554,19 @@ public class ParquetFileUtils { // For each missing subdirectories, run the mkDirs-request. if ( !foundAttemptsDir ) { logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathAttempts + "\" does not exist! Going to create it."); - attemptCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsParams); + attemptCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathAttempts + mkDirsAndParams); } else logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathAttempts + "\" exists."); if ( !foundPayloadsAggregatedDir ) { logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsAggregated + "\" does not exist! Going to create it."); - payloadAggregatedCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsParams); + payloadAggregatedCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsAggregated + mkDirsAndParams); } else logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsAggregated + "\" exists."); if ( !foundPayloadsBulkImportDir ) { logger.debug("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsBulkImport + "\" does not exist! Going to create it."); - payloadBulkImportCreationSuccessful = createHDFSDirectory(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsParams); + payloadBulkImportCreationSuccessful = applyHDFOperation(webHDFSBaseUrl + parquetHDFSDirectoryPathPayloadsBulkImport + mkDirsAndParams); } else logger.info("The remote parquet directory \"" + parquetHDFSDirectoryPathPayloadsBulkImport + "\" exists."); } @@ -575,10 +580,10 @@ public class ParquetFileUtils { } - public boolean createHDFSDirectory(String createDirectoryUrl) + public boolean applyHDFOperation(String hdfsOperationUrl) { try { - URL url = new URL(createDirectoryUrl); + URL url = new URL(hdfsOperationUrl); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setRequestMethod("PUT"); conn.setRequestProperty("Authorization", hdfsHttpAuthString); @@ -586,15 +591,15 @@ public class ParquetFileUtils { conn.connect(); int statusCode = conn.getResponseCode(); if ( statusCode == -1 ) { - logger.error("Problem when getting the \"status-code\" for url: " + createDirectoryUrl); + logger.error("Problem when getting the \"status-code\" for url: " + hdfsOperationUrl); return false; } else if ( statusCode != 200 ) { - String errorMsg = "We expected a \"200 OK\" response, but got: \"" + statusCode + "\" instead, for url: " + createDirectoryUrl; + String errorMsg = "We expected a \"200 OK\" response, but got: \"" + statusCode + "\" instead, for url: " + hdfsOperationUrl; logger.error(errorMsg + "\n\n" + fileUtils.getMessageFromResponseBody(conn, true)); return false; } - logger.trace("Creation was successful for hdfs-dir-url: " + createDirectoryUrl + "\n" + fileUtils.getMessageFromResponseBody(conn, false)); + logger.trace("The Operation was successful for hdfs-op-url: " + hdfsOperationUrl + "\n" + fileUtils.getMessageFromResponseBody(conn, false)); } catch (Exception e) { logger.error("", e); return false; diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index be9ef81..bfb66e3 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -29,12 +29,22 @@ services: shouldEmptyBucket: false shouldShowAllS3Buckets: true - datasources: # Provide a list of datasource IDs, which should be excluded from crawling. Their content is either bulk-imported or is known to be restricted. - excludedIDs: > # Use comma-seperated values (one in each line for best readability), as Spring has is currently incapable of parsing Dropwizard-styled lists (at least without additional config). - opendoar____::6f4922f45568161a8cdf4ad2299f6d23 - # Since we use a multi-line value from our list, we add the ID-explanations here (otherwise comments will be part of values): - # First-id: arXiv.org e-Print Archive +bulkImport: + baseBulkImportLocation: /mnt/bulkImport/ + bulkImportReportLocation: /bulkImportReports/ + bulkImportSources: # These sources are accepted for bulk-import requests and are excluded from crawling. + arxivImport: + datasourceID: opendoar____::6f4922f45568161a8cdf4ad2299f6d23 + datasourcePrefix: arXiv_______ # For PID-providing datasource, we use the PID-prefix here. (so not the datasource-prefix: "od________18") + pdfUrlPrefix: https://arxiv.org/pdf/ + mimeType: application/pdf +# otherImport: +# datasourceID: othersource__::0123 +# datasourcePrefix: other_______ +# pdfUrlPrefix: https://example.org/pdf/ +# mimeType: application/pdf + spring: application: