2021-05-18 16:23:20 +02:00
package eu.openaire.urls_controller.util ;
import com.google.common.collect.HashMultimap ;
2021-11-30 17:23:27 +01:00
import eu.openaire.urls_controller.configuration.ImpalaConnector ;
import eu.openaire.urls_controller.models.Payload ;
2021-05-18 16:23:20 +02:00
import eu.openaire.urls_controller.models.Task ;
2021-11-30 17:23:27 +01:00
import eu.openaire.urls_controller.models.UrlReport ;
2021-05-18 16:23:20 +02:00
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
import org.springframework.boot.configurationprocessor.json.JSONException ;
import org.springframework.boot.configurationprocessor.json.JSONObject ;
2021-11-30 17:23:27 +01:00
import javax.servlet.http.HttpServletRequest ;
2021-12-06 19:18:30 +01:00
import java.io.* ;
2021-11-30 17:23:27 +01:00
import java.net.HttpURLConnection ;
import java.net.URL ;
2021-05-18 16:23:20 +02:00
import java.nio.file.Files ;
2021-11-30 17:23:27 +01:00
import java.nio.file.Path ;
2021-05-18 16:23:20 +02:00
import java.nio.file.Paths ;
2021-11-30 17:23:27 +01:00
import java.sql.Connection ;
import java.sql.SQLException ;
import java.sql.Statement ;
import java.util.ArrayList ;
import java.util.HashMap ;
import java.util.List ;
2021-05-18 16:23:20 +02:00
import java.util.Scanner ;
2021-11-30 17:23:27 +01:00
import java.util.regex.Matcher ;
import java.util.regex.Pattern ;
2021-05-18 16:23:20 +02:00
public class FileUtils {
private static final Logger logger = LoggerFactory . getLogger ( FileUtils . class ) ;
2021-06-10 19:24:51 +02:00
public static ThreadLocal < Scanner > inputScanner = new ThreadLocal < Scanner > ( ) ; // Every Thread has its own variable.
2021-11-04 10:57:19 +01:00
private static final ThreadLocal < Integer > fileIndex = new ThreadLocal < Integer > ( ) ;
private static final ThreadLocal < Integer > unretrievableInputLines = new ThreadLocal < Integer > ( ) ;
2021-06-10 19:24:51 +02:00
public static ThreadLocal < Integer > duplicateIdUrlEntries = new ThreadLocal < Integer > ( ) ;
public static final int jsonBatchSize = 3000 ;
2021-05-18 16:23:20 +02:00
private static final String utf8Charset = " UTF-8 " ;
public static String inputFileFullPath ;
2021-06-10 19:24:51 +02:00
private static final String workingDir = System . getProperty ( " user.dir " ) + File . separator ;
2021-05-18 16:23:20 +02:00
2021-06-10 13:21:39 +02:00
public FileUtils ( ) throws RuntimeException
2021-05-18 16:23:20 +02:00
{
2021-06-10 13:21:39 +02:00
inputFileFullPath = workingDir + " src " + File . separator + " main " + File . separator + " resources " ;
2021-05-18 16:23:20 +02:00
String resourceFileName = " testInputFiles " + File . separator + " orderedList1000.json " ;
inputFileFullPath + = File . separator + resourceFileName ;
InputStream inputStream = getClass ( ) . getClassLoader ( ) . getResourceAsStream ( resourceFileName ) ;
2021-06-10 13:21:39 +02:00
if ( inputStream = = null )
throw new RuntimeException ( " No resourceFile was found with name \" " + resourceFileName + " \" . " ) ;
2021-05-18 16:23:20 +02:00
logger . debug ( " Going to retrieve the data from the inputResourceFile: " + resourceFileName ) ;
2021-06-10 19:24:51 +02:00
FileUtils . inputScanner . set ( new Scanner ( inputStream , utf8Charset ) ) ;
fileIndex . set ( 0 ) ; // Re-initialize the file-number-pointer.
unretrievableInputLines . set ( 0 ) ;
duplicateIdUrlEntries . set ( 0 ) ;
2021-05-18 16:23:20 +02:00
}
2021-11-30 17:23:27 +01:00
/ * *
* In each insertion , a new parquet - file is created , so we end up with millions of files . Parquet is great for fast - select , so have to stick with it and merge those files . .
* This method , creates a clone of the original table in order to have only one parquet file in the end . Drops the original table .
* Renames the clone to the original ' s name .
* Returns the errorMsg , if an error appears , otherwise is returns " null " .
* * /
public static String mergeParquetFiles ( String tableName , Connection con )
{
String errorMsg ;
if ( tableName = = null ) {
errorMsg = " No tableName was given. Do not know the tableName for which we should merger the underlying files for! " ;
logger . error ( errorMsg ) ;
return errorMsg ;
}
Statement statement ;
try {
statement = con . createStatement ( ) ;
} catch ( SQLException sqle ) {
errorMsg = " Problem when creating a connection-statement! \ n " ;
logger . error ( errorMsg + sqle . getMessage ( ) ) ;
return errorMsg ;
}
try {
statement . execute ( " CREATE TABLE " + ImpalaConnector . databaseName + " . " + tableName + " _tmp stored as parquet AS SELECT * FROM " + ImpalaConnector . databaseName + " . " + tableName ) ;
statement . execute ( " DROP TABLE " + ImpalaConnector . databaseName + " . " + tableName + " PURGE " ) ;
statement . execute ( " ALTER TABLE " + ImpalaConnector . databaseName + " . " + tableName + " _tmp RENAME TO " + ImpalaConnector . databaseName + " . " + tableName ) ;
statement . execute ( " COMPUTE STATS " + ImpalaConnector . databaseName + " . " + tableName ) ;
} catch ( SQLException sqle ) {
errorMsg = " Problem when executing the \" clone-drop-rename \" queries! \ n " ;
logger . error ( errorMsg + getCutBatchExceptionMessage ( sqle . getMessage ( ) ) , sqle ) ;
return errorMsg ;
} finally {
// Make sure we close the statement.
try { statement . close ( ) ; }
catch ( SQLException sqle3 ) { logger . error ( " Could not close the statement for executing queries in the Impala-database. \ n " + sqle3 ) ; }
}
return null ; // No errorMsg, everything is fine.
}
private static final Pattern FILENAME_ID = Pattern . compile ( " ([ \\ w_:]+) \\ .[ \\ w]{2,10}$ " ) ;
private static final Pattern FILENAME_WITH_EXTENSION = Pattern . compile ( " .*/([ \\ w_:]+ \\ .[ \\ w]{2,10})$ " ) ;
public static final String baseTargetLocation = System . getProperty ( " user.dir " ) + File . separator + " fullTexts " + File . separator ;
private static final int numOfFullTextsPerBatch = 70 ; // The HTTP-headers cannot be too large (It failed with 100 fileNames).
2021-12-06 19:18:30 +01:00
public static boolean getAndUploadFullTexts ( List < UrlReport > urlReports , HttpServletRequest request , long assignmentsBatchCounter , String workerId )
2021-11-30 17:23:27 +01:00
{
// The Controller have to request the files from the Worker, in order to upload them to the S3.
// We will have to UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database.
if ( request = = null ) {
logger . error ( " The \" HttpServletRequest \" is null! " ) ;
return false ;
}
String remoteAddr = request . getHeader ( " X-FORWARDED-FOR " ) ;
if ( remoteAddr = = null | | " " . equals ( remoteAddr ) )
remoteAddr = request . getRemoteAddr ( ) ;
// Get the file-locations.
List < String > allFileNames = new ArrayList < > ( urlReports . size ( ) / 2 ) ;
for ( UrlReport urlReport : urlReports ) {
UrlReport . StatusType statusType = urlReport . getStatus ( ) ;
if ( ( statusType = = null ) | | statusType . equals ( UrlReport . StatusType . non_accessible ) ) {
continue ;
}
Payload payload = urlReport . getPayload ( ) ;
if ( payload ! = null ) {
String fileLocation = payload . getLocation ( ) ;
if ( fileLocation ! = null ) { // If the docFile was downloaded (without an error)..
Matcher matcher = FILENAME_WITH_EXTENSION . matcher ( fileLocation ) ;
if ( ! matcher . matches ( ) ) {
continue ;
}
String fileNameWithExtension = matcher . group ( 1 ) ;
if ( ( fileNameWithExtension = = null ) | | fileNameWithExtension . isEmpty ( ) ) {
continue ;
}
allFileNames . add ( fileNameWithExtension ) ;
}
}
}
int numAllFullTexts = allFileNames . size ( ) ;
if ( numAllFullTexts = = 0 ) {
logger . warn ( " The file retrieved by the Worker where < 0 > for assignments_ " + assignmentsBatchCounter ) ;
return true ; // It was handled, no error.
}
// Request the full-texts in batches, compressed in zip.
int numOfBatches = ( numAllFullTexts / numOfFullTextsPerBatch ) ;
if ( ( numAllFullTexts % numOfFullTextsPerBatch ) > 0 ) // Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are.
numOfBatches + + ;
logger . debug ( " The assignments_ " + assignmentsBatchCounter + " have " + numAllFullTexts + " fullTexts. Going to request them from the Worker, in " + numOfBatches + " batches. " ) ;
// Check if one full text is left out because of the division. Put it int the last batch.
String baseUrl = " http:// " + remoteAddr + " :1881/api/full-texts/getFullTexts/ " + assignmentsBatchCounter + " / " + numOfBatches + " / " ;
// Index all UrlReports to be more efficiently searched later.
HashMap < String , Payload > payloadsHashMap = new HashMap < > ( urlReports . size ( ) ) ;
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport . getPayload ( ) ;
if ( payload ! = null )
payloadsHashMap . put ( payload . getId ( ) , payload ) ;
}
String curAssignmentsBaseLocation = baseTargetLocation + " assignments_ " + assignmentsBatchCounter + File . separator ;
File curAssignmentsBaseDir = new File ( curAssignmentsBaseLocation ) ;
int failedBatches = 0 ;
2021-12-06 19:18:30 +01:00
for ( int batchCounter = 1 ; batchCounter < = numOfBatches ; + + batchCounter )
2021-11-30 17:23:27 +01:00
{
2021-12-06 19:18:30 +01:00
List < String > fileNamesForCurBatch = getFileNamesForBatch ( allFileNames , numAllFullTexts , batchCounter ) ;
HttpURLConnection conn = getConnection ( baseUrl , assignmentsBatchCounter , batchCounter , fileNamesForCurBatch , numOfBatches , workerId ) ;
2021-11-30 17:23:27 +01:00
if ( conn = = null ) {
updateUrlReportsForCurBatchTOHaveNoFullTextFiles ( payloadsHashMap , fileNamesForCurBatch ) ;
failedBatches + + ;
continue ; // To the next batch.
}
2021-12-06 19:18:30 +01:00
String targetLocation = curAssignmentsBaseLocation + " batch_ " + batchCounter + File . separator ;
2021-11-30 17:23:27 +01:00
File curBatchDir = new File ( targetLocation ) ;
try {
// Get the extracted files.,
Path targetPath = Files . createDirectories ( Paths . get ( targetLocation ) ) ;
// Unzip the file. Iterate over the PDFs and upload each one of them and get the S3-Url
2021-12-06 19:18:30 +01:00
String zipFileFullPath = targetLocation + " fullTexts_ " + assignmentsBatchCounter + " _ " + batchCounter + " .zip " ;
2021-11-30 17:23:27 +01:00
File zipFile = new File ( zipFileFullPath ) ;
if ( ! saveZipFile ( conn , zipFile ) ) {
updateUrlReportsForCurBatchTOHaveNoFullTextFiles ( payloadsHashMap , fileNamesForCurBatch ) ;
deleteDirectory ( curBatchDir ) ;
failedBatches + + ;
continue ; // To the next batch.
}
//logger.debug("The zip file has been saved: " + zipFileFullPath); // DEBUG!
FileUnZipper . unzipFolder ( Paths . get ( zipFileFullPath ) , targetPath ) ;
String [ ] fileNames = curBatchDir . list ( ) ;
if ( ( fileNames = = null ) | | ( fileNames . length = = 0 ) ) {
logger . error ( " No filenames where extracted from directory: " + targetLocation ) ;
updateUrlReportsForCurBatchTOHaveNoFullTextFiles ( payloadsHashMap , fileNamesForCurBatch ) ;
deleteDirectory ( curBatchDir ) ;
failedBatches + + ;
continue ; // To the next batch.
}
// Iterate over the files and upload them to S3.
int numUploadedFiles = 0 ;
for ( String fileName : fileNames )
{
String fileFullPath = targetLocation + fileName ;
if ( fileFullPath . equals ( zipFileFullPath ) ) { // Exclude the zip-file from uploading.
continue ;
}
// Get the ID of the file.
Matcher matcher = FILENAME_ID . matcher ( fileName ) ;
if ( ! matcher . matches ( ) ) {
continue ;
}
String id = matcher . group ( 1 ) ;
if ( ( id = = null ) | | id . isEmpty ( ) ) {
continue ;
}
Payload payload = payloadsHashMap . get ( id ) ;
if ( payload = = null ) {
continue ;
}
String location = payload . getLocation ( ) ;
if ( location = = null ) {
continue ;
}
if ( ! location . endsWith ( fileName ) ) { // That should NEVER happen...
logger . error ( " The location \" " + location + " \" of the payload matched with the ID \" " + id + " \" is not ending with the filename it was supposed to \" " + fileName + " \" " ) ;
continue ;
}
String s3Url = S3ObjectStoreMinIO . uploadToS3 ( fileName , fileFullPath ) ;
if ( s3Url ! = null ) {
payload . setLocation ( s3Url ) ; // Update the file-location to the new S3-url.
numUploadedFiles + + ;
} else
setUnretrievedFullText ( payload ) ;
}
2021-12-06 19:18:30 +01:00
logger . info ( " Finished uploading " + numUploadedFiles + " full-texts of assignments_ " + assignmentsBatchCounter + " , batch_ " + batchCounter + " on S3-ObjectStore. " ) ;
2021-11-30 17:23:27 +01:00
} catch ( Exception e ) {
2021-12-06 19:18:30 +01:00
logger . error ( " Could not extract and upload the full-texts for batch_ " + batchCounter + " of assignments_ " + assignmentsBatchCounter + " \ n " + e . getMessage ( ) , e ) ; // It shows the response body (after Spring v.2.5.6).
2021-11-30 17:23:27 +01:00
updateUrlReportsForCurBatchTOHaveNoFullTextFiles ( payloadsHashMap , fileNamesForCurBatch ) ;
failedBatches + + ;
} finally {
deleteDirectory ( curBatchDir ) ; // Delete the files of this batch (including the zip-file).
}
} // End of batches.
// Delete this assignments-num directory.
deleteDirectory ( curAssignmentsBaseDir ) ;
// Check if none of the batches were handled..
if ( failedBatches = = numOfBatches ) {
2021-12-06 19:18:30 +01:00
logger . error ( " None of the " + numOfBatches + " batches could be handled for assignments_ " + assignmentsBatchCounter + " , for worker: " + workerId ) ;
2021-11-30 17:23:27 +01:00
return false ;
} else {
replaceNotUploadedFileLocations ( urlReports ) ;
return true ;
}
}
2021-12-06 19:18:30 +01:00
private static HttpURLConnection getConnection ( String baseUrl , long assignmentsBatchCounter , int batchNum , List < String > fileNamesForCurBatch , int totalBatches , String workerId )
2021-11-30 17:23:27 +01:00
{
2021-12-06 19:18:30 +01:00
baseUrl + = batchNum + " / " ;
String requestUrl = getRequestUrlForBatch ( baseUrl , fileNamesForCurBatch ) ;
2021-11-30 17:23:27 +01:00
logger . info ( " Going to request the batch_ " + batchNum + " (out of " + totalBatches + " ) with " + fileNamesForCurBatch . size ( ) + " fullTexts, of assignments_ " + assignmentsBatchCounter + " from the Worker with ID \" " + workerId + " \" and baseRequestUrl: " + baseUrl + " [fileNames] " ) ;
HttpURLConnection conn = null ;
try {
conn = ( HttpURLConnection ) new URL ( requestUrl ) . openConnection ( ) ;
conn . setRequestMethod ( " GET " ) ;
conn . setRequestProperty ( " User-Agent " , " UrlsController " ) ;
conn . connect ( ) ;
int statusCode = conn . getResponseCode ( ) ;
if ( statusCode ! = 200 ) {
2021-12-06 19:18:30 +01:00
logger . warn ( " HTTP- " + statusCode + " : " + getErrorMessageFromResponseBody ( conn ) + " \ nProblem when requesting the ZipFile of batch_ " + batchNum + " from the Worker with ID \" " + workerId + " \" and requestUrl: " + requestUrl ) ;
2021-11-30 17:23:27 +01:00
return null ;
}
} catch ( Exception e ) {
logger . warn ( " Problem when requesting the ZipFile of batch_ " + batchNum + " of assignments_ " + assignmentsBatchCounter + " from the Worker with ID \" " + workerId + " \" and requestUrl: " + requestUrl + " \ n " + e . getMessage ( ) ) ;
return null ;
}
return conn ;
}
2021-12-06 19:18:30 +01:00
private static String getErrorMessageFromResponseBody ( HttpURLConnection conn )
{
StringBuilder errorMsgStrB = new StringBuilder ( 500 ) ;
try ( BufferedReader br = new BufferedReader ( new InputStreamReader ( conn . getErrorStream ( ) ) ) ) { // Try-with-resources
String inputLine ;
while ( ( inputLine = br . readLine ( ) ) ! = null )
{
if ( ! inputLine . isEmpty ( ) ) {
errorMsgStrB . append ( inputLine ) ;
}
}
return ( errorMsgStrB . length ( ) ! = 0 ) ? errorMsgStrB . toString ( ) : null ; // Make sure we return a "null" on empty string, to better handle the case in the caller-function.
} catch ( IOException ioe ) {
logger . error ( " IOException when retrieving the error-message: " + ioe . getMessage ( ) ) ;
return null ;
} catch ( Exception e ) {
logger . error ( " Could not extract the errorMessage! " , e ) ;
return null ;
}
}
private static List < String > getFileNamesForBatch ( List < String > allFileNames , int numAllFullTexts , int curBatch )
2021-11-30 17:23:27 +01:00
{
int initialIndex = ( ( curBatch - 1 ) * numOfFullTextsPerBatch ) ;
int endingIndex = ( curBatch * numOfFullTextsPerBatch ) ;
if ( endingIndex > numAllFullTexts ) // This might be the case, when the "numAllFullTexts" is too small.
endingIndex = numAllFullTexts ;
List < String > fileNamesOfCurBatch = new ArrayList < > ( numOfFullTextsPerBatch ) ;
for ( int i = initialIndex ; i < endingIndex ; + + i ) {
try {
fileNamesOfCurBatch . add ( allFileNames . get ( i ) ) ;
} catch ( IndexOutOfBoundsException ioobe ) {
logger . error ( " IOOBE for i= " + i + " \ n " + ioobe . getMessage ( ) , ioobe ) ;
}
}
return fileNamesOfCurBatch ;
}
2021-12-06 19:18:30 +01:00
private static final StringBuilder sb = new StringBuilder ( numOfFullTextsPerBatch * 50 ) ;
2021-11-30 17:23:27 +01:00
// TODO - Make it THREAD-LOCAL, if we move to multi-thread batch requests.
2021-12-06 19:18:30 +01:00
private static String getRequestUrlForBatch ( String baseUrl , List < String > fileNamesForCurBatch )
2021-11-30 17:23:27 +01:00
{
2021-12-06 19:18:30 +01:00
sb . append ( baseUrl ) ;
2021-11-30 17:23:27 +01:00
int numFullTextsCurBatch = fileNamesForCurBatch . size ( ) ;
for ( int j = 0 ; j < numFullTextsCurBatch ; + + j ) {
sb . append ( fileNamesForCurBatch . get ( j ) ) ;
if ( j < ( numFullTextsCurBatch - 1 ) )
sb . append ( " , " ) ;
}
String requestUrl = sb . toString ( ) ;
sb . setLength ( 0 ) ; // Reset for the next batch.
return requestUrl ;
}
/ * *
* This method updates the UrlReports to not point to any downloaded fullText files .
* This is useful when the uploading process of the fullTexts to the S3 - ObjectStore fails .
* Then , we don ' t want any " links " to locally stored files , which will be deleted .
* @param urlReports
* @return
* /
public static void updateUrlReportsToHaveNoFullTextFiles ( List < UrlReport > urlReports )
{
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport . getPayload ( ) ;
if ( payload ! = null )
setUnretrievedFullText ( payload ) ;
}
}
private static void replaceNotUploadedFileLocations ( List < UrlReport > urlReports )
{
for ( UrlReport urlReport : urlReports ) {
Payload payload = urlReport . getPayload ( ) ;
if ( payload ! = null ) {
String fileLocation = payload . getLocation ( ) ;
if ( ( fileLocation ! = null ) & & ( ! fileLocation . startsWith ( S3ObjectStoreMinIO . endpoint ) ) )
setUnretrievedFullText ( payload ) ;
}
}
}
public static void updateUrlReportsForCurBatchTOHaveNoFullTextFiles ( HashMap < String , Payload > payloadsHashMap , List < String > fileNames )
{
for ( String fileName : fileNames ) {
// Get the ID of the file.
Matcher matcher = FILENAME_ID . matcher ( fileName ) ;
if ( ! matcher . matches ( ) ) {
continue ;
}
String id = matcher . group ( 1 ) ;
if ( ( id = = null ) | | id . isEmpty ( ) ) {
continue ;
}
Payload payload = payloadsHashMap . get ( id ) ;
if ( payload ! = null )
setUnretrievedFullText ( payload ) ; // It changes the payload in the original UrlReport list.
}
}
public static void setUnretrievedFullText ( Payload payload )
{
// Mark the full-text as not-retrieved, since it will be deleted from local-storage. The retrieved link to the full-text will be kept.
payload . setLocation ( null ) ;
payload . setHash ( null ) ;
payload . setMime_type ( null ) ;
payload . setSize ( null ) ;
}
private static final int bufferSize = 20971520 ; // 20 MB
public static boolean saveZipFile ( HttpURLConnection conn , File zipFile )
{
FileOutputStream outStream = null ;
InputStream inStream = null ;
try {
inStream = conn . getInputStream ( ) ;
outStream = new FileOutputStream ( zipFile ) ;
byte [ ] byteBuffer = new byte [ bufferSize ] ; // 20 MB
int bytesRead = - 1 ;
while ( ( bytesRead = inStream . read ( byteBuffer , 0 , bufferSize ) ) ! = - 1 ) {
outStream . write ( byteBuffer , 0 , bytesRead ) ;
}
return true ;
} catch ( Exception e ) {
logger . error ( " Could not save the zip file \" " + zipFile . getName ( ) + " \" : " + e . getMessage ( ) , e ) ;
return false ;
} finally {
try {
if ( inStream ! = null )
inStream . close ( ) ;
if ( outStream ! = null )
outStream . close ( ) ;
} catch ( Exception e ) {
logger . error ( e . getMessage ( ) , e ) ;
}
}
}
public static boolean deleteDirectory ( File curBatchDir ) {
try {
org . apache . commons . io . FileUtils . deleteDirectory ( curBatchDir ) ;
return true ;
} catch ( IOException e ) {
logger . error ( " The following directory could not be deleted: " + curBatchDir . getName ( ) , e ) ;
return false ;
}
}
private static String getCutBatchExceptionMessage ( String sqleMessage )
{
// The sqleMessage contains the actual message followed by the long batch. This makes the logs unreadable. So we should shorten the message before logging.
int maxEnding = 1500 ;
if ( sqleMessage . length ( ) > maxEnding )
return ( sqleMessage . substring ( 0 , maxEnding ) + " ... " ) ;
else
return sqleMessage ;
}
2021-05-18 16:23:20 +02:00
// This is currently not used, but it may be useful in a future scenario.
private static long getInputFileLinesNum ( )
{
long numOfLines = 0 ;
try {
numOfLines = Files . lines ( Paths . get ( inputFileFullPath ) ) . count ( ) ;
logger . debug ( " The numOfLines in the inputFile is " + numOfLines ) ;
} catch ( IOException e ) {
logger . error ( " Could not retrieve the numOfLines. " + e ) ;
return - 1 ;
}
return numOfLines ;
}
/ * *
2021-11-04 10:57:19 +01:00
* This method decodes a Json String and returns its members .
2021-05-18 16:23:20 +02:00
* @param jsonLine String
* @return HashMap < String , String >
* /
public static Task jsonDecoder ( String jsonLine )
{
// Get ID and url and put them in the HashMap
String idStr = null ;
String urlStr = null ;
try {
JSONObject jObj = new JSONObject ( jsonLine ) ; // Construct a JSONObject from the retrieved jsonLine.
idStr = jObj . get ( " id " ) . toString ( ) ;
urlStr = jObj . get ( " url " ) . toString ( ) ;
} catch ( JSONException je ) {
logger . warn ( " JSONException caught when tried to parse and extract values from jsonLine: \ t " + jsonLine , je ) ;
return null ;
}
if ( urlStr . isEmpty ( ) ) {
if ( ! idStr . isEmpty ( ) ) // If we only have the id, then go and log it.
logger . warn ( " The url was not found for id: \" " + idStr + " \" " ) ;
return null ;
}
2021-05-20 01:50:50 +02:00
return new Task ( idStr , urlStr , null ) ;
2021-05-18 16:23:20 +02:00
}
/ * *
* This method parses a Json file and extracts the urls , along with the IDs .
* @return HashMultimap < String , String >
* /
public static HashMultimap < String , String > getNextIdUrlPairBatchFromJson ( )
{
Task inputIdUrlTuple ;
int expectedPathsPerID = 5 ;
int expectedIDsPerBatch = jsonBatchSize / expectedPathsPerID ;
HashMultimap < String , String > idAndUrlMappedInput = HashMultimap . create ( expectedIDsPerBatch , expectedPathsPerID ) ;
2021-06-10 19:24:51 +02:00
int curBeginning = fileIndex . get ( ) ;
2021-05-18 16:23:20 +02:00
2021-06-10 19:24:51 +02:00
while ( inputScanner . get ( ) . hasNextLine ( ) & & ( fileIndex . get ( ) < ( curBeginning + jsonBatchSize ) ) )
2021-05-18 16:23:20 +02:00
{ // While (!EOF) and inside the current url-batch, iterate through lines.
2021-06-10 19:24:51 +02:00
//logger.debug("fileIndex: " + FileUtils.fileIndex.get()); // DEBUG!
2021-05-18 16:23:20 +02:00
// Take each line, remove potential double quotes.
2021-06-10 19:24:51 +02:00
String retrievedLineStr = inputScanner . get ( ) . nextLine ( ) ;
2021-05-18 16:23:20 +02:00
//logger.debug("Loaded from inputFile: " + retrievedLineStr); // DEBUG!
2021-06-10 19:24:51 +02:00
fileIndex . set ( fileIndex . get ( ) + 1 ) ;
2021-05-18 16:23:20 +02:00
if ( retrievedLineStr . isEmpty ( ) ) {
2021-06-10 19:24:51 +02:00
unretrievableInputLines . set ( unretrievableInputLines . get ( ) + 1 ) ;
2021-05-18 16:23:20 +02:00
continue ;
}
if ( ( inputIdUrlTuple = jsonDecoder ( retrievedLineStr ) ) = = null ) { // Decode the jsonLine and take the two attributes.
logger . warn ( " A problematic inputLine found: \ t " + retrievedLineStr ) ;
2021-06-10 19:24:51 +02:00
unretrievableInputLines . set ( unretrievableInputLines . get ( ) + 1 ) ;
2021-05-18 16:23:20 +02:00
continue ;
}
if ( ! idAndUrlMappedInput . put ( inputIdUrlTuple . getId ( ) , inputIdUrlTuple . getUrl ( ) ) ) { // We have a duplicate url in the input.. log it here as we cannot pass it through the HashMultimap. It's possible that this as well as the original might be/give a docUrl.
2021-06-10 19:24:51 +02:00
duplicateIdUrlEntries . set ( duplicateIdUrlEntries . get ( ) + 1 ) ;
2021-05-18 16:23:20 +02:00
}
}
return idAndUrlMappedInput ;
}
/ * *
* This method returns the number of ( non - heading , non - empty ) lines we have read from the inputFile .
* @return loadedUrls
* /
public static int getCurrentlyLoadedUrls ( ) // In the end, it gives the total number of urls we have processed.
{
2021-06-10 19:24:51 +02:00
return FileUtils . fileIndex . get ( ) - FileUtils . unretrievableInputLines . get ( ) ;
2021-05-18 16:23:20 +02:00
}
/ * *
* This method checks if there is no more input - data and returns true in that case .
* Otherwise , it returns false , if there is more input - data to be loaded .
* A " RuntimeException " is thrown if no input - urls were retrieved in general .
* @param isEmptyOfData
* @param isFirstRun
* @return finished loading / not finished
* @throws RuntimeException
* /
public static boolean isFinishedLoading ( boolean isEmptyOfData , boolean isFirstRun )
{
if ( isEmptyOfData ) {
if ( isFirstRun )
logger . error ( " Could not retrieve any urls from the inputFile! " ) ;
else
logger . debug ( " Done loading " + FileUtils . getCurrentlyLoadedUrls ( ) + " urls from the inputFile. " ) ;
return true ;
}
return false ;
}
}