package eu.openaire.urls_controller.util; import com.google.common.collect.HashMultimap; import eu.openaire.urls_controller.configuration.ImpalaConnector; import eu.openaire.urls_controller.models.Payload; import eu.openaire.urls_controller.models.Task; import eu.openaire.urls_controller.models.UrlReport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.configurationprocessor.json.JSONException; import org.springframework.boot.configurationprocessor.json.JSONObject; import javax.servlet.http.HttpServletRequest; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.HttpURLConnection; import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Scanner; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Matcher; import java.util.regex.Pattern; public class FileUtils { private static final Logger logger = LoggerFactory.getLogger(FileUtils.class); public static ThreadLocal inputScanner = new ThreadLocal(); // Every Thread has its own variable. private static final ThreadLocal fileIndex = new ThreadLocal(); private static final ThreadLocal unretrievableInputLines = new ThreadLocal(); public static ThreadLocal duplicateIdUrlEntries = new ThreadLocal(); public static final int jsonBatchSize = 3000; private static final String utf8Charset = "UTF-8"; public static String inputFileFullPath; private static final String workingDir = System.getProperty("user.dir") + File.separator; public FileUtils() throws RuntimeException { inputFileFullPath = workingDir + "src" + File.separator + "main" + File.separator + "resources"; String resourceFileName = "testInputFiles" + File.separator + "orderedList1000.json"; inputFileFullPath += File.separator + resourceFileName; InputStream inputStream = getClass().getClassLoader().getResourceAsStream(resourceFileName); if ( inputStream == null ) throw new RuntimeException("No resourceFile was found with name \"" + resourceFileName + "\"."); logger.debug("Going to retrieve the data from the inputResourceFile: " + resourceFileName); FileUtils.inputScanner.set(new Scanner(inputStream, utf8Charset)); fileIndex.set(0); // Re-initialize the file-number-pointer. unretrievableInputLines.set(0); duplicateIdUrlEntries.set(0); } /** * In each insertion, a new parquet-file is created, so we end up with millions of files. Parquet is great for fast-select, so have to stick with it and merge those files.. * This method, creates a clone of the original table in order to have only one parquet file in the end. Drops the original table. * Renames the clone to the original's name. * Returns the errorMsg, if an error appears, otherwise is returns "null". * */ public static String mergeParquetFiles(String tableName, Connection con) { String errorMsg; if ( tableName == null ) { errorMsg = "No tableName was given. Do not know the tableName for which we should merger the underlying files for!"; logger.error(errorMsg); return errorMsg; } Statement statement; try { statement = con.createStatement(); } catch (SQLException sqle) { errorMsg = "Problem when creating a connection-statement!\n"; logger.error(errorMsg + sqle.getMessage()); return errorMsg; } try { statement.execute("CREATE TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp stored as parquet AS SELECT * FROM " + ImpalaConnector.databaseName + "." + tableName); statement.execute("DROP TABLE " + ImpalaConnector.databaseName + "." + tableName + " PURGE"); statement.execute("ALTER TABLE " + ImpalaConnector.databaseName + "." + tableName + "_tmp RENAME TO " + ImpalaConnector.databaseName + "." + tableName); statement.execute("COMPUTE STATS " + ImpalaConnector.databaseName + "." + tableName); } catch (SQLException sqle) { errorMsg = "Problem when executing the \"clone-drop-rename\" queries!\n"; logger.error(errorMsg + getCutBatchExceptionMessage(sqle.getMessage()), sqle); return errorMsg; } finally { // Make sure we close the statement. try { statement.close(); } catch (SQLException sqle3) { logger.error("Could not close the statement for executing queries in the Impala-database.\n" + sqle3); } } return null; // No errorMsg, everything is fine. } private static final Pattern FILENAME_ID = Pattern.compile("([\\w_:]+)\\.[\\w]{2,10}$"); private static final Pattern FILENAME_WITH_EXTENSION = Pattern.compile(".*/([\\w_:]+\\.[\\w]{2,10})$"); public static final String baseTargetLocation = System.getProperty("user.dir") + File.separator + "fullTexts" + File.separator; private static final int numOfFullTextsPerBatch = 70; // The HTTP-headers cannot be too large (It failed with 100 fileNames). public static boolean getAndUploadFullTexts(List urlReports, HttpServletRequest request, AtomicLong assignmentsBatchCounter, String workerId) { // The Controller have to request the files from the Worker, in order to upload them to the S3. // We will have to UPDATE the "location" of each of those files in the UrlReports and then insert them all into the database. if ( request == null ) { logger.error("The \"HttpServletRequest\" is null!"); return false; } String remoteAddr = request.getHeader("X-FORWARDED-FOR"); if ( remoteAddr == null || "".equals(remoteAddr) ) remoteAddr = request.getRemoteAddr(); // Get the file-locations. List allFileNames = new ArrayList<>(urlReports.size()/2); for ( UrlReport urlReport : urlReports ) { UrlReport.StatusType statusType = urlReport.getStatus(); if ( (statusType == null) || statusType.equals(UrlReport.StatusType.non_accessible) ) { continue; } Payload payload = urlReport.getPayload(); if ( payload != null ) { String fileLocation = payload.getLocation(); if ( fileLocation != null ) { // If the docFile was downloaded (without an error).. Matcher matcher = FILENAME_WITH_EXTENSION.matcher(fileLocation); if ( !matcher.matches() ) { continue; } String fileNameWithExtension = matcher.group(1); if ( (fileNameWithExtension == null) || fileNameWithExtension.isEmpty() ) { continue; } allFileNames.add(fileNameWithExtension); } } } int numAllFullTexts = allFileNames.size(); if ( numAllFullTexts == 0 ) { logger.warn("The file retrieved by the Worker where < 0 > for assignments_" + assignmentsBatchCounter); return true; // It was handled, no error. } // Request the full-texts in batches, compressed in zip. int numOfBatches = (numAllFullTexts / numOfFullTextsPerBatch); if ( (numAllFullTexts % numOfFullTextsPerBatch) > 0 ) // Add an extra batch for the remaining files. This guarantees at least one batch will exist no matter how few (>0) the files are. numOfBatches ++; logger.debug("The assignments_" + assignmentsBatchCounter + " have " + numAllFullTexts + " fullTexts. Going to request them from the Worker, in " + numOfBatches + " batches."); // Check if one full text is left out because of the division. Put it int the last batch. String baseUrl = "http://" + remoteAddr + ":1881/api/full-texts/getFullTexts/" + assignmentsBatchCounter + "/" + numOfBatches + "/"; // Index all UrlReports to be more efficiently searched later. HashMap payloadsHashMap = new HashMap<>(urlReports.size()); for ( UrlReport urlReport : urlReports ) { Payload payload = urlReport.getPayload(); if ( payload != null ) payloadsHashMap.put(payload.getId(), payload); } String curAssignmentsBaseLocation = baseTargetLocation + "assignments_" + assignmentsBatchCounter + File.separator; File curAssignmentsBaseDir = new File(curAssignmentsBaseLocation); int failedBatches = 0; for ( int i=1; i <= numOfBatches; ++i ) { List fileNamesForCurBatch = getFileNamesForBatch(allFileNames, numAllFullTexts, i, numOfBatches); HttpURLConnection conn = getConnection(baseUrl, assignmentsBatchCounter, i, fileNamesForCurBatch, numOfBatches, workerId); if ( conn == null ) { updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMap, fileNamesForCurBatch); failedBatches ++; continue; // To the next batch. } String targetLocation = curAssignmentsBaseLocation + "batch_" + i + File.separator; File curBatchDir = new File(targetLocation); try { // Get the extracted files., Path targetPath = Files.createDirectories(Paths.get(targetLocation)); // Unzip the file. Iterate over the PDFs and upload each one of them and get the S3-Url String zipFileFullPath = targetLocation + "fullTexts_" + assignmentsBatchCounter + "_" + i + ".zip"; File zipFile = new File(zipFileFullPath); if ( ! saveZipFile(conn, zipFile) ) { updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMap, fileNamesForCurBatch); deleteDirectory(curBatchDir); failedBatches ++; continue; // To the next batch. } //logger.debug("The zip file has been saved: " + zipFileFullPath); // DEBUG! FileUnZipper.unzipFolder(Paths.get(zipFileFullPath), targetPath); String[] fileNames = curBatchDir.list(); if ( (fileNames == null) || (fileNames.length == 0) ) { logger.error("No filenames where extracted from directory: " + targetLocation); updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMap, fileNamesForCurBatch); deleteDirectory(curBatchDir); failedBatches ++; continue; // To the next batch. } // Iterate over the files and upload them to S3. int numUploadedFiles = 0; for ( String fileName : fileNames ) { String fileFullPath = targetLocation + fileName; if ( fileFullPath.equals(zipFileFullPath) ) { // Exclude the zip-file from uploading. continue; } // Get the ID of the file. Matcher matcher = FILENAME_ID.matcher(fileName); if ( !matcher.matches() ) { continue; } String id = matcher.group(1); if ( (id == null) || id.isEmpty() ) { continue; } Payload payload = payloadsHashMap.get(id); if ( payload == null ) { continue; } String location = payload.getLocation(); if ( location == null ) { continue; } if ( ! location.endsWith(fileName) ) { // That should NEVER happen... logger.error("The location \"" + location + "\" of the payload matched with the ID \"" + id + "\" is not ending with the filename it was supposed to \"" + fileName + "\""); continue; } String s3Url = S3ObjectStoreMinIO.uploadToS3(fileName, fileFullPath); if ( s3Url != null ) { payload.setLocation(s3Url); // Update the file-location to the new S3-url. numUploadedFiles ++; } else setUnretrievedFullText(payload); } logger.info("Finished uploading " + numUploadedFiles + " full-texts of assignments_" + assignmentsBatchCounter + " on S3-ObjectStore."); } catch (Exception e) { logger.error("Could not extract and upload the full-texts for batch_" + i + " of assignments_" + assignmentsBatchCounter + "\n" + e.getMessage(), e); // It shows the response body (after Spring v.2.5.6). updateUrlReportsForCurBatchTOHaveNoFullTextFiles(payloadsHashMap, fileNamesForCurBatch); failedBatches ++; } finally { deleteDirectory(curBatchDir); // Delete the files of this batch (including the zip-file). } } // End of batches. // Delete this assignments-num directory. deleteDirectory(curAssignmentsBaseDir); // Check if none of the batches were handled.. if ( failedBatches == numOfBatches ) { logger.error("None of the " + numOfBatches + " batches could be handled!"); return false; } else { replaceNotUploadedFileLocations(urlReports); return true; } } private static HttpURLConnection getConnection(String baseUrl, AtomicLong assignmentsBatchCounter, int batchNum, List fileNamesForCurBatch, int totalBatches, String workerId) { String requestUrl = getRequestUrlForBatch(baseUrl, batchNum, fileNamesForCurBatch); logger.info("Going to request the batch_" + batchNum + " (out of " + totalBatches + ") with " + fileNamesForCurBatch.size() + " fullTexts, of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and baseRequestUrl: " + baseUrl + "[fileNames]"); HttpURLConnection conn = null; try { conn = (HttpURLConnection) new URL(requestUrl).openConnection(); conn.setRequestMethod("GET"); conn.setRequestProperty("User-Agent", "UrlsController"); conn.connect(); int statusCode = conn.getResponseCode(); if ( statusCode != 200 ) { logger.warn("HTTP-" + statusCode + ": Problem with when requesting the ZipFile of batch_" + batchNum + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl); return null; } } catch (Exception e) { logger.warn("Problem when requesting the ZipFile of batch_" + batchNum + " of assignments_" + assignmentsBatchCounter + " from the Worker with ID \"" + workerId + "\" and requestUrl: " + requestUrl + "\n" + e.getMessage()); return null; } return conn; } private static List getFileNamesForBatch(List allFileNames, int numAllFullTexts, int curBatch, int numOfBatches) { int initialIndex = ((curBatch-1) * numOfFullTextsPerBatch); int endingIndex = (curBatch * numOfFullTextsPerBatch); if ( endingIndex > numAllFullTexts ) // This might be the case, when the "numAllFullTexts" is too small. endingIndex = numAllFullTexts; List fileNamesOfCurBatch = new ArrayList<>(numOfFullTextsPerBatch); for ( int i = initialIndex; i < endingIndex; ++i ) { try { fileNamesOfCurBatch.add(allFileNames.get(i)); } catch (IndexOutOfBoundsException ioobe) { logger.error("IOOBE for i=" + i + "\n" + ioobe.getMessage(), ioobe); } } return fileNamesOfCurBatch; } private static final StringBuilder sb = new StringBuilder(numOfFullTextsPerBatch * 100); // TODO - Make it THREAD-LOCAL, if we move to multi-thread batch requests. private static String getRequestUrlForBatch(String baseUrl, int curBatch, List fileNamesForCurBatch) { sb.append(baseUrl).append(curBatch).append("/"); int numFullTextsCurBatch = fileNamesForCurBatch.size(); for ( int j=0; j < numFullTextsCurBatch; ++j ){ sb.append(fileNamesForCurBatch.get(j)); if ( j < (numFullTextsCurBatch -1) ) sb.append(","); } String requestUrl = sb.toString(); sb.setLength(0); // Reset for the next batch. return requestUrl; } /** * This method updates the UrlReports to not point to any downloaded fullText files. * This is useful when the uploading process of the fullTexts to the S3-ObjectStore fails. * Then, we don't want any "links" to locally stored files, which will be deleted. * @param urlReports * @return */ public static void updateUrlReportsToHaveNoFullTextFiles(List urlReports) { for ( UrlReport urlReport : urlReports ) { Payload payload = urlReport.getPayload(); if ( payload != null ) setUnretrievedFullText(payload); } } private static void replaceNotUploadedFileLocations(List urlReports) { for ( UrlReport urlReport : urlReports ) { Payload payload = urlReport.getPayload(); if ( payload != null ) { String fileLocation = payload.getLocation(); if ( (fileLocation != null) && (! fileLocation.startsWith(S3ObjectStoreMinIO.endpoint)) ) setUnretrievedFullText(payload); } } } public static void updateUrlReportsForCurBatchTOHaveNoFullTextFiles(HashMap payloadsHashMap, List fileNames) { for ( String fileName : fileNames ) { // Get the ID of the file. Matcher matcher = FILENAME_ID.matcher(fileName); if ( !matcher.matches() ) { continue; } String id = matcher.group(1); if ( (id == null) || id.isEmpty() ) { continue; } Payload payload = payloadsHashMap.get(id); if ( payload != null ) setUnretrievedFullText(payload); // It changes the payload in the original UrlReport list. } } public static void setUnretrievedFullText(Payload payload) { // Mark the full-text as not-retrieved, since it will be deleted from local-storage. The retrieved link to the full-text will be kept. payload.setLocation(null); payload.setHash(null); payload.setMime_type(null); payload.setSize(null); } private static final int bufferSize = 20971520; // 20 MB public static boolean saveZipFile(HttpURLConnection conn, File zipFile) { FileOutputStream outStream = null; InputStream inStream = null; try { inStream = conn.getInputStream(); outStream = new FileOutputStream(zipFile); byte[] byteBuffer = new byte[bufferSize]; // 20 MB int bytesRead = -1; while ( (bytesRead = inStream.read(byteBuffer, 0, bufferSize)) != -1 ) { outStream.write(byteBuffer, 0, bytesRead); } return true; } catch (Exception e) { logger.error("Could not save the zip file \"" + zipFile.getName() + "\": " + e.getMessage(), e); return false; } finally { try { if ( inStream != null ) inStream.close(); if ( outStream != null ) outStream.close(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } public static boolean deleteDirectory(File curBatchDir) { try { org.apache.commons.io.FileUtils.deleteDirectory(curBatchDir); return true; } catch (IOException e) { logger.error("The following directory could not be deleted: " + curBatchDir.getName(), e); return false; } } private static String getCutBatchExceptionMessage(String sqleMessage) { // The sqleMessage contains the actual message followed by the long batch. This makes the logs unreadable. So we should shorten the message before logging. int maxEnding = 1500; if ( sqleMessage.length() > maxEnding ) return (sqleMessage.substring(0, maxEnding) + "..."); else return sqleMessage; } // This is currently not used, but it may be useful in a future scenario. private static long getInputFileLinesNum() { long numOfLines = 0; try { numOfLines = Files.lines(Paths.get(inputFileFullPath)).count(); logger.debug("The numOfLines in the inputFile is " + numOfLines); } catch (IOException e) { logger.error("Could not retrieve the numOfLines. " + e); return -1; } return numOfLines; } /** * This method decodes a Json String and returns its members. * @param jsonLine String * @return HashMap */ public static Task jsonDecoder(String jsonLine) { // Get ID and url and put them in the HashMap String idStr = null; String urlStr = null; try { JSONObject jObj = new JSONObject(jsonLine); // Construct a JSONObject from the retrieved jsonLine. idStr = jObj.get("id").toString(); urlStr = jObj.get("url").toString(); } catch (JSONException je) { logger.warn("JSONException caught when tried to parse and extract values from jsonLine: \t" + jsonLine, je); return null; } if ( urlStr.isEmpty() ) { if ( !idStr.isEmpty() ) // If we only have the id, then go and log it. logger.warn("The url was not found for id: \"" + idStr + "\""); return null; } return new Task(idStr, urlStr, null); } /** * This method parses a Json file and extracts the urls, along with the IDs. * @return HashMultimap */ public static HashMultimap getNextIdUrlPairBatchFromJson() { Task inputIdUrlTuple; int expectedPathsPerID = 5; int expectedIDsPerBatch = jsonBatchSize / expectedPathsPerID; HashMultimap idAndUrlMappedInput = HashMultimap.create(expectedIDsPerBatch, expectedPathsPerID); int curBeginning = fileIndex.get(); while ( inputScanner.get().hasNextLine() && (fileIndex.get() < (curBeginning + jsonBatchSize)) ) {// While (!EOF) and inside the current url-batch, iterate through lines. //logger.debug("fileIndex: " + FileUtils.fileIndex.get()); // DEBUG! // Take each line, remove potential double quotes. String retrievedLineStr = inputScanner.get().nextLine(); //logger.debug("Loaded from inputFile: " + retrievedLineStr); // DEBUG! fileIndex.set(fileIndex.get() +1); if ( retrievedLineStr.isEmpty() ) { unretrievableInputLines.set(unretrievableInputLines.get() +1); continue; } if ( (inputIdUrlTuple = jsonDecoder(retrievedLineStr)) == null ) { // Decode the jsonLine and take the two attributes. logger.warn("A problematic inputLine found: \t" + retrievedLineStr); unretrievableInputLines.set(unretrievableInputLines.get() +1); continue; } if ( !idAndUrlMappedInput.put(inputIdUrlTuple.getId(), inputIdUrlTuple.getUrl()) ) { // We have a duplicate url in the input.. log it here as we cannot pass it through the HashMultimap. It's possible that this as well as the original might be/give a docUrl. duplicateIdUrlEntries.set(duplicateIdUrlEntries.get() +1); } } return idAndUrlMappedInput; } /** * This method returns the number of (non-heading, non-empty) lines we have read from the inputFile. * @return loadedUrls */ public static int getCurrentlyLoadedUrls() // In the end, it gives the total number of urls we have processed. { return FileUtils.fileIndex.get() - FileUtils.unretrievableInputLines.get(); } /** * This method checks if there is no more input-data and returns true in that case. * Otherwise, it returns false, if there is more input-data to be loaded. * A "RuntimeException" is thrown if no input-urls were retrieved in general. * @param isEmptyOfData * @param isFirstRun * @return finished loading / not finished * @throws RuntimeException */ public static boolean isFinishedLoading(boolean isEmptyOfData, boolean isFirstRun) { if ( isEmptyOfData ) { if ( isFirstRun ) logger.error("Could not retrieve any urls from the inputFile!"); else logger.debug("Done loading " + FileUtils.getCurrentlyLoadedUrls() + " urls from the inputFile."); return true; } return false; } }