217 lines
12 KiB
Java
217 lines
12 KiB
Java
package eu.openaire.urls_worker.plugins;
|
|
|
|
import edu.uci.ics.crawler4j.url.URLCanonicalizer;
|
|
import eu.openaire.publications_retriever.PublicationsRetriever;
|
|
import eu.openaire.publications_retriever.util.file.FileUtils;
|
|
import eu.openaire.publications_retriever.util.http.ConnSupportUtils;
|
|
import eu.openaire.publications_retriever.util.http.HttpConnUtils;
|
|
import eu.openaire.publications_retriever.util.url.DataToBeLogged;
|
|
import eu.openaire.publications_retriever.util.url.LoaderAndChecker;
|
|
import eu.openaire.publications_retriever.util.url.UrlUtils;
|
|
import eu.openaire.urls_worker.UrlsWorkerApplication;
|
|
import eu.openaire.urls_worker.models.Assignment;
|
|
import eu.openaire.urls_worker.models.Error;
|
|
import eu.openaire.urls_worker.models.Payload;
|
|
import eu.openaire.urls_worker.models.UrlReport;
|
|
import eu.openaire.urls_worker.services.FileStorageService;
|
|
import eu.openaire.urls_worker.util.AssignmentsHandler;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
import java.io.*;
|
|
import java.nio.charset.StandardCharsets;
|
|
import java.sql.Timestamp;
|
|
import java.util.*;
|
|
import java.util.concurrent.Callable;
|
|
import java.util.concurrent.Executors;
|
|
|
|
|
|
public class PublicationsRetrieverPlugin {
|
|
|
|
private static final Logger logger = LoggerFactory.getLogger(PublicationsRetrieverPlugin.class);
|
|
|
|
public static String assignmentsBasePath;
|
|
|
|
|
|
public PublicationsRetrieverPlugin() {
|
|
// Specify some configurations
|
|
LoaderAndChecker.retrieveDocuments = true;
|
|
LoaderAndChecker.retrieveDatasets = false;
|
|
ConnSupportUtils.setKnownMimeTypes();
|
|
FileUtils.shouldDownloadDocFiles = true;
|
|
FileUtils.docFileNameType = FileUtils.DocFileNameType.idName;
|
|
PublicationsRetriever.targetUrlType = "docUrl";
|
|
FileUtils.jsonBatchSize = UrlsWorkerApplication.maxAssignmentsLimitPerBatch;
|
|
|
|
assignmentsBasePath = FileStorageService.assignmentsLocation.toString();
|
|
if ( !assignmentsBasePath.endsWith(File.separator) )
|
|
assignmentsBasePath += File.separator;
|
|
|
|
ConnSupportUtils.shouldBlockMost5XXDomains = false;
|
|
LoaderAndChecker.setCouldRetryRegex();
|
|
|
|
PublicationsRetriever.threadsMultiplier = 6;
|
|
int workerThreadsCount = Runtime.getRuntime().availableProcessors() * PublicationsRetriever.threadsMultiplier;
|
|
logger.info("Use " + workerThreadsCount + " worker-threads.");
|
|
PublicationsRetriever.executor = Executors.newFixedThreadPool(workerThreadsCount);
|
|
}
|
|
|
|
private static final List<Callable<Boolean>> callableTasks = new ArrayList<>(FileUtils.jsonBatchSize);
|
|
|
|
public static void processAssignments(Long assignmentRequestCounter, Collection<Assignment> assignments) throws RuntimeException
|
|
{
|
|
FileUtils.storeDocFilesDir = assignmentsBasePath + "assignments_" + assignmentRequestCounter + "_fullTexts" + File.separator; // It needs the last separator, because of how the docFiles are named and stored.
|
|
|
|
File curAssignmentsDirs = new File(FileUtils.storeDocFilesDir);
|
|
try {
|
|
if ( !curAssignmentsDirs.exists() ) {
|
|
if ( !curAssignmentsDirs.mkdirs() ) { // Create the directories.
|
|
String workingDir = System.getProperty("user.dir") + File.separator;
|
|
logger.error("Could not create the \"assignments_fullTexts directories\": \"" + FileUtils.storeDocFilesDir + "\". Using the \"workingDir\" instead (" + workingDir + ").");
|
|
FileUtils.storeDocFilesDir = assignmentsBasePath = workingDir;
|
|
}
|
|
}
|
|
} catch (Exception e) {
|
|
String errorMsg = "Failed to create the full-texts directory for assignments_" + assignmentRequestCounter;
|
|
logger.error(errorMsg, e);
|
|
throw new RuntimeException(errorMsg + ": " + e.getMessage());
|
|
}
|
|
|
|
// Start loading and checking urls.
|
|
for ( Assignment assignment : assignments )
|
|
{
|
|
callableTasks.add(() -> {
|
|
String id = assignment.getId();
|
|
String url = assignment.getOriginalUrl();
|
|
|
|
if ( (url = LoaderAndChecker.handleUrlChecks(id, url)) == null ) {
|
|
return false;
|
|
} // The "url" might have changed (inside "handleUrlChecks()").
|
|
|
|
String urlToCheck = url;
|
|
String sourceUrl = urlToCheck; // Hold it here for the logging-messages.
|
|
if ( !sourceUrl.contains("#/") && (urlToCheck = URLCanonicalizer.getCanonicalURL(sourceUrl, null, StandardCharsets.UTF_8)) == null ) {
|
|
logger.warn("Could not canonicalize url: " + sourceUrl);
|
|
UrlUtils.logOutputData(id, sourceUrl, null, "unreachable", "Discarded at loading time, due to canonicalization's problems.", null, true, "true", "false", "false", "false", "false", null, null);
|
|
LoaderAndChecker.connProblematicUrls.incrementAndGet();
|
|
return false;
|
|
}
|
|
|
|
if ( UrlUtils.docOrDatasetUrlsWithIDs.containsKey(url) ) { // If we got into an already-found docUrl, log it and return.
|
|
ConnSupportUtils.handleReCrossedDocUrl(id, url, url, url, logger, true);
|
|
return true;
|
|
}
|
|
|
|
boolean isPossibleDocOrDatasetUrl = false; // Used for specific connection settings.
|
|
String lowerCaseRetrievedUrl = url.toLowerCase();
|
|
// Check if it's a possible-DocUrl, if so, this info will be used for optimal web-connection later.
|
|
if ( (LoaderAndChecker.retrieveDocuments && LoaderAndChecker.DOC_URL_FILTER.matcher(lowerCaseRetrievedUrl).matches())
|
|
|| (LoaderAndChecker.retrieveDatasets && LoaderAndChecker.DATASET_URL_FILTER.matcher(lowerCaseRetrievedUrl).matches()) ) {
|
|
//logger.debug("Possible docUrl or datasetUrl: " + url);
|
|
isPossibleDocOrDatasetUrl = true;
|
|
}
|
|
|
|
try { // Check if it's a docUrl, if not, it gets crawled.
|
|
HttpConnUtils.connectAndCheckMimeType(id, sourceUrl, urlToCheck, urlToCheck, null, true, isPossibleDocOrDatasetUrl);
|
|
} catch (Exception e) {
|
|
List<String> list = LoaderAndChecker.getWasValidAndCouldRetry(e);
|
|
String wasUrlValid = list.get(0);
|
|
String couldRetry = list.get(1);
|
|
UrlUtils.logOutputData(id, urlToCheck, null, "unreachable", "Discarded at loading time, due to connectivity problems.", null, true, "true", wasUrlValid, "false", "false", couldRetry, null, null);
|
|
return false;
|
|
}
|
|
return true;
|
|
});
|
|
}
|
|
|
|
int numFailedTasks = LoaderAndChecker.invokeAllTasksAndWait(callableTasks);
|
|
if ( numFailedTasks > 0 )
|
|
logger.warn(numFailedTasks + " tasks failed, from assignments_" + assignmentRequestCounter);
|
|
addUrlReportsToWorkerReport();
|
|
callableTasks.clear(); // Reset the thread-tasks-list for the next batch.
|
|
|
|
UrlUtils.docOrDatasetUrlsWithIDs.clear(); // This HashTable is useful only for a single assignments-batch.
|
|
// In the next batch, the previously stored files might have been already uploaded by the Controller and deleted by the worker. Also, they will be stored in a different directory anyway.
|
|
}
|
|
|
|
|
|
public static void addUrlReportsToWorkerReport()
|
|
{
|
|
Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records.
|
|
|
|
for ( DataToBeLogged data : FileUtils.dataToBeLoggedList )
|
|
{
|
|
UrlReport.StatusType status = null;
|
|
String fileLocation = null, comment = data.getComment(), mimeType = null, hash = data.getHash();
|
|
Long size = data.getSize();
|
|
Error error = null;
|
|
|
|
if ( "true".equals(data.getWasDocumentOrDatasetAccessible()) ) // The reversed order defends against a potential NPE.
|
|
{
|
|
status = UrlReport.StatusType.accessible;
|
|
if ( comment.contains(UrlUtils.alreadyDownloadedByIDMessage) ) {
|
|
// The file of this docUrl was already downloaded by another docUrl.
|
|
String initialId = comment.substring(UrlUtils.alreadyDownloadedByIDMessage.length()); // The fileName starts right after the "message".
|
|
//logger.debug("initialId: " + initialId); // DEBUG!
|
|
// Search that ID inside the list and if that instance gave the docUrl (there might be multiple ID instances) then get the file-location.
|
|
for ( DataToBeLogged data_2 : FileUtils.dataToBeLoggedList ) {
|
|
if ( data_2.getUrlId().equals(initialId) && data_2.getWasDocumentOrDatasetAccessible().equals("true") ) {
|
|
fileLocation = data_2.getComment();
|
|
size = data_2.getSize();
|
|
hash = data_2.getHash();
|
|
mimeType = "application/pdf"; // TODO - If support is added for other doc-formats other than "pdf", then make sure the "mime_type" is correctly specified.
|
|
break;
|
|
}
|
|
}
|
|
// TODO - The case where the "twin-ID" is not found, should "never" happen. But should we check? How to handle if that is the case..?
|
|
}
|
|
else if ( ! comment.equals(HttpConnUtils.docFileNotRetrievedMessage) ) { // If it was downloaded without an error.
|
|
fileLocation = comment; // This is the full-file-path.
|
|
mimeType = "application/pdf";
|
|
} else // Else the file was not retrieved, so all file-related data are kept "null".
|
|
error = new Error(Error.ErrorType.couldRetry, comment); // We can still try to download it in the future.
|
|
|
|
if ( error == null ) // If the file was retrieved, in any time.
|
|
error = new Error(Error.ErrorType.couldRetry, null); // We do not want to send a "null" object, since it just adds more complicated handling in the controller..
|
|
}
|
|
else {
|
|
status = UrlReport.StatusType.non_accessible;
|
|
if ( "true".equals(data.getCouldRetry()) )
|
|
error = new Error(Error.ErrorType.couldRetry, comment);
|
|
else
|
|
error = new Error(Error.ErrorType.noRetry, comment);
|
|
}
|
|
|
|
String docOrDatasetUrl = data.getDocOrDatasetUrl();
|
|
if ( docOrDatasetUrl.equals(UrlUtils.unreachableDocOrDatasetUrlIndicator) || docOrDatasetUrl.equals(UrlUtils.duplicateUrlIndicator) )
|
|
docOrDatasetUrl = null;
|
|
|
|
// Cleanup some data.
|
|
if ( (size != null) && (size == 0L) )
|
|
size = null;
|
|
|
|
if ( (hash != null) && (hash.equals("null")) )
|
|
hash = null;
|
|
|
|
Payload payload = new Payload(data.getUrlId(), data.getSourceUrl(), docOrDatasetUrl, timestamp, mimeType, size, hash, fileLocation, "crawl:PublicationsRetriever");
|
|
// TODO - If support is added for other doc-formats other than "pdf", then make sure the "mime_type" is correctly specified.
|
|
|
|
AssignmentsHandler.urlReports.add(new UrlReport(status, payload, error));
|
|
}// end-for
|
|
FileUtils.dataToBeLoggedList.clear(); // Empty the list, to be re-populated by the next batch / assignment.
|
|
}
|
|
|
|
|
|
public static boolean connectWithUrlTest(String urlToCheck) {
|
|
try {
|
|
return HttpConnUtils.connectAndCheckMimeType("null", urlToCheck, urlToCheck, urlToCheck, null, true, false); // Sent the < null > in quotes to avoid an NPE in the concurrent data-structures.
|
|
} catch (Exception e) {
|
|
List<String> list = LoaderAndChecker.getWasValidAndCouldRetry(e);
|
|
String wasUrlValid = list.get(0);
|
|
String couldRetry = list.get(1);
|
|
UrlUtils.logOutputData(null, urlToCheck, null, "unreachable", "Discarded at loading time, due to connectivity problems.", null, true, "true", wasUrlValid, "false", "false", couldRetry, null, null);
|
|
return false;
|
|
}
|
|
}
|
|
}
|