2021-07-29 08:01:53 +02:00
package eu.openaire.urls_worker.plugins ;
2021-06-22 04:58:07 +02:00
import edu.uci.ics.crawler4j.url.URLCanonicalizer ;
import eu.openaire.publications_retriever.PublicationsRetriever ;
2021-08-05 14:09:28 +02:00
import eu.openaire.publications_retriever.exceptions.DocFileNotRetrievedException ;
2021-06-22 04:58:07 +02:00
import eu.openaire.publications_retriever.util.file.FileUtils ;
import eu.openaire.publications_retriever.util.http.ConnSupportUtils ;
import eu.openaire.publications_retriever.util.http.HttpConnUtils ;
import eu.openaire.publications_retriever.util.url.DataToBeLogged ;
import eu.openaire.publications_retriever.util.url.LoaderAndChecker ;
import eu.openaire.publications_retriever.util.url.UrlUtils ;
2021-07-05 14:00:29 +02:00
import eu.openaire.urls_worker.models.Assignment ;
2021-08-05 14:09:28 +02:00
import eu.openaire.urls_worker.models.Error ;
2021-06-22 04:58:07 +02:00
import eu.openaire.urls_worker.models.Payload ;
import eu.openaire.urls_worker.models.UrlReport ;
2021-11-26 16:04:31 +01:00
import eu.openaire.urls_worker.services.FileStorageService ;
2021-11-27 01:37:33 +01:00
import eu.openaire.urls_worker.util.AssignmentsHandler ;
2021-10-30 16:14:18 +02:00
import eu.openaire.urls_worker.util.WorkerConstants ;
2021-06-22 04:58:07 +02:00
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
2021-11-26 16:04:31 +01:00
import java.io.* ;
2021-06-22 04:58:07 +02:00
import java.nio.charset.StandardCharsets ;
2021-11-27 01:37:33 +01:00
import java.sql.Timestamp ;
2021-06-22 04:58:07 +02:00
import java.util.* ;
import java.util.concurrent.Callable ;
import java.util.concurrent.Executors ;
public class PublicationsRetrieverPlugin {
private static final Logger logger = LoggerFactory . getLogger ( PublicationsRetrieverPlugin . class ) ;
2021-11-26 16:04:31 +01:00
public static String assignmentsBasePath ;
2021-06-22 04:58:07 +02:00
static {
// Specify some configurations
LoaderAndChecker . retrieveDocuments = true ;
LoaderAndChecker . retrieveDatasets = false ;
FileUtils . shouldDownloadDocFiles = true ;
2021-07-29 08:01:53 +02:00
FileUtils . docFileNameType = FileUtils . DocFileNameType . idName ;
2021-06-22 04:58:07 +02:00
PublicationsRetriever . targetUrlType = " docUrl " ;
2021-10-30 16:14:18 +02:00
FileUtils . jsonBatchSize = WorkerConstants . ASSIGNMENTS_LIMIT ;
2021-06-22 04:58:07 +02:00
int workerThreadsCount = Runtime . getRuntime ( ) . availableProcessors ( ) * PublicationsRetriever . threadsMultiplier ;
logger . info ( " Use " + workerThreadsCount + " worker-threads. " ) ;
PublicationsRetriever . executor = Executors . newFixedThreadPool ( workerThreadsCount ) ;
}
2021-11-26 16:04:31 +01:00
public PublicationsRetrieverPlugin ( ) {
assignmentsBasePath = FileStorageService . assignmentsLocation . toString ( ) ;
if ( ! assignmentsBasePath . endsWith ( File . separator ) )
assignmentsBasePath + = File . separator ;
}
2021-10-30 16:14:18 +02:00
private static final List < Callable < Boolean > > callableTasks = new ArrayList < > ( FileUtils . jsonBatchSize ) ;
2021-06-22 04:58:07 +02:00
2021-09-08 04:02:14 +02:00
public static void processAssignments ( Long assignmentRequestCounter , Collection < Assignment > assignments ) throws RuntimeException , FileNotFoundException
2021-06-22 04:58:07 +02:00
{
2021-11-26 16:04:31 +01:00
FileUtils . storeDocFilesDir = assignmentsBasePath + " assignments_ " + assignmentRequestCounter + " _fullTexts " + File . separator ; // It needs the last separator, because of how the docFiles are named and stored.
File curAssignmentsDirs = new File ( FileUtils . storeDocFilesDir ) ;
if ( ! curAssignmentsDirs . exists ( ) ) {
if ( ! curAssignmentsDirs . mkdirs ( ) ) { // Create the directories.
String workingDir = System . getProperty ( " user.dir " ) + File . separator ;
logger . error ( " Could not create the \" assignments_fullTexts directories \" : \" " + FileUtils . storeDocFilesDir + " \" . Using the \" workingDir \" instead ( " + workingDir + " ). " ) ;
FileUtils . storeDocFilesDir = assignmentsBasePath = workingDir ;
}
}
2021-06-22 04:58:07 +02:00
2021-11-26 16:04:31 +01:00
ConnSupportUtils . setKnownMimeTypes ( ) ;
2021-09-21 15:21:39 +02:00
int tasksNumber = assignments . size ( ) ;
2021-06-22 04:58:07 +02:00
int batchCount = 0 ;
2021-09-21 15:21:39 +02:00
int tasksCount = 0 ;
2021-06-22 04:58:07 +02:00
// Start loading and checking urls.
2021-07-05 14:00:29 +02:00
for ( Assignment assignment : assignments )
2021-06-22 04:58:07 +02:00
{
callableTasks . add ( ( ) - > {
2021-07-05 14:00:29 +02:00
String id = assignment . getId ( ) ;
String url = assignment . getOriginalUrl ( ) ;
2021-06-22 04:58:07 +02:00
if ( ( url = LoaderAndChecker . handleUrlChecks ( id , url ) ) = = null ) {
return false ;
} // The "url" might have changed (inside "handleUrlChecks()").
String urlToCheck = url ;
String sourceUrl = urlToCheck ; // Hold it here for the logging-messages.
if ( ! sourceUrl . contains ( " #/ " ) & & ( urlToCheck = URLCanonicalizer . getCanonicalURL ( sourceUrl , null , StandardCharsets . UTF_8 ) ) = = null ) {
logger . warn ( " Could not canonicalize url: " + sourceUrl ) ;
2021-09-08 04:02:14 +02:00
UrlUtils . logOutputData ( id , sourceUrl , null , " unreachable " , " Discarded at loading time, due to canonicalization's problems. " , null , true , " true " , " false " , " false " , " false " , " false " , null , null ) ;
2021-06-22 04:58:07 +02:00
LoaderAndChecker . connProblematicUrls . incrementAndGet ( ) ;
return false ;
}
if ( UrlUtils . docOrDatasetUrlsWithIDs . containsKey ( url ) ) { // If we got into an already-found docUrl, log it and return.
ConnSupportUtils . handleReCrossedDocUrl ( id , url , url , url , logger , true ) ;
return true ;
}
boolean isPossibleDocOrDatasetUrl = false ; // Used for specific connection settings.
String lowerCaseRetrievedUrl = url . toLowerCase ( ) ;
// Check if it's a possible-DocUrl, if so, this info will be used for optimal web-connection later.
if ( ( LoaderAndChecker . retrieveDocuments & & LoaderAndChecker . DOC_URL_FILTER . matcher ( lowerCaseRetrievedUrl ) . matches ( ) )
| | ( LoaderAndChecker . retrieveDatasets & & LoaderAndChecker . DATASET_URL_FILTER . matcher ( lowerCaseRetrievedUrl ) . matches ( ) ) ) {
//logger.debug("Possible docUrl or datasetUrl: " + url);
isPossibleDocOrDatasetUrl = true ;
}
try { // Check if it's a docUrl, if not, it gets crawled.
HttpConnUtils . connectAndCheckMimeType ( id , sourceUrl , urlToCheck , urlToCheck , null , true , isPossibleDocOrDatasetUrl ) ;
} catch ( Exception e ) {
2021-09-01 18:42:32 +02:00
List < String > list = LoaderAndChecker . getWasValidAndCouldRetry ( e ) ;
String wasUrlValid = list . get ( 0 ) ;
String couldRetry = list . get ( 1 ) ;
2021-09-08 04:02:14 +02:00
UrlUtils . logOutputData ( id , urlToCheck , null , " unreachable " , " Discarded at loading time, due to connectivity problems. " , null , true , " true " , wasUrlValid , " false " , " false " , couldRetry , null , null ) ;
return false ;
2021-06-22 04:58:07 +02:00
}
return true ;
} ) ;
2021-09-21 15:21:39 +02:00
// Invoke the tasks every time we reach the "jsonBatchSize" tasks, or we are at the end of the list.
2021-10-30 16:14:18 +02:00
tasksCount + + ;
if ( ( tasksCount = = FileUtils . jsonBatchSize ) | | ( tasksCount = = tasksNumber ) )
2021-06-22 04:58:07 +02:00
{
2021-10-30 16:14:18 +02:00
logger . info ( " Batch counter: " + ( + + batchCount ) + " | progress: " + PublicationsRetriever . df . format ( ( batchCount * tasksCount ) * 100 . 0 / tasksNumber ) + " % | every batch contains " + FileUtils . jsonBatchSize + " id-url pairs. " ) ;
2021-06-22 04:58:07 +02:00
LoaderAndChecker . invokeAllTasksAndWait ( callableTasks ) ;
addUrlReportsToWorkerReport ( ) ;
2021-09-08 04:02:14 +02:00
callableTasks . clear ( ) ; // Reset the thread-tasks-list for the next batch.
2021-06-22 04:58:07 +02:00
}
} // end tasks-for-loop
}
2021-08-05 14:09:28 +02:00
private static final String DocFileNotRetrievedExceptionName = DocFileNotRetrievedException . class . getName ( ) ; // Keep it here for easily spot if the exception changes inside the PublicationsRetriever library.
2021-06-22 04:58:07 +02:00
public static void addUrlReportsToWorkerReport ( )
{
2021-11-27 01:37:33 +01:00
Timestamp timestamp = new Timestamp ( System . currentTimeMillis ( ) ) ; // Store it here, in order to have the same for all current records.
2021-11-26 16:04:31 +01:00
2021-06-22 04:58:07 +02:00
for ( DataToBeLogged data : FileUtils . dataToBeLoggedList )
{
2021-09-22 15:36:48 +02:00
UrlReport . StatusType status = null ;
String fileLocation = null , comment = data . getComment ( ) , mimeType = null , hash = data . getHash ( ) ;
2021-09-21 15:21:39 +02:00
Long size = data . getSize ( ) ;
2021-08-05 14:09:28 +02:00
Error error = null ;
2021-06-22 04:58:07 +02:00
if ( data . getWasDocumentOrDatasetAccessible ( ) . equals ( " true " ) )
{
2021-09-22 15:36:48 +02:00
status = UrlReport . StatusType . accessible ;
2021-08-05 14:09:28 +02:00
if ( comment . contains ( UrlUtils . alreadyDownloadedByIDMessage ) ) {
2021-06-22 04:58:07 +02:00
// The file of this docUrl was already downloaded by another docUrl.
2021-08-05 14:09:28 +02:00
String previousId = comment . substring ( UrlUtils . alreadyDownloadedByIDMessage . length ( ) + 1 ) ;
2021-06-22 04:58:07 +02:00
//logger.debug("previousId: " + previousId); // DEBUG!
// Search that ID inside the list and if that instance gave the docUrl (there might be multiple ID instances) then get the file-location.
for ( DataToBeLogged data_2 : FileUtils . dataToBeLoggedList ) {
if ( data_2 . getUrlId ( ) . equals ( previousId ) & & data_2 . getWasDocumentOrDatasetAccessible ( ) . equals ( " true " ) ) {
fileLocation = data_2 . getComment ( ) ;
2021-09-21 15:21:39 +02:00
size = data_2 . getSize ( ) ;
hash = data_2 . getHash ( ) ;
mimeType = " application/pdf " ; // TODO - If support is added for other doc-formats other than "pdf", then make sure the "mime_type" is correctly specified.
2021-06-22 04:58:07 +02:00
break ;
}
}
2021-09-21 15:21:39 +02:00
// TODO - The case where the "twin-ID" is not found, should "never" happen. But should we check? How to handle if that is the case..?
2021-06-22 04:58:07 +02:00
}
2021-08-05 14:09:28 +02:00
else if ( comment . contains ( DocFileNotRetrievedExceptionName ) )
fileLocation = " File not retrieved " ;
else {
fileLocation = comment ;
2021-09-08 04:02:14 +02:00
mimeType = " application/pdf " ;
2021-08-05 14:09:28 +02:00
}
2021-08-05 19:41:32 +02:00
error = new Error ( null , null ) ; // We do not want to send a "null" object, since it just adds more complicated handling in the controller..
}
else {
2021-09-22 15:36:48 +02:00
status = UrlReport . StatusType . non_accessible ;
2021-08-05 14:09:28 +02:00
if ( data . getCouldRetry ( ) . equals ( " true " ) )
error = new Error ( Error . ErrorType . couldRetry , comment ) ;
else
error = new Error ( Error . ErrorType . noRetry , comment ) ;
}
String docOrDatasetUrl = data . getDocOrDatasetUrl ( ) ;
if ( docOrDatasetUrl . equals ( UrlUtils . unreachableDocOrDatasetUrlIndicator ) | | docOrDatasetUrl . equals ( UrlUtils . duplicateUrlIndicator ) )
docOrDatasetUrl = null ;
2021-11-26 16:04:31 +01:00
// Cleanup some data.
if ( ( size ! = null ) & & ( size = = 0L ) )
size = null ;
if ( ( hash ! = null ) & & ( hash . equals ( " null " ) ) )
hash = null ;
2021-11-27 01:37:33 +01:00
Payload payload = new Payload ( data . getUrlId ( ) , data . getSourceUrl ( ) , docOrDatasetUrl , timestamp , mimeType , size , hash , fileLocation , " crawl:PublicationsRetriever " ) ;
2021-08-05 14:09:28 +02:00
// TODO - If support is added for other doc-formats other than "pdf", then make sure the "mime_type" is correctly specified.
2021-06-22 04:58:07 +02:00
2021-11-27 01:37:33 +01:00
AssignmentsHandler . urlReports . add ( new UrlReport ( status , payload , error ) ) ;
2021-11-26 16:04:31 +01:00
} // end-for
2021-06-22 04:58:07 +02:00
FileUtils . dataToBeLoggedList . clear ( ) ; // Empty the list, to be re-populated by the next batch / assignment.
}
public static boolean connectWithUrlTest ( String urlToCheck ) {
try {
return HttpConnUtils . connectAndCheckMimeType ( " null " , urlToCheck , urlToCheck , urlToCheck , null , true , false ) ; // Sent the < null > in quotes to avoid an NPE in the concurrent data-structures.
} catch ( Exception e ) {
2021-09-01 18:42:32 +02:00
List < String > list = LoaderAndChecker . getWasValidAndCouldRetry ( e ) ;
String wasUrlValid = list . get ( 0 ) ;
String couldRetry = list . get ( 1 ) ;
2021-09-08 04:02:14 +02:00
UrlUtils . logOutputData ( null , urlToCheck , null , " unreachable " , " Discarded at loading time, due to connectivity problems. " , null , true , " true " , wasUrlValid , " false " , " false " , couldRetry , null , null ) ;
2021-06-22 04:58:07 +02:00
return false ;
}
}
}