2021-07-29 08:01:53 +02:00
package eu.openaire.urls_worker.plugins ;
2021-06-22 04:58:07 +02:00
import com.google.common.hash.Hashing ;
import com.google.common.io.Files ;
import edu.uci.ics.crawler4j.url.URLCanonicalizer ;
import eu.openaire.publications_retriever.PublicationsRetriever ;
2021-08-05 14:09:28 +02:00
import eu.openaire.publications_retriever.exceptions.DocFileNotRetrievedException ;
2021-06-22 04:58:07 +02:00
import eu.openaire.publications_retriever.util.file.FileUtils ;
2021-07-29 08:01:53 +02:00
import eu.openaire.publications_retriever.util.file.S3ObjectStoreMinIO ;
2021-06-22 04:58:07 +02:00
import eu.openaire.publications_retriever.util.http.ConnSupportUtils ;
import eu.openaire.publications_retriever.util.http.HttpConnUtils ;
import eu.openaire.publications_retriever.util.url.DataToBeLogged ;
import eu.openaire.publications_retriever.util.url.LoaderAndChecker ;
import eu.openaire.publications_retriever.util.url.UrlUtils ;
2021-07-05 14:00:29 +02:00
import eu.openaire.urls_worker.models.Assignment ;
2021-08-05 14:09:28 +02:00
import eu.openaire.urls_worker.models.Error ;
2021-06-22 04:58:07 +02:00
import eu.openaire.urls_worker.models.Payload ;
import eu.openaire.urls_worker.models.UrlReport ;
import eu.openaire.urls_worker.util.AssignmentHandler ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import java.io.File ;
import java.io.FileNotFoundException ;
import java.io.FileOutputStream ;
import java.nio.charset.StandardCharsets ;
import java.nio.file.Paths ;
import java.util.* ;
import java.util.concurrent.Callable ;
import java.util.concurrent.Executors ;
public class PublicationsRetrieverPlugin {
private static final Logger logger = LoggerFactory . getLogger ( PublicationsRetrieverPlugin . class ) ;
private static final String workingDir = System . getProperty ( " user.dir " ) + File . separator ;
private static String assignmentsBasePath = workingDir + " assignments " + File . separator ;
private static String assignmentsBaseFullTextsPath = assignmentsBasePath + " fullTexts " + File . separator ;
static {
File assignmentsDir = new File ( assignmentsBaseFullTextsPath ) ;
if ( ! assignmentsDir . exists ( ) ) {
if ( ! assignmentsDir . mkdirs ( ) ) { // Create the directory.
logger . error ( " Could not create the \" assignments directories \" : \" " + assignmentsBaseFullTextsPath + " \" . Using the \" workingDir \" instead ( " + workingDir + " ). " ) ;
assignmentsBasePath = workingDir ;
assignmentsBaseFullTextsPath = assignmentsBasePath ;
}
}
// Specify some configurations
LoaderAndChecker . retrieveDocuments = true ;
LoaderAndChecker . retrieveDatasets = false ;
FileUtils . shouldDownloadDocFiles = true ;
2021-07-29 08:01:53 +02:00
FileUtils . shouldUploadFilesToS3 = true ;
FileUtils . docFileNameType = FileUtils . DocFileNameType . idName ;
2021-06-22 04:58:07 +02:00
PublicationsRetriever . targetUrlType = " docUrl " ;
2021-07-29 08:01:53 +02:00
if ( FileUtils . shouldUploadFilesToS3 )
new S3ObjectStoreMinIO ( ) ; // Check here on how to create the credentials-file: https://github.com/LSmyrnaios/PublicationsRetriever/blob/master/README.md
2021-06-22 04:58:07 +02:00
int workerThreadsCount = Runtime . getRuntime ( ) . availableProcessors ( ) * PublicationsRetriever . threadsMultiplier ;
logger . info ( " Use " + workerThreadsCount + " worker-threads. " ) ;
PublicationsRetriever . executor = Executors . newFixedThreadPool ( workerThreadsCount ) ;
}
2021-07-05 14:00:29 +02:00
public static void processAssginmets ( Long assignmentRequestCounter , Collection < Assignment > assignments ) throws RuntimeException , FileNotFoundException
2021-06-22 04:58:07 +02:00
{
ConnSupportUtils . setKnownMimeTypes ( ) ;
2021-07-05 14:00:29 +02:00
FileUtils . storeDocFilesDir = assignmentsBaseFullTextsPath + " assignment_ " + assignmentRequestCounter + " _fullTexts " + File . separator ; // It needs the last separator, because of how the docFiles are named and stored.
2021-06-22 04:58:07 +02:00
2021-07-05 14:00:29 +02:00
FileUtils . setOutput ( new FileOutputStream ( assignmentsBasePath + " assignment_ " + assignmentRequestCounter + " _generic_results.json " ) ) ;
2021-06-22 04:58:07 +02:00
2021-07-05 14:00:29 +02:00
int tasksSize = assignments . size ( ) ;
2021-06-22 04:58:07 +02:00
int batchCount = 0 ;
int taskCount = 0 ;
List < Callable < Boolean > > callableTasks = new ArrayList < > ( FileUtils . jsonBatchSize ) ;
// Start loading and checking urls.
2021-07-05 14:00:29 +02:00
for ( Assignment assignment : assignments )
2021-06-22 04:58:07 +02:00
{
callableTasks . add ( ( ) - > {
2021-07-05 14:00:29 +02:00
String id = assignment . getId ( ) ;
String url = assignment . getOriginalUrl ( ) ;
2021-06-22 04:58:07 +02:00
if ( ( url = LoaderAndChecker . handleUrlChecks ( id , url ) ) = = null ) {
return false ;
} // The "url" might have changed (inside "handleUrlChecks()").
String urlToCheck = url ;
String sourceUrl = urlToCheck ; // Hold it here for the logging-messages.
if ( ! sourceUrl . contains ( " #/ " ) & & ( urlToCheck = URLCanonicalizer . getCanonicalURL ( sourceUrl , null , StandardCharsets . UTF_8 ) ) = = null ) {
logger . warn ( " Could not canonicalize url: " + sourceUrl ) ;
2021-08-05 14:09:28 +02:00
UrlUtils . logOutputData ( id , sourceUrl , null , " unreachable " , " Discarded at loading time, due to canonicalization's problems. " , null , true , " true " , " false " , " false " , " false " , " false " ) ;
2021-06-22 04:58:07 +02:00
LoaderAndChecker . connProblematicUrls . incrementAndGet ( ) ;
return false ;
}
if ( UrlUtils . docOrDatasetUrlsWithIDs . containsKey ( url ) ) { // If we got into an already-found docUrl, log it and return.
ConnSupportUtils . handleReCrossedDocUrl ( id , url , url , url , logger , true ) ;
return true ;
}
boolean isPossibleDocOrDatasetUrl = false ; // Used for specific connection settings.
String lowerCaseRetrievedUrl = url . toLowerCase ( ) ;
// Check if it's a possible-DocUrl, if so, this info will be used for optimal web-connection later.
if ( ( LoaderAndChecker . retrieveDocuments & & LoaderAndChecker . DOC_URL_FILTER . matcher ( lowerCaseRetrievedUrl ) . matches ( ) )
| | ( LoaderAndChecker . retrieveDatasets & & LoaderAndChecker . DATASET_URL_FILTER . matcher ( lowerCaseRetrievedUrl ) . matches ( ) ) ) {
//logger.debug("Possible docUrl or datasetUrl: " + url);
isPossibleDocOrDatasetUrl = true ;
}
try { // Check if it's a docUrl, if not, it gets crawled.
HttpConnUtils . connectAndCheckMimeType ( id , sourceUrl , urlToCheck , urlToCheck , null , true , isPossibleDocOrDatasetUrl ) ;
} catch ( Exception e ) {
String wasUrlValid = " true " ;
2021-08-05 14:09:28 +02:00
String couldRetry = " false " ;
2021-06-22 04:58:07 +02:00
if ( e instanceof RuntimeException ) {
String message = e . getMessage ( ) ;
2021-08-05 14:09:28 +02:00
if ( message ! = null ) {
if ( message . contains ( " HTTP 404 Client Error " ) )
wasUrlValid = " false " ;
else if ( message . contains ( " Server Error " ) | | message . contains ( " HTTP 408 " ) )
couldRetry = " true " ; // We could retry at a later time, as the HTTP-non-404-errors can be temporal.
}
2021-06-22 04:58:07 +02:00
}
2021-08-05 14:09:28 +02:00
UrlUtils . logOutputData ( id , urlToCheck , null , " unreachable " , " Discarded at loading time, due to connectivity problems. " , null , true , " true " , wasUrlValid , " false " , " false " , couldRetry ) ;
2021-06-22 04:58:07 +02:00
}
return true ;
} ) ;
if ( ( ( + + taskCount ) > = FileUtils . jsonBatchSize ) | | ( taskCount > = tasksSize ) )
{
logger . info ( " Batch counter: " + ( + + batchCount ) + " | progress: " + PublicationsRetriever . df . format ( ( ( batchCount - 1 ) * taskCount ) * 100 . 0 / tasksSize ) + " % | every batch contains " + FileUtils . jsonBatchSize + " id-url pairs. " ) ;
LoaderAndChecker . invokeAllTasksAndWait ( callableTasks ) ;
addUrlReportsToWorkerReport ( ) ;
callableTasks = new ArrayList < > ( FileUtils . jsonBatchSize ) ; // Reset the thread-tasks-list for the next batch.
}
} // end tasks-for-loop
}
2021-08-05 14:09:28 +02:00
private static final String DocFileNotRetrievedExceptionName = DocFileNotRetrievedException . class . getName ( ) ; // Keep it here for easily spot if the exception changes inside the PublicationsRetriever library.
2021-06-22 04:58:07 +02:00
public static void addUrlReportsToWorkerReport ( )
{
for ( DataToBeLogged data : FileUtils . dataToBeLoggedList )
{
2021-07-05 14:00:29 +02:00
String status = null , fileLocation = null , hash = null ;
2021-06-22 04:58:07 +02:00
Long size = null ;
2021-08-05 14:09:28 +02:00
Error error = null ;
String comment = data . getComment ( ) ;
2021-06-22 04:58:07 +02:00
if ( data . getWasDocumentOrDatasetAccessible ( ) . equals ( " true " ) )
{
status = " accessible " ;
2021-08-05 14:09:28 +02:00
if ( comment . contains ( UrlUtils . alreadyDownloadedByIDMessage ) ) {
2021-06-22 04:58:07 +02:00
// The file of this docUrl was already downloaded by another docUrl.
2021-08-05 14:09:28 +02:00
String previousId = comment . substring ( UrlUtils . alreadyDownloadedByIDMessage . length ( ) + 1 ) ;
2021-06-22 04:58:07 +02:00
//logger.debug("previousId: " + previousId); // DEBUG!
// Search that ID inside the list and if that instance gave the docUrl (there might be multiple ID instances) then get the file-location.
for ( DataToBeLogged data_2 : FileUtils . dataToBeLoggedList ) {
if ( data_2 . getUrlId ( ) . equals ( previousId ) & & data_2 . getWasDocumentOrDatasetAccessible ( ) . equals ( " true " ) ) {
fileLocation = data_2 . getComment ( ) ;
break ;
}
}
}
2021-08-05 14:09:28 +02:00
else if ( comment . contains ( DocFileNotRetrievedExceptionName ) )
fileLocation = " File not retrieved " ;
else {
fileLocation = comment ;
2021-06-22 04:58:07 +02:00
try {
File docFile = new File ( fileLocation ) ;
if ( docFile . isFile ( ) ) {
2021-07-05 14:00:29 +02:00
hash = Files . hash ( docFile , Hashing . md5 ( ) ) . toString ( ) ; // These hashing functions are deprecated, but just to inform us that MD5 is not secure. Luckily, we use MD5 just to identify duplicate files.
//logger.debug("MD5 for file \"" + docFile.getName() + "\": " + hash); // DEBUG!
2021-06-22 04:58:07 +02:00
size = java . nio . file . Files . size ( Paths . get ( fileLocation ) ) ;
//logger.debug("Size of file \"" + docFile.getName() + "\": " + size); // DEBUG!
} else
logger . error ( " No file was found with path: " + fileLocation ) ;
} catch ( Exception e ) {
2021-07-05 14:00:29 +02:00
if ( hash = = null )
2021-06-22 04:58:07 +02:00
logger . error ( " Could not retrieve the MD5-hash for the file: " + fileLocation ) ;
if ( size = = null )
logger . error ( " Could not retrieve the size of the file: " + fileLocation ) ;
e . printStackTrace ( ) ;
}
2021-08-05 14:09:28 +02:00
}
} else {
2021-06-22 04:58:07 +02:00
status = " non-accessible " ;
2021-08-05 14:09:28 +02:00
if ( data . getCouldRetry ( ) . equals ( " true " ) )
error = new Error ( Error . ErrorType . couldRetry , comment ) ;
else
error = new Error ( Error . ErrorType . noRetry , comment ) ;
}
String docOrDatasetUrl = data . getDocOrDatasetUrl ( ) ;
if ( docOrDatasetUrl . equals ( UrlUtils . unreachableDocOrDatasetUrlIndicator ) | | docOrDatasetUrl . equals ( UrlUtils . duplicateUrlIndicator ) )
docOrDatasetUrl = null ;
Payload payload = new Payload ( data . getUrlId ( ) , data . getSourceUrl ( ) , docOrDatasetUrl , new Date ( ) , " application/pdf " , size , hash , fileLocation , " crawl:PublicationsRetriever " ) ;
// TODO - If support is added for other doc-formats other than "pdf", then make sure the "mime_type" is correctly specified.
2021-06-22 04:58:07 +02:00
2021-08-05 14:09:28 +02:00
AssignmentHandler . urlReports . add ( new UrlReport ( status , payload , error ) ) ;
2021-06-22 04:58:07 +02:00
}
FileUtils . dataToBeLoggedList . clear ( ) ; // Empty the list, to be re-populated by the next batch / assignment.
}
public static boolean connectWithUrlTest ( String urlToCheck ) {
try {
return HttpConnUtils . connectAndCheckMimeType ( " null " , urlToCheck , urlToCheck , urlToCheck , null , true , false ) ; // Sent the < null > in quotes to avoid an NPE in the concurrent data-structures.
} catch ( Exception e ) {
2021-08-05 14:09:28 +02:00
UrlUtils . logOutputData ( null , urlToCheck , null , " unreachable " , " Discarded at loading time, due to connectivity problems. " , null , true , " true " , " true " , " false " , " false " , " false " ) ;
2021-06-22 04:58:07 +02:00
return false ;
}
}
}