UrlsWorker/src/main/java/eu/openaire/urls_worker/plugins/PublicationsRetrieverPlugin...

201 lines
11 KiB
Java

package eu.openaire.urls_worker.plugins;
import edu.uci.ics.crawler4j.url.URLCanonicalizer;
import eu.openaire.publications_retriever.PublicationsRetriever;
import eu.openaire.publications_retriever.exceptions.DocFileNotRetrievedException;
import eu.openaire.publications_retriever.util.file.FileUtils;
import eu.openaire.publications_retriever.util.file.S3ObjectStoreMinIO;
import eu.openaire.publications_retriever.util.http.ConnSupportUtils;
import eu.openaire.publications_retriever.util.http.HttpConnUtils;
import eu.openaire.publications_retriever.util.url.DataToBeLogged;
import eu.openaire.publications_retriever.util.url.LoaderAndChecker;
import eu.openaire.publications_retriever.util.url.UrlUtils;
import eu.openaire.urls_worker.models.Assignment;
import eu.openaire.urls_worker.models.Error;
import eu.openaire.urls_worker.models.Payload;
import eu.openaire.urls_worker.models.UrlReport;
import eu.openaire.urls_worker.util.AssignmentHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
public class PublicationsRetrieverPlugin {
private static final Logger logger = LoggerFactory.getLogger(PublicationsRetrieverPlugin.class);
private static final String workingDir = System.getProperty("user.dir") + File.separator;
private static String assignmentsBasePath = workingDir + "assignments" + File.separator;
private static String assignmentsBaseFullTextsPath = assignmentsBasePath + "fullTexts" + File.separator;
static {
File assignmentsDir = new File(assignmentsBaseFullTextsPath);
if ( !assignmentsDir.exists() ) {
if ( !assignmentsDir.mkdirs() ) { // Create the directory.
logger.error("Could not create the \"assignments directories\": \"" + assignmentsBaseFullTextsPath + "\". Using the \"workingDir\" instead (" + workingDir + ").");
assignmentsBasePath = workingDir;
assignmentsBaseFullTextsPath = assignmentsBasePath;
}
}
// Specify some configurations
LoaderAndChecker.retrieveDocuments = true;
LoaderAndChecker.retrieveDatasets = false;
FileUtils.shouldDownloadDocFiles = true;
FileUtils.shouldUploadFilesToS3 = true;
FileUtils.docFileNameType = FileUtils.DocFileNameType.idName;
PublicationsRetriever.targetUrlType = "docUrl";
if ( FileUtils.shouldUploadFilesToS3 )
new S3ObjectStoreMinIO(); // Check here on how to create the credentials-file: https://github.com/LSmyrnaios/PublicationsRetriever/blob/master/README.md
int workerThreadsCount = Runtime.getRuntime().availableProcessors() * PublicationsRetriever.threadsMultiplier;
logger.info("Use " + workerThreadsCount + " worker-threads.");
PublicationsRetriever.executor = Executors.newFixedThreadPool(workerThreadsCount);
}
public static void processAssignments(Long assignmentRequestCounter, Collection<Assignment> assignments) throws RuntimeException, FileNotFoundException
{
ConnSupportUtils.setKnownMimeTypes();
FileUtils.storeDocFilesDir = assignmentsBaseFullTextsPath + "assignment_" + assignmentRequestCounter + "_fullTexts" + File.separator; // It needs the last separator, because of how the docFiles are named and stored.
FileUtils.setOutput(new FileOutputStream(assignmentsBasePath + "assignment_" + assignmentRequestCounter + "_generic_results.json"));
int tasksSize = assignments.size();
int batchCount = 0;
int taskCount = 0;
List<Callable<Boolean>> callableTasks = new ArrayList<>(FileUtils.jsonBatchSize);
// Start loading and checking urls.
for ( Assignment assignment : assignments )
{
callableTasks.add(() -> {
String id = assignment.getId();
String url = assignment.getOriginalUrl();
if ( (url = LoaderAndChecker.handleUrlChecks(id, url)) == null ) {
return false;
} // The "url" might have changed (inside "handleUrlChecks()").
String urlToCheck = url;
String sourceUrl = urlToCheck; // Hold it here for the logging-messages.
if ( !sourceUrl.contains("#/") && (urlToCheck = URLCanonicalizer.getCanonicalURL(sourceUrl, null, StandardCharsets.UTF_8)) == null ) {
logger.warn("Could not canonicalize url: " + sourceUrl);
UrlUtils.logOutputData(id, sourceUrl, null, "unreachable", "Discarded at loading time, due to canonicalization's problems.", null, true, "true", "false", "false", "false", "false", null, null);
LoaderAndChecker.connProblematicUrls.incrementAndGet();
return false;
}
if ( UrlUtils.docOrDatasetUrlsWithIDs.containsKey(url) ) { // If we got into an already-found docUrl, log it and return.
ConnSupportUtils.handleReCrossedDocUrl(id, url, url, url, logger, true);
return true;
}
boolean isPossibleDocOrDatasetUrl = false; // Used for specific connection settings.
String lowerCaseRetrievedUrl = url.toLowerCase();
// Check if it's a possible-DocUrl, if so, this info will be used for optimal web-connection later.
if ( (LoaderAndChecker.retrieveDocuments && LoaderAndChecker.DOC_URL_FILTER.matcher(lowerCaseRetrievedUrl).matches())
|| (LoaderAndChecker.retrieveDatasets && LoaderAndChecker.DATASET_URL_FILTER.matcher(lowerCaseRetrievedUrl).matches()) ) {
//logger.debug("Possible docUrl or datasetUrl: " + url);
isPossibleDocOrDatasetUrl = true;
}
try { // Check if it's a docUrl, if not, it gets crawled.
HttpConnUtils.connectAndCheckMimeType(id, sourceUrl, urlToCheck, urlToCheck, null, true, isPossibleDocOrDatasetUrl);
} catch (Exception e) {
List<String> list = LoaderAndChecker.getWasValidAndCouldRetry(e);
String wasUrlValid = list.get(0);
String couldRetry = list.get(1);
UrlUtils.logOutputData(id, urlToCheck, null, "unreachable", "Discarded at loading time, due to connectivity problems.", null, true, "true", wasUrlValid, "false", "false", couldRetry, null, null);
return false;
}
return true;
});
if ( ((++taskCount) >= FileUtils.jsonBatchSize) || (taskCount >= tasksSize) )
{
logger.info("Batch counter: " + (++batchCount) + " | progress: " + PublicationsRetriever.df.format(((batchCount-1) * taskCount) * 100.0 / tasksSize) + "% | every batch contains " + FileUtils.jsonBatchSize + " id-url pairs.");
LoaderAndChecker.invokeAllTasksAndWait(callableTasks);
addUrlReportsToWorkerReport();
callableTasks.clear(); // Reset the thread-tasks-list for the next batch.
}
}// end tasks-for-loop
}
private static final String DocFileNotRetrievedExceptionName = DocFileNotRetrievedException.class.getName(); // Keep it here for easily spot if the exception changes inside the PublicationsRetriever library.
public static void addUrlReportsToWorkerReport()
{
for ( DataToBeLogged data : FileUtils.dataToBeLoggedList )
{
String status = null, fileLocation = null, comment = data.getComment(), mimeType = null;
Error error = null;
if ( data.getWasDocumentOrDatasetAccessible().equals("true") )
{
status = "accessible";
if ( comment.contains(UrlUtils.alreadyDownloadedByIDMessage) ) {
// The file of this docUrl was already downloaded by another docUrl.
String previousId = comment.substring(UrlUtils.alreadyDownloadedByIDMessage.length() +1);
//logger.debug("previousId: " + previousId); // DEBUG!
// Search that ID inside the list and if that instance gave the docUrl (there might be multiple ID instances) then get the file-location.
for ( DataToBeLogged data_2 : FileUtils.dataToBeLoggedList ) {
if ( data_2.getUrlId().equals(previousId) && data_2.getWasDocumentOrDatasetAccessible().equals("true") ) {
fileLocation = data_2.getComment();
mimeType = "application/pdf";
break;
}
}
}
else if ( comment.contains(DocFileNotRetrievedExceptionName) )
fileLocation = "File not retrieved";
else {
fileLocation = comment;
mimeType = "application/pdf";
}
error = new Error(null, null); // We do not want to send a "null" object, since it just adds more complicated handling in the controller..
}
else {
status = "non-accessible";
if ( data.getCouldRetry().equals("true") )
error = new Error(Error.ErrorType.couldRetry, comment);
else
error = new Error(Error.ErrorType.noRetry, comment);
}
String docOrDatasetUrl = data.getDocOrDatasetUrl();
if ( docOrDatasetUrl.equals(UrlUtils.unreachableDocOrDatasetUrlIndicator) || docOrDatasetUrl.equals(UrlUtils.duplicateUrlIndicator) )
docOrDatasetUrl = null;
Payload payload = new Payload(data.getUrlId(), data.getSourceUrl(), docOrDatasetUrl, new Date(), mimeType, data.getSize(), data.getHash(), fileLocation, "crawl:PublicationsRetriever");
// TODO - If support is added for other doc-formats other than "pdf", then make sure the "mime_type" is correctly specified.
AssignmentHandler.urlReports.add(new UrlReport(status, payload, error));
}
FileUtils.dataToBeLoggedList.clear(); // Empty the list, to be re-populated by the next batch / assignment.
}
public static boolean connectWithUrlTest(String urlToCheck) {
try {
return HttpConnUtils.connectAndCheckMimeType("null", urlToCheck, urlToCheck, urlToCheck, null, true, false); // Sent the < null > in quotes to avoid an NPE in the concurrent data-structures.
} catch (Exception e) {
List<String> list = LoaderAndChecker.getWasValidAndCouldRetry(e);
String wasUrlValid = list.get(0);
String couldRetry = list.get(1);
UrlUtils.logOutputData(null, urlToCheck, null, "unreachable", "Discarded at loading time, due to connectivity problems.", null, true, "true", wasUrlValid, "false", "false", couldRetry, null, null);
return false;
}
}
}