diff --git a/src/main/java/eu/openaire/urls_controller/components/BulkImport.java b/src/main/java/eu/openaire/urls_controller/components/BulkImport.java index fec6c09..7f1cfa4 100644 --- a/src/main/java/eu/openaire/urls_controller/components/BulkImport.java +++ b/src/main/java/eu/openaire/urls_controller/components/BulkImport.java @@ -14,7 +14,7 @@ public class BulkImport { private String bulkImportReportLocation; - private int numOfThreadsPerBulkImportProcedure; + private int numOfThreadsForBulkImportProcedures; private Map bulkImportSources; @@ -37,12 +37,12 @@ public class BulkImport { this.bulkImportReportLocation = bulkImportReportLocation; } - public int getNumOfThreadsPerBulkImportProcedure() { - return numOfThreadsPerBulkImportProcedure; + public int getNumOfThreadsForBulkImportProcedures() { + return numOfThreadsForBulkImportProcedures; } - public void setNumOfThreadsPerBulkImportProcedure(int numOfThreadsPerBulkImportProcedure) { - this.numOfThreadsPerBulkImportProcedure = numOfThreadsPerBulkImportProcedure; + public void setNumOfThreadsForBulkImportProcedures(int numOfThreadsForBulkImportProcedures) { + this.numOfThreadsForBulkImportProcedures = numOfThreadsForBulkImportProcedures; } public Map getBulkImportSources() { @@ -58,7 +58,7 @@ public class BulkImport { return "BulkImport{" + "baseBulkImportLocation='" + baseBulkImportLocation + '\'' + ", bulkImportReportLocation='" + bulkImportReportLocation + '\'' + - ", numOfThreadsPerBulkImportProcedure=" + numOfThreadsPerBulkImportProcedure + + ", numOfThreadsForBulkImportProcedures=" + numOfThreadsForBulkImportProcedures + ", bulkImportSources=" + bulkImportSources + '}'; } diff --git a/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java b/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java index f8931d2..4be541b 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java @@ -45,9 +45,9 @@ public class BulkImportController { private final HashMap bulkImportSources; - public static final Set bulkImportDirs = Collections.newSetFromMap(new ConcurrentHashMap()); + public static final Set bulkImportDirsUnderProcessing = Collections.newSetFromMap(new ConcurrentHashMap()); - public static int numOfThreadsPerBulkImportProcedure; + public static int numOfThreadsForBulkImportProcedures; public static ExecutorService bulkImportExecutor; @@ -65,9 +65,9 @@ public class BulkImportController { this.bulkImportService = bulkImportService; - numOfThreadsPerBulkImportProcedure = bulkImport.getNumOfThreadsPerBulkImportProcedure(); - logger.info("Will use " + numOfThreadsPerBulkImportProcedure + " threads per bulk-import procedure."); - bulkImportExecutor = Executors.newFixedThreadPool(numOfThreadsPerBulkImportProcedure); // At most < numOfThreadsPerBulkImportProcedure > threads will be used per bulk-import procedure.. + numOfThreadsForBulkImportProcedures = bulkImport.getNumOfThreadsForBulkImportProcedures(); + logger.info("Will use " + numOfThreadsForBulkImportProcedures + " threads per bulk-import procedure."); + bulkImportExecutor = Executors.newFixedThreadPool(numOfThreadsForBulkImportProcedures); // At most < numOfThreadsPerBulkImportProcedure > threads will be used per bulk-import procedure.. } @@ -146,7 +146,7 @@ public class BulkImportController { return ResponseEntity.badRequest().body(errorMsg); } // The above check does not catch the case were the directory has at least one subdirectory, but no full-texts files. - // The "iterator()" will have a "next" entry, but no full-text file will exist. Although, that case will be rare and will be caught later on, after this procedure being accepted. + // The "iterator()" will have a "next" entry, but no full-text file will exist. Although, that case will be rare and will be caught later on, after this procedure has been accepted. } catch (Exception e) { String errorMsg = "Error when checking if the givenDir \"" + givenDir + "\" is empty!"; logger.error(errorMsg); @@ -161,7 +161,7 @@ public class BulkImportController { } // Detect if the same directory is scheduled for being processed. In that case, return a 429. - if ( ! bulkImportDirs.add(bulkImportDir) ) { + if ( ! bulkImportDirsUnderProcessing.add(bulkImportDir) ) { // We allow multiple jobs for the same provenance, running at the same time, but not multiple jobs for the same bulkImportDirectory. String errorMsg = "There is a bulk-import request for the directory \"" + bulkImportDir + "\" that is being handled at the moment. Please wait until it's finished being processed, before making another request."; logger.error(errorMsg); @@ -172,13 +172,14 @@ public class BulkImportController { try { Files.createDirectories(currentBulkImportReportLocationDir); // No-op if dir exists. It does not throw a "alreadyExistsException" } catch (Exception e) { - String errorMsg = "Could nor create the \"bulkImportReportLocation\" for provenance \"" + provenance + "\" : " + currentBulkImportReportLocationDir; + String errorMsg = "Could not create the \"bulkImportReportLocation\" for provenance \"" + provenance + "\" : " + currentBulkImportReportLocationDir; logger.error(errorMsg, e); + bulkImportDirsUnderProcessing.remove(bulkImportDir); return ResponseEntity.internalServerError().body(errorMsg); } // Generate the "bulkImportReportID". We are removing the ending "slash" ("/") from the "relativeBulkImportDir". - String bulkImportReportID = provenance + "/" + relativeBulkImportDir.substring(0, (relativeBulkImportDir.length() -1)) + "_report_" + GenericUtils.getRandomNumber(10000, 99999); + String bulkImportReportID = provenance + "/" + relativeBulkImportDir + "report_" + GenericUtils.getRandomNumber(10000, 99999); String bulkImportReportFullPath = this.bulkImportReportLocation + bulkImportReportID + ".json"; String msg = "The bulkImportFullTexts request for " + provenance + " procedure and bulkImportDir: " + givenBulkDir + " was accepted and will be scheduled for execution. " @@ -189,8 +190,10 @@ public class BulkImportController { bulkImportReport.addEvent(msg); String errorMsg = fileUtils.writeToFile(bulkImportReportFullPath, bulkImportReport.getJsonReport(), true); - if ( errorMsg != null ) + if ( errorMsg != null ) { + bulkImportDirsUnderProcessing.remove(bulkImportDir); return ResponseEntity.internalServerError().body(errorMsg); + } logger.info(msg + " \"bulkImportReportID\": " + bulkImportReportID); @@ -201,6 +204,7 @@ public class BulkImportController { bulkImportService.bulkImportFullTextsFromDirectory(bulkImportReport, finalRelativeBulkImportDir, finalBulkImportDir, givenDir, provenance, bulkImportSource, shouldDeleteFilesOnFinish) ); + // This directory, will be removed from "bulkImportDirsUnderProcessing", when the background job finishes. return ResponseEntity.ok().body(new BulkImportResponse(msg, bulkImportReportID)); // The response is automatically serialized to json and it's of type "application/json". } diff --git a/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java index 3c929f7..9ab38c1 100644 --- a/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java @@ -73,7 +73,7 @@ public class BulkImportServiceImpl implements BulkImportService { logger.error(errorMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); - BulkImportController.bulkImportDirs.remove(bulkImportDirName); + BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return false; } @@ -81,7 +81,7 @@ public class BulkImportServiceImpl implements BulkImportService { if ( fileLocations == null ) { bulkImportReport.addEvent("Could not retrieve the files for bulk-import!"); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); - BulkImportController.bulkImportDirs.remove(bulkImportDirName); + BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return false; } @@ -91,7 +91,7 @@ public class BulkImportServiceImpl implements BulkImportService { logger.warn(errorMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); - BulkImportController.bulkImportDirs.remove(bulkImportDirName); + BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return false; } @@ -106,7 +106,7 @@ public class BulkImportServiceImpl implements BulkImportService { logger.error(errorMsg, e); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); - BulkImportController.bulkImportDirs.remove(bulkImportDirName); + BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return false; } @@ -117,17 +117,18 @@ public class BulkImportServiceImpl implements BulkImportService { logger.error(errorMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); - BulkImportController.bulkImportDirs.remove(bulkImportDirName); + BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return false; } long timeMillis = System.currentTimeMillis(); // Store it here, in order to have the same for all current records. List> callableTasksForFileSegments = new ArrayList<>(numOfFiles); - List> subLists = Lists.partition(fileLocations, BulkImportController.numOfThreadsPerBulkImportProcedure); // Divide the initial list to "numOfThreadsPerBulkImportProcedure" subLists. The last one may have marginally fewer files. + int sizeOfEachSegment = (numOfFiles / BulkImportController.numOfThreadsForBulkImportProcedures); + List> subLists = Lists.partition(fileLocations, sizeOfEachSegment); // Divide the initial list to "numOfThreadsPerBulkImportProcedure" subLists. The last one may have marginally fewer files. int subListsSize = subLists.size(); - bulkImportReport.addEvent("Going to import the files in " + subListsSize + " segments, in parallel."); + bulkImportReport.addEvent("Going to import the files in parallel, after dividing them in " + subListsSize + " segments."); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); for ( int i = 0; i < subListsSize; ++i ) { @@ -138,16 +139,18 @@ public class BulkImportServiceImpl implements BulkImportService { } int numFailedSegments = 0; - int numFailedFiles = 0; + int numFailedFilesForSegment = 0; + int numAllFailedFiles = 0; try { List> futures = BulkImportController.bulkImportExecutor.invokeAll(callableTasksForFileSegments); // This waits for all tasks to finish. - int sizeOfFutures = futures.size(); - for ( int i = 0; i < sizeOfFutures; ++i ) { + int sizeOfFutures = futures.size(); // This is the same as the "subListsSize". + for ( int i = 0; i < sizeOfFutures; ++i ) + { // For each segment.. try { - numFailedFiles += futures.get(i).get(); - if ( numFailedFiles == subLists.get(i).size() ) { // Get and see if it was successfully or not, or if an exception is thrown.. + numFailedFilesForSegment = futures.get(i).get(); + numAllFailedFiles += numFailedFilesForSegment; + if ( numFailedFilesForSegment == subLists.get(i).size() ) numFailedSegments++; - } // In case all the files failed to be bulk-imported, then we will detect it in the "numSuccessfulSegments"-check later. // The failed-to-be-imported files, will not be deleted, even if the user specifies that he wants to delete the directory. } catch (ExecutionException ee) { @@ -164,7 +167,7 @@ public class BulkImportServiceImpl implements BulkImportService { logger.error(errorMsg, e); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); - BulkImportController.bulkImportDirs.remove(bulkImportDirName); + BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return false; } finally { logger.debug("Deleting local parquet directory: " + localParquetDir); @@ -173,15 +176,15 @@ public class BulkImportServiceImpl implements BulkImportService { // Check the results. String msg; - if ( numFailedFiles == numOfFiles ) { + if ( numAllFailedFiles == numOfFiles ) { String errorMsg = "None of the files inside the bulkImportDir: " + bulkImportDirName + " were imported!"; logger.error(errorMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); - BulkImportController.bulkImportDirs.remove(bulkImportDirName); + BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return false; - } else if ( numFailedFiles > 0 ) { // Some failed, but not all. - msg = numFailedFiles + " files" + (numFailedSegments > 0 ? (" and " + numFailedSegments + " whole segments") : "") + " failed to be bulk-imported, from the bulkImportDir: " + bulkImportDirName; + } else if ( numAllFailedFiles > 0 ) { // Some failed, but not all. + msg = numAllFailedFiles + " files" + (numFailedSegments > 0 ? (" and " + numFailedSegments + " whole segments") : "") + " failed to be bulk-imported, from the bulkImportDir: " + bulkImportDirName; logger.warn(msg); } else { msg = "All " + numOfFiles + " files, from bulkImportDir: " + bulkImportDirName + " were bulkImported successfully."; @@ -197,7 +200,7 @@ public class BulkImportServiceImpl implements BulkImportService { ImpalaConnector.databaseLock.unlock(); bulkImportReport.addEvent(mergeErrorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); - BulkImportController.bulkImportDirs.remove(bulkImportDirName); + BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return false; } ImpalaConnector.databaseLock.unlock(); @@ -210,7 +213,7 @@ public class BulkImportServiceImpl implements BulkImportService { // Also, we do not want to write the object in the end (in its final form), since we want the user to have the ability to request the report at any time, // after submitting the bulk-import request, to see its progress (since the number of file may be very large and the processing may take many hours). - BulkImportController.bulkImportDirs.remove(bulkImportDirName); + BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return true; } @@ -222,7 +225,7 @@ public class BulkImportServiceImpl implements BulkImportService { String bulkImportReportLocation = bulkImportReport.getReportLocation(); int numOfFilesInSegment = fileLocationsSegment.size(); - String msg = "Going to import " + numOfFilesInSegment + " files for segment-" + segmentCounter + " , of bulkImport procedure: " + provenance + " | dir: " + bulkImportDirName; + String msg = "Going to import " + numOfFilesInSegment + " files, for segment-" + segmentCounter + ", of bulkImport procedure: " + provenance + " | dir: " + bulkImportDirName; logger.debug(msg); bulkImportReport.addEvent(msg); @@ -250,7 +253,7 @@ public class BulkImportServiceImpl implements BulkImportService { int numOfPayloadRecords = payloadRecords.size(); if ( numOfPayloadRecords == 0 ) { - String errorMsg = "No payload-records were generated for any of the files inside the bulkImportDir: " + bulkImportDirName; + String errorMsg = "No payload-records were generated for any of the files, of segment-" + segmentCounter + ", inside the bulkImportDir: " + bulkImportDirName; logger.warn(errorMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); @@ -258,13 +261,13 @@ public class BulkImportServiceImpl implements BulkImportService { return numOfFilesInSegment; } else if ( numOfPayloadRecords != numOfFilesInSegment ) { // Write this important note here, in order to certainly be in the report, even if a parquet-file failure happens and the method exists early. - String errorMsg = failedFiles.size() + " out of " + numOfFilesInSegment + " files failed to be imported, for segment-" + segmentCounter + " !"; + String errorMsg = failedFiles.size() + " out of " + numOfFilesInSegment + " files failed to be bulk-imported, for segment-" + segmentCounter + " !"; logger.warn(errorMsg); bulkImportReport.addEvent(errorMsg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); } - // Construct the parquet file, upload it to HDFS and load them it in the "payload_bulk_import" table. + // Construct the parquet file, upload it to HDFS and load it in the "payload_bulk_import" table. String parquetFileName = "payloads_" + segmentCounter + ".parquet"; String fullLocalParquetFilePath = localParquetDir + parquetFileName; @@ -272,10 +275,10 @@ public class BulkImportServiceImpl implements BulkImportService { logger.trace("Going to write " + numOfPayloadRecords + " payload-records to the parquet file: " + fullLocalParquetFilePath); // DEBUG! if ( ! parquetFileUtils.writeToParquet(payloadRecords, ParquetFileUtils.payloadsSchema, fullLocalParquetFilePath) ) { - bulkImportReport.addEvent("Could not write the payload-records to the parquet-file: " + parquetFileName + " !"); + bulkImportReport.addEvent("Could not write the payload-records for segment-" + segmentCounter + " to the parquet-file: " + parquetFileName + " !"); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); // None of the files of this segment will be deleted, in any case. - return numOfFilesInSegment; + return numOfFilesInSegment; // All files of this segment have failed. } if ( logger.isTraceEnabled() ) @@ -287,7 +290,7 @@ public class BulkImportServiceImpl implements BulkImportService { bulkImportReport.addEvent("Could not upload the parquet-file " + parquetFileName + " to HDFS!"); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); // None of the files of this segment will be deleted, in any case. - return numOfFilesInSegment; + return numOfFilesInSegment; // All files of this segment have failed. } if ( logger.isTraceEnabled() ) @@ -296,14 +299,14 @@ public class BulkImportServiceImpl implements BulkImportService { ImpalaConnector.databaseLock.lock(); if ( !parquetFileUtils.loadParquetDataIntoTable((currentBulkImportHdfsDir + parquetFileName), "payload_bulk_import") ) { ImpalaConnector.databaseLock.unlock(); - bulkImportReport.addEvent("Could not load the payload-records to the database!"); + bulkImportReport.addEvent("Could not load the payload-records to the database, for segment-" + segmentCounter + "!"); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); // None of the files of this segment will be deleted, in any case. - return numOfFilesInSegment; + return numOfFilesInSegment; // All files of this segment have failed. } ImpalaConnector.databaseLock.unlock(); - String segmentSuccessMsg = "Finished importing " + numOfPayloadRecords + " files, out of " + numOfFilesInSegment + " , for segment-" + segmentCounter + "."; + String segmentSuccessMsg = "Finished importing " + numOfPayloadRecords + " files, out of " + numOfFilesInSegment + ", for segment-" + segmentCounter + "."; logger.info(segmentSuccessMsg); bulkImportReport.addEvent(segmentSuccessMsg); @@ -407,7 +410,7 @@ public class BulkImportServiceImpl implements BulkImportService { record.put("size", ((size != null) ? String.valueOf(size) : null)); record.put("hash", fileHash); // This is already checked and will not be null here. record.put("location", s3Url); - record.put("provenance", provenance); + record.put("provenance", ("bulk:" + provenance)); // Add a prefix in order to be more clear that this record comes from bulkImport, when looking all records in the "payload" VIEW. return record; } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 4c383f5..a02f56d 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -39,7 +39,7 @@ services: bulk-import: baseBulkImportLocation: /mnt/bulk_import/ bulkImportReportLocation: /reports/bulkImportReports/ - numOfThreadsPerBulkImportProcedure: 6 + numOfThreadsForBulkImportProcedures: 6 bulkImportSources: # These sources are accepted for bulk-import requests and are excluded from crawling. arxivImport: datasourceID: opendoar____::6f4922f45568161a8cdf4ad2299f6d23