UrlsWorker/src/main/java/eu/openaire/urls_worker/components/plugins/PublicationsRetrieverPlugin...

293 lines
18 KiB
Java

package eu.openaire.urls_worker.components.plugins;
import eu.openaire.publications_retriever.PublicationsRetriever;
import eu.openaire.publications_retriever.util.file.FileUtils;
import eu.openaire.publications_retriever.util.http.ConnSupportUtils;
import eu.openaire.publications_retriever.util.http.HttpConnUtils;
import eu.openaire.publications_retriever.util.url.DataToBeLogged;
import eu.openaire.publications_retriever.util.url.LoaderAndChecker;
import eu.openaire.publications_retriever.util.url.UrlUtils;
import eu.openaire.urls_worker.components.AssignmentsHandler;
import eu.openaire.urls_worker.controllers.GeneralController;
import eu.openaire.urls_worker.models.Assignment;
import eu.openaire.urls_worker.models.Error;
import eu.openaire.urls_worker.models.Payload;
import eu.openaire.urls_worker.models.UrlReport;
import eu.openaire.urls_worker.services.FileStorageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.File;
import java.net.CookieStore;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
@Component
public class PublicationsRetrieverPlugin {
private static final Logger logger = LoggerFactory.getLogger(PublicationsRetrieverPlugin.class);
public static String assignmentsBasePath;
private static String workerId;
private static CookieStore cookieStore = null;
public PublicationsRetrieverPlugin(@Value("${info.workerId}")String workerId, @Value("${info.maxAssignmentsLimitPerBatch}") int maxAssignmentsLimitPerBatch, FileStorageService fileStorageService) {
// Specify some configurations
this.workerId = workerId;
LoaderAndChecker.retrieveDocuments = true;
LoaderAndChecker.retrieveDatasets = false;
ConnSupportUtils.setKnownMimeTypes();
FileUtils.shouldDownloadDocFiles = true;
FileUtils.docFileNameType = FileUtils.DocFileNameType.idName;
PublicationsRetriever.targetUrlType = "docUrl";
FileUtils.jsonBatchSize = maxAssignmentsLimitPerBatch;
assignmentsBasePath = fileStorageService.assignmentsBaseLocation;
ConnSupportUtils.shouldBlockMost5XXDomains = false; // If this is "true", all but the "503" will be blocked. Otherwise, only the "511" will be blocked.
LoaderAndChecker.setCouldRetryRegex();
cookieStore = HttpConnUtils.cookieManager.getCookieStore();
int availableProcessors = Runtime.getRuntime().availableProcessors();
if ( availableProcessors <= 4 )
PublicationsRetriever.threadsMultiplier = 10;
else
PublicationsRetriever.threadsMultiplier = 6;
int workerThreadsCount = (availableProcessors * PublicationsRetriever.threadsMultiplier);
logger.info("Use " + workerThreadsCount + " worker-threads.");
PublicationsRetriever.executor = Executors.newFixedThreadPool(workerThreadsCount);
}
private static final List<Callable<Boolean>> callableTasks = new ArrayList<>(FileUtils.jsonBatchSize);
public void processAssignments(Long assignmentRequestCounter, Collection<Assignment> assignments) throws RuntimeException
{
// At this point, the "assignmentsBasePath"-directory has already been successfully created.
String currentAssignmentsSubDir = "assignments_" + assignmentRequestCounter + "_fullTexts" + File.separator;
FileUtils.storeDocFilesDir = assignmentsBasePath + currentAssignmentsSubDir; // It needs the last separator, because of how the docFiles are named and stored.
File curAssignmentsDirs = new File(FileUtils.storeDocFilesDir);
try {
if ( ! curAssignmentsDirs.mkdirs() ) // Create the subdirectory. It may return "false", in case it already exists (will not happen, unless there is problem in cleaning up of the subdirectories) or any problem occurred.
throw new RuntimeException("Could not create the \"assignments_" + assignmentRequestCounter + "_fullTexts\" directories: \"" + FileUtils.storeDocFilesDir + "\"!");
} catch (Exception e) { // Mainly a SecurityException.
throw new RuntimeException("Failed to create the full-texts directory for assignments_" + assignmentRequestCounter + ": " + e.getMessage());
}
// Start loading and checking urls.
for ( Assignment assignment : assignments )
{
callableTasks.add(() -> {
String id = assignment.getId();
String url = assignment.getOriginalUrl();
if ( (id == null) || id.isEmpty() || (url == null) || url.isEmpty() ) {
String errorMsg = "Got null or empty pair! ID=" + id + " , url=" + url;
logger.warn(errorMsg);
UrlUtils.logOutputData(id, url, null, UrlUtils.unreachableDocOrDatasetUrlIndicator, "Discarded at loading time, due to input problems. " + errorMsg, null, true, "true", "false", "false", "false", "false", null, null);
return false;
}
if ( (url = LoaderAndChecker.handleUrlChecks(id, url)) == null ) {
return false;
} // The "url" might have changed (inside "handleUrlChecks()").
String urlToCheck = url;
String sourceUrl = urlToCheck; // Hold it here for the logging-messages.
if ( (urlToCheck = LoaderAndChecker.basicURLNormalizer.filter(sourceUrl)) == null ) {
logger.warn("Could not normalize url: " + sourceUrl);
UrlUtils.logOutputData(id, sourceUrl, null, UrlUtils.unreachableDocOrDatasetUrlIndicator, "Discarded at loading time, due to canonicalization's problems.", null, true, "true", "false", "false", "false", "false", null, null);
LoaderAndChecker.connProblematicUrls.incrementAndGet();
return false;
}
if ( UrlUtils.docOrDatasetUrlsWithIDs.containsKey(url) ) { // If we got into an already-found docUrl, log it and return.
ConnSupportUtils.handleReCrossedDocUrl(id, url, url, url, true);
return true;
}
boolean isPossibleDocOrDatasetUrl = false; // Used for specific connection settings.
String lowerCaseRetrievedUrl = url.toLowerCase();
// Check if it's a possible-DocUrl, if so, this info will be used for optimal web-connection later.
if ( (LoaderAndChecker.retrieveDocuments && LoaderAndChecker.DOC_URL_FILTER.matcher(lowerCaseRetrievedUrl).matches())
|| (LoaderAndChecker.retrieveDatasets && LoaderAndChecker.DATASET_URL_FILTER.matcher(lowerCaseRetrievedUrl).matches()) ) {
//logger.debug("Possible docUrl or datasetUrl: " + url);
isPossibleDocOrDatasetUrl = true;
}
try { // Check if it's a docUrl, if not, it gets crawled.
HttpConnUtils.connectAndCheckMimeType(id, sourceUrl, urlToCheck, urlToCheck, null, true, isPossibleDocOrDatasetUrl);
} catch (Exception e) {
List<String> list = LoaderAndChecker.getWasValidAndCouldRetry(e, urlToCheck);
String wasUrlValid = list.get(0);
String couldRetry = list.get(1);
UrlUtils.logOutputData(id, urlToCheck, null, UrlUtils.unreachableDocOrDatasetUrlIndicator, "Discarded at loading time, due to connectivity problems.", null, true, "true", wasUrlValid, "false", "false", couldRetry, null, null);
return false;
}
return true;
});
}
int numFailedTasks = LoaderAndChecker.invokeAllTasksAndWait(callableTasks);
if ( numFailedTasks == -1 ) { // The unknown exception is logged inside the above method.
GeneralController.shouldShutdownWorker = true;
AssignmentsHandler.shouldNotRequestMore = true;
PublicationsRetriever.executor.shutdownNow(); // Close the thread-pool immediately. It will not be used again while the Worker is still running.
throw new RuntimeException("Invoking and/or executing the callableTasks failed with the exception (which is written in the log files)!");
}
if ( numFailedTasks > 0 )
logger.warn(numFailedTasks + " tasks failed, from assignments_" + assignmentRequestCounter);
addUrlReportsToWorkerReport(assignments);
callableTasks.clear(); // Reset the thread-tasks-list for the next batch.
UrlUtils.docOrDatasetUrlsWithIDs.clear(); // This HashTable is useful only for a single assignments-batch.
// In the next batch, the previously stored files might have been already uploaded by the Controller and deleted by the worker. Also, they will be stored in a different directory anyway.
ConnSupportUtils.domainsWithConnectionData.clear(); // This data is not useful for the next batch, since plenty of time will have passed before needing to check the "lastConnectedTime" for each domain, in order to apply the "politenessDelay".
//logger.debug("The number of cookies is: " + cookieStore.getCookies().size()); // debug!
boolean cookiesDeleted = cookieStore.removeAll();
//logger.debug(cookiesDeleted ? "The cookies where removed!" : "No cookies where removed!"); // DEBUG!
}
private static final int lengthOfAlreadyDownloadedFromSourceUrlContinuedMessage = ConnSupportUtils.alreadyDownloadedFromSourceUrlContinuedMessage.length();
private static final int lengthOfAlreadyDownloadedFromIDMessage = ConnSupportUtils.alreadyDownloadedFromIDMessage.length();
private static final String provenance = "crawl:PublicationsRetriever";
public static void addUrlReportsToWorkerReport(Collection<Assignment> assignments)
{
if ( FileUtils.dataToBeLoggedList.size() != assignments.size() ) {
logger.warn("The number of the results (" + FileUtils.dataToBeLoggedList.size() + ") is different from the number of the given assignments (" + assignments.size() + ")!");
} // TODO - Should any other step be taken, except from just showing the log-message?
// Index the UrlIds with the DatasourceIds for quick-search later. The datasourceIds are not included in the "DataToBeLogged" objects.
HashMap<String, String> urlIdsWithDatasourceIds = new HashMap<>(assignments.size());
for ( Assignment assignment : assignments )
urlIdsWithDatasourceIds.put(assignment.getId(), assignment.getDatasource().getId());
int numOfUnretrievedFiles = 0;
Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records.
for ( DataToBeLogged data : FileUtils.dataToBeLoggedList )
{
// TODO - Consider adding multi-thread execution for the following code.
// In that case, use "ConcurrentHashMap".
UrlReport.StatusType status = null;
String fileLocation = null, comment = data.getComment(), mimeType = null, hash = data.getHash();
Long size = data.getSize();
Error error = null;
if ( "true".equals(data.getWasDocumentOrDatasetAccessible()) ) // The reversed order defends against a potential NPE.
{
status = UrlReport.StatusType.accessible;
if ( comment.startsWith(ConnSupportUtils.alreadyDownloadedFromIDMessage, 0) ) { // If this is not the initially-found docUrl record, go search for the initial.
// The file of this docUrl was already downloaded by another docUrl.
int indexOfAlreadyDownloadedFromSourceUrlMessage = comment.indexOf(ConnSupportUtils.alreadyDownloadedFromSourceUrlContinuedMessage);
int indexOfAlreadyDownloadedFromSourceUrl = indexOfAlreadyDownloadedFromSourceUrlMessage + lengthOfAlreadyDownloadedFromSourceUrlContinuedMessage;
String initialId = comment.substring(lengthOfAlreadyDownloadedFromIDMessage, indexOfAlreadyDownloadedFromSourceUrlMessage); // The fileName starts right after the "message".
String initialSourceUrl = comment.substring(indexOfAlreadyDownloadedFromSourceUrl);
//logger.debug("initialId: " + initialId + " | sourceUrl: " + initialSourceUrl); // DEBUG!
// Search that ID and sourceUrl inside the list, if that instance is the first-found one, then get the file-data (there might be duplicate ID-sourceUrl instances, but only one of them has the file-data).
boolean foundAlreadyDownloadedFullText = false;
boolean foundIDUrlInWorkerReport = false;
for ( DataToBeLogged data_2 : FileUtils.dataToBeLoggedList )
{
if ( ! (data_2.getUrlId().equals(initialId) && (data_2.getSourceUrl().equals(initialSourceUrl))) )
continue;
// At this point we have found a record which has the same id and sourceUrl as the inspected record.
foundIDUrlInWorkerReport = true;
if ( "false".equals(data_2.getWasDocumentOrDatasetAccessible()) )
continue;
// At this point we have excluded any non-docUrl record, even if it has the same id and sourceUrl.
// It is possible, that the same sourceUrl at one time it gives the docUrl and at another it does not, due to some kind of error.
// So, we do not want to accept a record-instance which does not lead to any file, even if another instance of the same record did lead to a file.
String tempFileLocation = data_2.getComment();
if ( tempFileLocation.startsWith(ConnSupportUtils.alreadyDownloadedFromIDMessage, 0) || tempFileLocation.startsWith(HttpConnUtils.docFileNotRetrievedMessage, 0) )
continue;
// At this point we have found that another instance of the same record gives the docFile itself, not a reference to it.
fileLocation = tempFileLocation;
size = data_2.getSize();
hash = data_2.getHash();
mimeType = "application/pdf"; // TODO - If support is added for other doc-formats other than "pdf", then make sure the "mime_type" is assigned to the value provided by the plugin (it has to be added in the future).
foundAlreadyDownloadedFullText = true;
break;
}
// In case the "alreadyDownloaded" full-text is not found, we have an error. All file-related data is "null".
if ( !foundAlreadyDownloadedFullText ) {
String addErrorMessage = ((!foundIDUrlInWorkerReport) ? " | That ID-sourceUrl was not found inside the WorkerReport!" : " | The file was not downloaded!");
error = new Error(Error.ErrorType.couldRetry, comment + addErrorMessage); // We can still try to download it from the found docUrl, in the future.
// The "fileLocation" is null.
}
}
else if ( ! comment.startsWith(HttpConnUtils.docFileNotRetrievedMessage, 0) ) { // If it was downloaded without an error.
fileLocation = comment; // This is the full-file-path.
mimeType = "application/pdf";
} else { // Else the file was not retrieved, so all file-related data are kept "null".
numOfUnretrievedFiles ++;
error = new Error(Error.ErrorType.couldRetry, comment); // We can still try to download it from the found docUrl, in the future.
}
if ( error == null ) // If the file was retrieved, in any time.
error = new Error(Error.ErrorType.couldRetry, null); // We do not want to send a "null" Error-object, since it just adds more complicated handling in the Controller..
}
else {
status = UrlReport.StatusType.non_accessible;
if ( "true".equals(data.getCouldRetry()) )
error = new Error(Error.ErrorType.couldRetry, comment);
else
error = new Error(Error.ErrorType.noRetry, comment);
}
String docOrDatasetUrl = data.getDocOrDatasetUrl();
if ( docOrDatasetUrl.equals(UrlUtils.unreachableDocOrDatasetUrlIndicator) || docOrDatasetUrl.equals(UrlUtils.duplicateUrlIndicator) )
docOrDatasetUrl = null;
// Convert "null" strings to actual < null >
if ( (hash != null) && (hash.equals("null")) )
hash = null;
String urlId = data.getUrlId();
String datasourceId = urlIdsWithDatasourceIds.get(urlId);
// Each record will have the urlID, the datasourceID and possibly one filename, which may contain a different urlID.
// The Controller will select the correct datasourceID for before adding it inside the S3-ObjectStore filename.
Payload payload = new Payload(urlId, data.getSourceUrl(), docOrDatasetUrl, timestamp, mimeType, size, hash, fileLocation, provenance, datasourceId);
// TODO - If support is added for other doc-formats other than "pdf", then make sure the "mime_type" is correctly specified.
AssignmentsHandler.urlReports.add(new UrlReport(status, payload, error));
}// end-for
FileUtils.dataToBeLoggedList.clear(); // Empty the list, to be re-populated by the next batch / assignment.
if ( numOfUnretrievedFiles > 50 )
logger.warn("The number of non-retrieved files is: " + numOfUnretrievedFiles);
}
}