|
|
|
@ -1,10 +1,11 @@
|
|
|
|
|
package eu.openaire.urls_worker.components.plugins;
|
|
|
|
|
|
|
|
|
|
import eu.openaire.publications_retriever.PublicationsRetriever;
|
|
|
|
|
import eu.openaire.publications_retriever.util.args.ArgsUtils;
|
|
|
|
|
import eu.openaire.publications_retriever.util.file.FileUtils;
|
|
|
|
|
import eu.openaire.publications_retriever.util.http.ConnSupportUtils;
|
|
|
|
|
import eu.openaire.publications_retriever.util.http.HttpConnUtils;
|
|
|
|
|
import eu.openaire.publications_retriever.util.url.DataToBeLogged;
|
|
|
|
|
import eu.openaire.publications_retriever.util.url.DataForOutput;
|
|
|
|
|
import eu.openaire.publications_retriever.util.url.LoaderAndChecker;
|
|
|
|
|
import eu.openaire.publications_retriever.util.url.UrlUtils;
|
|
|
|
|
import eu.openaire.urls_worker.components.AssignmentsHandler;
|
|
|
|
@ -47,7 +48,7 @@ public class PublicationsRetrieverPlugin {
|
|
|
|
|
ConnSupportUtils.setKnownMimeTypes();
|
|
|
|
|
FileUtils.shouldDownloadDocFiles = true;
|
|
|
|
|
FileUtils.docFileNameType = FileUtils.DocFileNameType.idName;
|
|
|
|
|
PublicationsRetriever.targetUrlType = "docUrl";
|
|
|
|
|
ArgsUtils.targetUrlType = "docUrl";
|
|
|
|
|
FileUtils.jsonBatchSize = maxAssignmentsLimitPerBatch;
|
|
|
|
|
|
|
|
|
|
assignmentsBasePath = fileStorageService.assignmentsBaseLocation;
|
|
|
|
@ -59,11 +60,11 @@ public class PublicationsRetrieverPlugin {
|
|
|
|
|
|
|
|
|
|
int availableProcessors = Runtime.getRuntime().availableProcessors();
|
|
|
|
|
if ( availableProcessors <= 4 )
|
|
|
|
|
PublicationsRetriever.threadsMultiplier = 10;
|
|
|
|
|
ArgsUtils.threadsMultiplier = 10;
|
|
|
|
|
else
|
|
|
|
|
PublicationsRetriever.threadsMultiplier = 6;
|
|
|
|
|
ArgsUtils.threadsMultiplier = 6;
|
|
|
|
|
|
|
|
|
|
int workerThreadsCount = (availableProcessors * PublicationsRetriever.threadsMultiplier);
|
|
|
|
|
int workerThreadsCount = (availableProcessors * ArgsUtils.threadsMultiplier);
|
|
|
|
|
logger.info("Use " + workerThreadsCount + " worker-threads.");
|
|
|
|
|
PublicationsRetriever.executor = Executors.newFixedThreadPool(workerThreadsCount);
|
|
|
|
|
}
|
|
|
|
@ -106,7 +107,7 @@ public class PublicationsRetrieverPlugin {
|
|
|
|
|
if ( (id == null) || id.isEmpty() || (url == null) || url.isEmpty() ) {
|
|
|
|
|
String errorMsg = "Got null or empty pair! ID=" + id + " , url=" + url;
|
|
|
|
|
logger.warn(errorMsg);
|
|
|
|
|
UrlUtils.logOutputData(id, url, null, UrlUtils.unreachableDocOrDatasetUrlIndicator, "Discarded at loading time, due to input problems. " + errorMsg, null, true, "true", "false", "false", "false", "false", null, null);
|
|
|
|
|
UrlUtils.addOutputData(id, url, null, UrlUtils.unreachableDocOrDatasetUrlIndicator, "Discarded at loading time, due to input problems. " + errorMsg, null, true, "true", "false", "false", "false", "false", null, null);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -117,7 +118,7 @@ public class PublicationsRetrieverPlugin {
|
|
|
|
|
String sourceUrl = urlToCheck; // Hold it here for the logging-messages.
|
|
|
|
|
if ( (urlToCheck = LoaderAndChecker.basicURLNormalizer.filter(sourceUrl)) == null ) {
|
|
|
|
|
logger.warn("Could not normalize url: " + sourceUrl);
|
|
|
|
|
UrlUtils.logOutputData(id, sourceUrl, null, UrlUtils.unreachableDocOrDatasetUrlIndicator, "Discarded at loading time, due to canonicalization's problems.", null, true, "true", "false", "false", "false", "false", null, null);
|
|
|
|
|
UrlUtils.addOutputData(id, sourceUrl, null, UrlUtils.unreachableDocOrDatasetUrlIndicator, "Discarded at loading time, due to canonicalization's problems.", null, true, "true", "false", "false", "false", "false", null, null);
|
|
|
|
|
LoaderAndChecker.connProblematicUrls.incrementAndGet();
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
@ -143,7 +144,7 @@ public class PublicationsRetrieverPlugin {
|
|
|
|
|
String wasUrlValid = list.get(0);
|
|
|
|
|
String couldRetry = list.get(1);
|
|
|
|
|
String errorMsg = "Discarded at loading time, as " + list.get(2);
|
|
|
|
|
UrlUtils.logOutputData(id, urlToCheck, null, UrlUtils.unreachableDocOrDatasetUrlIndicator, errorMsg, null, true, "true", wasUrlValid, "false", "false", couldRetry, null, null);
|
|
|
|
|
UrlUtils.addOutputData(id, urlToCheck, null, UrlUtils.unreachableDocOrDatasetUrlIndicator, errorMsg, null, true, "true", wasUrlValid, "false", "false", couldRetry, null, null);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
return true;
|
|
|
|
@ -161,13 +162,14 @@ public class PublicationsRetrieverPlugin {
|
|
|
|
|
if ( numFailedTasks > 0 )
|
|
|
|
|
logger.warn(numFailedTasks + " tasks failed, from assignments_" + assignmentRequestCounter);
|
|
|
|
|
|
|
|
|
|
callableTasks.clear(); // Reset the thread-tasks-list for the next batch.
|
|
|
|
|
ConnSupportUtils.domainsWithConnectionData.clear(); // This data is not useful for the next batch, since plenty of time will have passed before needing to check the "lastConnectedTime" for each domain, in order to apply the "politenessDelay".
|
|
|
|
|
|
|
|
|
|
addUrlReportsToWorkerReport(assignments);
|
|
|
|
|
|
|
|
|
|
callableTasks.clear(); // Reset the thread-tasks-list for the next batch.
|
|
|
|
|
UrlUtils.docOrDatasetUrlsWithIDs.clear(); // This HashTable is useful only for a single assignments-batch.
|
|
|
|
|
// In the next batch, the previously stored files might have been already uploaded by the Controller and deleted by the worker. Also, they will be stored in a different directory anyway.
|
|
|
|
|
// In the next batch, the previously stored files might have been already delivered to the Controller and deleted by the worker. Also, they will be stored in a different directory anyway.
|
|
|
|
|
|
|
|
|
|
ConnSupportUtils.domainsWithConnectionData.clear(); // This data is not useful for the next batch, since plenty of time will have passed before needing to check the "lastConnectedTime" for each domain, in order to apply the "politenessDelay".
|
|
|
|
|
|
|
|
|
|
//logger.debug("The number of cookies is: " + cookieStore.getCookies().size()); // debug!
|
|
|
|
|
boolean cookiesDeleted = cookieStore.removeAll();
|
|
|
|
@ -183,11 +185,11 @@ public class PublicationsRetrieverPlugin {
|
|
|
|
|
|
|
|
|
|
public static void addUrlReportsToWorkerReport(Collection<Assignment> assignments)
|
|
|
|
|
{
|
|
|
|
|
if ( FileUtils.dataToBeLoggedList.size() != assignments.size() ) {
|
|
|
|
|
logger.warn("The number of the results (" + FileUtils.dataToBeLoggedList.size() + ") is different from the number of the given assignments (" + assignments.size() + ")!");
|
|
|
|
|
if ( FileUtils.dataForOutput.size() != assignments.size() ) {
|
|
|
|
|
logger.warn("The number of the results (" + FileUtils.dataForOutput.size() + ") is different from the number of the given assignments (" + assignments.size() + ")!");
|
|
|
|
|
} // TODO - Should any other step be taken, except from just showing the log-message?
|
|
|
|
|
|
|
|
|
|
// Index the UrlIds with the DatasourceIds for quick-search later. The datasourceIds are not included in the "DataToBeLogged" objects.
|
|
|
|
|
// Index the UrlIds with the DatasourceIds for quick-search later. The datasourceIds are not included in the "DataForOutput" objects.
|
|
|
|
|
HashMap<String, String> urlIdsWithDatasourceIds = new HashMap<>(assignments.size());
|
|
|
|
|
for ( Assignment assignment : assignments )
|
|
|
|
|
urlIdsWithDatasourceIds.put(assignment.getId(), assignment.getDatasource().getId());
|
|
|
|
@ -195,7 +197,7 @@ public class PublicationsRetrieverPlugin {
|
|
|
|
|
int numOfUnretrievedFiles = 0;
|
|
|
|
|
Timestamp timestamp = new Timestamp(System.currentTimeMillis()); // Store it here, in order to have the same for all current records.
|
|
|
|
|
|
|
|
|
|
for ( DataToBeLogged data : FileUtils.dataToBeLoggedList )
|
|
|
|
|
for ( DataForOutput data : FileUtils.dataForOutput )
|
|
|
|
|
{
|
|
|
|
|
// TODO - Consider adding multi-thread execution for the following code.
|
|
|
|
|
// In that case, use "ConcurrentHashMap".
|
|
|
|
@ -218,7 +220,7 @@ public class PublicationsRetrieverPlugin {
|
|
|
|
|
// Search that ID and sourceUrl inside the list, if that instance is the first-found one, then get the file-data (there might be duplicate ID-sourceUrl instances, but only one of them has the file-data).
|
|
|
|
|
boolean foundAlreadyDownloadedFullText = false;
|
|
|
|
|
boolean foundIDUrlInWorkerReport = false;
|
|
|
|
|
for ( DataToBeLogged data_2 : FileUtils.dataToBeLoggedList )
|
|
|
|
|
for ( DataForOutput data_2 : FileUtils.dataForOutput )
|
|
|
|
|
{
|
|
|
|
|
if ( ! (data_2.getUrlId().equals(initialId) && (data_2.getSourceUrl().equals(initialSourceUrl))) )
|
|
|
|
|
continue;
|
|
|
|
@ -237,7 +239,7 @@ public class PublicationsRetrieverPlugin {
|
|
|
|
|
if ( tempFileLocation.startsWith(ConnSupportUtils.alreadyDownloadedFromIDMessage, 0) || tempFileLocation.startsWith(HttpConnUtils.docFileNotRetrievedMessage, 0) )
|
|
|
|
|
continue;
|
|
|
|
|
|
|
|
|
|
// At this point we have found that another instance of the same record gives the docFile itself, not a reference to it.
|
|
|
|
|
// At this point we have found that another instance of the same record gives the docFile itself, not a reference to it, nor we have a problematic case.
|
|
|
|
|
fileLocation = tempFileLocation;
|
|
|
|
|
size = data_2.getSize();
|
|
|
|
|
hash = data_2.getHash();
|
|
|
|
@ -255,7 +257,7 @@ public class PublicationsRetrieverPlugin {
|
|
|
|
|
else if ( ! comment.startsWith(HttpConnUtils.docFileNotRetrievedMessage, 0) ) { // If it was downloaded without an error.
|
|
|
|
|
fileLocation = comment; // This is the full-file-path.
|
|
|
|
|
mimeType = "application/pdf";
|
|
|
|
|
} else { // Else the file was not retrieved, so all file-related data are kept "null".
|
|
|
|
|
} else { // Else the file was not retrieved, so the file-related data is kept "null".
|
|
|
|
|
numOfUnretrievedFiles ++;
|
|
|
|
|
error = new Error(Error.ErrorType.couldRetry, comment); // We can still try to download it from the found docUrl, in the future.
|
|
|
|
|
}
|
|
|
|
@ -290,7 +292,7 @@ public class PublicationsRetrieverPlugin {
|
|
|
|
|
|
|
|
|
|
AssignmentsHandler.urlReports.add(new UrlReport(status, payload, error));
|
|
|
|
|
}// end-for
|
|
|
|
|
FileUtils.dataToBeLoggedList.clear(); // Empty the list, to be re-populated by the next batch / assignment.
|
|
|
|
|
FileUtils.dataForOutput.clear(); // Empty the list, to be re-populated by the next batch / assignment.
|
|
|
|
|
|
|
|
|
|
if ( numOfUnretrievedFiles > 50 )
|
|
|
|
|
logger.warn("The number of non-retrieved files is: " + numOfUnretrievedFiles);
|
|
|
|
|