diff --git a/README.md b/README.md index 37cfa1a..6c95a19 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,5 @@ -# UrlsController [![Build Status](https://jenkins-dnet.d4science.org/buildStatus/icon?job=UrlsController)](https://jenkins-dnet.d4science.org/job/UrlsController/) +# UrlsController +## [![Jenkins build status](https://jenkins-dnet.d4science.org/buildStatus/icon?job=UrlsController)](https://jenkins-dnet.d4science.org/job/UrlsController/) The Controller's Application receives requests coming from the [**Workers**](https://code-repo.d4science.org/lsmyrnaios/UrlsWorker) (deployed on the cloud), constructs an assignments-list with data received from a database and returns the list to the workers.
Then, it receives the "WorkerReports", it requests the full-texts from the workers, in batches, and uploads them on the S3-Object-Store. Finally, it writes the related reports, along with the updated file-locations into the database.
diff --git a/src/main/java/eu/openaire/urls_controller/controllers/ShutdownController.java b/src/main/java/eu/openaire/urls_controller/controllers/ShutdownController.java index 11b3c62..dedcc77 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/ShutdownController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/ShutdownController.java @@ -136,7 +136,8 @@ public class ShutdownController { @PostMapping("workerShutdownReport") public ResponseEntity workerShutdownReport(@RequestParam String workerId, HttpServletRequest request) { - String initMsg = "Received a \"workerShutdownReport\" from worker: \"" + workerId + "\"."; + String remoteAddr = GenericUtils.getRequestorAddress(request); + String initMsg = "Received a \"workerShutdownReport\" from worker: \"" + workerId + "\" [IP: " + remoteAddr + "]."; WorkerInfo workerInfo = UrlsController.workersInfoMap.get(workerId); if ( workerInfo == null ) { String errorMsg = "The worker with id \"" + workerId + "\" has not participated in the PDF-Aggregation-Service!"; @@ -144,9 +145,8 @@ public class ShutdownController { return ResponseEntity.badRequest().body(errorMsg); } - String remoteAddr = GenericUtils.getRequestorAddress(request); if ( ! remoteAddr.equals(workerInfo.getWorkerIP()) ) { - logger.error(initMsg + " The request came from another IP: " + remoteAddr + " | while this worker was registered with this IP: " + workerInfo.getWorkerIP()); + logger.error(initMsg + " The request came from an IP different from the one this worker was registered with: " + workerInfo.getWorkerIP()); return ResponseEntity.status(HttpStatus.FORBIDDEN).build(); } diff --git a/src/main/java/eu/openaire/urls_controller/models/BulkImportReport.java b/src/main/java/eu/openaire/urls_controller/models/BulkImportReport.java index b175425..83c3274 100644 --- a/src/main/java/eu/openaire/urls_controller/models/BulkImportReport.java +++ b/src/main/java/eu/openaire/urls_controller/models/BulkImportReport.java @@ -43,7 +43,7 @@ public class BulkImportReport { public void addEvent(String event) { - eventsMultimap.put(GenericUtils.getReadableCurrentTimeAndZone(), event); + eventsMultimap.put(GenericUtils.getReadableCurrentTimeAndZone(), event); // This is synchronized. } public String getJsonReport() diff --git a/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java index fed8455..45574d4 100644 --- a/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java @@ -78,7 +78,7 @@ public class BulkImportServiceImpl implements BulkImportService { String errorMsg = "The payloadsSchema could not be parsed!"; logger.error(errorMsg + additionalLoggingMsg); bulkImportReport.addEvent(errorMsg); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false); BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return false; } @@ -88,7 +88,7 @@ public class BulkImportServiceImpl implements BulkImportService { String errorMsg = "Could not retrieve the files for bulk-import!"; logger.error(errorMsg + additionalLoggingMsg); bulkImportReport.addEvent(errorMsg); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false); BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return false; } @@ -98,7 +98,7 @@ public class BulkImportServiceImpl implements BulkImportService { String errorMsg = "No files were found inside the bulkImportDir: " + bulkImportDirName; logger.warn(errorMsg); bulkImportReport.addEvent(errorMsg); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false); BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return false; } @@ -113,7 +113,7 @@ public class BulkImportServiceImpl implements BulkImportService { String errorMsg = "Could not create the local parquet-directory: " + localParquetDir; logger.error(errorMsg + additionalLoggingMsg, e); bulkImportReport.addEvent(errorMsg); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false); BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return false; } @@ -124,7 +124,7 @@ public class BulkImportServiceImpl implements BulkImportService { String errorMsg = "Could not create the remote HDFS-directory: " + currentBulkImportHdfsDir; logger.error(errorMsg + additionalLoggingMsg); bulkImportReport.addEvent(errorMsg); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false); BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return false; } @@ -139,7 +139,7 @@ public class BulkImportServiceImpl implements BulkImportService { msg = "Going to bulk-import the " + numOfFiles + " files in parallel, after dividing them in " + subListsSize + " segments."; logger.debug(msg + additionalLoggingMsg); bulkImportReport.addEvent(msg); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false); for ( int i = 0; i < subListsSize; ++i ) { int finalI = i; @@ -180,7 +180,7 @@ public class BulkImportServiceImpl implements BulkImportService { String errorMsg = "An error occurred when trying to bulk-import data from bulkImportDir: " + bulkImportDirName; logger.error(errorMsg, e); bulkImportReport.addEvent(errorMsg); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false); BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return false; } finally { @@ -193,7 +193,7 @@ public class BulkImportServiceImpl implements BulkImportService { String errorMsg = "None of the files inside the bulkImportDir: " + bulkImportDirName + " were imported!"; logger.error(errorMsg); bulkImportReport.addEvent(errorMsg); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false); BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return false; } else if ( numAllFailedFiles > 0 ) { // Some failed, but not all. @@ -204,7 +204,7 @@ public class BulkImportServiceImpl implements BulkImportService { logger.info(msg); } bulkImportReport.addEvent(msg); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false); // Merge the parquet files inside the table "payload_bulk_import", to improve performance of future operations. DatabaseConnector.databaseLock.lock(); @@ -212,7 +212,7 @@ public class BulkImportServiceImpl implements BulkImportService { DatabaseConnector.databaseLock.unlock(); if ( mergeErrorMsg != null ) { // the message in already logged bulkImportReport.addEvent(mergeErrorMsg); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false); BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName); return false; } @@ -220,7 +220,7 @@ public class BulkImportServiceImpl implements BulkImportService { String successMsg = "Finished the bulk-import procedure for " + provenance + " and bulkImportDir: " + bulkImportDirName; logger.info(successMsg); bulkImportReport.addEvent(successMsg); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false); // The report-file will be overwritten every now and then, instead of appended, since we want to add an update new JSON report-object each time. // Also, we do not want to write the object in the end (in its final form), since we want the user to have the ability to request the report at any time, // after submitting the bulk-import request, to see its progress (since the number of file may be very large and the processing may take many hours). @@ -240,6 +240,7 @@ public class BulkImportServiceImpl implements BulkImportService { String msg = "Going to import " + numOfFilesInSegment + " files, for segment-" + segmentCounter; logger.debug(msg + additionalLoggingMsg); bulkImportReport.addEvent(msg); + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); List payloadRecords = new ArrayList<>(numOfFilesInSegment); @@ -254,7 +255,7 @@ public class BulkImportServiceImpl implements BulkImportService { try { record = processBulkImportedFile(fileLocation, provenance, bulkImportSource, timeMillis, additionalLoggingMsg); } catch (Exception e) { - String errorMsg = "Exception when uploading the files of segment_" + segmentCounter + " to the S3 Object Store. Will avoid uploading any file for this segment.."; + String errorMsg = "Exception when uploading the files of segment_" + segmentCounter + " to the S3 Object Store. Will avoid uploading the rest of the files for this segment.."; logger.error(errorMsg + additionalLoggingMsg); bulkImportReport.addEvent(errorMsg); for ( int j=i; j < numOfFilesInSegment; ++j ) @@ -269,14 +270,14 @@ public class BulkImportServiceImpl implements BulkImportService { failedFiles.add(fileLocation); } - if ( ((++counter) % 150) == 0 ) { // Every 150 files, report the status for this segment. + if ( ((++counter) % 150) == 0 ) { // Every 150 files, report the status for this segment and right it to the file. msg = "Progress for segment-" + segmentCounter + " : " + payloadRecords.size() + " files have been imported and " + failedFiles.size() + " have failed, out of " + numOfFilesInSegment + " files."; if ( logger.isTraceEnabled() ) logger.trace(msg + additionalLoggingMsg); bulkImportReport.addEvent(msg); fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); } - } + } // End of processing files for this segment. int numOfPayloadRecords = payloadRecords.size(); if ( numOfPayloadRecords == 0 ) { @@ -291,7 +292,6 @@ public class BulkImportServiceImpl implements BulkImportService { String errorMsg = failedFiles.size() + " out of " + numOfFilesInSegment + " files failed to be bulk-imported, for segment-" + segmentCounter + " !"; logger.warn(errorMsg + additionalLoggingMsg); bulkImportReport.addEvent(errorMsg); - fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); } // Construct the parquet file, upload it to HDFS and load it in the "payload_bulk_import" table. @@ -350,11 +350,12 @@ public class BulkImportServiceImpl implements BulkImportService { // Delete all files except the ones in the "failedHashSet". for ( String fileLocation : fileLocationsSegment ) { if ( !failedFiles.contains(fileLocation) ) - if ( !fileUtils.deleteFile(fileLocation) ) + if ( !fileUtils.deleteFile(fileLocation) ) // The "error-log-message" is shown inside. bulkImportReport.addEvent("The file " + fileLocation + " could not be deleted! Please make sure you have provided the WRITE-permission."); } } + fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), true); return (numOfFilesInSegment - numOfPayloadRecords); // Return the numOfFailedFiles. } @@ -385,6 +386,10 @@ public class BulkImportServiceImpl implements BulkImportService { String datasourcePrefix = bulkImportSource.getDatasourcePrefix(); String fileNameID = fileLocationData.getFileNameID(); + String openAireId = generateOpenaireId(fileNameID, datasourcePrefix, bulkImportSource.getIsAuthoritative()); + if ( openAireId == null ) + return null; + String actualUrl = (bulkImportSource.getPdfUrlPrefix() + fileNameID); // This string-concatenation, works with urls of Arvix. A different construction may be needed for other datasources. String originalUrl = actualUrl; // We have the full-text files from bulk-import, so let's assume the original-url is also the full-text-link. @@ -403,10 +408,6 @@ public class BulkImportServiceImpl implements BulkImportService { DatabaseConnector.databaseLock.unlock(); } - String openAireId = generateOpenaireId(fileNameID, datasourcePrefix, bulkImportSource.getIsAuthoritative()); - if ( openAireId == null ) - return null; - String s3Url = null; if ( alreadyFoundFileLocation != null ) // If the full-text of this record is already-found and uploaded. diff --git a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java index 5199630..5790df6 100644 --- a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java @@ -287,7 +287,7 @@ public class UrlsServiceImpl implements UrlsService { } else if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) { logger.error("Failed to get and/or upload the fullTexts for batch-assignments_" + curReportAssignmentsCounter); // The docUrls were still found! Just update ALL the fileLocations, sizes, hashes and mimetypes, to show that the files are not available. - fileUtils.updateUrlReportsToHaveNoFullTextFiles(urlReports, false); + fileUtils.removeUnretrievedFullTextsFromUrlReports(urlReports, false); // We write only the payloads which are connected with retrieved full-texts, uploaded to S3-Object-Store. // We continue with writing the "attempts", as we want to avoid re-checking the failed-urls later. // The urls which give full-text (no matter if we could not get it from the worker), are flagged as "couldRetry" anyway, so they will be picked-up to be checked again later. diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index deb2366..395bc73 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -243,7 +243,7 @@ public class FileUtils { if ( failedBatches == numOfBatches ) logger.error("None of the " + numOfBatches + " batches could be handled for assignments_" + assignmentsBatchCounter + ", for worker: " + workerId); - updateUrlReportsToHaveNoFullTextFiles(urlReports, true); // Make sure all records without an S3-Url have < null > file-data (some batches or uploads might have failed). + removeUnretrievedFullTextsFromUrlReports(urlReports, true); // Make sure all records without an S3-Url have < null > file-data (some batches or uploads might have failed). deleteDirectory(new File(curAssignmentsBaseLocation)); // Check and warn about the number of failed payloads. @@ -617,7 +617,7 @@ public class FileUtils { public static final int twentyFiveKb = 25_600; // 25 Kb - public static final int halfMb = 524_288; // 0.5 Mb = 512 Kb + public static final int halfMb = 524_288; // 0.5 Mb = 512 Kb = 524_288 bytes public static final int tenMb = (10 * 1_048_576); public boolean saveArchive(HttpURLConnection conn, File zstdFile) @@ -644,7 +644,7 @@ public class FileUtils { * @param urlReports * @param shouldCheckAndKeepS3UploadedFiles */ - public void updateUrlReportsToHaveNoFullTextFiles(List urlReports, boolean shouldCheckAndKeepS3UploadedFiles) + public void removeUnretrievedFullTextsFromUrlReports(List urlReports, boolean shouldCheckAndKeepS3UploadedFiles) { for ( UrlReport urlReport : urlReports ) { Payload payload = urlReport.getPayload(); @@ -703,9 +703,11 @@ public class FileUtils { // Get the id and url of any String getDataForPayloadPrefillQuery = "select distinct pu.id, pu.url\n" + "from " + DatabaseConnector.databaseName + ".publication_urls pu\n" + + // Exclude the "already-processed" pairs. "left anti join " + DatabaseConnector.databaseName + ".attempt a on a.id=pu.id and a.original_url=pu.url\n" + "left anti join " + DatabaseConnector.databaseName + ".payload p on p.id=pu.id and p.original_url=pu.url\n" + "left anti join " + DatabaseConnector.databaseName + ".assignment asgn on asgn.id=pu.id and asgn.original_url=pu.url\n" + + // Limit the urls to the ones matching to the payload-urls found for the current assignments. "where pu.url in " + getQueryListString(urlsToRetrieveRelatedIDs, urlsToRetrieveRelatedIDsSize, stringBuilderCapacity); //logger.trace("getDataForPayloadPrefillQuery:\n" + getDataForPayloadPrefillQuery);