2021-07-29 08:01:53 +02:00
package eu.openaire.urls_worker.plugins ;
2021-06-22 04:58:07 +02:00
import edu.uci.ics.crawler4j.url.URLCanonicalizer ;
import eu.openaire.publications_retriever.PublicationsRetriever ;
import eu.openaire.publications_retriever.util.file.FileUtils ;
import eu.openaire.publications_retriever.util.http.ConnSupportUtils ;
import eu.openaire.publications_retriever.util.http.HttpConnUtils ;
import eu.openaire.publications_retriever.util.url.DataToBeLogged ;
import eu.openaire.publications_retriever.util.url.LoaderAndChecker ;
import eu.openaire.publications_retriever.util.url.UrlUtils ;
2021-12-06 23:52:40 +01:00
import eu.openaire.urls_worker.UrlsWorkerApplication ;
2022-01-03 23:23:45 +01:00
import eu.openaire.urls_worker.components.ScheduledTasks ;
2021-07-05 14:00:29 +02:00
import eu.openaire.urls_worker.models.Assignment ;
2021-08-05 14:09:28 +02:00
import eu.openaire.urls_worker.models.Error ;
2021-06-22 04:58:07 +02:00
import eu.openaire.urls_worker.models.Payload ;
import eu.openaire.urls_worker.models.UrlReport ;
2021-11-26 16:04:31 +01:00
import eu.openaire.urls_worker.services.FileStorageService ;
2021-11-27 01:37:33 +01:00
import eu.openaire.urls_worker.util.AssignmentsHandler ;
2021-06-22 04:58:07 +02:00
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
2021-12-17 07:24:09 +01:00
import java.io.File ;
2021-06-22 04:58:07 +02:00
import java.nio.charset.StandardCharsets ;
2021-11-27 01:37:33 +01:00
import java.sql.Timestamp ;
2021-12-17 07:24:09 +01:00
import java.util.ArrayList ;
import java.util.Collection ;
import java.util.List ;
2021-06-22 04:58:07 +02:00
import java.util.concurrent.Callable ;
import java.util.concurrent.Executors ;
public class PublicationsRetrieverPlugin {
private static final Logger logger = LoggerFactory . getLogger ( PublicationsRetrieverPlugin . class ) ;
2021-11-26 16:04:31 +01:00
public static String assignmentsBasePath ;
2021-06-22 04:58:07 +02:00
2021-11-30 05:57:51 +01:00
public PublicationsRetrieverPlugin ( ) {
2021-06-22 04:58:07 +02:00
// Specify some configurations
LoaderAndChecker . retrieveDocuments = true ;
LoaderAndChecker . retrieveDatasets = false ;
2021-11-30 05:57:51 +01:00
ConnSupportUtils . setKnownMimeTypes ( ) ;
2021-06-22 04:58:07 +02:00
FileUtils . shouldDownloadDocFiles = true ;
2021-07-29 08:01:53 +02:00
FileUtils . docFileNameType = FileUtils . DocFileNameType . idName ;
2021-06-22 04:58:07 +02:00
PublicationsRetriever . targetUrlType = " docUrl " ;
2021-12-06 23:52:40 +01:00
FileUtils . jsonBatchSize = UrlsWorkerApplication . maxAssignmentsLimitPerBatch ;
2021-06-22 04:58:07 +02:00
2021-11-30 05:57:51 +01:00
assignmentsBasePath = FileStorageService . assignmentsLocation . toString ( ) ;
if ( ! assignmentsBasePath . endsWith ( File . separator ) )
assignmentsBasePath + = File . separator ;
ConnSupportUtils . shouldBlockMost5XXDomains = false ;
LoaderAndChecker . setCouldRetryRegex ( ) ;
2021-12-20 21:25:27 +01:00
PublicationsRetriever . threadsMultiplier = 4 ;
2021-06-22 04:58:07 +02:00
int workerThreadsCount = Runtime . getRuntime ( ) . availableProcessors ( ) * PublicationsRetriever . threadsMultiplier ;
logger . info ( " Use " + workerThreadsCount + " worker-threads. " ) ;
PublicationsRetriever . executor = Executors . newFixedThreadPool ( workerThreadsCount ) ;
}
2021-10-30 16:14:18 +02:00
private static final List < Callable < Boolean > > callableTasks = new ArrayList < > ( FileUtils . jsonBatchSize ) ;
2021-06-22 04:58:07 +02:00
2021-12-03 03:09:40 +01:00
public static void processAssignments ( Long assignmentRequestCounter , Collection < Assignment > assignments ) throws RuntimeException
2021-06-22 04:58:07 +02:00
{
2021-11-26 16:04:31 +01:00
FileUtils . storeDocFilesDir = assignmentsBasePath + " assignments_ " + assignmentRequestCounter + " _fullTexts " + File . separator ; // It needs the last separator, because of how the docFiles are named and stored.
File curAssignmentsDirs = new File ( FileUtils . storeDocFilesDir ) ;
2021-12-03 03:09:40 +01:00
try {
if ( ! curAssignmentsDirs . exists ( ) ) {
if ( ! curAssignmentsDirs . mkdirs ( ) ) { // Create the directories.
String workingDir = System . getProperty ( " user.dir " ) + File . separator ;
logger . error ( " Could not create the \" assignments_fullTexts directories \" : \" " + FileUtils . storeDocFilesDir + " \" . Using the \" workingDir \" instead ( " + workingDir + " ). " ) ;
FileUtils . storeDocFilesDir = assignmentsBasePath = workingDir ;
}
2021-11-26 16:04:31 +01:00
}
2021-12-03 03:09:40 +01:00
} catch ( Exception e ) {
String errorMsg = " Failed to create the full-texts directory for assignments_ " + assignmentRequestCounter ;
logger . error ( errorMsg , e ) ;
throw new RuntimeException ( errorMsg + " : " + e . getMessage ( ) ) ;
2021-11-26 16:04:31 +01:00
}
2021-06-22 04:58:07 +02:00
// Start loading and checking urls.
2021-07-05 14:00:29 +02:00
for ( Assignment assignment : assignments )
2021-06-22 04:58:07 +02:00
{
callableTasks . add ( ( ) - > {
2021-07-05 14:00:29 +02:00
String id = assignment . getId ( ) ;
String url = assignment . getOriginalUrl ( ) ;
2021-06-22 04:58:07 +02:00
if ( ( url = LoaderAndChecker . handleUrlChecks ( id , url ) ) = = null ) {
return false ;
} // The "url" might have changed (inside "handleUrlChecks()").
String urlToCheck = url ;
String sourceUrl = urlToCheck ; // Hold it here for the logging-messages.
if ( ! sourceUrl . contains ( " #/ " ) & & ( urlToCheck = URLCanonicalizer . getCanonicalURL ( sourceUrl , null , StandardCharsets . UTF_8 ) ) = = null ) {
logger . warn ( " Could not canonicalize url: " + sourceUrl ) ;
2021-09-08 04:02:14 +02:00
UrlUtils . logOutputData ( id , sourceUrl , null , " unreachable " , " Discarded at loading time, due to canonicalization's problems. " , null , true , " true " , " false " , " false " , " false " , " false " , null , null ) ;
2021-06-22 04:58:07 +02:00
LoaderAndChecker . connProblematicUrls . incrementAndGet ( ) ;
return false ;
}
if ( UrlUtils . docOrDatasetUrlsWithIDs . containsKey ( url ) ) { // If we got into an already-found docUrl, log it and return.
2022-01-14 14:13:00 +01:00
ConnSupportUtils . handleReCrossedDocUrl ( id , url , url , url , true ) ;
2021-06-22 04:58:07 +02:00
return true ;
}
boolean isPossibleDocOrDatasetUrl = false ; // Used for specific connection settings.
String lowerCaseRetrievedUrl = url . toLowerCase ( ) ;
// Check if it's a possible-DocUrl, if so, this info will be used for optimal web-connection later.
if ( ( LoaderAndChecker . retrieveDocuments & & LoaderAndChecker . DOC_URL_FILTER . matcher ( lowerCaseRetrievedUrl ) . matches ( ) )
| | ( LoaderAndChecker . retrieveDatasets & & LoaderAndChecker . DATASET_URL_FILTER . matcher ( lowerCaseRetrievedUrl ) . matches ( ) ) ) {
//logger.debug("Possible docUrl or datasetUrl: " + url);
isPossibleDocOrDatasetUrl = true ;
}
try { // Check if it's a docUrl, if not, it gets crawled.
HttpConnUtils . connectAndCheckMimeType ( id , sourceUrl , urlToCheck , urlToCheck , null , true , isPossibleDocOrDatasetUrl ) ;
} catch ( Exception e ) {
2021-09-01 18:42:32 +02:00
List < String > list = LoaderAndChecker . getWasValidAndCouldRetry ( e ) ;
String wasUrlValid = list . get ( 0 ) ;
String couldRetry = list . get ( 1 ) ;
2021-09-08 04:02:14 +02:00
UrlUtils . logOutputData ( id , urlToCheck , null , " unreachable " , " Discarded at loading time, due to connectivity problems. " , null , true , " true " , wasUrlValid , " false " , " false " , couldRetry , null , null ) ;
return false ;
2021-06-22 04:58:07 +02:00
}
return true ;
} ) ;
2021-12-03 03:09:40 +01:00
}
2021-06-22 04:58:07 +02:00
2021-12-03 03:09:40 +01:00
int numFailedTasks = LoaderAndChecker . invokeAllTasksAndWait ( callableTasks ) ;
2022-01-03 23:23:45 +01:00
if ( numFailedTasks = = - 1 ) { // The unknown exception is logged inside the above method.
System . err . println ( " Invoking and/or executing the callableTasks failed with the exception written in the log files! " ) ;
UrlsWorkerApplication . gentleAppShutdown ( ) ;
}
2021-12-03 03:09:40 +01:00
if ( numFailedTasks > 0 )
2021-12-03 15:29:16 +01:00
logger . warn ( numFailedTasks + " tasks failed, from assignments_ " + assignmentRequestCounter ) ;
2022-01-03 23:23:45 +01:00
2021-12-03 03:09:40 +01:00
addUrlReportsToWorkerReport ( ) ;
callableTasks . clear ( ) ; // Reset the thread-tasks-list for the next batch.
2021-12-13 20:16:30 +01:00
UrlUtils . docOrDatasetUrlsWithIDs . clear ( ) ; // This HashTable is useful only for a single assignments-batch.
// In the next batch, the previously stored files might have been already uploaded by the Controller and deleted by the worker. Also, they will be stored in a different directory anyway.
2021-06-22 04:58:07 +02:00
}
public static void addUrlReportsToWorkerReport ( )
{
2021-11-27 01:37:33 +01:00
Timestamp timestamp = new Timestamp ( System . currentTimeMillis ( ) ) ; // Store it here, in order to have the same for all current records.
2021-11-26 16:04:31 +01:00
2021-06-22 04:58:07 +02:00
for ( DataToBeLogged data : FileUtils . dataToBeLoggedList )
{
2021-09-22 15:36:48 +02:00
UrlReport . StatusType status = null ;
String fileLocation = null , comment = data . getComment ( ) , mimeType = null , hash = data . getHash ( ) ;
2021-09-21 15:21:39 +02:00
Long size = data . getSize ( ) ;
2021-08-05 14:09:28 +02:00
Error error = null ;
2021-12-16 01:04:05 +01:00
if ( " true " . equals ( data . getWasDocumentOrDatasetAccessible ( ) ) ) // The reversed order defends against a potential NPE.
2021-06-22 04:58:07 +02:00
{
2021-09-22 15:36:48 +02:00
status = UrlReport . StatusType . accessible ;
2022-01-17 09:12:48 +01:00
if ( comment . startsWith ( UrlUtils . alreadyDownloadedFromIDMessage , 0 ) ) {
2021-06-22 04:58:07 +02:00
// The file of this docUrl was already downloaded by another docUrl.
2022-01-17 09:12:48 +01:00
int indexOfAlreadyDownloadedFromSourceUrlMessage = comment . indexOf ( UrlUtils . alreadyDownloadedFromSourceUrlContinuedMessage ) ;
int indexOfAlreadyDownloadedFromSourceUrl = indexOfAlreadyDownloadedFromSourceUrlMessage + UrlUtils . alreadyDownloadedFromSourceUrlContinuedMessage . length ( ) ;
String initialId = comment . substring ( UrlUtils . alreadyDownloadedFromIDMessage . length ( ) , indexOfAlreadyDownloadedFromSourceUrlMessage ) ; // The fileName starts right after the "message".
String initialSourceUrl = comment . substring ( indexOfAlreadyDownloadedFromSourceUrl ) ;
//logger.debug("initialId: " + initialId + " | sourceUrl: " + initialSourceUrl); // DEBUG!
// Search that ID and sourceUrl inside the list, if that instance is the first-found one, then get the file-data (there might be duplicate ID-sourceUrl instances, but only one of them has the file-data).
2021-06-22 04:58:07 +02:00
for ( DataToBeLogged data_2 : FileUtils . dataToBeLoggedList ) {
2022-01-17 09:12:48 +01:00
if ( data_2 . getUrlId ( ) . equals ( initialId ) & & ( data_2 . getSourceUrl ( ) . equals ( initialSourceUrl ) )
& & ! data_2 . getComment ( ) . startsWith ( UrlUtils . alreadyDownloadedFromIDMessage ) ) {
2021-06-22 04:58:07 +02:00
fileLocation = data_2 . getComment ( ) ;
2021-09-21 15:21:39 +02:00
size = data_2 . getSize ( ) ;
hash = data_2 . getHash ( ) ;
mimeType = " application/pdf " ; // TODO - If support is added for other doc-formats other than "pdf", then make sure the "mime_type" is correctly specified.
2021-06-22 04:58:07 +02:00
break ;
}
}
2022-01-17 09:12:48 +01:00
// In case the "alreadyDownloaded" full-text is not found, we have an error.
error = new Error ( Error . ErrorType . couldRetry , comment + " | That ID-sourceUrl was not found inside the WorkerReport! " ) ; // We can still try to download it from the found docUrl, in the future.
2021-06-22 04:58:07 +02:00
}
2021-12-16 01:04:05 +01:00
else if ( ! comment . equals ( HttpConnUtils . docFileNotRetrievedMessage ) ) { // If it was downloaded without an error.
2021-12-07 18:33:10 +01:00
fileLocation = comment ; // This is the full-file-path.
2021-09-08 04:02:14 +02:00
mimeType = " application/pdf " ;
2021-12-16 01:04:05 +01:00
} else // Else the file was not retrieved, so all file-related data are kept "null".
2021-12-23 23:12:34 +01:00
error = new Error ( Error . ErrorType . couldRetry , comment ) ; // We can still try to download it from the found docUrl, in the future.
2021-12-16 01:04:05 +01:00
if ( error = = null ) // If the file was retrieved, in any time.
2022-01-17 09:12:48 +01:00
error = new Error ( Error . ErrorType . couldRetry , null ) ; // We do not want to send a "null" Error-object, since it just adds more complicated handling in the controller..
2021-08-05 19:41:32 +02:00
}
else {
2021-09-22 15:36:48 +02:00
status = UrlReport . StatusType . non_accessible ;
2021-12-16 01:04:05 +01:00
if ( " true " . equals ( data . getCouldRetry ( ) ) )
2021-08-05 14:09:28 +02:00
error = new Error ( Error . ErrorType . couldRetry , comment ) ;
else
error = new Error ( Error . ErrorType . noRetry , comment ) ;
}
String docOrDatasetUrl = data . getDocOrDatasetUrl ( ) ;
if ( docOrDatasetUrl . equals ( UrlUtils . unreachableDocOrDatasetUrlIndicator ) | | docOrDatasetUrl . equals ( UrlUtils . duplicateUrlIndicator ) )
docOrDatasetUrl = null ;
2021-12-23 23:12:34 +01:00
// Convert "null" strings to actual < null >
2021-11-26 16:04:31 +01:00
if ( ( hash ! = null ) & & ( hash . equals ( " null " ) ) )
hash = null ;
2021-11-27 01:37:33 +01:00
Payload payload = new Payload ( data . getUrlId ( ) , data . getSourceUrl ( ) , docOrDatasetUrl , timestamp , mimeType , size , hash , fileLocation , " crawl:PublicationsRetriever " ) ;
2021-08-05 14:09:28 +02:00
// TODO - If support is added for other doc-formats other than "pdf", then make sure the "mime_type" is correctly specified.
2021-06-22 04:58:07 +02:00
2021-11-27 01:37:33 +01:00
AssignmentsHandler . urlReports . add ( new UrlReport ( status , payload , error ) ) ;
2021-11-26 16:04:31 +01:00
} // end-for
2021-06-22 04:58:07 +02:00
FileUtils . dataToBeLoggedList . clear ( ) ; // Empty the list, to be re-populated by the next batch / assignment.
}
public static boolean connectWithUrlTest ( String urlToCheck ) {
2022-01-14 14:13:00 +01:00
String testID = " testID " ;
2021-06-22 04:58:07 +02:00
try {
2022-01-14 14:13:00 +01:00
return HttpConnUtils . connectAndCheckMimeType ( testID , urlToCheck , urlToCheck , urlToCheck , null , true , false ) ; // Sent the < null > in quotes to avoid an NPE in the concurrent data-structures.
2021-06-22 04:58:07 +02:00
} catch ( Exception e ) {
2021-09-01 18:42:32 +02:00
List < String > list = LoaderAndChecker . getWasValidAndCouldRetry ( e ) ;
String wasUrlValid = list . get ( 0 ) ;
String couldRetry = list . get ( 1 ) ;
2022-01-14 14:13:00 +01:00
UrlUtils . logOutputData ( testID , urlToCheck , null , " unreachable " , " Discarded at loading time, due to connectivity problems. " , null , true , " true " , wasUrlValid , " false " , " false " , couldRetry , null , null ) ;
2021-06-22 04:58:07 +02:00
return false ;
}
}
}