From f6e53ca289172637795ed4ab7a5169c5694fd222 Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Tue, 22 Jun 2021 05:58:07 +0300 Subject: [PATCH] Integrate the "PublicationsRetriever" program as a plugin, which downloads the full-texts of the publications. Afterwards, the retrieved data info is transferred to the Controller. The "PublicationsRetriever" can be installed locally as a library, using the "installPublicationsRetriever.sh" script. --- build.gradle | 11 +- installPublicationsRetriever.sh | 17 ++ .../urls_worker/UrlsWorkerApplication.java | 26 ++- .../components/ScheduledTasks.java | 9 +- .../PublicationsRetrieverPlugin.java | 196 ++++++++++++++++++ .../urls_worker/util/AssignmentHandler.java | 45 +++- src/main/resources/application.properties | 4 +- 7 files changed, 297 insertions(+), 11 deletions(-) create mode 100755 installPublicationsRetriever.sh create mode 100644 src/main/java/eu/openaire/urls_worker/plugins/publicationsRetriever/PublicationsRetrieverPlugin.java diff --git a/build.gradle b/build.gradle index 4bd3f2b..1168915 100644 --- a/build.gradle +++ b/build.gradle @@ -17,8 +17,12 @@ sourceCompatibility = '1.8' repositories { mavenCentral() + flatDir { + dirs 'libs' + } } + dependencies { runtimeOnly 'org.springframework.boot:spring-boot-devtools' @@ -33,8 +37,11 @@ dependencies { implementation "org.projectlombok:lombok:1.18.20" implementation group: 'javax.validation', name: 'validation-api', version: '2.0.1.Final' - // https://mvnrepository.com/artifact/com.google.guava/guava - implementation group: 'com.google.guava', name: 'guava', version: '30.1.1-jre' // It will be usefull later.. + implementation ("eu.openaire:publications_retriever:1.0-SNAPSHOT") { + exclude group: 'ch.qos.logback', module: 'logback-core' + exclude group: 'ch.qos.logback', module: 'logback-classic' + exclude group: 'org.slf4j', module: 'slf4j-api' + } testImplementation group: 'org.springframework.security', name: 'spring-security-test', version: springSecurityVersion testImplementation 'org.springframework.boot:spring-boot-starter-test' diff --git a/installPublicationsRetriever.sh b/installPublicationsRetriever.sh new file mode 100755 index 0000000..457ec13 --- /dev/null +++ b/installPublicationsRetriever.sh @@ -0,0 +1,17 @@ +cd "${0%/*}" || (echo "Could not chdir to this script's dir!" && exit) # Change the working directory to the script's directory, when running from other location. + +cd libs || exit +git clone https://github.com/LSmyrnaios/PublicationsRetriever.git # We assume there is no previously source-code here, if so, it will be overwritten. + +# Do not need to perform a string-replace in "build.gradle", since it automatically gets all ".jar" files. + +# Keep a backup of the existing JAR file. +mv ./publications_retriever-1.0-SNAPSHOT.jar ./publications_retriever-1.0-SNAPSHOT_BACKUP.jar + +cd PublicationsRetriever && mvn clean install + +# Copy the created JAR file to the top libs directory. +cp target/publications_retriever-1.0-SNAPSHOT.jar ../publications_retriever-1.0-SNAPSHOT.jar + +# Delete the directory with the source-code. +cd ../ && rm -rf PublicationsRetriever diff --git a/src/main/java/eu/openaire/urls_worker/UrlsWorkerApplication.java b/src/main/java/eu/openaire/urls_worker/UrlsWorkerApplication.java index b2c4584..2481066 100644 --- a/src/main/java/eu/openaire/urls_worker/UrlsWorkerApplication.java +++ b/src/main/java/eu/openaire/urls_worker/UrlsWorkerApplication.java @@ -1,17 +1,41 @@ package eu.openaire.urls_worker; - +import eu.openaire.publications_retriever.PublicationsRetriever; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; +import javax.annotation.PreDestroy; +import java.util.concurrent.TimeUnit; + @SpringBootApplication @EnableScheduling public class UrlsWorkerApplication { + private static final Logger logger = LoggerFactory.getLogger(UrlsWorkerApplication.class); + public static void main(String[] args) { SpringApplication.run(UrlsWorkerApplication.class, args); } + @PreDestroy + public static void preDestroy() + { + if ( PublicationsRetriever.executor != null ) + { + PublicationsRetriever.executor.shutdown(); // Define that no new tasks will be scheduled. + try { + if ( !PublicationsRetriever.executor.awaitTermination(1, TimeUnit.MINUTES) ) { + logger.warn("The working threads did not finish on time! Stopping them immediately.."); + PublicationsRetriever.executor.shutdownNow(); + } + } catch (InterruptedException e) { + PublicationsRetriever.executor.shutdownNow(); + } + } + } + } diff --git a/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java b/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java index fb7899d..6db8359 100644 --- a/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java +++ b/src/main/java/eu/openaire/urls_worker/components/ScheduledTasks.java @@ -1,5 +1,6 @@ package eu.openaire.urls_worker.components; +import eu.openaire.urls_worker.plugins.publicationsRetriever.PublicationsRetrieverPlugin; import eu.openaire.urls_worker.util.AssignmentHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,4 +28,10 @@ public class ScheduledTasks { if ( AssignmentHandler.isAvailableForWork ) AssignmentHandler.handleAssignment(); } -} \ No newline at end of file + + //@Scheduled(fixedRate = 20_000) // Every 20 secs. + public void testUrlConnection() { + String urlToCheck = "https://zenodo.org/record/1145726"; + PublicationsRetrieverPlugin.connectWithUrlTest(urlToCheck); + } +} diff --git a/src/main/java/eu/openaire/urls_worker/plugins/publicationsRetriever/PublicationsRetrieverPlugin.java b/src/main/java/eu/openaire/urls_worker/plugins/publicationsRetriever/PublicationsRetrieverPlugin.java new file mode 100644 index 0000000..eb83ea8 --- /dev/null +++ b/src/main/java/eu/openaire/urls_worker/plugins/publicationsRetriever/PublicationsRetrieverPlugin.java @@ -0,0 +1,196 @@ +package eu.openaire.urls_worker.plugins.publicationsRetriever; + +import com.google.common.hash.Hashing; +import com.google.common.io.Files; +import edu.uci.ics.crawler4j.url.URLCanonicalizer; +import eu.openaire.publications_retriever.PublicationsRetriever; +import eu.openaire.publications_retriever.util.file.FileUtils; +import eu.openaire.publications_retriever.util.http.ConnSupportUtils; +import eu.openaire.publications_retriever.util.http.HttpConnUtils; +import eu.openaire.publications_retriever.util.url.DataToBeLogged; +import eu.openaire.publications_retriever.util.url.LoaderAndChecker; +import eu.openaire.publications_retriever.util.url.UrlUtils; +import eu.openaire.urls_worker.models.Payload; +import eu.openaire.urls_worker.models.Task; +import eu.openaire.urls_worker.models.UrlReport; +import eu.openaire.urls_worker.util.AssignmentHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Paths; +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; + + +public class PublicationsRetrieverPlugin { + + private static final Logger logger = LoggerFactory.getLogger(PublicationsRetrieverPlugin.class); + + private static final String workingDir = System.getProperty("user.dir") + File.separator; + private static String assignmentsBasePath = workingDir + "assignments" + File.separator; + private static String assignmentsBaseFullTextsPath = assignmentsBasePath + "fullTexts" + File.separator; + + static { + File assignmentsDir = new File(assignmentsBaseFullTextsPath); + if ( !assignmentsDir.exists() ) { + if ( !assignmentsDir.mkdirs() ) { // Create the directory. + logger.error("Could not create the \"assignments directories\": \"" + assignmentsBaseFullTextsPath + "\". Using the \"workingDir\" instead (" + workingDir + ")."); + assignmentsBasePath = workingDir; + assignmentsBaseFullTextsPath = assignmentsBasePath; + } + } + + // Specify some configurations + LoaderAndChecker.retrieveDocuments = true; + LoaderAndChecker.retrieveDatasets = false; + FileUtils.shouldDownloadDocFiles = true; + PublicationsRetriever.targetUrlType = "docUrl"; + + int workerThreadsCount = Runtime.getRuntime().availableProcessors() * PublicationsRetriever.threadsMultiplier; + logger.info("Use " + workerThreadsCount + " worker-threads."); + PublicationsRetriever.executor = Executors.newFixedThreadPool(workerThreadsCount); + } + + + public static void processTasks(int assignmentId, Collection tasks) throws RuntimeException, FileNotFoundException + { + ConnSupportUtils.setKnownMimeTypes(); + + FileUtils.storeDocFilesDir = assignmentsBaseFullTextsPath + "assignment_" + assignmentId + "_fullTexts" + File.separator; // It needs the last separator, because of how the docFiles are named and stored. + + FileUtils.setOutput(new FileOutputStream(assignmentsBasePath + "assignment_" + assignmentId + "_generic_results.json")); + + int tasksSize = tasks.size(); + int batchCount = 0; + int taskCount = 0; + List> callableTasks = new ArrayList<>(FileUtils.jsonBatchSize); + + // Start loading and checking urls. + for ( Task task : tasks ) + { + callableTasks.add(() -> { + String id = task.getId(); + String url = task.getUrl(); + + if ( (url = LoaderAndChecker.handleUrlChecks(id, url)) == null ) { + return false; + } // The "url" might have changed (inside "handleUrlChecks()"). + + String urlToCheck = url; + String sourceUrl = urlToCheck; // Hold it here for the logging-messages. + if ( !sourceUrl.contains("#/") && (urlToCheck = URLCanonicalizer.getCanonicalURL(sourceUrl, null, StandardCharsets.UTF_8)) == null ) { + logger.warn("Could not canonicalize url: " + sourceUrl); + UrlUtils.logOutputData(id, sourceUrl, null, "unreachable", "Discarded at loading time, due to canonicalization's problems.", null, true, "true", "false", "false", "false"); + LoaderAndChecker.connProblematicUrls.incrementAndGet(); + return false; + } + + if ( UrlUtils.docOrDatasetUrlsWithIDs.containsKey(url) ) { // If we got into an already-found docUrl, log it and return. + ConnSupportUtils.handleReCrossedDocUrl(id, url, url, url, logger, true); + return true; + } + + boolean isPossibleDocOrDatasetUrl = false; // Used for specific connection settings. + String lowerCaseRetrievedUrl = url.toLowerCase(); + // Check if it's a possible-DocUrl, if so, this info will be used for optimal web-connection later. + if ( (LoaderAndChecker.retrieveDocuments && LoaderAndChecker.DOC_URL_FILTER.matcher(lowerCaseRetrievedUrl).matches()) + || (LoaderAndChecker.retrieveDatasets && LoaderAndChecker.DATASET_URL_FILTER.matcher(lowerCaseRetrievedUrl).matches()) ) { + //logger.debug("Possible docUrl or datasetUrl: " + url); + isPossibleDocOrDatasetUrl = true; + } + + try { // Check if it's a docUrl, if not, it gets crawled. + HttpConnUtils.connectAndCheckMimeType(id, sourceUrl, urlToCheck, urlToCheck, null, true, isPossibleDocOrDatasetUrl); + } catch (Exception e) { + String wasUrlValid = "true"; + if ( e instanceof RuntimeException ) { + String message = e.getMessage(); + if ( (message != null) && message.contains("HTTP 404 Client Error") ) + wasUrlValid = "false"; + } + UrlUtils.logOutputData(id, urlToCheck, null, "unreachable", "Discarded at loading time, due to connectivity problems.", null, true, "true", wasUrlValid, "false", "false"); + } + return true; + }); + + if ( ((++taskCount) >= FileUtils.jsonBatchSize) || (taskCount >= tasksSize) ) + { + logger.info("Batch counter: " + (++batchCount) + " | progress: " + PublicationsRetriever.df.format(((batchCount-1) * taskCount) * 100.0 / tasksSize) + "% | every batch contains " + FileUtils.jsonBatchSize + " id-url pairs."); + LoaderAndChecker.invokeAllTasksAndWait(callableTasks); + addUrlReportsToWorkerReport(); + callableTasks = new ArrayList<>(FileUtils.jsonBatchSize); // Reset the thread-tasks-list for the next batch. + } + }// end tasks-for-loop + } + + + public static void addUrlReportsToWorkerReport() + { + for ( DataToBeLogged data : FileUtils.dataToBeLoggedList ) + { + String status = null, fileLocation = null, md5 = null; + Long size = null; + if ( data.getWasDocumentOrDatasetAccessible().equals("true") ) + { + status = "accessible"; + fileLocation = data.getComment(); + if ( fileLocation.contains(UrlUtils.alreadyDownloadedByIDMessage) ) { + // The file of this docUrl was already downloaded by another docUrl. + String previousId = fileLocation.substring(UrlUtils.alreadyDownloadedByIDMessage.length() +1); + //logger.debug("previousId: " + previousId); // DEBUG! + // Search that ID inside the list and if that instance gave the docUrl (there might be multiple ID instances) then get the file-location. + for ( DataToBeLogged data_2 : FileUtils.dataToBeLoggedList ) { + if ( data_2.getUrlId().equals(previousId) && data_2.getWasDocumentOrDatasetAccessible().equals("true") ) { + fileLocation = data_2.getComment(); + break; + } + } + } + else if ( fileLocation.contains("DocFileNotRetrievedException") ) + fileLocation = null; + + if ( fileLocation != null ) { + try { + File docFile = new File(fileLocation); + if ( docFile.isFile() ) { + md5 = Files.hash(docFile, Hashing.md5()).toString(); // These hashing functions are deprecated, but just to inform us that MD5 is not secure. Luckily, we use MD5 just to identify duplicate files. + //logger.debug("MD5 for file \"" + docFile.getName() + "\": " + md5); // DEBUG! + size = java.nio.file.Files.size(Paths.get(fileLocation)); + //logger.debug("Size of file \"" + docFile.getName() + "\": " + size); // DEBUG! + } else + logger.error("No file was found with path: " + fileLocation); + } catch (Exception e) { + if ( md5 == null ) + logger.error("Could not retrieve the MD5-hash for the file: " + fileLocation); + + if ( size == null ) + logger.error("Could not retrieve the size of the file: " + fileLocation); + + e.printStackTrace(); + } + } else + fileLocation = "File not retrieved"; + } else + status = "non-accessible"; + + Payload payload = new Payload(data.getUrlId(), data.getSourceUrl(), data.getDocOrDatasetUrl(), new Date(), "application/pdf", size, "more_info", md5, fileLocation, null); + AssignmentHandler.urlReports.add(new UrlReport(status, payload)); + } + FileUtils.dataToBeLoggedList.clear(); // Empty the list, to be re-populated by the next batch / assignment. + } + + + public static boolean connectWithUrlTest(String urlToCheck) { + try { + return HttpConnUtils.connectAndCheckMimeType("null", urlToCheck, urlToCheck, urlToCheck, null, true, false); // Sent the < null > in quotes to avoid an NPE in the concurrent data-structures. + } catch (Exception e) { + UrlUtils.logOutputData(null, urlToCheck, null, "unreachable", "Discarded at loading time, due to connectivity problems.", null, true, "true", "true", "false", "false"); + return false; + } + } +} diff --git a/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java b/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java index 2e420d9..bb155b7 100644 --- a/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java +++ b/src/main/java/eu/openaire/urls_worker/util/AssignmentHandler.java @@ -6,10 +6,15 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import eu.openaire.urls_worker.models.Assignment; import eu.openaire.urls_worker.models.Task; +import eu.openaire.urls_worker.models.UrlReport; import eu.openaire.urls_worker.payloads.requests.AssignmentRequest; +import eu.openaire.urls_worker.payloads.responces.WorkerReport; +import eu.openaire.urls_worker.plugins.publicationsRetriever.PublicationsRetrieverPlugin; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.web.client.RestTemplateBuilder; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; import org.springframework.web.client.RestTemplate; import java.util.*; @@ -21,6 +26,9 @@ public class AssignmentHandler { public static boolean isAvailableForWork = true; + public static List urlReports = null; + + public static Assignment requestAssignment() { RestTemplate restTemplate = new RestTemplateBuilder().build(); @@ -80,21 +88,46 @@ public class AssignmentHandler { tasksForPlugins.put(task.getDatasource().getId(), task); } + urlReports = new ArrayList<>(tasks.size()); // Define the new UrlReport-list. It is reinitialized each time. + + // TODO - Decide which tasks run with what plugin (depending on their datasource). + // First run -in parallel- the tasks which require some specific plugin. // Then run the remaining tasks in the generic plugin (which handles parallelism itself). + // For now, let's just run all tasks in the generic plugin. - - - // Check availability-endpoint using Thread-sleep here. DEBUG! try { - Thread.sleep(30_000); // 30 seconds - } catch (InterruptedException ie) { - logger.warn("Sleeping interrupted!"); + PublicationsRetrieverPlugin.processTasks(assignment.getAssignmentId(), tasksForPlugins.values()); + } catch (Exception e) { + logger.error(e.getMessage(), e); } + postWorkerReport(assignment.getAssignmentId()); + isAvailableForWork = true; } + + public static boolean postWorkerReport(int assignmentId) + { + RestTemplate restTemplate = new RestTemplateBuilder().build(); + String url = "http://localhost:8080/api/urls/addWorkerReport"; + + try { + ResponseEntity responseEntity = restTemplate.postForEntity(url, new WorkerReport(WorkerConstants.WORKER_ID, assignmentId, urlReports), String.class); + int responseCode = responseEntity.getStatusCode().value(); + if ( responseCode != HttpStatus.OK.value() ) { + logger.error("Connection problem with the submission of the WorkerReport of assignment_" + assignmentId + " to the Controller. Error-code was: " + responseCode); + return false; + } + } catch (Exception e) { + logger.error("Error when submitting the WorkerReport of assignment_" + assignmentId + " to the Controller: ", e); + return false; + } + + return true; + } + } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 7b089d6..94f58f6 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -16,10 +16,12 @@ server.port = 8081 server.servlet.context-path=/api # LOGGING LEVELS -spring.output.ansi.enabled=always +logging.config=classpath:logback-spring.xml logging.level.root=WARN logging.level.org.springframework.web=INFO logging.level.eu.openaire.urls_worker=DEBUG +logging.level.eu.openaire.publications_retriever=DEBUG +spring.output.ansi.enabled=always ## MULTIPART (MultipartProperties)