Add support for Springer-bulkImport.

This commit is contained in:
Lampros Smyrnaios 2024-06-14 15:39:44 +03:00
parent 0d63165b6d
commit 7e7fc35d1e
7 changed files with 151 additions and 24 deletions

View File

@ -69,6 +69,7 @@ public class BulkImport {
private String datasourcePrefix;
private String fulltextUrlPrefix;
private String mimeType;
private String idMappingFilePath;
private boolean isAuthoritative;
@ -107,6 +108,14 @@ public class BulkImport {
this.mimeType = (mimeType.isEmpty() ? null : mimeType);
}
public String getIdMappingFilePath() {
return idMappingFilePath;
}
public void setIdMappingFilePath(String idMappingFilePath) {
this.idMappingFilePath = (idMappingFilePath.isEmpty() ? null : idMappingFilePath);
}
public boolean getIsAuthoritative() {
return isAuthoritative;
}
@ -118,6 +127,7 @@ public class BulkImport {
@Override
public String toString() {
return "BulkImportSource{" + "datasourceID='" + datasourceID + '\'' + ", datasourcePrefix='" + datasourcePrefix + '\'' + ", fulltextUrlPrefix='" + fulltextUrlPrefix + '\'' + ", mimeType='" + mimeType + '\'' +
", idMappingFilePath='" + idMappingFilePath + '\'' +
", isAuthoritative=" + isAuthoritative + '}';
}
}

View File

@ -11,7 +11,7 @@ public interface BulkImportService {
Boolean bulkImportFullTextsFromDirectory(BulkImportReport bulkImportReport, String relativeBulkImportDir, String bulkImportDirName, File bulkImportDir, String provenance, BulkImport.BulkImportSource bulkImportSource, boolean shouldDeleteFilesOnFinish);
List<String> getFileLocationsInsideDir(String directory);
List<String> getFileLocationsInsideDir(String directory, String idMappingsFilePath);
String getMD5Hash(String string);

View File

@ -10,8 +10,10 @@ import eu.openaire.urls_controller.models.DocFileData;
import eu.openaire.urls_controller.models.FileLocationData;
import eu.openaire.urls_controller.util.FileUtils;
import eu.openaire.urls_controller.util.GenericUtils;
import eu.openaire.urls_controller.util.JsonUtils;
import eu.openaire.urls_controller.util.ParquetFileUtils;
import org.apache.avro.generic.GenericData;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -26,10 +28,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -42,6 +41,9 @@ public class BulkImportServiceImpl implements BulkImportService {
@Autowired
private FileUtils fileUtils;
@Autowired
private JsonUtils jsonUtils;
@Autowired
private ParquetFileUtils parquetFileUtils;
@ -72,7 +74,7 @@ public class BulkImportServiceImpl implements BulkImportService {
return false;
}
List<String> fileLocations = getFileLocationsInsideDir(bulkImportDirName); // the error-msg has already been written
List<String> fileLocations = getFileLocationsInsideDir(bulkImportDirName, bulkImportSource.getIdMappingFilePath()); // the error-msg has already been written
if ( fileLocations == null ) {
String errorMsg = "Could not retrieve the files for bulk-import!";
logger.error(errorMsg + additionalLoggingMsg);
@ -93,7 +95,7 @@ public class BulkImportServiceImpl implements BulkImportService {
}
if ( logger.isTraceEnabled() )
logger.trace("fileLocations: " + additionalLoggingMsg + GenericUtils.endOfLine + fileLocations);
logger.trace("fileLocations: (below)" + additionalLoggingMsg + GenericUtils.endOfLine + fileLocations);
String localParquetDir = parquetFileUtils.parquetBaseLocalDirectoryPath + "bulk_import_" + provenance + File.separator + relativeBulkImportDir; // This ends with "/".
try {
@ -132,10 +134,25 @@ public class BulkImportServiceImpl implements BulkImportService {
bulkImportReport.addEvent(msg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false);
// If we have "provenance" = "springerImport", then we have to load the file-id-mappings.
final ConcurrentHashMap<String, String> idMappings;
if ( provenance.equals("springerImport") ) {
idMappings = jsonUtils.loadIdMappings(bulkImportDirName + bulkImportSource.getIdMappingFilePath(), numOfFiles, additionalLoggingMsg);
if ( idMappings == null ) {
String errorMsg = "Could not load the file-id-mappings! As a result, the OpenAIRE-IDs cannot be generated!";
logger.error(errorMsg + additionalLoggingMsg);
bulkImportReport.addEvent(errorMsg);
fileUtils.writeToFile(bulkImportReportLocation, bulkImportReport.getJsonReport(), false);
BulkImportController.bulkImportDirsUnderProcessing.remove(bulkImportDirName);
return false;
}
} else
idMappings = null; // This way the variable can remain "final", in order to be passed to the "callableTasksForFileSegments" bellow.
for ( int i = 0; i < subListsSize; ++i ) {
int finalI = i;
callableTasksForFileSegments.add(() -> { // Handle inserts to the "attempt" table. Insert 20% of the "attempt" queries.
return processBulkImportedFilesSegment(bulkImportReport, finalI, subLists.get(finalI), bulkImportDirName, localParquetDir, currentBulkImportHdfsDir, provenance, bulkImportSource, timeMillis, shouldDeleteFilesOnFinish, additionalLoggingMsg);
return processBulkImportedFilesSegment(bulkImportReport, finalI, subLists.get(finalI), bulkImportDirName, localParquetDir, currentBulkImportHdfsDir, provenance, bulkImportSource, idMappings, timeMillis, shouldDeleteFilesOnFinish, additionalLoggingMsg);
});
}
@ -228,7 +245,7 @@ public class BulkImportServiceImpl implements BulkImportService {
private int processBulkImportedFilesSegment(BulkImportReport bulkImportReport, int segmentCounter, List<String> fileLocationsSegment, String bulkImportDirName, String localParquetDir, String currentBulkImportHdfsDir,
String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis, boolean shouldDeleteFilesOnFinish, String additionalLoggingMsg)
String provenance, BulkImport.BulkImportSource bulkImportSource, ConcurrentHashMap<String, String> idMappings, long timeMillis, boolean shouldDeleteFilesOnFinish, String additionalLoggingMsg)
{
// Inside this thread, process a segment of the files.
String bulkImportReportLocation = bulkImportReport.getReportLocation();
@ -292,7 +309,7 @@ public class BulkImportServiceImpl implements BulkImportService {
String alreadyRetrievedFileLocation = hashWithExistingLocationMap.get(docFileData.getHash());
GenericData.Record record = null;
try {
record = processBulkImportedFile(docFileData, alreadyRetrievedFileLocation, provenance, bulkImportSource, timeMillis, additionalLoggingMsg);
record = processBulkImportedFile(docFileData, alreadyRetrievedFileLocation, provenance, bulkImportDirName, bulkImportSource, idMappings, timeMillis, additionalLoggingMsg);
} catch (Exception e) {
String errorMsg = "Exception when uploading the files of segment_" + segmentCounter + " to the S3 Object Store. Will avoid uploading the rest of the files for this segment.. " + e.getMessage();
logger.error(errorMsg + additionalLoggingMsg);
@ -402,19 +419,25 @@ public class BulkImportServiceImpl implements BulkImportService {
}
private GenericData.Record processBulkImportedFile(DocFileData docFileData, String alreadyRetrievedFileLocation, String provenance, BulkImport.BulkImportSource bulkImportSource, long timeMillis, String additionalLoggingMsg)
private GenericData.Record processBulkImportedFile(DocFileData docFileData, String alreadyRetrievedFileLocation, String provenance, String bulkImportDirName, BulkImport.BulkImportSource bulkImportSource, ConcurrentHashMap<String, String> idMappings, long timeMillis, String additionalLoggingMsg)
throws ConnectException, UnknownHostException
{
FileLocationData fileLocationData;
try {
fileLocationData = new FileLocationData(docFileData.getLocation());
} catch (RuntimeException re) {
logger.error(re.getMessage() + additionalLoggingMsg);
return null;
String fileLocation = docFileData.getLocation();
FileLocationData fileLocationData = null;
String fileId;
if ( provenance.equals("springerImport") ) {
String relativeFileLocation = StringUtils.replace(fileLocation, bulkImportDirName, "", 1);
if ( (fileId = idMappings.get(relativeFileLocation)) == null ) { // Take the "DOI" id matching to this file-ID.
logger.error("File '" + fileLocation + "' could not have its relative-path (" + relativeFileLocation + ") mapped with an ID!" + additionalLoggingMsg);
return null;
}
//logger.trace("File '" + fileLocationData.getFileName() + "' was mapped with ID: '" + fileId + "'" + additionalLoggingMsg); // Comment-out when ready.
} else {
if ( (fileLocationData = getFileLocationData(fileLocation, additionalLoggingMsg)) == null )
return null;
fileId = fileLocationData.getFileNameID(); // Note: This method not accept parentheses. If there is ever a publisher that uses parentheses, then we have to use another regex, than the one used for retrieved full-texts, from the Workers.
}
String fileId = fileLocationData.getFileNameID(); // Note: This method not accept parentheses. If there is ever a publisher that uses parentheses, then we have to use another regex, than the one used for retrieved full-texts, from the Workers.
String openAireId = generateOpenaireId(fileId, bulkImportSource.getDatasourcePrefix(), bulkImportSource.getIsAuthoritative());
if ( openAireId == null ) // The error is logged inside.
return null;
@ -432,13 +455,16 @@ public class BulkImportServiceImpl implements BulkImportService {
// The above analysis is educational, it does not need to take place and is not currently used.
s3Url = alreadyRetrievedFileLocation;
} else {
if ( fileLocationData == null ) // In case we have a "SpringerImport", this will not have been set.
if ( (fileLocationData = getFileLocationData(fileLocation, additionalLoggingMsg)) == null )
return null;
s3Url = fileUtils.constructS3FilenameAndUploadToS3(fileLocationData.getFileDir(), fileLocationData.getFileName(), openAireId, fileLocationData.getDotFileExtension(), bulkImportSource.getDatasourceID(), fileHash);
if ( s3Url == null )
return null;
}
// TODO - If another url-schema is introduced for other datasources, have a "switch"-statement and perform the right "actualUrl"-creation based on current schema.
String actualUrl = (bulkImportSource.getFulltextUrlPrefix() + fileId); // This string-concatenation, works with urls of Arvix. A different construction may be needed for other datasources.
String actualUrl = (bulkImportSource.getFulltextUrlPrefix() + fileId); // This string-concatenation, works with urls of arXiv. A different construction may be needed for other datasources.
String originalUrl = actualUrl; // We have the full-text files from bulk-import, so let's assume the original-url is also the full-text-link.
return parquetFileUtils.getPayloadParquetRecord(openAireId, originalUrl, actualUrl, timeMillis, bulkImportSource.getMimeType(),
@ -446,10 +472,24 @@ public class BulkImportServiceImpl implements BulkImportService {
}
public List<String> getFileLocationsInsideDir(String directory)
private FileLocationData getFileLocationData(String fileLocation, String additionalLoggingMsg)
{
try {
return new FileLocationData(fileLocation);
} catch (RuntimeException re) {
logger.error(re.getMessage() + additionalLoggingMsg);
return null;
}
}
public List<String> getFileLocationsInsideDir(String directory, String idMappingsFilePath)
{
List<String> fileLocations = null;
try ( Stream<Path> walkStream = Files.find(Paths.get(directory), 20, (filePath, fileAttr) -> fileAttr.isRegularFile()) )
try ( Stream<Path> walkStream = Files.find(Paths.get(directory), 20,
(idMappingsFilePath != null)
? (filePath, fileAttr) -> fileAttr.isRegularFile() && !filePath.toString().contains(idMappingsFilePath)
: (filePath, fileAttr) -> fileAttr.isRegularFile()) )
// In case we ever include other type-of-Files inside the same directory, we need to add this filter: "&& !filePath.toString().endsWith("name.ext")"
{
fileLocations = walkStream.map(Path::toString).collect(Collectors.toList());

View File

@ -95,9 +95,11 @@ public class UrlsServiceImpl implements UrlsService {
List<String> excludedIDs = new ArrayList<>();
for ( BulkImport.BulkImportSource source : bulkImportSources.values() ) {
if ( source.getFulltextUrlPrefix().contains("link.springer.com") )
continue; // Do not block the "made-up" springer-import-datasourceID from crawling. This is only used for the "S3-folder", upon uploading the fulltexts to S3.
String datasourceID = source.getDatasourceID();
if ( (datasourceID == null) || datasourceID.isEmpty() )
throw new RuntimeException("One of the bulk-imported datasourceIDs was not found! | source: " + source);
if ( datasourceID == null )
throw new RuntimeException("One of the bulk-imported datasourceIDs was not found! | source: " + source); // This may be the case for some bulkImports, as we do not want to block a datasource, since we are not sure we will get ALL fulltexts from it through bulkImport.
excludedIDs.add(datasourceID);
}
int exclusionListSize = excludedIDs.size(); // This list will not be empty.

View File

@ -0,0 +1,64 @@
package eu.openaire.urls_controller.util;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class JsonUtils {
private static final Logger logger = LoggerFactory.getLogger(JsonUtils.class);
private static final Gson gson = new Gson(); // This is "transient" by default. It won't be included in any json object.
public ConcurrentHashMap<String, String> loadIdMappings(String filePath, int mappingsSize, String additional_message)
{
logger.debug("Going to load the idMappings from '" + filePath + "'." + additional_message);
ConcurrentHashMap<String, String> idMappings = new ConcurrentHashMap<>(mappingsSize);
try ( BufferedReader br = new BufferedReader(new FileReader(filePath), FileUtils.halfMb) ) {
JsonArray idMappingsList = gson.fromJson(br, JsonArray.class);
//logger.debug("IdMappingsList:\n" + idMappingsList); // DEBUG!
for ( JsonElement idMapping : idMappingsList ) {
JsonObject jsonObject = idMapping.getAsJsonObject();
if ( null != idMappings.put(jsonObject.get("file").getAsString(), jsonObject.get("id").getAsString()) )
logger.warn("There was a duplicate file '" + jsonObject.get("file") + "' (probably in a different sub-directory)!" + additional_message);
}
/*Function<JsonObject, String> keyMapper = key -> key.get("file").getAsString();
Function<JsonObject, String> valueMapper = value -> value.get("id").getAsString();
idMappings = (ConcurrentHashMap<String, String>) idMappingsList.asList().stream()
.flatMap(jsonElement -> Stream.of(jsonElement.getAsJsonObject()))
.collect(Collectors.toConcurrentMap(keyMapper, valueMapper,
(id1, id2) -> {logger.warn("There was a duplicate file '" + keyMapper.apply(id1) + "' (probably in a different sub-directory)!" + additional_message);; return id1;} )); // Keep the first-assigned id, for this duplicate file.
// TODO - How to get the KEY Inside the keyMapper..?
*/
} catch (FileNotFoundException fnfe) {
logger.error("Could not find the id-file-idMappings! " + fnfe.getMessage() + additional_message);
} catch (Exception e) {
logger.error("Could not load the id-file-idMappings!" + additional_message, e);
try ( BufferedReader br = new BufferedReader(new FileReader(filePath), FileUtils.halfMb) ) {
logger.warn(br.readLine() + br.readLine() + br.readLine());
} catch (Exception ex) {
logger.error("", ex);
}
}
//if ( idMappings != null ) // Uncomment, in case the "stream"-version is used.
//logger.debug("IdMappings:\n" + idMappings); // DEBUG!
return idMappings; // It may be null.
}
}

View File

@ -92,6 +92,8 @@ public class S3ObjectStore {
else {
if ( extension.equals("pdf") )
contentType = "application/pdf";
else if ( extension.equals("xml") )
contentType = "application/xml";
/*else if ( *//* TODO - other-extension-match *//* )
contentType = "application/EXTENSION"; */
else

View File

@ -47,12 +47,21 @@ bulk-import:
datasourcePrefix: arXiv_______ # For PID-providing datasource, we use the PID-prefix here. (so not the datasource-prefix: "od________18")
fulltextUrlPrefix: https://arxiv.org/pdf/
mimeType: application/pdf
idMappingFilePath: null # This is interpreted as empty string, by Spring and we have to explicitly set it to null, in "compomnents/BulkImport.Java"
isAuthoritative: true
springerImport:
datasourceID: Springer - bulk import # The files are not Springer-exclusive, so we cannot exclude (even multiple) Springer datasources from crawling.
datasourcePrefix: doi_________ # The OpenAIRE-IDs should be
fulltextUrlPrefix: https://link.springer.com/content/xml/ # This is a "dummy" url. Springer does not expose XML records to the public, only PDFs.
mimeType: application/xml
idMappingFilePath: _metadata_/bulk.ids.json
isAuthoritative: true # The IDs are DOIs which need to pass through "lowercase preprocessing".
# otherImport:
# datasourceID: othersource__::0123
# datasourcePrefix: other_______
# fulltextUrlPrefix: https://example.org/pdf/
# mimeType: application/pdf
# idMappingFilePath: null
# isAuthoritative: false
# For "authoritative" sources, a special prefix is selected, from: https://graph.openaire.eu/docs/data-model/pids-and-identifiers/#identifiers-in-the-graph