From ede7ca5a896853e3f038a23cb99b375993ab163e Mon Sep 17 00:00:00 2001 From: LSmyrnaios Date: Tue, 26 Sep 2023 18:01:55 +0300 Subject: [PATCH] - Add bulk-import support for non-Authoritative data-sources. - Update Spring Boot. - Code polishing. --- build.gradle | 2 +- .../components/BulkImport.java | 18 +++++++++--- .../controllers/BulkImportController.java | 2 +- .../services/BulkImportService.java | 2 +- .../services/BulkImportServiceImpl.java | 29 +++++++++++++------ .../services/ShutdownServiceImpl.java | 1 - .../services/StatsServiceImpl.java | 1 + .../services/UrlsServiceImpl.java | 5 ++-- .../urls_controller/util/FileUtils.java | 4 +-- src/main/resources/application.yml | 8 +++++ 10 files changed, 50 insertions(+), 22 deletions(-) diff --git a/build.gradle b/build.gradle index f09e0df..05d6071 100644 --- a/build.gradle +++ b/build.gradle @@ -1,5 +1,5 @@ plugins { - id 'org.springframework.boot' version '2.7.15' + id 'org.springframework.boot' version '2.7.16' id 'io.spring.dependency-management' version '1.1.3' id 'java' } diff --git a/src/main/java/eu/openaire/urls_controller/components/BulkImport.java b/src/main/java/eu/openaire/urls_controller/components/BulkImport.java index 7f1cfa4..85838f6 100644 --- a/src/main/java/eu/openaire/urls_controller/components/BulkImport.java +++ b/src/main/java/eu/openaire/urls_controller/components/BulkImport.java @@ -65,10 +65,11 @@ public class BulkImport { public static class BulkImportSource { - String datasourceID; - String datasourcePrefix; - String pdfUrlPrefix; - String mimeType; + private String datasourceID; + private String datasourcePrefix; + private String pdfUrlPrefix; + private String mimeType; + private boolean isAuthoritative; public BulkImportSource() { @@ -107,6 +108,14 @@ public class BulkImport { this.mimeType = mimeType; } + public boolean isAuthoritative() { + return isAuthoritative; + } + + public void setAuthoritative(boolean authoritative) { + isAuthoritative = authoritative; + } + @Override public String toString() { @@ -115,6 +124,7 @@ public class BulkImport { ", datasourcePrefix='" + datasourcePrefix + '\'' + ", pdfUrlPrefix='" + pdfUrlPrefix + '\'' + ", mimeType='" + mimeType + '\'' + + ", isAuthoritative=" + isAuthoritative + '}'; } } diff --git a/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java b/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java index 6aaad0f..8bde43e 100644 --- a/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java +++ b/src/main/java/eu/openaire/urls_controller/controllers/BulkImportController.java @@ -212,7 +212,7 @@ public class BulkImportController { ); // This directory, will be removed from "bulkImportDirsUnderProcessing", when the background job finishes. - return ResponseEntity.ok().body(new BulkImportResponse(msg, bulkImportReportID)); // The response is automatically serialized to json and it's of type "application/json". + return ResponseEntity.ok().body(new BulkImportResponse(msg, bulkImportReportID)); // The response is automatically serialized to json, and it has the type "application/json". } diff --git a/src/main/java/eu/openaire/urls_controller/services/BulkImportService.java b/src/main/java/eu/openaire/urls_controller/services/BulkImportService.java index 3e2430f..69aaa52 100644 --- a/src/main/java/eu/openaire/urls_controller/services/BulkImportService.java +++ b/src/main/java/eu/openaire/urls_controller/services/BulkImportService.java @@ -13,6 +13,6 @@ public interface BulkImportService { List getFileLocationsInsideDir(String directory); - String getMD5hash(String string); + String getMD5Hash(String string); } diff --git a/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java index 7d426c1..bb930f1 100644 --- a/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/BulkImportServiceImpl.java @@ -385,13 +385,10 @@ public class BulkImportServiceImpl implements BulkImportService { DatabaseConnector.databaseLock.unlock(); } - String idMd5hash = getMD5hash(fileNameID.toLowerCase()); - if ( idMd5hash == null ) + String openAireId = generateOpenaireId(fileNameID, datasourcePrefix, bulkImportSource.isAuthoritative()); + if ( openAireId == null ) return null; - // openaire id = + "::" + - String openAireId = (datasourcePrefix + "::" + idMd5hash); - String s3Url = null; if ( alreadyFoundFileLocation != null ) // If the full-text of this record is already-found and uploaded. @@ -426,7 +423,6 @@ public class BulkImportServiceImpl implements BulkImportService { record.put("hash", fileHash); // This is already checked and will not be null here. record.put("location", s3Url); record.put("provenance", ("bulk:" + provenance)); // Add the "bulk:" prefix in order to be more clear that this record comes from bulkImport, when looking all records in the "payload" VIEW. - return record; } @@ -434,7 +430,6 @@ public class BulkImportServiceImpl implements BulkImportService { public List getFileLocationsInsideDir(String directory) { List fileLocations = null; - try ( Stream walkStream = Files.find(Paths.get(directory), Integer.MAX_VALUE, (filePath, fileAttr) -> fileAttr.isRegularFile()) ) // In case we ever include other type-of-Files inside the same directory, we need to add this filter: "&& !filePath.toString().endsWith("name.ext")" { @@ -444,12 +439,11 @@ public class BulkImportServiceImpl implements BulkImportService { logger.error(errorMsg, e); return null; } - return fileLocations; } - public String getMD5hash(String string) + public String getMD5Hash(String string) { String md5 = null; try { @@ -463,4 +457,21 @@ public class BulkImportServiceImpl implements BulkImportService { return md5; } + + public String generateOpenaireId(String id, String datasourcePrefix, boolean isAuthoritative) + { + // If the "provenance" relates to an "authoritative" source, then its id has to be lowercase, before the md5() is applied to it. + // general_openaire_id = + "::" + + // authoritative_openaire_id = + "::" + + + if ( isAuthoritative ) + id = id.toLowerCase(); + + String idMd5Hash = getMD5Hash(id); + if ( idMd5Hash == null ) + return null; + + return (datasourcePrefix + "::" + idMd5Hash); + } + } diff --git a/src/main/java/eu/openaire/urls_controller/services/ShutdownServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/ShutdownServiceImpl.java index eff8ee7..0551848 100644 --- a/src/main/java/eu/openaire/urls_controller/services/ShutdownServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/ShutdownServiceImpl.java @@ -32,7 +32,6 @@ public class ShutdownServiceImpl implements ShutdownService { logger.error(initMsg + "The request came from another IP: " + remoteAddr + " | while the Controller has the IP: " + UriBuilder.ip); return ResponseEntity.status(HttpStatus.FORBIDDEN).build(); } - return null; // The checks are passing. } diff --git a/src/main/java/eu/openaire/urls_controller/services/StatsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/StatsServiceImpl.java index ef0da24..4142811 100644 --- a/src/main/java/eu/openaire/urls_controller/services/StatsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/StatsServiceImpl.java @@ -95,6 +95,7 @@ public class StatsServiceImpl implements StatsService { // To get the human-friendly timestamp format from the BigInt in the database: // select from_timestamp(CAST(CAST(`date` as decimal(30,0))/1000 AS timestamp), "yyyy-MM-dd HH:mm:ss.SSS") from payload + // Or simpler: select from_timestamp(CAST((`date`/1000) AS timestamp), "yyyy-MM-dd HH:mm:ss.SSS") from payload private void sleep1min() { diff --git a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java index 259d8fe..5127741 100644 --- a/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java +++ b/src/main/java/eu/openaire/urls_controller/services/UrlsServiceImpl.java @@ -87,7 +87,6 @@ public class UrlsServiceImpl implements UrlsService { throw new RuntimeException("One of the bulk-imported datasourceIDs was not found! | source: " + source); excludedIDs.add(datasourceID); } - int exclusionListSize = excludedIDs.size(); // This list will not be empty. // Prepare the "excludedDatasourceIDsStringList" to be used inside the "findAssignmentsQuery". Create the following string-pattern: @@ -100,9 +99,8 @@ public class UrlsServiceImpl implements UrlsService { sb.append(", "); } sb.append(")"); - excludedDatasourceIDsStringList = sb.toString(); - logger.info("The following bulkImport-datasources will be excluded from crawling: " + excludedDatasourceIDsStringList); + logger.info("The following bulkImport data-sources will be excluded from crawling: " + excludedDatasourceIDsStringList); } @@ -400,6 +398,7 @@ public class UrlsServiceImpl implements UrlsService { // there will always be a time when the counter will be just before the "golden-value" and then one workerReport has to be processed here and the counter will be incremented by one and signal the merging-time. if ( (currentNumOfWorkerReportsProcessed % UrlsController.numOfWorkers.get()) == 0 ) // The workersNum should not be zero! If a "division by zero" exception is thrown below, then there's a big bug somewhere in the design. if ( ! mergeWorkerRelatedTables(curWorkerId, curReportAssignmentsCounter, hasAttemptParquetFileProblem, hasPayloadParquetFileProblem) ) + // The "postReportResultToWorker()" was called inside. return false; if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) { diff --git a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java index c35604f..52d3341 100644 --- a/src/main/java/eu/openaire/urls_controller/util/FileUtils.java +++ b/src/main/java/eu/openaire/urls_controller/util/FileUtils.java @@ -397,7 +397,7 @@ public class FileUtils { { HttpURLConnection conn; try { - if ( (conn = getConnection(baseUrl, assignmentsBatchCounter, batchCounter, fileNamesForCurBatch, numOfBatches, workerId)) == null ) + if ( (conn = getConnectionForFullTextBatch(baseUrl, assignmentsBatchCounter, batchCounter, fileNamesForCurBatch, numOfBatches, workerId)) == null ) return false; } catch (RuntimeException re) { // The "cause" was logged inside "getConnection()". @@ -435,7 +435,7 @@ public class FileUtils { } - private HttpURLConnection getConnection(String baseUrl, long assignmentsBatchCounter, int batchNum, List fileNamesForCurBatch, int totalBatches, String workerId) throws RuntimeException + private HttpURLConnection getConnectionForFullTextBatch(String baseUrl, long assignmentsBatchCounter, int batchNum, List fileNamesForCurBatch, int totalBatches, String workerId) throws RuntimeException { baseUrl += batchNum + "/"; String requestUrl = getRequestUrlForBatch(baseUrl, fileNamesForCurBatch); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index a02f56d..0abbd4e 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -46,11 +46,19 @@ bulk-import: datasourcePrefix: arXiv_______ # For PID-providing datasource, we use the PID-prefix here. (so not the datasource-prefix: "od________18") pdfUrlPrefix: https://arxiv.org/pdf/ mimeType: application/pdf + isAuthoritative: true # otherImport: # datasourceID: othersource__::0123 # datasourcePrefix: other_______ # pdfUrlPrefix: https://example.org/pdf/ # mimeType: application/pdf +# isAuthoritative: false + +# For "authoritative" sources, a special prefix is selected, from: https://graph.openaire.eu/docs/data-model/pids-and-identifiers/#identifiers-in-the-graph +# For the rest, the "datasource_prefix" is selected, using this query: +# select datasource.namespaceprefix.value +# from openaire_prod_20230414.datasource -- Here use the latest production-table. +# where officialname.value = 'datasourceOfficialName'; spring: