- Add bulk-import support for non-Authoritative data-sources.

- Update Spring Boot.
- Code polishing.
This commit is contained in:
Lampros Smyrnaios 2023-09-26 18:01:55 +03:00
parent 90a864ea61
commit ede7ca5a89
10 changed files with 50 additions and 22 deletions

View File

@ -1,5 +1,5 @@
plugins {
id 'org.springframework.boot' version '2.7.15'
id 'org.springframework.boot' version '2.7.16'
id 'io.spring.dependency-management' version '1.1.3'
id 'java'
}

View File

@ -65,10 +65,11 @@ public class BulkImport {
public static class BulkImportSource {
String datasourceID;
String datasourcePrefix;
String pdfUrlPrefix;
String mimeType;
private String datasourceID;
private String datasourcePrefix;
private String pdfUrlPrefix;
private String mimeType;
private boolean isAuthoritative;
public BulkImportSource() {
@ -107,6 +108,14 @@ public class BulkImport {
this.mimeType = mimeType;
}
public boolean isAuthoritative() {
return isAuthoritative;
}
public void setAuthoritative(boolean authoritative) {
isAuthoritative = authoritative;
}
@Override
public String toString() {
@ -115,6 +124,7 @@ public class BulkImport {
", datasourcePrefix='" + datasourcePrefix + '\'' +
", pdfUrlPrefix='" + pdfUrlPrefix + '\'' +
", mimeType='" + mimeType + '\'' +
", isAuthoritative=" + isAuthoritative +
'}';
}
}

View File

@ -212,7 +212,7 @@ public class BulkImportController {
);
// This directory, will be removed from "bulkImportDirsUnderProcessing", when the background job finishes.
return ResponseEntity.ok().body(new BulkImportResponse(msg, bulkImportReportID)); // The response is automatically serialized to json and it's of type "application/json".
return ResponseEntity.ok().body(new BulkImportResponse(msg, bulkImportReportID)); // The response is automatically serialized to json, and it has the type "application/json".
}

View File

@ -13,6 +13,6 @@ public interface BulkImportService {
List<String> getFileLocationsInsideDir(String directory);
String getMD5hash(String string);
String getMD5Hash(String string);
}

View File

@ -385,13 +385,10 @@ public class BulkImportServiceImpl implements BulkImportService {
DatabaseConnector.databaseLock.unlock();
}
String idMd5hash = getMD5hash(fileNameID.toLowerCase());
if ( idMd5hash == null )
String openAireId = generateOpenaireId(fileNameID, datasourcePrefix, bulkImportSource.isAuthoritative());
if ( openAireId == null )
return null;
// openaire id = <datasourcePrefix> + "::" + <md5(lowercase(arxivId))>
String openAireId = (datasourcePrefix + "::" + idMd5hash);
String s3Url = null;
if ( alreadyFoundFileLocation != null ) // If the full-text of this record is already-found and uploaded.
@ -426,7 +423,6 @@ public class BulkImportServiceImpl implements BulkImportService {
record.put("hash", fileHash); // This is already checked and will not be null here.
record.put("location", s3Url);
record.put("provenance", ("bulk:" + provenance)); // Add the "bulk:" prefix in order to be more clear that this record comes from bulkImport, when looking all records in the "payload" VIEW.
return record;
}
@ -434,7 +430,6 @@ public class BulkImportServiceImpl implements BulkImportService {
public List<String> getFileLocationsInsideDir(String directory)
{
List<String> fileLocations = null;
try ( Stream<Path> walkStream = Files.find(Paths.get(directory), Integer.MAX_VALUE, (filePath, fileAttr) -> fileAttr.isRegularFile()) )
// In case we ever include other type-of-Files inside the same directory, we need to add this filter: "&& !filePath.toString().endsWith("name.ext")"
{
@ -444,12 +439,11 @@ public class BulkImportServiceImpl implements BulkImportService {
logger.error(errorMsg, e);
return null;
}
return fileLocations;
}
public String getMD5hash(String string)
public String getMD5Hash(String string)
{
String md5 = null;
try {
@ -463,4 +457,21 @@ public class BulkImportServiceImpl implements BulkImportService {
return md5;
}
public String generateOpenaireId(String id, String datasourcePrefix, boolean isAuthoritative)
{
// If the "provenance" relates to an "authoritative" source, then its id has to be lowercase, before the md5() is applied to it.
// general_openaire_id = <datasourcePrefix> + "::" + <md5(ID)>
// authoritative_openaire_id = <datasourcePrefix> + "::" + <md5(lowercase(ID))>
if ( isAuthoritative )
id = id.toLowerCase();
String idMd5Hash = getMD5Hash(id);
if ( idMd5Hash == null )
return null;
return (datasourcePrefix + "::" + idMd5Hash);
}
}

View File

@ -32,7 +32,6 @@ public class ShutdownServiceImpl implements ShutdownService {
logger.error(initMsg + "The request came from another IP: " + remoteAddr + " | while the Controller has the IP: " + UriBuilder.ip);
return ResponseEntity.status(HttpStatus.FORBIDDEN).build();
}
return null; // The checks are passing.
}

View File

@ -95,6 +95,7 @@ public class StatsServiceImpl implements StatsService {
// To get the human-friendly timestamp format from the BigInt in the database:
// select from_timestamp(CAST(CAST(`date` as decimal(30,0))/1000 AS timestamp), "yyyy-MM-dd HH:mm:ss.SSS") from payload
// Or simpler: select from_timestamp(CAST((`date`/1000) AS timestamp), "yyyy-MM-dd HH:mm:ss.SSS") from payload
private void sleep1min() {

View File

@ -87,7 +87,6 @@ public class UrlsServiceImpl implements UrlsService {
throw new RuntimeException("One of the bulk-imported datasourceIDs was not found! | source: " + source);
excludedIDs.add(datasourceID);
}
int exclusionListSize = excludedIDs.size(); // This list will not be empty.
// Prepare the "excludedDatasourceIDsStringList" to be used inside the "findAssignmentsQuery". Create the following string-pattern:
@ -100,9 +99,8 @@ public class UrlsServiceImpl implements UrlsService {
sb.append(", ");
}
sb.append(")");
excludedDatasourceIDsStringList = sb.toString();
logger.info("The following bulkImport-datasources will be excluded from crawling: " + excludedDatasourceIDsStringList);
logger.info("The following bulkImport data-sources will be excluded from crawling: " + excludedDatasourceIDsStringList);
}
@ -400,6 +398,7 @@ public class UrlsServiceImpl implements UrlsService {
// there will always be a time when the counter will be just before the "golden-value" and then one workerReport has to be processed here and the counter will be incremented by one and signal the merging-time.
if ( (currentNumOfWorkerReportsProcessed % UrlsController.numOfWorkers.get()) == 0 ) // The workersNum should not be zero! If a "division by zero" exception is thrown below, then there's a big bug somewhere in the design.
if ( ! mergeWorkerRelatedTables(curWorkerId, curReportAssignmentsCounter, hasAttemptParquetFileProblem, hasPayloadParquetFileProblem) )
// The "postReportResultToWorker()" was called inside.
return false;
if ( uploadFullTextsResponse == FileUtils.UploadFullTextsResponse.unsuccessful ) {

View File

@ -397,7 +397,7 @@ public class FileUtils {
{
HttpURLConnection conn;
try {
if ( (conn = getConnection(baseUrl, assignmentsBatchCounter, batchCounter, fileNamesForCurBatch, numOfBatches, workerId)) == null )
if ( (conn = getConnectionForFullTextBatch(baseUrl, assignmentsBatchCounter, batchCounter, fileNamesForCurBatch, numOfBatches, workerId)) == null )
return false;
} catch (RuntimeException re) {
// The "cause" was logged inside "getConnection()".
@ -435,7 +435,7 @@ public class FileUtils {
}
private HttpURLConnection getConnection(String baseUrl, long assignmentsBatchCounter, int batchNum, List<String> fileNamesForCurBatch, int totalBatches, String workerId) throws RuntimeException
private HttpURLConnection getConnectionForFullTextBatch(String baseUrl, long assignmentsBatchCounter, int batchNum, List<String> fileNamesForCurBatch, int totalBatches, String workerId) throws RuntimeException
{
baseUrl += batchNum + "/";
String requestUrl = getRequestUrlForBatch(baseUrl, fileNamesForCurBatch);

View File

@ -46,11 +46,19 @@ bulk-import:
datasourcePrefix: arXiv_______ # For PID-providing datasource, we use the PID-prefix here. (so not the datasource-prefix: "od________18")
pdfUrlPrefix: https://arxiv.org/pdf/
mimeType: application/pdf
isAuthoritative: true
# otherImport:
# datasourceID: othersource__::0123
# datasourcePrefix: other_______
# pdfUrlPrefix: https://example.org/pdf/
# mimeType: application/pdf
# isAuthoritative: false
# For "authoritative" sources, a special prefix is selected, from: https://graph.openaire.eu/docs/data-model/pids-and-identifiers/#identifiers-in-the-graph
# For the rest, the "datasource_prefix" is selected, using this query:
# select datasource.namespaceprefix.value
# from openaire_prod_20230414.datasource -- Here use the latest production-table.
# where officialname.value = 'datasourceOfficialName';
spring: